[java] 카프카에서 자바를 사용한 상태 관리

카프카는 대규모의 데이터를 처리하기 위한 분산 메시지 큐 시스템입니다. 자바 언어를 사용하여 카프카를 활용하는 경우, 상태 관리를 효율적으로 처리하는 방법을 알아보겠습니다.

1. 상태 관리란?

상태 관리는 애플리케이션에서 발생하는 데이터의 상태를 추적하고 유지하는 것을 의미합니다. 예를 들어, 고객 주문 애플리케이션에서 주문 상태를 추적하고 업데이트하는 작업을 상태 관리라고 할 수 있습니다.

2. 카프카 스트림즈를 사용한 상태 관리

카프카 스트림즈는 카프카에서 데이터 스트림을 처리하기 위한 라이브러리입니다. 스트림 처리를 통해 입력 데이터를 처리하고 중간 상태를 유지하며 결과를 출력할 수 있습니다.

아래는 카프카 스트림즈를 사용하여 상태 관리를 구현하는 자바 코드의 예시입니다.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); // 애플리케이션 ID 설정
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 카프카 브로커 설정

StreamsBuilder builder = new StreamsBuilder();

// 입력 토픽에 대한 KStream 생성
KStream<String, String> input = builder.stream("input-topic");

// 상태를 유지하기 위한 KTable 생성
KTable<String, Long> stateTable = input
    .groupByKey()
    .count();

// 결과를 출력하기 위한 KStream 생성
stateTable.toStream().to("output-topic");

// 스트림즈 애플리케이션 시작
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

위의 코드는 “input-topic”에서 데이터를 읽어와 상태를 유지하고, 그 결과를 “output-topic”으로 출력하는 예시입니다. 여기서는 주문 상태를 세는 간단한 기능을 수행하도록 구현되었습니다.

3. 상태 저장소를 사용한 상태 관리

때로는 카프카 스트림즈보다 좀 더 복잡한 상태 관리가 필요한 경우가 있습니다. 이 경우, 상태 저장소를 사용하여 상태를 관리하는 것이 더 유용할 수 있습니다.

아래는 상태 저장소를 사용하여 상태 관리를 구현하는 자바 코드의 예시입니다.

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 카프카 브로커 설정

// 상태 저장소 설정
KeyValueStore<String, Long> stateStore = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("my-state-store"), // 상태 저장소 이름 설정
    Serdes.String(), // 키 데이터 직렬화 설정
    Serdes.Long() // 값 데이터 직렬화 설정
);

Topology topology = new Topology();
topology.addSource("source", "input-topic")
    .addProcessor("processor", () -> new MyProcessor(stateStore), "source")
    .addStateStore(stateStore, "processor") // 상태 저장소 연결 설정
    .addSink("sink", "output-topic", "processor");

KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();

위의 코드는 “input-topic”에서 데이터를 받아와 사용자 정의 프로세서인 MyProcessor를 거쳐 “output-topic”으로 전송하는 예시입니다. MyProcessor에서는 stateStore를 사용하여 상태를 관리할 수 있습니다.

결론

카프카에서 자바를 사용한 상태 관리는 카프카 스트림즈나 상태 저장소를 활용하여 간편하게 구현할 수 있습니다. 카프카는 대용량 데이터를 신속하게 처리하기 위한 강력한 툴이므로, 이를 효율적으로 활용할 수 있도록 상태 관리를 잘 구성하는 것이 중요합니다.


참고 자료: