[java] Kafka Streams와 데이터 적재 및 추출 방법

이전의 기술들과는 다르게 Kafka는 이벤트 중심과 스트리밍 데이터 처리를 위해 설계된 플랫폼입니다. Kafka Streams는 Kafka에서 데이터 스트림을 처리하기위한 클라이언트 라이브러리입니다. 이번 포스트에서는 Kafka Streams를 사용하여 데이터를 적재하고 추출하는 방법에 대해 알아보겠습니다.

데이터 적재

Kafka Streams를 사용하여 데이터를 적재하는 가장 간단한 방법은 Kafka Producer를 사용하여 데이터를 생성하고 Kafka Topic에 전송하는 것입니다. 이렇게 생성된 데이터는 Kafka Streams 애플리케이션에서 소비될 수 있습니다.

아래의 예제 코드를 통해 Kafka Producer를 사용하여 데이터를 적재하는 방법을 살펴보겠습니다.

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {

        String bootstrapServers = "localhost:9092";
        String topicName = "my-topic";

        // 설정
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Producer 생성
        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 데이터 생성 및 전송
        for (int i = 0; i < 10; i++) {
            String key = "key_" + i;
            String value = "value_" + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, value);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("Record sent successfully");
                    }
                }
            });
        }

        producer.close();
    }
}

위의 예제에서는 KafkaProducer를 사용하여 Kafka Topic으로 데이터를 전송합니다.

데이터 추출

Kafka Streams를 사용하여 데이터를 추출하는 방법은 Kafka Consumer를 사용하여 Kafka Topic에서 데이터를 소비하는 것입니다. Kafka Streams 애플리케이션은 Consumer Group에 소속되어 데이터를 처리할 수 있으며, 여러 개의 Consumer Group을 사용하여 데이터를 병렬로 처리할 수도 있습니다.

아래의 예제 코드를 통해 Kafka Consumer를 사용하여 데이터를 추출하는 방법을 살펴보겠습니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {

        String bootstrapServers = "localhost:9092";
        String topicName = "my-topic";
        String groupId = "my-group";

        // 설정
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("group.id", groupId);

        // Consumer 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // Topic 구독
        consumer.subscribe(Collections.singleton(topicName));

        // 데이터 소비
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received record - Key: " + record.key() + ", Value: " + record.value());
            }
            consumer.commitSync();
        }
    }
}

위의 예제에서는 KafkaConsumer를 사용하여 Kafka Topic으로부터 데이터를 소비합니다.

결론

위의 예제 코드를 통해 Kafka Streams를 사용하여 데이터를 적재하고 추출하는 방법을 알아보았습니다. Kafka Streams는 Kafka를 기반으로 한 스트리밍 데이터 처리를 위한 강력한 도구입니다. 데이터 적재 및 추출에 대한 이해는 Kafka Streams의 효과적인 활용에 큰 도움이 될 것입니다.

더 자세한 내용을 알고 싶다면 Kafka documentation을 참조해주세요.