[java] 자바로 카프카에서 발생하는 네트워크 문제 해결하기

카프카는 대규모 데이터 스트리밍 플랫폼으로, 네트워크 문제가 발생할 수 있습니다. 이 문제를 해결하는 가장 중요한 단계는 디버깅과 모니터링입니다. 이 글에서는 자바를 사용하여 카프카 네트워크 문제를 해결하는 방법을 알아보겠습니다.

1. 네트워크 연결 확인

먼저, 카프카 클러스터에 정상적으로 연결되었는지 확인해야 합니다. 자바에서는 카프카 클러스터와의 연결을 설정하는 kafka-clients 라이브러리를 사용할 수 있습니다. 다음은 카프카 클러스터와의 연결을 확인하는 예제 코드입니다.

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicListing;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaConnectionChecker {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092");

        try (AdminClient adminClient = AdminClient.create(properties)) {
            ListTopicsResult listTopicsResult = adminClient.listTopics();

            for (TopicListing topicListing : listTopicsResult.listings().get()) {
                System.out.println(topicListing.name());
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

위 코드는 bootstrap.servers 설정을 통해 카프카 클러스터에 연결하고, listTopics 메서드를 사용하여 모든 토픽을 가져오는 예제입니다. 연결에 성공했다면, 토픽 목록이 출력될 것입니다.

2. 네트워크 대기 시간 설정

카프카에서는 네트워크 연결이 끊길 경우, 어떤 시간까지 대기해야 하는지 설정할 수 있습니다. 이를 통해 네트워크 문제가 임시적으로 발생하는 경우에도 적절히 대응할 수 있습니다. 자바에서는 kafka-clients 라이브러리의 max.block.ms 설정을 사용하여 대기 시간을 설정할 수 있습니다. 아래 코드는 max.block.ms 값을 5000ms로 설정하는 예제입니다.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        properties.put(ConsumerConfig.MAX_BLOCK_MS_CONFIG, "5000");

        try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)) {
            // Kafka Consumer 동작 코드 작성
        }
    }
}

위 코드에서 MAX_BLOCK_MS_CONFIG를 5000으로 설정하면, 네트워크 연결이 끊길 경우 최대 5초까지 대기하게 됩니다.

3. 컨슈머 재시작 로직 구현

네트워크 문제가 발생한 경우, 카프카 컨슈머를 재시작하는 로직을 구현해야 합니다. 자바에서는 Exception 처리를 통해 예외상황에 대응할 수 있습니다. 아래 코드는 카프카 컨슈머를 예외처리하여 재시작하는 예제입니다.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

        while (true) {
            try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

                // Kafka Consumer 처리 로직 작성
                
                kafkaConsumer.commitSync();
            } catch (Exception e) {
                e.printStackTrace();
                // Sleep or retry logic
            }
        }
    }
}

위 코드는 컨슈머를 감싸는 while문을 통해 컨슈머 재시작 로직을 구현하였습니다. 예외가 발생하면 스택 트레이스를 출력하고, 필요에 따라 슬립 또는 재시도 로직을 추가할 수 있습니다.

결론

카프카에서 발생하는 네트워크 문제는 디버깅과 모니터링을 통해 해결할 수 있습니다. 자바를 사용하여 카프카 클러스터에 정상적으로 연결되었는지 확인하고, 대기 시간 설정 및 컨슈머 재시작 로직을 구현할 수 있습니다.