[java] 카프카에서 자바로 데이터 파이프라인 구성하기

이번 글에서는 Apache Kafka를 사용하여 자바로 데이터 파이프라인을 구축하는 방법을 알아보겠습니다. 카프카는 분산 스트리밍 플랫폼으로, 대량의 데이터를 안정적이고 확장 가능한 방식으로 처리할 수 있도록 지원합니다.

1. 카프카 설치 및 설정

먼저, 카프카를 설치하고 설정하는 과정부터 시작합니다. 아래의 단계를 따라 진행해보세요.

1-1. 카프카 설치

먼저 Apache Kafka의 공식 웹사이트에서 카프카를 다운로드 받습니다. 다운로드 후 압축을 해제하면 카프카 폴더가 생성됩니다.

1-2. 카프카 설정

카프카 폴더에서 config 폴더로 이동한 후, server.properties 파일을 열어서 카프카의 설정을 수정합니다. 예를 들어, listeners 속성을 PLAINTEXT://localhost:9092로 변경하여 로컬에서 접속할 수 있도록 설정할 수 있습니다.

2. 카프카 프로듀서 작성

이제 카프카 프로듀서를 작성해보겠습니다. 카프카 프로듀서는 데이터를 생성하여 카프카 토픽으로 보내는 역할을 합니다. 아래는 간단한 자바 코드 예시입니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 프로듀서 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 카프카 프로듀서 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 메시지 전송
        ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value");
        producer.send(record);

        // 카프카 프로듀서 종료
        producer.close();
    }
}

위의 코드는 my_topic라는 토픽에 “key”와 “value”라는 메시지를 보내는 카프카 프로듀서를 작성한 예시입니다. 카프카 클러스터와 연결할 주소와 직렬화 설정은 Properties 객체에 지정하고, KafkaProducer를 생성하여 메시지를 전송하고 프로듀서를 종료하는 간단한 코드입니다.

3. 카프카 컨슈머 작성

이번에는 카프카 컨슈머를 작성해보겠습니다. 컨슈머는 카프카 토픽에서 데이터를 소비하는 역할을 합니다. 아래는 간단한 자바 코드 예시입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 컨슈머 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my_group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 카프카 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 토픽 구독
        consumer.subscribe(Collections.singletonList("my_topic"));

        // 메시지 수신 및 처리
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            // 메시지 처리 로직 작성
        }

        // 카프카 컨슈머 종료
        consumer.close();
    }
}

위의 코드는 my_topic 토픽을 구독하는 카프카 컨슈머를 작성한 예시입니다. 마찬가지로 카프카 클러스터와 연결할 주소와 직렬화 설정은 Properties 객체에 지정하며, KafkaConsumer를 생성하여 메시지를 수신하고 처리하는 간단한 코드입니다.

4. 실행 및 결과 확인

카프카 프로듀서와 컨슈머를 작성한 후, 실행하여 결과를 확인해보세요. 프로듀서가 메시지를 카프카 토픽으로 전송하고, 컨슈머가 해당 토픽에서 메시지를 수신하고 처리하는 것을 확인할 수 있습니다.

결론

이상으로 카프카를 사용하여 자바로 데이터 파이프라인을 구성하는 방법을 알아보았습니다. 카프카를 통해 대용량의 데이터를 안정적이고 확장 가능한 방식으로 처리할 수 있으며, 자바를 사용하여 간편하게 데이터 파이프라인을 구축할 수 있습니다.

더 자세한 내용은 Apache Kafka 공식 웹사이트를 참고하시기 바랍니다.

참고문헌: