[java] 카프카의 자바 API 사용하기

카프카(Kafka)는 분산형 스트리밍 플랫폼으로서 대용량의 실시간 데이터 스트림을 처리하기 위한 도구입니다. 카프카를 사용하면 다양한 소스에서 데이터를 수집하고 다양한 대상에 데이터를 전송할 수 있습니다. 이번에는 카프카의 자바 API를 사용하여 데이터를 생산하고 소비하는 방법에 대해 알아보겠습니다.

카프카 자바 API 추가하기

카프카 자바 API를 사용하기 위해 먼저 해당 API를 프로젝트에 추가해야 합니다. Maven을 사용한다면 pom.xml 파일에 다음과 같은 의존성을 추가하면 됩니다:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

Gradle을 사용한다면 build.gradle 파일에 다음과 같은 의존성을 추가하면 됩니다:

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
}

데이터 생산하기

카프카에서 데이터를 생산하려면 생산자(Producer) 객체를 생성하고 데이터를 보내는 코드를 작성해야 합니다. 다음은 간단한 예제 코드입니다:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        String topic = "my-topic";
        String key = "my-key";
        String value = "Hello, Kafka!";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println("Sent message: " + recordMetadata.topic() +
                            ", partition: " + recordMetadata.partition() +
                            ", offset: " + recordMetadata.offset());
                }
            }
        });

        producer.close();
    }
}

위의 코드에서는 bootstrap.servers 프로퍼티를 통해 카프카 브로커의 주소를 설정하고, key.serializervalue.serializer 프로퍼티를 통해 메시지의 키와 값을 직렬화합니다. 그리고 ProducerRecord 객체를 생성하여 생산할 메시지를 설정한 후 producer.send() 메서드를 호출하여 메시지를 보냅니다. 보내는 과정에서 문제가 발생하면 Callback 객체를 통해 처리할 수 있습니다.

데이터 소비하기

카프카에서 데이터를 소비하려면 소비자(Consumer) 객체를 생성하고 데이터를 받아오는 코드를 작성해야 합니다. 다음은 간단한 예제 코드입니다:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "my-group");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        String topic = "my-topic";

        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " +
                        "topic: " + record.topic() +
                        ", partition: " + record.partition() +
                        ", offset: " + record.offset() +
                        ", key: " + record.key() +
                        ", value: " + record.value());
            }
        }

        consumer.close();
    }
}

위의 코드에서는 bootstrap.servers 프로퍼티를 통해 카프카 브로커의 주소를 설정하고, group.id 프로퍼티를 통해 소비자 그룹을 설정합니다. 그리고 subscribe() 메서드를 호출하여 소비할 토픽을 지정하고, poll() 메서드를 호출하여 메시지를 받아옵니다. 받은 메시지는 반복문을 통해 처리할 수 있습니다.

결론

이렇게 카프카의 자바 API를 사용하여 데이터를 생산하고 소비하는 방법을 알아보았습니다. 카프카는 많은 기능과 유연성을 제공하므로 다양한 상황에서 실시간 데이터 스트림을 처리할 수 있습니다. 카프카를 사용하면 대용량의 데이터를 안정적으로 처리할 수 있으며, 자바 API를 사용하면 쉽게 카프카를 통합할 수 있습니다.

더 자세한 정보는 카프카 공식 문서를 참고하시기 바랍니다.