[java] 카프카에서 자바를 사용한 데이터 결합 및 분석

카프카(Kafka)는 대용량 실시간 데이터 스트리밍 플랫폼으로, 실시간 데이터를 처리하고 저장하기 위한 분산 시스템입니다. 자바는 카프카와 함께 사용할 수 있는 주요 프로그래밍 언어 중 하나입니다. 이 글에서는 카프카에서 자바를 사용하여 데이터를 결합하고 분석하는 방법을 알아보겠습니다.

카프카와 자바 연동

먼저, 카프카와 자바를 연동하기 위해서는 카프카 클라이언트를 사용해야 합니다. 이를 위해 프로젝트의 의존성에 아래의 카프카 클라이언트 라이브러리를 추가해야 합니다.

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

위의 의존성을 추가한 후, 카프카 클러스터에 연결하기 위한 설정을 작성해야 합니다. 아래는 카프카 클러스터의 호스트와 포트를 지정하는 예시입니다.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

이제 데이터를 소비자(consumer)로부터 결합하고 처리하기 위해 카프카 토픽에 구독(subscribe)할 수 있습니다. 아래는 토픽에 구독하여 데이터를 소비하는 예시입니다.

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList("my-topic"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 데이터 처리 로직
        }
    }
}

위의 코드에서 “my-topic”이라는 토픽에서 데이터를 가져와서 처리하는 로직을 작성하면 됩니다.

데이터 결합 및 분석

카프카에서 자바를 사용하여 데이터를 결합하고 분석하는 방법은 다양합니다. 예를 들어, 다른 토픽에서 가져온 데이터를 조인(join)하여 새로운 결과를 생성하거나, 특정 조건에 따라 데이터를 필터링(filtering)하는 등의 작업을 수행할 수 있습니다.

데이터 조인의 예시를 살펴보겠습니다. 먼저, 두 개의 토픽에서 데이터를 가져와 결합하는 코드를 작성합니다.

KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
consumer1.subscribe(Collections.singletonList("topic1"));

KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Collections.singletonList("topic2"));

while (true) {
    ConsumerRecords<String, String> records1 = consumer1.poll(Duration.ofMillis(100));
    ConsumerRecords<String, String> records2 = consumer2.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record1 : records1) {
        // record1 데이터 처리 로직

        for (ConsumerRecord<String, String> record2 : records2) {
            // record2 데이터 처리 로직
        }
    }
}

위의 코드에서는 topic1과 topic2에서 가져온 데이터를 이중 for문을 통해 결합하고 처리하는 로직을 작성할 수 있습니다.

데이터 분석의 예시로는 특정 조건에 따라 데이터를 필터링하는 작업을 들 수 있습니다. 아래는 특정 조건을 만족하는 데이터만 처리하는 예시입니다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        if (record.value().equals("filterCondition")) {
            // 데이터 처리 로직
        }
    }
}

위의 코드에서는 “filterCondition” 값을 가진 데이터만 처리하는 로직을 작성할 수 있습니다.

결론

이와 같이 카프카에서 자바를 사용하여 데이터를 결합하고 분석하는 방법을 알아보았습니다. 카프카와 자바를 함께 사용하면 대량의 실시간 스트리밍 데이터를 효과적으로 처리할 수 있습니다. 자바를 활용하여 다양한 데이터 분석 작업을 수행할 수 있으며, 카프카 클라이언트를 통해 카프카와의 연동을 쉽게 구현할 수 있습니다.

참고 문서: Kafka 클라이언트 사용하기