[java] 자바에서 카프카와 함께하는 데이터 처리

카프카(Kafka)는 대용량 실시간 데이터 스트리밍 플랫폼으로, 분산 메시지 큐 시스템입니다. 많은 기업들이 카프카를 사용하여 데이터를 신속하고 효율적으로 처리하고 있습니다. 자바는 카프카 클라이언트 라이브러리를 통해 간단하게 카프카와 통신할 수 있습니다.

카프카와 자바 프로듀서

카프카 프로듀서는 데이터를 카프카 브로커로 보내는 역할을 합니다. 자바에서는 KafkaProducer 클래스를 사용하여 카프카 프로듀서를 만들 수 있습니다. 아래는 자바에서 카프카 프로듀서를 생성하고 메시지를 보내는 예제 코드입니다.

```java import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample { public static void main(String[] args) { // 카프카 프로듀서 설정 Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

    // 카프카 프로듀서 생성
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);

    // 메시지 보내기
    String topic = "my_topic";
    String data = "Hello, Kafka!";
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, data);
    producer.send(record);

    // 카프카 프로듀서 종료
    producer.close();
} } \```

위의 코드에서는 bootstrap.servers 속성을 통해 카프카 브로커의 주소를 설정하고, key.serializervalue.serializer 속성을 통해 메시지의 키와 값을 직렬화합니다. KafkaProducer 객체를 생성한 후 send 메서드를 호출하여 메시지를 보냅니다. 마지막으로 close 메서드를 호출하여 카프카 프로듀서를 종료합니다.

카프카와 자바 컨슈머

카프카 컨슈머는 카프카 브로커에서 데이터를 가져오는 역할을 합니다. 자바에서는 KafkaConsumer 클래스를 사용하여 카프카 컨슈머를 생성하고 데이터를 가져올 수 있습니다. 아래는 자바에서 카프카 컨슈머를 생성하고 데이터를 가져오는 예제 코드입니다.

```java import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord;

public class KafkaConsumerExample { public static void main(String[] args) { // 카프카 컨슈머 설정 Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “my_group”); props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

    // 카프카 컨슈머 생성
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    // 토픽 구독
    String topic = "my_topic";
    consumer.subscribe(Arrays.asList(topic));

    // 데이터 가져오기
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
        }
    }
} } \```

위의 코드에서는 bootstrap.servers 속성을 통해 카프카 브로커의 주소를 설정하고, group.id 속성을 통해 컨슈머 그룹을 지정합니다. KafkaConsumer 객체를 생성한 후 subscribe 메서드를 호출하여 토픽을 구독합니다. poll 메서드를 통해 데이터를 가져올 수 있습니다.

결론

이 글에서는 자바에서 카프카와 함께하는 데이터 처리에 대해 알아보았습니다. 카프카 프로듀서를 사용하여 데이터를 보내고, 카프카 컨슈머를 사용하여 데이터를 가져올 수 있습니다. 자바를 통해 카프카를 활용하면 대용량 데이터를 신속하게 처리할 수 있습니다.

더 자세한 내용은 아파치 카프카 공식 문서를 참고하시기 바랍니다.