아파치 카프카는 대규모의 실시간 데이터 스트리밍 플랫폼으로 많은 기업에서 사용되고 있습니다. 자바 개발자들은 아파치 카프카의 자바 클라이언트를 사용하여 카프카와 상호작용하고 실시간 데이터를 손쉽게 처리할 수 있습니다.
1. 카프카 자바 클라이언트 라이브러리 추가
먼저, 아파치 카프카의 자바 클라이언트 라이브러리를 프로젝트에 추가해야 합니다. 이를 위해 Maven을 사용하는 경우 pom.xml
파일에 다음 의존성을 추가합니다:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
Gradle을 사용하는 경우 build.gradle
파일에 다음 의존성을 추가합니다:
implementation 'org.apache.kafka:kafka-clients:2.8.0'
2. 카프카 클러스터에 연결
카프카 클러스터에 연결하기 위해 KafkaProducer
또는 KafkaConsumer
인스턴스를 생성해야 합니다. 이때 bootstrap.servers
프로퍼티에는 카프카 브로커의 호스트 및 포트 정보를 지정해야 합니다. 예를 들어, localhost:9092
와 같이 지정할 수 있습니다.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3. 메시지 보내기
KafkaProducer
를 사용하여 메시지를 카프카 토픽에 보낼 수 있습니다. send
메서드를 호출하여 메시지를 보내고 성공 여부를 확인할 수 있습니다.
String topic = "my-topic";
String key = "my-key";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);
try {
RecordMetadata metadata = future.get();
System.out.println("메시지 전송 성공! Offset: " + metadata.offset());
} catch (Exception e) {
System.out.println("메시지 전송 실패: " + e.getMessage());
}
4. 메시지 수신하기
KafkaConsumer
를 사용하여 특정 토픽에서 메시지를 수신할 수 있습니다. subscribe
메서드를 호출하여 원하는 토픽을 구독하고 poll
메서드를 호출하여 메시지를 가져옵니다.
consumer.subscribe(Collections.singleton("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("메시지 수신: " + record.value());
}
}
마무리
이상으로 아파치 카프카의 자바 클라이언트를 사용하여 카프카와 상호작용하는 방법을 알아보았습니다. 이를 통해 실시간 데이터를 효율적으로 처리하고 대규모 데이터 시스템을 구축할 수 있습니다.
더 많은 자세한 내용은 아파치 카프카 공식 문서를 참조하세요.