[java] Kafka Streams와 에너지 데이터 처리 방법
소개
Kafka Streams는 Apache Kafka를 기반으로 한 스트리밍 데이터 처리 플랫폼입니다. 이를 활용하여 에너지 데이터를 실시간으로 처리하는 방법을 살펴보겠습니다.
에너지 데이터 수집
에너지 데이터는 여러 소스에서 수집될 수 있습니다. 전기 사용량, 온도, 습도 등을 수집하기 위해 IoT 기기나 센서를 사용할 수 있습니다.
public class EnergyDataCollector {
private KafkaProducer<String, String> producer;
public EnergyDataCollector() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public void collect(String topic, String data) {
producer.send(new ProducerRecord<>(topic, data));
}
}
위의 코드는 에너지 데이터를 수집하는 클래스 EnergyDataCollector
입니다. KafkaProducer
를 이용하여 데이터를 Kafka 클러스터로 전송합니다.
데이터 처리
Kafka Streams를 사용하여 수집된 에너지 데이터를 실시간으로 처리할 수 있습니다. 아래의 예제 코드는 전기 사용량 데이터를 요약하는 예제입니다.
public class EnergyDataProcessor {
private static final String INPUT_TOPIC = "energy-data";
private static final String OUTPUT_TOPIC = "summary-data";
public void process() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "energy-data-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Integer> summaryTable = builder.stream(INPUT_TOPIC)
.mapValues(value -> parseEnergy(value))
.groupByKey()
.reduce((v1, v2) -> v1 + v2)
.toTable();
summaryTable.toStream().to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private Integer parseEnergy(String value) {
// energy data parsing logic
// ...
}
}
위의 코드는 EnergyDataProcessor
클래스에서 Kafka Streams를 사용하여 에너지 데이터를 처리하는 예제입니다. KTable
을 사용하여 에너지 데이터를 요약합니다.
시각화
처리된 에너지 데이터를 시각화할 수 있습니다. 예를 들어, Grafana와 같은 도구를 사용하여 실시간으로 그래프를 생성할 수 있습니다.
결론
Kafka Streams를 활용하면 에너지 데이터를 실시간으로 처리하고 분석할 수 있습니다. 에너지 데이터 수집, 처리, 시각화에 대한 예제 코드와 간단한 설명을 통해 Kafka Streams의 활용을 알아보았습니다.