[java] 자바로 카프카를 활용한 실시간 데이터 스트리밍 처리 개발하기

카프카는 대용량 실시간 데이터 스트리밍 처리를 위한 분산 메시징 시스템으로 널리 사용되고 있습니다. 이번 글에서는 자바를 사용하여 카프카를 활용한 실시간 데이터 스트리밍 처리를 개발하는 방법에 대해 알아보겠습니다.

목차

카프카 소개

카프카는 LinkedIn에서 개발된 분산 메시징 시스템으로, 대용량의 실시간 데이터를 안전하게 처리할 수 있습니다. 카프카는 프로듀서(Producer)와 컨슈머(Consumer)를 통해 데이터를 실시간으로 전송하고 처리할 수 있습니다.

카프카 설치 및 설정

카프카를 사용하기 위해서는 먼저 설치와 설정 작업이 필요합니다. 카프카의 공식 홈페이지에서 다운로드 후 압축을 풀어주고, 서버를 실행시켜야 합니다. 자세한 설치 및 설정 방법은 카프카 설치 가이드를 참고해주세요.

자바로 카프카 프로듀서 개발하기

카프카 프로듀서는 데이터를 카프카로 전송하는 역할을 합니다. 자바로 카프카 프로듀서를 개발하기 위해서는 먼저 카프카 클라이언트 라이브러리를 프로젝트에 추가해야 합니다.

import org.apache.kafka.clients.producer.*;

public class KafkaProducerExample {

    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        try {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
                producer.send(record);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

위의 예제 코드는 카프카 프로듀서를 구현한 코드입니다. KafkaProducer 클래스를 사용하여 프로듀서 객체를 생성하고, send 메서드를 통해 메시지를 전송합니다.

자바로 카프카 컨슈머 개발하기

카프카 컨슈머는 카프카로부터 데이터를 소비하는 역할을 합니다. 자바로 카프카 컨슈머를 개발하기 위해서는 마찬가지로 카프카 클라이언트 라이브러리를 프로젝트에 추가해야 합니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    
    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        
        consumer.subscribe(Collections.singleton(TOPIC_NAME));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }
    }
}

위의 예제 코드는 카프카 컨슈머를 구현한 코드입니다. KafkaConsumer 클래스를 사용하여 컨슈머 객체를 생성하고, subscribe 메서드를 통해 메시지를 소비할 토픽을 등록합니다. 그 후 poll 메서드를 호출하여 메시지를 수신합니다.

실시간 데이터 스트리밍 처리

카프카를 활용한 실시간 데이터 스트리밍 처리를 위해서는 스트림 처리 엔진(예: Kafka Streams, Apache Flink)을 사용할 수 있습니다. 이를 통해 카프카로부터 데이터를 읽어와 복잡한 처리를 수행할 수 있습니다.

위에서 작성한 카프카 프로듀서와 컨슈머 코드를 사용하여 실시간 데이터 스트리밍 처리 애플리케이션을 개발할 수 있습니다.

결론

자바를 활용하여 카프카를 이용한 실시간 데이터 스트리밍 처리를 개발하는 방법에 대해 알아보았습니다. 카프카의 뛰어난 확장성과 유연성을 활용하여 다양한 실시간 데이터 처리 요구사항을 해결할 수 있습니다. 추가적인 자세한 내용은 카프카 공식 문서를 참고하시기 바랍니다.