[java] 카프카 프로듀서와 컨슈머를 자바로 작성하기

Apache Kafka는 대용량의 실시간 데이터를 처리하기 위한 분산 메시지 큐 시스템입니다. 카프카는 프로듀서(Producer)와 컨슈머(Consumer)를 이용하여 데이터를 생성하고 처리하는 아키텍처를 제공합니다. 이번 포스트에서는 자바를 사용하여 카프카 프로듀서와 컨슈머를 작성하는 방법에 대해 알아보겠습니다.

1. 카프카 프로듀서 작성하기

카프카 프로듀서는 데이터를 생성하여 카프카 클러스터에 전송하는 역할을 수행합니다. 자바로 카프카 프로듀서를 작성하기 위해서는 Kafka 클라이언트 라이브러리를 사용해야 합니다. 아래는 간단한 예제 코드입니다.

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 카프카 프로듀서 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // KafkaProducer 객체 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 메시지 전송
        String topic = "my-topic";
        String key = "my-key";
        String value = "Hello, Kafka!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record);

        // KafkaProducer 객체 종료
        producer.close();
    }
}

위 예제 코드에서는 KafkaProducer 클래스를 이용하여 카프카에 메시지를 전송하는 예제를 보여줍니다. 프로듀서 설정에는 bootstrap.servers, key.serializer, value.serializer 등이 필요하며, 이를 적절히 설정해야 합니다. 메시지를 전송하기 위해서는 ProducerRecord 객체를 생성하여 send() 메서드로 전송합니다. 마지막으로 KafkaProducer 객체를 종료하기 전에 close() 메서드로 클리어해야 합니다.

2. 카프카 컨슈머 작성하기

카프카 컨슈머는 카프카 클러스터에서 생성된 데이터를 읽어오는 역할을 수행합니다. 자바로 카프카 컨슈머를 작성하기 위해서는 마찬가지로 Kafka 클라이언트 라이브러리를 사용해야 합니다. 아래는 간단한 예제 코드입니다.

import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 카프카 컨슈머 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my-group");

        // KafkaConsumer 객체 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 구독 시작
        consumer.subscribe(Arrays.asList("my-topic"));

        // 메시지 읽기
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }

        // KafkaConsumer 객체 종료
        consumer.close();
    }
}

위 예제 코드에서는 KafkaConsumer 클래스를 이용하여 카프카에서 메시지를 읽어오는 예제를 보여줍니다. 컨슈머 설정에는 bootstrap.servers, key.deserializer, value.deserializer, group.id 등이 필요하며, 이를 적절히 설정해야 합니다. 컨슈머는 subscribe() 메서드를 이용하여 토픽을 구독하고, poll() 메서드로 메시지를 읽어옵니다. 읽어온 메시지는 ConsumerRecord 객체로써 다양한 정보를 포함하고 있습니다. 마지막으로 KafkaConsumer 객체를 종료하기 전에 close() 메서드로 클리어해야 합니다.

3. 결론

이번 포스트에서는 카프카 프로듀서와 컨슈머를 자바로 작성하는 방법에 대해 알아보았습니다. 카프카를 사용하면 대용량의 데이터를 실시간으로 처리할 수 있으며, 이를 자바로 구현할 수 있는 간단한 예제 코드를 제공했습니다. 카프카를 활용하여 다양한 실시간 데이터 처리 시스템을 구축할 수 있으며, 자바를 통해 이를 구현할 수 있습니다.