[java] 카프카와 자바를 이용한 이벤트 로깅 처리하기

이벤트 로깅은 분산 시스템에서 발생하는 이벤트를 기록하고 추적하기 위해 중요한 역할을 합니다. 이에 따라 카프카는 대규모 이벤트 로깅 시스템을 구축하기 위한 인기 있는 플랫폼 중 하나입니다. 이 튜토리얼에서는 자바를 사용하여 카프카로 이벤트를 보내고 로깅을 처리하는 방법에 대해 알아보겠습니다.

1. 카프카 설치 및 설정

카프카를 사용하려면 먼저 카프카 클러스터를 설치하고 설정해야 합니다. Apache Kafka의 공식 웹사이트에서 최신 버전을 다운로드한 후, 압축을 해제합니다. 그리고 카프카의 설정 파일을 편집하여 브로커, 주키퍼, 토픽 등을 설정합니다.

2. Maven 프로젝트 설정

다음으로 Maven 프로젝트를 설정해야 합니다. 이를 위해 Maven pom.xml 파일에 Kafka 클라이언트 의존성을 추가합니다.

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependencies>

3. 이벤트 보내기

이제 자바를 사용하여 카프카로 이벤트를 보내는 방법을 알아보겠습니다. 먼저 Kafka 클라이언트를 초기화하고 카프카 클러스터에 연결합니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class EventProducer {
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC = "event-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        try {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key", "value");
            producer.send(record);
            System.out.println("Event sent successfully");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

위의 코드에서는 BOOTSTRAP_SERVERS에는 Kafka 브로커의 주소를, TOPIC에는 카프카 토픽의 이름을 설정합니다. 이벤트를 전송하기 위해 KafkaProducer 객체를 생성하고 ProducerRecord 객체를 생성하여 이벤트를 설정합니다. 그리고 producer.send()를 사용하여 이벤트를 전송합니다.

4. 이벤트 로깅 처리

마지막으로, 카프카로부터 이벤트를 수신하여 로깅을 처리하는 방법을 알아보겠습니다. 이를 위해 Kafka 컨슈머를 초기화하고 카프카 클러스터에 연결합니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class EventConsumer {
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC = "event-topic";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("group.id", "event-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        try {
            consumer.subscribe(Collections.singletonList(TOPIC));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                records.forEach(record -> {
                    System.out.println("Received event - key: " + record.key() + ", value: " + record.value());
                    // 로깅 처리
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

위의 코드에서는 BOOTSTRAP_SERVERS에는 Kafka 브로커의 주소를, TOPIC에는 카프카 토픽의 이름을 설정합니다. KafkaConsumer 객체를 생성하고 props를 사용하여 속성을 설정합니다. 그리고 consumer.subscribe()을 사용하여 컨슈머가 카프카 토픽을 구독하도록 설정하고, consumer.poll()를 사용하여 이벤트를 수신합니다. 그리고 각 이벤트를 로깅 처리합니다.

이제 카프카와 자바를 이용하여 이벤트 로깅 처리를 위한 간단한 시스템을 구축하는 방법에 대해 알아보았습니다.

참고 자료