[java] 카프카에서 자바를 사용한 대용량 데이터 처리 및 저장

서론

카프카는 신뢰성 있고 확장 가능한 대용량 데이터 처리 시스템으로 알려져 있습니다. 이 시스템은 분산 아키텍처를 기반으로 하여 많은 양의 데이터를 효율적으로 처리하고 저장할 수 있습니다. 자바는 카프카와 함께 사용할 수 있는 많은 기능과 라이브러리를 제공하므로 대용량 데이터 처리 및 저장에 적합한 언어입니다.

카프카와 자바 연동

카프카와 자바를 연동하기 위해서는 먼저 카프카의 자바 클라이언트 라이브러리를 추가해야 합니다. Maven을 사용한다면 다음과 같이 의존성을 추가할 수 있습니다:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

프로듀서로 데이터 전송하기

카프카 프로듀서는 데이터를 생성하여 카프카 클러스터로 전송합니다. 자바를 사용하여 카프카 프로듀서를 구현하는 예제는 다음과 같습니다:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {

    private static final String TOPIC = "my_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            String key = "key_" + i;
            String value = "value_" + i;
            producer.send(new ProducerRecord<>(TOPIC, key, value));
        }

        producer.close();
    }
}

위의 예제에서는 KafkaProducer 클래스를 사용하여 프로듀서를 생성하고 Kafka 클러스터로 메시지를 전송합니다. ProducerRecord를 사용하여 메시지의 토픽, 키(key)와 값(value)을 지정할 수 있습니다.

컨슈머로 데이터 수신하기

카프카 컨슈머는 카프카 클러스터로부터 데이터를 수신합니다. 자바를 사용하여 카프카 컨슈머를 구현하는 예제는 다음과 같습니다:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {

    private static final String TOPIC = "my_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my_consumer_group";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                int partition = record.partition();
                long offset = record.offset();

                System.out.println("key=" + key + ", value=" + value + ", partition=" + partition + ", offset=" + offset);
            }
        }
    }
}

위의 예제에서는 KafkaConsumer 클래스를 사용하여 컨슈머를 생성하고 해당 토픽의 메시지를 수신합니다. ConsumerRecords를 통해 받은 메시지들을 순회하며 메시지의 키, 값, 파티션, 오프셋 정보를 출력합니다.

결론

자바를 사용하여 카프카와 연동하여 대용량 데이터를 처리하고 저장하는 것은 매우 강력한 도구입니다. 이 문서에서는 카프카 프로듀서와 컨슈머를 자바로 구현하는 예제를 제공하였습니다. 더 많은 카프카 기능을 활용하고자 한다면 공식 문서와 샘플 코드를 참고하시기 바랍니다.

참고 자료