[java] Kafka Streams와 데이터 압축 및 압축 해제 방법

Kafka Streams는 실시간 스트리밍 애플리케이션을 구축하기 위한 라이브러리입니다. 데이터 처리를 위해 Kafka 메시지 브로커에 압축된 데이터를 전송하거나, 압축된 데이터를 Kafka Streams에서 처리해야 할 때가 있습니다. 본 문서에서는 Kafka Streams에서 데이터를 압축하거나 압축을 해제하는 방법을 알아보겠습니다.

데이터 압축 설정

Kafka Streams를 사용하여 데이터를 압축하려면 Kafka 프로듀서에게 압축 설정을 전달해야 합니다. 이를 위해 다음과 같이 Properties 객체를 생성하고 compression.type 값을 설정합니다.

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

Properties props = new Properties();
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
// ...

compression.type에서 지원되는 압축 방식으로는 gzip, snappy, lz4 등이 있습니다. 위의 예제에서는 gzip을 사용했습니다.

데이터 압축 해제

Kafka Streams에서 압축된 데이터를 처리하려면 KStream 또는 KTable 객체에 대해 uncompress() 메서드를 호출해야 합니다. 이렇게 하면 데이터가 압축 해제됩니다.

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

KStream<String, String> stream = builder.stream("inputTopic");
KStream<String, String> uncompressedStream = stream.uncompress();

KTable<String, String> table = builder.table("inputTopic");
KTable<String, String> uncompressedTable = table.uncompress();

uncompress() 메서드는 압축이 해제된 데이터를 반환하는 KStream 또는 KTable 객체를 생성합니다.

참고 자료