kafka 컨슈머

Kafka는 대용량 데이터를 안정적으로 처리하기 위한 분산 시스템입니다. 이 시스템에서 컨슈머는 데이터를 처리하고 소비하는 역할을 합니다. Kafka 컨슈머는 프로듀서가 생성한 메시지를 읽어오고, 로직에 따라 처리하거나 저장하는 등의 작업을 수행합니다.

Kafka 컨슈머 작동 방식

Kafka 컨슈머는 토픽(topic)과 파티션(partition)에 대한 구독(subscription)을 설정하여 메시지를 읽어옵니다. 컨슈머는 오프셋(offset)이라고 불리는 메시지의 위치 정보를 유지하며, 이를 통해 이전에 이미 처리한 메시지들을 건너뛰고 최신 메시지들만 처리합니다.

컨슈머는 Kafka의 브로커들과 연결되어 메시지를 요청하고 수신합니다. 수신한 메시지는 주로 처리를 위해 다른 시스템이나 애플리케이션으로 전달됩니다. 컨슈머 그룹(consumer group)이라는 개념을 통해 컨슈머들을 적절하게 조직화하고 데이터를 병렬로 처리할 수 있습니다.

Kafka 컨슈머 개발하기

Kafka 컨슈머를 개발하기 위해서는 Kafka 클라이언트 라이브러리를 사용해야 합니다. Java를 기반으로 한 프로덕션 환경에서는 Kafka Consumer API를 사용할 수 있습니다. 다른 언어에서도 Kafka를 사용할 수 있는 라이브러리들이 있으므로 해당 언어에 맞는 클라이언트 라이브러리를 선택하면 됩니다.

아래는 Java를 사용하여 Kafka 컨슈머를 개발하는 예제 코드입니다.

import org.apache.kafka.clients.consumer.*;

import java.util.Properties;

public class KafkaConsumerExample {

   public static void main(String[] args) {
      
      Properties props = new Properties();
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "my-consumer-group");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("my-topic"));
      
      while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
      }
   }
}

위의 예제 코드는 Kafka 컨슈머를 생성하고 “my-topic” 토픽에 구독하는 방법을 보여줍니다. 메시지를 수신하면 오프셋, 키, 값 정보를 출력합니다.

결론

Kafka 컨슈머는 대용량 데이터를 안정적으로 처리하기 위한 중요한 역할을 합니다. 이 글에서는 Kafka 컨슈머의 개요와 작동 방식, 그리고 개발을 위한 예제 코드를 살펴보았습니다. Kafka를 사용하여 안정적인 데이터 소비를 구현할 수 있도록 컨슈머를 잘 이해하고 활용해보세요.

#Kafka #컨슈머