[java] 자바로 카프카 데이터 유실 방지하기

카프카(Kafka)는 대용량의 실시간 데이터 스트리밍을 처리하기 위한 분산 메시지 큐 시스템입니다. 하지만 카프카의 특성상 데이터 유실이 발생할 수 있으며, 이는 실시간 데이터 스트리밍 시스템에서 치명적인 문제가 될 수 있습니다. 이번 포스트에서는 자바를 사용하여 카프카 데이터 유실을 방지하는 방법에 대해 알아보겠습니다.

1. 메시지 전송 확인

카프카에서 메시지를 전송할 때는 ProducerRecord 객체를 사용합니다. 이 때 ProducerRecord 객체에는 메시지를 전송하는 토픽, 파티션 및 메시지 자체가 포함됩니다. 데이터 유실을 방지하기 위해서는 메시지 전송 결과를 확인해야 합니다.

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.*;

public class KafkaProducerExample {
    private static final String TOPIC = "my_topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key", "value");

        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("Error while sending message to Kafka");
                    exception.printStackTrace();
                } else {
                    System.out.println("Message sent to Kafka");
                }
            }
        });
        producer.close();
    }
}

위의 예제에서 send() 메서드의 콜백에서 메시지 전송 결과를 확인하게 됩니다. onCompletion() 메서드는 성공적으로 메시지가 전송된 경우에는 metadata 객체와 함께 호출되고, 실패한 경우에는 exception 객체를 받게 됩니다.

2. 오프셋 관리하기

카프카에서 각 파티션은 메시지의 오프셋(offset)을 가지고 있습니다. 오프셋은 해당 파티션의 메시지의 순서를 나타내는 값으로, 이를 이용하여 데이터 유실을 감지할 수 있습니다.

자바에서 카프카 오프셋을 관리하기 위해서는 KafkaConsumer 객체를 사용해야 합니다. 아래의 예제를 통해 오프셋을 확인하는 방법을 알아보겠습니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
import java.util.*;

public class KafkaConsumerExample {
    private static final String TOPIC = "my_topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my_consumer_group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value() + ", offset: " + record.offset());
            }

            consumer.commitSync();
        }
    }
}

위의 예제에서 poll() 메서드로 메시지를 가져온 후, 각 메시지의 오프셋을 확인하여 데이터 유실을 방지할 수 있습니다. commitSync() 메서드로 오프셋을 커밋하여 메시지 처리의 일관성을 보장할 수 있습니다.

결론

카프카를 이용한 실시간 데이터 스트리밍 시스템에서 데이터의 유실은 치명적인 문제가 될 수 있습니다. 자바를 사용하여 메시지 전송 결과를 확인하고, 오프셋을 관리함으로써 데이터 유실을 방지할 수 있습니다. 이를 통해 안정적인 데이터 처리 시스템을 구축할 수 있습니다.

참고문서: