[java] 카프카와 자바를 이용한 실시간 데이터 분석

소개

지금은 대용량의 실시간 데이터가 생성되고 분석되는 시대입니다. 이를 위해 데이터 플랫폼과 분석 도구가 필요한데, 카프카와 자바를 사용하여 실시간 데이터 분석을 구현할 수 있습니다. 이 글에서는 카프카를 활용하여 자바로 실시간 데이터를 수집하고 분석하는 방법을 알아보겠습니다.

카프카란?

카프카는 LinkedIn에서 개발된 분산 스트리밍 플랫폼으로, 대량의 실시간 데이터를 안정적으로 처리하고 전달하기 위해 사용됩니다. 카프카는 메시지 큐 형태로 동작하며, 생산자-소비자 모델을 기반으로 동작합니다.

자바로 카프카 사용하기

카프카를 자바에서 사용하기 위해서는 Apache Kafka 클라이언트를 Maven 또는 Gradle을 통해 프로젝트에 추가해야 합니다.

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

카프카에 데이터를 보내기 위해서는 프로듀서를 생성하고, 데이터를 보내는 방법을 알아야 합니다.

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        try {
            String message = "Hello, Kafka!";
            producer.send(new ProducerRecord<>(TOPIC_NAME, message));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

카프카에서 데이터 읽어오기

카프카에서 데이터를 읽어오기 위해서는 컨슈머를 생성하고, 데이터를 읽어오는 방법을 알아야 합니다.

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    private static final String TOPIC_NAME = "my-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "my-group";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("group.id", GROUP_ID);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(TOPIC_NAME));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                // 데이터 처리 로직 작성
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

위의 예제에서는 “my-topic” 토픽에서 데이터를 읽어와서 처리하는 부분을 작성하면 됩니다.

결론

카프카와 자바를 이용하여 실시간 데이터 분석을 구현하는 방법을 알아보았습니다. 카프카는 대용량 데이터 처리에 용이한 분산 스트리밍 플랫폼으로, 자바에서도 손쉽게 사용할 수 있습니다. 이를 통해 실시간 데이터를 수집하고 분석하는 애플리케이션을 개발할 수 있습니다. 참고 자료를 참조하여 더 자세한 내용을 학습해보시기 바랍니다.

참고 자료