[java] 자바로 카프카에서 메시지 큐 사용하기

카프카는 분산형 스트리밍 플랫폼으로, 대량의 실시간 데이터를 안정적으로 처리할 수 있는 기능을 제공합니다. 메시지 큐로 작동하여, 다양한 애플리케이션 간에 데이터를 신속하고 효율적으로 전송할 수 있습니다. 이번 블로그 포스트에서는 자바를 사용하여 카프카에서 메시지 큐를 사용하는 방법에 대해 알아보겠습니다.

카프카 설정하기

먼저, 카프카를 사용하기 위해 카프카 클라이언트를 빌드파일에 추가해야 합니다. Maven을 사용하는 경우, pom.xml 파일에 다음 의존성을 추가하세요:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.7.0</version>
    </dependency>
</dependencies>

메시지 보내기

메시지를 카프카에 보내기 위해 프로듀서를 만들어야 합니다. 다음은 자바로 카프카 프로듀서를 생성하는 예제 코드입니다:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {

        String bootstrapServers = "localhost:9092";
        String topic = "my-topic";
        String key = "my-key";
        String value = "Hello, Kafka!";

        // 카프카 프로듀서 설정
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 메시지 보내기
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    exception.printStackTrace();
                } else {
                    System.out.println("메시지 전송 완료: " + metadata.topic() + ", " + metadata.partition() + ", " + metadata.offset());
                }
            }
        });

        // 프로듀서 종료
        producer.close();
    }
}

위의 예제에서는 bootstrapServers, topic, key, value를 설정하고, KafkaProducer를 생성하여 메시지를 전송합니다. 메시지 전송 완료 시에는 Callback을 통해 확인할 수 있습니다.

메시지 받기

카프카에서 메시지를 받으려면 컨슈머를 만들어야 합니다. 다음은 자바로 카프카 컨슈머를 생성하는 예제 코드입니다:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {

        String bootstrapServers = "localhost:9092";
        String topic = "my-topic";
        String groupId = "my-group";

        // 카프카 컨슈머 설정
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 토픽 구독
        consumer.subscribe(Collections.singletonList(topic));

        // 메시지 받기
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("메시지 수신: " + record.topic() + ", " + record.partition() + ", " + record.offset() + ", " + record.value());
            }
        }
    }
}

위의 예제에서는 bootstrapServers, topic, groupId를 설정하고, KafkaConsumer를 생성하여 메시지를 수신합니다. 메시지는 poll 메서드를 통해 읽어올 수 있습니다.

결론

이 블로그 포스트에서는 자바를 사용하여 카프카에서 메시지 큐를 사용하는 방법에 대해 알아보았습니다. 카프카의 프로듀서와 컨슈머를 이용하여 메시지를 보내고 받을 수 있습니다. 추가로, 다양한 설정 옵션을 적용하여 카프카를 더욱 유연하게 사용할 수 있습니다.

더 자세한 내용은 아파치 카프카 공식 문서(https://kafka.apache.org/documentation/)를 참고하세요.