카프카는 분산 메시징 시스템으로서, 대량의 데이터를 안정적이고 확장 가능한 방식으로 처리할 수 있는 기능을 제공합니다. 이 기능을 활용하기 위해 자바로 카프카 이벤트를 처리하는 방법에 대해 알아보겠습니다.
1. 카프카 클라이언트 설정
먼저, 카프카를 사용하기 위해 카프카 클라이언트를 설정해야 합니다. Maven을 사용한다면 pom.xml
파일에 아래의 의존성을 추가합니다.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
2. 카프카 이벤트 송수신
카프카에서 이벤트를 송수신하기 위해 KafkaProducer
와 KafkaConsumer
를 사용합니다. 아래의 예제는 간단한 이벤트를 송신하는 코드입니다.
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaEventProducer {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public void produceEvent() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "my-key", "my-value");
producer.send(record);
}
}
public static void main(String[] args) {
KafkaEventProducer producer = new KafkaEventProducer();
producer.produceEvent();
}
}
위의 코드에서는 KafkaProducer
를 생성하고, ProducerRecord
를 생성하여 해당 레코드를 송신하고 있습니다. 이벤트를 수신하기 위해 아래의 예제 코드를 사용할 수 있습니다.
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaEventConsumer {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public void consumeEvent() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singleton(TOPIC_NAME));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Key: " + record.key() + ", Value: " + record.value());
// 이벤트 처리 로직을 작성하세요.
}
}
}
public static void main(String[] args) {
KafkaEventConsumer consumer = new KafkaEventConsumer();
consumer.consumeEvent();
}
}
위의 코드에서는 KafkaConsumer
를 생성하고, subscribe
메서드로 토픽을 구독한 후, poll
메서드로 메시지를 수신하고 있습니다. 수신한 메시지는 ConsumerRecord
로 반환되며, 이를 활용하여 이벤트 처리 로직을 구현할 수 있습니다.
3. 실행 및 결과 확인
카프카 클라이언트 설정을 마친 후, KafkaEventProducer
클래스와 KafkaEventConsumer
클래스를 각각 실행하면, 이벤트의 송수신이 정상적으로 이루어집니다. 송신된 이벤트는 KafkaEventConsumer
에서 수신되고, 결과가 출력됩니다.
4. 마무리
이번 포스트에서는 자바로 카프카 이벤트를 처리하는 방법을 알아보았습니다. 카프카 클라이언트를 설정하고, KafkaProducer
와 KafkaConsumer
를 활용하여 이벤트를 송수신하는 예제 코드를 제공했습니다. 이를 통해 카프카를 사용하여 안정적이고 확장 가능한 이벤트 처리가 가능함을 알 수 있습니다.
더 자세한 정보는 Apache Kafka Documentation을 참고하세요.