[java] 카프카에서 자바를 사용한 데이터 흐름 제어

소개

아파치 카프카(Kafka)는 분산형 스트리밍 플랫폼으로 대용량의 실시간 데이터 흐름을 처리하는 데 사용됩니다. 카프카를 사용하여 데이터를 제어하려면 자바 언어를 사용하여 프로그램을 작성해야합니다. 이 문서에서는 카프카에서 자바를 사용하여 데이터 흐름을 제어하는 방법에 대해 설명하겠습니다.

카프카 클라이언트 라이브러리 추가

카프카 클라이언트를 사용하기 위해 먼저 해당 라이브러리를 프로젝트에 추가해야합니다. 아래의 Maven 의존성을 프로젝트의 pom.xml 파일에 추가하여 카프카 클라이언트를 사용할 수 있습니다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.1</version>
</dependency>

데이터 생성

카프카에서 데이터를 생성하려면 Producer를 사용합니다. 자바에서는 KafkaProducer 클래스를 사용하여 데이터를 생성할 수 있습니다. 아래의 코드는 KafkaProducer를 사용하여 데이터를 생성하는 예제입니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class DataProducer {
    public static void main(String[] args) {
        // 카프카 클러스터의 주소와 기타 설정 정보를 설정합니다.
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 프로듀서를 생성합니다.
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 데이터를 생성하여 토픽에 전송합니다.
        for (int i = 0; i < 10; i++) {
            String key = "key" + i;
            String value = "value" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", key, value);
            producer.send(record);
        }

        // 프로듀서를 종료합니다.
        producer.close();
    }
}

위의 예제 코드에서는 KafkaProducer 객체를 생성한 후, 반복문을 사용하여 토픽에 데이터를 전송하고, 마지막으로 프로듀서를 종료합니다. 이와 같은 방식으로 데이터를 생성할 수 있습니다.

데이터 소비

카프카에서 데이터를 소비하려면 Consumer를 사용합니다. 자바에서는 KafkaConsumer 클래스를 사용하여 데이터를 소비할 수 있습니다. 아래의 코드는 KafkaConsumer를 사용하여 데이터를 소비하는 예제입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class DataConsumer {
    public static void main(String[] args) {
        // 카프카 클러스터의 주소와 기타 설정 정보를 설정합니다.
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my_consumer_group");

        // 컨슈머를 생성합니다.
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 토픽을 구독합니다.
        consumer.subscribe(Collections.singletonList("my_topic"));

        // 데이터를 소비합니다.
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            records.forEach(record ->
                System.out.printf("Received message: key=%s, value=%s, topic=%s, partition=%s, offset=%s\n",
                        record.key(), record.value(), record.topic(), record.partition(), record.offset())
            );
        }
    }
}

위의 예제 코드에서는 KafkaConsumer 객체를 생성한 후, subscribe 메서드를 사용하여 토픽을 구독합니다. 그 후 무한 루프를 돌면서 데이터를 소비합니다. 소비된 데이터를 처리하는 방식은 forEach 메서드 내에서 정의할 수 있습니다.

결론

이 문서에서는 카프카에서 자바를 사용한 데이터 흐름 제어에 대해 알아보았습니다. 데이터 생성과 소비에 대한 간단한 예제 코드를 제공했으며, 이를 기반으로 실제 프로젝트에서 카프카를 활용할 수 있습니다. 카프카 클라이언트 라이브러리의 다양한 기능과 설정에 대해서는 아파치 카프카 공식문서를 참고하시기 바랍니다.