[java] 카프카와 자바를 이용한 데이터 인덱싱

카프카는 분산 메시징 시스템으로 대용량의 실시간 데이터를 처리하는 데 매우 효과적입니다. 데이터를 신속하고 안정적으로 전달하고 처리하는 데에 사용됩니다. 이러한 특징을 이용하여 카프카를 데이터 인덱싱에 활용할 수 있습니다.

이 글에서는 자바를 사용하여 카프카에서 데이터를 수신하고, 수신한 데이터를 인덱싱하는 방법을 알아보겠습니다.

카프카에서 데이터 수신하기

카프카에서 데이터를 수신하기 위해서는 먼저 카프카 클러스터에 연결해야 합니다. 아래 코드는 자바를 사용하여 카프카 클러스터에 연결하는 예제입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                // 여기서 데이터를 인덱싱하는 작업을 수행합니다.
            }
        }
    }
}

위 코드에서 bootstrap.servers에는 카프카 클러스터의 주소를 입력해야 합니다. group.id는 카프카 컨슈머 그룹의 아이디입니다. key.deserializervalue.deserializer는 데이터의 직렬화 및 역직렬화에 사용할 클래스입니다.

데이터 인덱싱

카프카에서 데이터를 수신한 후, 해당 데이터를 인덱싱하기 위해서는 Elasticsearch나 Solr와 같은 검색 엔진을 사용할 수 있습니다. 이러한 검색 엔진을 사용하여 데이터를 인덱싱하고 검색할 수 있습니다.

아래는 Elasticsearch를 사용하여 데이터를 인덱싱하는 예제 코드입니다.

import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;

import java.io.IOException;

public class ElasticsearchIndexingExample {
    public static void main(String[] args) {
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder("localhost:9200"));

        // 카프카에서 수신한 데이터를 인덱싱합니다.
        IndexRequest request = new IndexRequest("data-index")
                .source("field", "value");

        try {
            IndexResponse response = client.index(request);
            System.out.println("Indexed data: " + response.getId());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

위 코드에서 “localhost:9200”는 Elasticsearch 클러스터의 주소를 입력한 것입니다. source 메소드를 통해 인덱싱할 데이터를 지정할 수 있습니다.

마무리

이 글에서는 카프카와 자바를 이용하여 데이터를 수신하고, Elasticsearch를 이용하여 해당 데이터를 인덱싱하는 방법을 알아보았습니다. 카프카와 자바를 함께 사용하여 대용량의 데이터를 신속하게 처리할 수 있고, 검색 엔진을 사용하여 데이터를 효과적으로 검색할 수 있는 장점이 있습니다.

추가적인 자세한 내용은 Kafka 공식 문서와 Elasticsearch 공식 문서를 참고하시기 바랍니다.