[java] Kafka Streams와 상태와의 소통 방법

Kafka Streams는 대용량의 데이터를 실시간으로 처리하기 위한 고수준 라이브러리입니다. 이 라이브러리는 Kafka 클러스터를 기반으로 동작하며, 데이터 처리 중에 상태를 유지할 수 있습니다. 이번 블로그에서는 Kafka Streams와 상태와의 소통 방법에 대해 알아보겠습니다.

상태를 관리하는 StateStore

Kafka Streams에서 상태를 관리하기 위해서는 StateStore를 사용합니다. StateStore는 Kafka Streams 애플리케이션의 상태를 저장하고 유지하는 역할을 합니다. StateStore는 여러 가지 타입으로 제공되며, 각 타입은 특정한 사용 사례에 적합합니다. 가장 일반적으로 사용되는 StateStore의 타입은 KeyValueStore입니다.

KeyValueStore를 사용한 상태 조회 및 갱신

KeyValueStore는 key-value 쌍으로 상태를 저장하는 StateStore입니다. KeyValueStore는 다음과 같은 API를 제공합니다.

이제 KeyValueStore를 사용하여 상태를 조회하고 갱신하는 코드를 살펴보겠습니다.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("input-topic");

// KeyValueStore 생성
StoreBuilder<KeyValueStore<String, String>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-state-store"),
        Serdes.String(),
        Serdes.String()
    );
builder.addStateStore(storeBuilder);

// 상태 조회 및 갱신
input.process(
    () -> new MyProcessor("my-state-store"),
    "my-state-store"
);

// Topology 생성 및 Kafka Streams 실행
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

위의 코드에서 MyProcessororg.apache.kafka.streams.processor.Processor 인터페이스를 구현한 사용자 정의 프로세서입니다. 이 프로세서에서는 KeyValueStore를 사용하여 상태를 조회하고 갱신할 수 있습니다.

상태 업데이트 및 컴팩션

Kafka Streams에서는 StateStore에 저장된 상태를 업데이트하는 매커니즘을 제공합니다. 이를 통해 상태가 변경되었을 때, 갱신된 상태를 다른 처리 단계나 노드에 전파할 수 있습니다.

또한, Kafka Streams는 StateStore의 컴팩션을 지원하여 오래된 데이터를 정리할 수 있습니다. 이를 통해 상태 저장소의 용량을 효율적으로 관리 가능합니다.

결론

Kafka Streams는 상태를 유지하면서 대용량의 데이터를 처리하는데 효과적인 방법을 제공합니다. StateStore를 사용하여 상태를 조회하고 갱신할 수 있으며, 업데이트 및 컴팩션 메커니즘을 통해 상태를 관리할 수 있습니다. 이러한 기능들을 활용하여 Kafka Streams를 효율적으로 활용할 수 있습니다.

참고 자료

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.state.KeyValueStore;

public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, String> stateStore;

    public MyProcessor(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.stateStore = (KeyValueStore<String, String>) context.getStateStore(stateStoreName);
        
        // 10초마다 상태 저장소 컴팩션 실행
        this.context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
            @Override
            public void punctuate(long timestamp) {
                stateStore.compact();
            }
        });
    }

    @Override
    public void process(String key, String value) {
        // 상태 조회
        String oldValue = stateStore.get(key);

        // 상태 갱신
        stateStore.put(key, value);
        
        // 갱신된 상태 전파
        context.forward(key, value);

        // 추가 로직 수행
        // ...
    }

    @Override
    public void close() {
        // 리소스 정리
        stateStore.close();
    }
}