[java] 카프카에서 자바를 사용한 데이터 플로우 관리하기

소개

카프카(Kafka)는 대용량의 실시간 데이터 스트리밍을 처리하기 위한 분산 스트리밍 플랫폼입니다. 자바는 카프카와 함께 사용되는 가장 일반적인 프로그래밍 언어 중 하나이며, 데이터 플로우를 관리하기 위해 자바를 사용하는 방법을 알아보겠습니다.

카프카 Java 클라이언트

카프카 Java 클라이언트는 카프카와 상호 작용하기 위한 자바 라이브러리입니다. 이 라이브러리를 사용하여 데이터를 생성하고 소비하는 프로듀서와 컨슈머를 만들 수 있습니다.

프로듀서(Producer) 작성하기

프로듀서는 카프카 토픽에 데이터를 생성하는 역할을 합니다. 아래는 카프카 프로듀서를 생성하고 데이터를 보내는 예제 코드입니다.

import org.apache.kafka.clients.producer.*;

public class SimpleProducer {
    public static void main(String[] args) {
        String topicName = "my-topic";
        String message = "Hello Kafka!";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);
        producer.send(record);

        producer.close();
    }
}

컨슈머(Consumer) 작성하기

컨슈머는 카프카 토픽에서 데이터를 소비하는 역할을 합니다. 아래는 카프카 컨슈머를 생성하고 데이터를 소비하는 예제 코드입니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class SimpleConsumer {
    public static void main(String[] args) {
        String topicName = "my-topic";
        String groupId = "my-group";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", groupId);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
            }
        }

        consumer.close();
    }
}

카프카 관리하기

카프카를 사용하는 동안 데이터 플로우의 상태를 모니터링하고 관리해야 할 수도 있습니다. 이를 위해 카프카 관리 도구인 카프카 컨트롤 센터(Kafka Control Center)를 사용할 수 있습니다.

카프카 컨트롤 센터 설치

카프카 컨트롤 센터는 Confluent 사에서 제공하는 웹 기반 관리 도구입니다. 아래는 카프카 컨트롤 센터를 설치하는 방법입니다.

  1. Confluent Platform 다운로드 페이지(https://www.confluent.io/download)에서 카프카 컨트롤 센터를 다운로드합니다.
  2. 압축을 해제하고 실행을 위한 디렉토리로 이동합니다.
  3. etc/kafka-control-center/kafka-control-center.properties 파일을 열고 필요한 구성을 수정합니다.
  4. bin/kafka-control-center-start 명령어를 실행하여 카프카 컨트롤 센터를 시작합니다.

카프카 컨트롤 센터 사용하기

카프카 컨트롤 센터를 사용하면 토픽 생성, 컨슈머 그룹 모니터링, 알림 설정 등 다양한 작업을 수행할 수 있습니다. 웹 브라우저에서 http://localhost:9021로 접속하여 카프카 컨트롤 센터에 로그인합니다.

결론

자바를 사용하여 카프카에서 데이터 플로우를 관리하는 방법에 대해 알아보았습니다. 프로듀서와 컨슈머를 작성하여 데이터를 생성하고 소비하는 방법을 배웠으며, 카프카 컨트롤 센터를 사용하여 데이터 플로우를 모니터링하고 관리하는 방법도 알아보았습니다. 카프카와 자바를 함께 사용하여 실시간 데이터 스트리밍 애플리케이션을 개발해 보세요.

참고 자료