[java] Kafka Streams의 상태 저장 기능과 사용 방법

Kafka Streams는 데이터 처리 애플리케이션을 구축하기 위한 강력한 라이브러리입니다. 이를 통해 데이터 흐름을 쉽게 처리하고 간단한 연산을 수행할 수 있습니다.

그러나 Kafka Streams는 상태를 제한적으로 저장할 수 있는데, 이는 일부 애플리케이션에서는 문제가 될 수 있습니다. 왜냐하면 많은 상태가 메모리에 저장되지 않고, 새로운 이벤트가 발생할 때마다 상태가 다시 계산되기 때문입니다. 이는 처리량과 지연 시간에 영향을 미칠 수 있습니다.

따라서 Kafka Streams는 상태 저장 기능을 제공합니다. 이를 통해 상태를 영구적으로 저장하고 복구할 수 있으며, 애플리케이션의 처리량과 불안정한 네트워크 연결 등의 문제로 인해 중단되는 상황에 대비할 수 있습니다.

상태 저장 기능 사용 방법

  1. 상태 저장 스토어 생성 Kafka Streams 애플리케이션에서 사용할 상태 저장 스토어를 생성해야 합니다. 이 스토어는 Kafka Streams의 Topology 객체에 등록됩니다.
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, String>> stateStoreBuilder = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("myStateStore"),
    Serdes.String(),
    Serdes.String()
);
builder.addStateStore(stateStoreBuilder);
  1. 스토어에 상태 저장 Kafka Streams 애플리케이션에서 상태를 변경할 때마다 해당 상태를 스토어에 저장해야 합니다. 예를 들어, Processor 객체 내에서 아래와 같이 사용할 수 있습니다.
public class MyProcessor implements Processor<String, String> {
    private KeyValueStore<String, String> stateStore;

    @Override
    public void init(ProcessorContext context) {
        this.stateStore = (KeyValueStore<String, String>) context.getStateStore("myStateStore");
    }

    @Override
    public void process(String key, String value) {
        // 상태 변경 로직 수행
        stateStore.put(key, value);
    }

    @Override
    public void close() {
        // 스토어 정리 로직 수행
        stateStore.close();
    }
}
  1. 애플리케이션 시작 시 상태 복구 Kafka Streams 애플리케이션을 시작할 때, 저장된 상태를 복구해야 합니다. 이를 위해 KafkaStreams 객체를 생성하고 restore() 메서드를 사용해 상태를 복구해야 합니다.
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "myApp");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);

streams.cleanUp(); // 상태 초기화
streams.start(); // 애플리케이션 시작

// 복구된 상태로 애플리케이션을 실행하기 위해 복구 메서드 호출
streams.restore(new StateRestoreListener() {
    @Override
    public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
        // 상태 복구 시작
    }

    @Override
    public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
        // 일부 상태 복구 진행 상황 로그 출력
    }

    @Override
    public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
        // 상태 복구 완료
    }
});

위와 같이 상태 저장 기능을 사용하면 Kafka Streams 애플리케이션에서 중요한 상태를 영구적으로 저장하고 복구할 수 있습니다. 이를 통해 애플리케이션의 안정성과 성능을 향상시킬 수 있습니다.

참고 자료: