[java] 트랜스포머와 프로세서 API를 활용한 Kafka Streams 데이터 처리

kafka_streams

Kafka Streams는 실시간 스트리밍 데이터 처리를 위한 분산 스트림 처리 프레임워크입니다. 이를 통해 Kafka 메시지 스트림을 처리하고 변환하고 집계할 수 있습니다. 이번 포스트에서는 Kafka Streams의 핵심 API 중 하나인 트랜스포머(Transformer)와 프로세서(Processor) API를 소개하고, 데이터 처리에 어떻게 사용되는지 살펴보겠습니다.

1. 트랜스포머 API

트랜스포머 API는 Kafka Streams에서 데이터 스트림을 변환하기 위해 사용됩니다. 이 API를 사용하면 입력 스트림에 변환을 적용하여 출력 스트림을 생성할 수 있습니다. 트랜스포머 API는 입력과 출력 타입을 지정할 수 있기 때문에 유연하게 데이터 변환 작업을 구현할 수 있습니다.

다음은 트랜스포머 API를 사용하여 입력 스트림의 데이터를 대문자로 변환하는 예제입니다:

public class UpperCaseTransformer implements Transformer<byte[], byte[], KeyValue<byte[], byte[]>> {

    @Override
    public void init(ProcessorContext context) {
        // 초기화 작업 수행
    }

    @Override
    public KeyValue<byte[], byte[]> transform(byte[] key, byte[] value) {
        // 입력 데이터 변환 작업 수행 (예: 대문자로 변환)
        byte[] transformedValue = new String(value).toUpperCase().getBytes();
        return KeyValue.pair(key, transformedValue);
    }

    @Override
    public void close() {
        // 정리 작업 수행
    }
}

위의 예제에서는 Transformer 인터페이스를 구현하여 트랜스포머 클래스를 생성합니다. init 메서드는 트랜스포머 초기화 작업을 수행하고, transform 메서드는 입력 데이터를 변환하여 변환된 데이터를 반환합니다. close 메서드는 트랜스포머 종료 시 정리 작업을 수행합니다.

2. 프로세서 API

프로세서 API는 Kafka Streams에서 스트림 데이터를 직접 처리하기 위해 사용됩니다. 트랜스포머 API와 마찬가지로 입력 스트림에서 데이터를 읽고 처리한 다음 출력 스트림을 생성할 수 있습니다. 프로세서 API는 입력과 출력 타입을 지정할 수 있으며, 필요한 경우 상태 저장소를 사용하여 상태를 유지할 수 있습니다.

다음은 프로세서 API를 사용하여 입력 스트림의 데이터를 집계하는 예제입니다:

public class WordCountProcessor implements Processor<byte[], byte[]> {
    
    private ProcessorContext context;
    private KeyValueStore<String, Integer> wordCountStore;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.wordCountStore = (KeyValueStore<String, Integer>) context.getStateStore("word-count-store");
    }

    @Override
    public void process(byte[] key, byte[] value) {
        // 입력 데이터 처리 작업 수행 (예: 단어 카운트)
        String[] words = new String(value).split(" ");

        for (String word : words) {
            Integer count = wordCountStore.get(word);
            if (count == null) {
                count = 0;
            }
            count++;
            wordCountStore.put(word, count);
        }
    }

    @Override
    public void close() {
        // 정리 작업 수행
    }
}

위의 예제에서는 Processor 인터페이스를 구현하여 프로세서 클래스를 생성합니다. init 메서드에서는 프로세서 초기화 및 상태 저장소를 초기화합니다. process 메서드에서는 입력 데이터를 처리하여 필요한 작업을 수행합니다. close 메서드에서는 프로세서 종료 시 정리 작업을 수행합니다.

3. 데이터 처리 예제

트랜스포머와 프로세서 API를 함께 사용하여 데이터 스트림을 처리하는 예제를 살펴보겠습니다. 아래 예제는 입력 스트림에서 단어를 대문자로 변환한 뒤 단어의 빈도를 계산하는 작업을 수행하는 코드입니다.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());

StreamsBuilder builder = new StreamsBuilder();
KTable<byte[], byte[]> inputTable = builder.table("input-topic");

inputTable
    .transform(()-> new UpperCaseTransformer(), "transformer-store")
    .groupByKey()
    .count(Materialized.<byte[], Long, KeyValueStore<Bytes, byte[]>>as("count-store"))
    .toStream()
    .to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

위의 코드는 Kafka Streams 애플리케이션을 시작하여 입력 토픽에서 데이터를 읽고 트랜스포머 API를 사용하여 데이터를 변환한 후, 그룹화하고 집계한 결과를 출력 토픽으로 전송합니다.

결론

트랜스포머와 프로세서 API를 활용하여 Kafka Streams에서 데이터 처리 작업을 수행할 수 있습니다. 이를 통해 실시간 스트리밍 데이터를 변환하고 집계하는 다양한 작업을 간편하게 구현할 수 있습니다. Kafka Streams의 강력한 API를 활용하여 실시간 데이터 처리에 도전해보세요!

참고 자료