[java] 자바로 카프카 데이터 파이프라인 구축하기

개요

카프카(Kafka)는 대용량의 실시간 데이터 스트리밍 플랫폼으로, 대규모 데이터 처리 및 분석에 많이 사용됩니다. 이번에는 자바를 사용하여 카프카 데이터 파이프라인을 구축하는 방법에 대해 알아보겠습니다.

카프카 프로듀서 구현하기

먼저, 카프카 프로듀서(Producer)를 자바로 구현해보겠습니다. 아래는 간단한 예시 코드입니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 카프카 프로듀서 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 카프카 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 메시지 전송
        String topic = "my-topic";
        String message = "Hello, Kafka!";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        producer.send(record);

        // 프로듀서 종료
        producer.close();
    }
}

위 코드에서는 bootstrap.servers를 카프카 브로커의 주소로 설정하고, key.serializervalue.serializer를 메시지 키와 값의 직렬화 방식으로 설정합니다. 그리고 KafkaProducer 객체를 생성하여 메시지를 전송하고, 전송이 완료되면 프로듀서를 종료합니다.

카프카 컨슈머 구현하기

이어서, 카프카 컨슈머(Consumer)를 자바로 구현해보겠습니다. 아래는 간단한 예시 코드입니다.

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 카프카 컨슈머 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 카프카 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 구독할 토픽 설정
        String topic = "my-topic";
        consumer.subscribe(Collections.singleton(topic));

        // 메시지 수신 및 처리
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                // TODO: 메시지 처리 로직 구현
            }
        }
    }
}

위 코드에서는 bootstrap.servers를 카프카 브로커의 주소로 설정하고, group.id를 컨슈머 그룹의 아이디로 설정합니다. 그리고 KafkaConsumer 객체를 생성하여 토픽을 구독하고, 메시지를 수신하여 처리하는 로직을 구현합니다.

실행하기

위에서 구현한 프로듀서와 컨슈머 코드를 각각 실행해보면, 프로듀서가 메시지를 카프카에 전송하고, 컨슈머가 해당 메시지를 수신하여 출력하는 것을 확인할 수 있습니다.

결론

이번 포스트에서는 자바를 사용하여 카프카 데이터 파이프라인을 구축하는 방법을 알아보았습니다. 카프카를 통해 대량의 데이터를 신속하고 안정적으로 처리하고 분석할 수 있으므로, 실시간 데이터 파이프라인 구축에 많은 도움이 됩니다.

더 자세한 내용은 아래 링크를 참고하시기 바랍니다:

Happy coding!