[java] Kafka Streams와 에너지 데이터 처리 방법

소개

Kafka Streams는 Apache Kafka를 기반으로 한 스트리밍 데이터 처리 플랫폼입니다. 이를 활용하여 에너지 데이터를 실시간으로 처리하는 방법을 살펴보겠습니다.

에너지 데이터 수집

에너지 데이터는 여러 소스에서 수집될 수 있습니다. 전기 사용량, 온도, 습도 등을 수집하기 위해 IoT 기기나 센서를 사용할 수 있습니다.

public class EnergyDataCollector {
    private KafkaProducer<String, String> producer;
    
    public EnergyDataCollector() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        producer = new KafkaProducer<>(props);
    }
    
    public void collect(String topic, String data) {
        producer.send(new ProducerRecord<>(topic, data));
    }
}

위의 코드는 에너지 데이터를 수집하는 클래스 EnergyDataCollector 입니다. KafkaProducer를 이용하여 데이터를 Kafka 클러스터로 전송합니다.

데이터 처리

Kafka Streams를 사용하여 수집된 에너지 데이터를 실시간으로 처리할 수 있습니다. 아래의 예제 코드는 전기 사용량 데이터를 요약하는 예제입니다.

public class EnergyDataProcessor {
    private static final String INPUT_TOPIC = "energy-data";
    private static final String OUTPUT_TOPIC = "summary-data";
    
    public void process() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "energy-data-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        KTable<String, Integer> summaryTable = builder.stream(INPUT_TOPIC)
                                        .mapValues(value -> parseEnergy(value))
                                        .groupByKey()
                                        .reduce((v1, v2) -> v1 + v2)
                                        .toTable();
                                        
        summaryTable.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
                
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
    
    private Integer parseEnergy(String value) {
        // energy data parsing logic
        // ...
    }
}

위의 코드는 EnergyDataProcessor 클래스에서 Kafka Streams를 사용하여 에너지 데이터를 처리하는 예제입니다. KTable을 사용하여 에너지 데이터를 요약합니다.

시각화

처리된 에너지 데이터를 시각화할 수 있습니다. 예를 들어, Grafana와 같은 도구를 사용하여 실시간으로 그래프를 생성할 수 있습니다.

결론

Kafka Streams를 활용하면 에너지 데이터를 실시간으로 처리하고 분석할 수 있습니다. 에너지 데이터 수집, 처리, 시각화에 대한 예제 코드와 간단한 설명을 통해 Kafka Streams의 활용을 알아보았습니다.