[java] 아파치 카프카의 자바 클라이언트 사용하기

아파치 카프카는 대규모의 실시간 데이터 스트리밍 플랫폼으로 많은 기업에서 사용되고 있습니다. 자바 개발자들은 아파치 카프카의 자바 클라이언트를 사용하여 카프카와 상호작용하고 실시간 데이터를 손쉽게 처리할 수 있습니다.

1. 카프카 자바 클라이언트 라이브러리 추가

먼저, 아파치 카프카의 자바 클라이언트 라이브러리를 프로젝트에 추가해야 합니다. 이를 위해 Maven을 사용하는 경우 pom.xml 파일에 다음 의존성을 추가합니다:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

Gradle을 사용하는 경우 build.gradle 파일에 다음 의존성을 추가합니다:

implementation 'org.apache.kafka:kafka-clients:2.8.0'

2. 카프카 클러스터에 연결

카프카 클러스터에 연결하기 위해 KafkaProducer 또는 KafkaConsumer 인스턴스를 생성해야 합니다. 이때 bootstrap.servers 프로퍼티에는 카프카 브로커의 호스트 및 포트 정보를 지정해야 합니다. 예를 들어, localhost:9092와 같이 지정할 수 있습니다.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

3. 메시지 보내기

KafkaProducer를 사용하여 메시지를 카프카 토픽에 보낼 수 있습니다. send 메서드를 호출하여 메시지를 보내고 성공 여부를 확인할 수 있습니다.

String topic = "my-topic";
String key = "my-key";
String value = "Hello, Kafka!";

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
try {
    RecordMetadata metadata = future.get();
    System.out.println("메시지 전송 성공! Offset: " + metadata.offset());
} catch (Exception e) {
    System.out.println("메시지 전송 실패: " + e.getMessage());
}

4. 메시지 수신하기

KafkaConsumer를 사용하여 특정 토픽에서 메시지를 수신할 수 있습니다. subscribe 메서드를 호출하여 원하는 토픽을 구독하고 poll 메서드를 호출하여 메시지를 가져옵니다.

consumer.subscribe(Collections.singleton("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("메시지 수신: " + record.value());
    }
}

마무리

이상으로 아파치 카프카의 자바 클라이언트를 사용하여 카프카와 상호작용하는 방법을 알아보았습니다. 이를 통해 실시간 데이터를 효율적으로 처리하고 대규모 데이터 시스템을 구축할 수 있습니다.

더 많은 자세한 내용은 아파치 카프카 공식 문서를 참조하세요.