[java] 자바로 카프카에서 메시지 핸들링

카프카는 대규모 데이터 스트리밍 플랫폼으로, 대량의 데이터를 효율적으로 처리하기 위해 설계되었습니다. 자바는 카프카 클라이언트를 사용하여 메시지를 송수신하고 핸들링할 수 있습니다. 이번 포스트에서는 자바로 카프카에서 메시지 핸들링하는 방법을 알아보겠습니다.

카프카 클라이언트 의존성 추가하기

먼저, Maven이나 Gradle을 사용하여 프로젝트에 카프카 클라이언트 의존성을 추가해야합니다. Maven을 사용하는 경우 pom.xml 파일에 다음 의존성을 추가합니다.

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>

Gradle을 사용하는 경우 build.gradle 파일에 다음 의존성을 추가합니다.

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

프로듀서 구현하기

카프카에서 프로듀서는 메시지를 생성하고 브로커에 전송합니다. 아래는 자바로 카프카 프로듀서를 구현하는 예제입니다.

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        String topicName = "my-topic";
        String bootstrapServers = "localhost:9092";

        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            String key = Integer.toString(i);
            String value = "Message " + i;

            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.println("Message sent successfully. Offset: " + metadata.offset());
                } else {
                    System.out.println("Failed to send message: " + exception.getMessage());
                }
            });
        }

        producer.close();
    }
}

위의 예제에서는 my-topic이라는 토픽으로 0부터 9까지의 메시지를 전송합니다. 메시지 전송 후에는 메시지의 성공적인 전송 여부에 따라 콜백 함수를 실행합니다.

컨슈머 구현하기

카프카에서 컨슈머는 브로커로부터 메시지를 소비하고 처리합니다. 아래는 자바로 카프카 컨슈머를 구현하는 예제입니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        String topicName = "my-topic";
        String bootstrapServers = "localhost:9092";
        String groupId = "my-group";

        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("group.id", groupId);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        TopicPartition partition = new TopicPartition(topicName, 0);
        consumer.assign(Collections.singletonList(partition));

        consumer.seekToBeginning(Collections.singletonList(partition));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message - Key: " + record.key() + ", Value: " + record.value());
            }
        }
    }
}

위의 예제에서는 my-topic이라는 토픽의 메시지를 수신하고 출력합니다. poll() 메서드를 사용하여 메시지를 주기적으로 가져옵니다.

마무리

위에서는 자바로 카프카에서 메시지를 송수신하고 핸들링하는 방법에 대해 알아보았습니다. 프로듀서를 통해 메시지를 전송하고, 컨슈머를 통해 메시지를 수신하는 방법을 익힐 수 있습니다. 카프카 클라이언트의 다양한 기능을 사용하여 데이터 스트리밍 애플리케이션을 구축할 수 있습니다.

더 자세한 내용은 Apache Kafka 공식 문서를 참조하세요.