[java] 자바로 카프카에서 발생하는 로그 수집과 분석

카프카(Kafka)는 대량의 데이터를 신속하게 처리하기 위한 분산 스트리밍 플랫폼입니다. 카프카를 사용하면 여러 애플리케이션 간에 실시간으로 데이터를 전송하고 처리할 수 있습니다. 이러한 대량의 데이터를 처리하는 과정에서 로그 데이터의 수집과 분석은 중요한 과제입니다. 이번 글에서는 자바를 사용하여 카프카에서 발생하는 로그 데이터를 수집하고 분석하는 방법에 대해 알아보겠습니다.

1. 카프카에서 로그 데이터 수집하기

카프카에서 로그 데이터를 수집하기 위해서는 카프카 프로듀서(Producer)를 사용해야 합니다. 자바를 사용하여 카프카 프로듀서를 구현하는 방법은 다음과 같습니다.

import org.apache.kafka.clients.producer.*;

public class KafkaLogProducer {
    private static final String TOPIC_NAME = "log_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 10; i++) {
                String log = generateLog(i);
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, log);
                producer.send(record);
            }
        }
    }

    private static String generateLog(int index) {
        return "Log message " + index;
    }
}

위 코드는 KafkaLogProducer 클래스에서 카프카 프로듀서를 생성하고 로그 데이터를 생성하여 카프카 토픽에 전송하는 예시입니다. 카프카 클러스터의 주소(bootstrap.servers)와 메시지를 직렬화하는 방식(key.serializervalue.serializer)을 설정해야 합니다. ProducerRecord를 사용하여 메시지를 생성하고 send 메서드를 통해 카프카 토픽에 전송합니다.

2. 카프카에서 로그 데이터 분석하기

카프카에서 전송된 로그 데이터를 분석하기 위해서는 카프카 컨슈머(Consumer)를 사용해야 합니다. 자바를 사용하여 카프카 컨슈머를 구현하는 방법은 다음과 같습니다.

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaLogConsumer {
    private static final String TOPIC_NAME = "log_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "log_consumer_group";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("group.id", GROUP_ID);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                String log = record.value();
                // 로그 데이터 처리 로직 구현
                System.out.println("Received log: " + log);
            }
        }
    }
}

위 코드는 KafkaLogConsumer 클래스에서 카프카 컨슈머를 생성하고 토픽에서 로그 데이터를 수신하여 처리하는 예시입니다. 카프카 클러스터의 주소(bootstrap.servers)와 컨슈머 그룹의 식별자(group.id)를 설정해야 합니다. ConsumerRecords로부터 수신된 메시지를 읽고 처리하는 로직을 구현할 수 있습니다.

결론

이번 글에서는 자바를 사용하여 카프카에서 발생하는 로그 데이터의 수집과 분석에 대해 알아보았습니다. 카프카 프로듀서를 사용하여 로그 데이터를 생성하고, 카프카 컨슈머를 사용하여 로그 데이터를 수신하고 처리하는 방법에 대해 살펴보았습니다. 카프카를 통해 로그 데이터를 효율적으로 관리하고 분석함으로써 애플리케이션의 성능 향상과 문제 해결에 큰 도움을 줄 수 있습니다.

참고 자료