[java] 자바로 카프카에서 발생하는 에러 로그 처리하기

카프카는 대규모 데이터 스트리밍 플랫폼으로, 분산된 아키텍처로 구성되어 있습니다. 하지만 카프카에서는 에러가 발생할 수도 있으며, 이를 효과적으로 처리하는 것은 매우 중요합니다. 이번 포스트에서는 자바를 사용하여 카프카에서 발생하는 에러 로그를 처리하는 방법에 대해 알아보겠습니다.

1. 에러 로그 수신

먼저, 카프카에서 발생하는 에러 로그를 수신하기 위해 카프카 컨슈머를 생성해야 합니다. 아래는 컨슈머를 생성하는 예제 코드입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;

public class KafkaErrorLogConsumer {
    public static void main(String[] args) {
        // 카프카 컨슈머 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "error-log-consumer");
        // 기타 설정 추가

        // 카프카 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 에러 토픽 구독
        consumer.subscribe(Collections.singletonList("error-log-topic"));

        // 메시지 수신 및 처리
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 에러 로그 처리 로직 구현
                System.out.printf("Received error log: key = %s, value = %s%n", record.key(), record.value());
            }
        }
    }
}

위 예제 코드에서는 KafkaErrorLogConsumer 클래스를 통해 카프카 컨슈머를 생성하고, “error-log-topic”을 구독하여 에러 로그를 수신하도록 설정하였습니다. 실제 에러 처리 로직은 for 루프 내에 구현하시면 됩니다.

2. 에러 로그 처리

에러 로그를 받은 후에는 해당 에러를 적절하게 처리해야 합니다. 에러 처리 방법은 에러의 종류와 상황에 따라 다를 수 있지만, 보통은 아래와 같은 방법들을 사용합니다.

실제로 위의 예제 코드에서는 에러 로그를 받은 후에 “Received error log”라는 메시지를 간단히 출력하고 있습니다. 이 부분에서 실제 에러 처리 로직을 구현하시면 됩니다.

3. 예외 처리

카프카에서 발생하는 에러는 예외로 처리되기 때문에, 이에 대한 예외 처리를 적절히 구현해야 합니다. 예외 처리 방법은 에러 처리와 유사하며, 예외 발생 시 어떤 작업을 수행할지 결정하면 됩니다.

아래는 예외 처리를 추가한 예제 코드입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;

public class KafkaErrorLogConsumer {
    public static void main(String[] args) {
        // 카프카 컨슈머 설정
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "error-log-consumer");
        // 기타 설정 추가

        // 카프카 컨슈머 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 에러 토픽 구독
        consumer.subscribe(Collections.singletonList("error-log-topic"));

        // 메시지 수신 및 처리
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 예외 처리 구현
                    try {
                        // 에러 로그 처리 로직 구현
                        System.out.printf("Received error log: key = %s, value = %s%n", record.key(), record.value());
                    } catch (Exception e) {
                        System.err.println("Error occurred while processing error log: " + e.getMessage());
                        // 예외 처리 로직 구현
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }
}

위 예제 코드에서는 에러 처리 로직 구간을 try-catch 문으로 감싸 에러 발생 시에 대한 예외 처리를 추가하였습니다. 예외가 발생했을 때는 해당 예외에 대한 처리를 구현하시면 됩니다.

결론

카프카에서 발생하는 에러 로그를 효과적으로 처리하기 위해 자바를 사용하여 에러 로그를 수신하고 처리하는 방법에 대해 알아보았습니다. 에러 로그를 적절하게 처리함으로써 카프카 클러스터의 안정성을 확보하고, 문제를 미리 예방할 수 있습니다.