[java] Kafka Streams의 데이터 처리 파이프라인 구성하기

Kafka Streams는 실시간 스트리밍 데이터 처리를 위한 클라이언트 라이브러리입니다. 이를 사용하여 데이터 처리 파이프라인을 구성할 수 있습니다. 이번 블로그 포스트에서는 Kafka Streams를 사용하여 Java로 데이터 처리 파이프라인을 구성하는 방법에 대해 알아보겠습니다.

1. Kafka Streams API를 추가하기

Kafka Streams를 사용하기 위해서는 우선 Maven 또는 Gradle 등의 의존성 관리 도구를 사용하여 Kafka Streams API를 프로젝트에 추가해야 합니다.

Maven의 경우 다음과 같이 pom.xml 파일에 아래의 의존성을 추가합니다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
</dependency>

Gradle의 경우 다음과 같이 build.gradle 파일에 아래의 의존성을 추가합니다.

implementation 'org.apache.kafka:kafka-streams:2.8.0'

2. Kafka Streams 애플리케이션 구현하기

Kafka Streams 애플리케이션을 구현하기 위해서는 다음과 같은 단계를 따라야 합니다.

2.1. Topology 정의하기

먼저 데이터 처리의 흐름을 정의하기 위해 Topology을 생성해야 합니다. Topology은 스트림 처리를 위한 데이터 흐름을 나타내는 그래프입니다.

Topology topology = new Topology();

topology
    .addSource("source", "input-topic")
    .addProcessor("processor", MyProcessorSupplier(), "source")
    .addSink("sink", "output-topic", "processor");

위의 코드에서 addSource, addProcessor, addSink 메서드를 사용하여 각각 데이터 소스, 처리기, 싱크를 추가하고 이들을 연결하여 Topology를 구성합니다. input-topic은 입력 스트림이 되고, output-topic은 결과를 출력할 스트림입니다.

2.2. 스트림 처리 로직 구현하기

스트림의 각 레코드를 처리하는 로직은 Processor를 구현하여 정의할 수 있습니다. 아래는 간단한 예시입니다.

public class MyProcessorSupplier implements ProcessorSupplier<String, String> {
    @Override
    public Processor<String, String> get() {
        return new Processor<String, String>() {
            private ProcessorContext context;
            
            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public void process(String key, String value) {
                // 여기서 데이터 처리 로직을 구현합니다.
                String processedValue = value.toUpperCase();
                context.forward(key, processedValue);
            }

            @Override
            public void close() {
                // 리소스 정리 로직을 구현합니다.
            }
        };
    }
}

위의 코드에서 process 메서드에서 데이터 처리 로직을 구현합니다. context.forward 메서드를 사용하여 결과를 다음 단계로 전달합니다.

2.3. Kafka Streams 애플리케이션 실행하기

애플리케이션을 실행하기 위해 KafkaStreams 객체를 생성하고 start 메서드를 호출합니다.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

3. 마치며

이번 포스트에서는 Kafka Streams를 사용하여 Java로 데이터 처리 파이프라인을 구성하는 방법에 대해 알아보았습니다. 위의 코드를 참고하여 Kafka Streams를 활용하여 데이터를 처리하는 애플리케이션을 구현해 보세요!

더 자세한 내용은 Kafka Streams 문서를 참고하시기 바랍니다.