서론
카프카는 신뢰성 있고 확장 가능한 대용량 데이터 처리 시스템으로 알려져 있습니다. 이 시스템은 분산 아키텍처를 기반으로 하여 많은 양의 데이터를 효율적으로 처리하고 저장할 수 있습니다. 자바는 카프카와 함께 사용할 수 있는 많은 기능과 라이브러리를 제공하므로 대용량 데이터 처리 및 저장에 적합한 언어입니다.
카프카와 자바 연동
카프카와 자바를 연동하기 위해서는 먼저 카프카의 자바 클라이언트 라이브러리를 추가해야 합니다. Maven을 사용한다면 다음과 같이 의존성을 추가할 수 있습니다:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
프로듀서로 데이터 전송하기
카프카 프로듀서는 데이터를 생성하여 카프카 클러스터로 전송합니다. 자바를 사용하여 카프카 프로듀서를 구현하는 예제는 다음과 같습니다:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
private static final String TOPIC = "my_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "key_" + i;
String value = "value_" + i;
producer.send(new ProducerRecord<>(TOPIC, key, value));
}
producer.close();
}
}
위의 예제에서는 KafkaProducer
클래스를 사용하여 프로듀서를 생성하고 Kafka 클러스터로 메시지를 전송합니다. ProducerRecord
를 사용하여 메시지의 토픽, 키(key)와 값(value)을 지정할 수 있습니다.
컨슈머로 데이터 수신하기
카프카 컨슈머는 카프카 클러스터로부터 데이터를 수신합니다. 자바를 사용하여 카프카 컨슈머를 구현하는 예제는 다음과 같습니다:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC = "my_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my_consumer_group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
int partition = record.partition();
long offset = record.offset();
System.out.println("key=" + key + ", value=" + value + ", partition=" + partition + ", offset=" + offset);
}
}
}
}
위의 예제에서는 KafkaConsumer
클래스를 사용하여 컨슈머를 생성하고 해당 토픽의 메시지를 수신합니다. ConsumerRecords
를 통해 받은 메시지들을 순회하며 메시지의 키, 값, 파티션, 오프셋 정보를 출력합니다.
결론
자바를 사용하여 카프카와 연동하여 대용량 데이터를 처리하고 저장하는 것은 매우 강력한 도구입니다. 이 문서에서는 카프카 프로듀서와 컨슈머를 자바로 구현하는 예제를 제공하였습니다. 더 많은 카프카 기능을 활용하고자 한다면 공식 문서와 샘플 코드를 참고하시기 바랍니다.
참고 자료
- 카프카 공식 문서: https://kafka.apache.org/documentation/
- 카프카 자바 클라이언트 API 문서: https://kafka.apache.org/28/javadoc/