[java] 자바로 카프카에서 발생하는 스트림 처리

이 글에서는 자바를 사용하여 카프카(Kafka)에서 발생하는 스트림(stream) 데이터를 처리하는 방법에 대해 알아보겠습니다.

1. 카프카 스트림 처리란?

카프카는 대용량 실시간 이벤트 데이터를 처리하기 위한 분산 스트리밍 플랫폼입니다. 카프카 스트림 처리는 카프카에서 발생하는 데이터를 실시간으로 읽어와서 처리하는 작업을 의미합니다. 이를 통해 데이터 파이프라인을 구축하고 데이터를 필터링, 변환, 집계 등 다양한 작업을 수행할 수 있습니다.

2. 카프카 스트림 처리 예제 코드

아래는 자바를 사용하여 카프카 스트림 처리를 하는 예제 코드입니다.

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

import java.util.Arrays;
import java.util.Properties;

public class StreamProcessingExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> input = builder.stream("input-topic");

        KTable<String, Long> wordCounts = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word)
                .count(Materialized.as("word-counts"));

        wordCounts.toStream().foreach((word, count) -> System.out.println("Word: " + word + ", Count: " + count));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}

위의 예제 코드는 “input-topic”이라는 카프카 토픽에서 데이터를 읽어와서 각 단어의 빈도수를 계산하는 작업을 수행합니다. 데이터는 소문자로 변환된 후 구분자를 기준으로 단어 단위로 분리됩니다. 그리고 각 단어의 빈도수가 “word-counts”라는 테이블에 저장되고 콘솔에 출력됩니다.

3. 참고 자료

이번 글에서는 카프카에서 발생하는 스트림 데이터를 자바로 처리하는 방법에 대해 알아보았습니다. 카프카 스트림 처리는 실시간 데이터 처리에 유용한 기능이므로, 관심이 있다면 깊이 공부해보시기를 추천드립니다.