[java] Kafka Streams를 사용한 데이터 전송과 변환

이번 포스트에서는 Kafka Streams를 사용하여 데이터를 전송하고 변환하는 방법에 대해 알아보겠습니다. Kafka Streams는 Apache Kafka를 기반으로 한 데이터 스트리밍 라이브러리로, 데이터의 실시간 처리와 변환을 간단하게 할 수 있습니다.

필요한 라이브러리 및 환경 설정

먼저 Kafka Streams를 사용하기 위해서는 몇 가지 라이브러리를 추가해야 합니다. Maven을 사용하는 경우, pom.xml 파일에 다음과 같이 의존성을 추가해주세요.

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>

또한 Kafka 클러스터에 연결하기 위해 필요한 설정들을 application.properties 파일에 작성해야 합니다.

bootstrap.servers=localhost:9092
application.id=my-streams-app

데이터 전송하기

Kafka Streams를 사용하여 데이터를 전송하려면 먼저 Kafka Producer를 생성해야 합니다. StreamsBuilder 클래스를 사용하여 Kafka Streams 애플리케이션을 생성하고, 데이터를 처리하고 전송하는 논리를 작성할 수 있습니다.

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class DataStreamer {

    public static void main(String[] args) {
        // Kafka Streams 애플리케이션 설정
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "data-streamer");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        // StreamsBuilder 객체 생성
        StreamsBuilder builder = new StreamsBuilder();

        // 데이터 소스 토픽을 스트림으로 읽기
        KStream<String, String> inputStream = builder.stream("input-topic");

        // 처리 로직 작성
        inputStream.foreach((key, value) -> {
            // 전송할 데이터를 생성하는 로직 작성
            String transformedValue = transform(value);

            // 동일한 키로 다른 토픽에 전송
            builder.stream("output-topic").to(key, transformedValue);
        });

        // Kafka Streams 애플리케이션 실행
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

    private static String transform(String value) {
        // 데이터를 변환하는 로직을 작성해주세요.
        // 예: value 값을 대문자로 변환하는 경우
        return value.toUpperCase();
    }
}

위의 예제 코드에서는 input-topic에서 데이터를 읽고, 각 데이터를 처리한 후 output-topic으로 전송하는 간단한 예제를 보여줍니다. transform 메소드는 입력값을 변환하는 로직을 작성하는 곳으로, 원하는 데이터 변환 방식에 따라 수정해주시면 됩니다.

데이터 변환하기

데이터를 변환하기 위해서는 Kafka Streams의 map 메소드를 사용할 수 있습니다. map 메소드는 데이터 스트림의 각 요소에 대해 지정된 변환 함수를 적용하고, 변환된 결과를 새로운 스트림으로 반환합니다.

inputStream.mapValues(value -> transform(value)).to("output-topic");

위의 코드는 입력 스트림의 각 값에 대해 transform 메소드를 적용하고, 결과를 output-topic으로 전송하는 예제입니다.

결론

Kafka Streams를 사용하여 데이터를 전송하고 변환하는 방법을 간단히 알아보았습니다. Kafka Streams는 강력한 기능을 제공하며, 실시간 데이터 처리 및 변환에 매우 유용합니다. 이를 통해 데이터를 실시간으로 효과적으로 처리하고 다른 시스템과의 통합이 가능해집니다.

더 많은 정보를 원하시면 Kafka Streams 문서를 참고해주세요.