[java] 카프카에서 자바를 사용한 분산 데이터 처리

카프카(Kafka)는 대용량의 실시간 스트리밍 데이터를 처리하기 위한 분산 메시지 큐입니다. 자바(Java)는 카프카와 함께 사용하기 좋은 언어 중 하나입니다. 이번 글에서는 카프카에서 자바를 사용하여 분산 데이터 처리를 하는 방법에 대해 알아보겠습니다.

카프카 클라이언트 라이브러리 추가하기

먼저 카프카와 연동하기 위해 카프카 클라이언트 라이브러리를 프로젝트에 추가해야 합니다. Maven을 사용하는 경우 pom.xml 파일에 다음 의존성을 추가할 수 있습니다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

Gradle을 사용하는 경우 build.gradle 파일에 다음 의존성을 추가할 수 있습니다.

implementation 'org.apache.kafka:kafka-clients:2.7.0'

프로듀서(Producer) 생성하기

카프카의 프로듀서는 데이터를 카프카 토픽(Topic)에 전송하는 역할을 합니다. 아래는 자바를 사용하여 카프카 프로듀서를 생성하는 코드입니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 카프카 클러스터의 주소 설정
        String bootstrapServers = "localhost:9092";
        
        // 프로듀서 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 메시지 전송
        String topic = "my-topic";
        String key = "key1";
        String value = "Hello, Kafka!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record);

        // 프로듀서 종료
        producer.close();
    }
}

위 코드에서는 bootstrap.servers, key.serializer, value.serializer 등의 프로듀서 설정을 지정한 후, KafkaProducer 객체를 생성하여 메시지를 전송하고 종료합니다.

컨슈머(Consumer) 생성하기

카프카의 컨슈머는 토픽에 저장된 메시지를 읽어오는 역할을 합니다. 아래는 자바를 사용하여 카프카 컨슈머를 생성하는 코드입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 카프카 클러스터의 주소 설정
        String bootstrapServers = "localhost:9092";

        // 컨슈머 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 읽어올 토픽과 파티션 설정
        String topic = "my-topic";
        TopicPartition partition = new TopicPartition(topic, 0);
        consumer.assign(Collections.singletonList(partition));

        // 메시지 읽어오기
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            records.forEach(record -> {
                System.out.println("Key: " + record.key() + ", Value: " + record.value() + ", Offset: " + record.offset());
            });
        }
    }
}

위 코드에서는 bootstrap.servers, group.id, key.deserializer, value.deserializer 등의 컨슈머 설정을 지정한 후, KafkaConsumer 객체를 생성하여 메시지를 읽어옵니다.

결론

자바를 사용하여 카프카와 연동하여 분산 데이터 처리를 할 수 있습니다. 프로듀서를 통해 데이터를 전송하고, 컨슈머를 통해 데이터를 읽어올 수 있으며, 이를 활용하여 다양한 분산 시스템을 구축할 수 있습니다.

더 자세한 카프카와 자바의 사용법은 공식 문서를 참고하시기 바랍니다.