[java] Akka와 Kafka의 통합

Akka와 Kafka는 모두 실시간 데이터 처리를 위한 인기있는 도구입니다. Akka는 액터 모델을 기반으로 하는 고성능, 분산 애플리케이션 개발을 위한 프레임워크이며, Kafka는 고성능의 메시지 큐 시스템입니다. 이 두 가지를 함께 사용하면 대규모 실시간 애플리케이션을 개발하고 운영하는 데 매우 유용합니다.

Akka-Stream-Kafka

Akka는 Akka-Stream-Kafka 모듈을 통해 Kafka와의 통합을 제공합니다. 이 모듈을 사용하면 Kafka로부터 메시지를 읽어들이거나 메시지를 Kafka로 전송할 수 있습니다. Akka-Stream-Kafka는 Akka Streams의 소재와 함께 메시지를 처리하는 일련의 스트림 워크플로를 정의할 수 있도록 도와줍니다.

예제

다음은 Akka-Stream-Kafka를 사용하여 Kafka에서 메시지를 읽고 처리하는 간단한 예제입니다:

import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.scaladsl.Consumer;
import akka.kafka.scaladsl.Producer;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaIntegrationExample {
    public static void main(String[] args) {
        // ActorSystem 설정
        final ActorSystem system = ActorSystem.create("kafka-integration-example");

        // ActorMaterializer 설정
        final ActorMaterializer materializer = ActorMaterializer.create(system);

        // Kafka Consumer 설정
        final String bootstrapServers = "localhost:9092";
        final String topic = "my-topic";
        final String groupId = "my-group-id";

        final Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        final ConsumerSettings<String, String> consumerSettings =
                ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
                        .withBootstrapServers(bootstrapServers)
                        .withGroupId(groupId);

        // Kafka로부터 메시지를 읽어들이는 Source 생성
        final Source<ConsumerRecord<String, String>, Consumer.Control> kafkaSource =
                Consumer.plainSource(consumerSettings, Subscriptions.topics(topic));

        // 메시지 처리를 위한 로직 적용
        kafkaSource.map(record -> {
            // 메시지 처리 로직
            String key = record.key();
            String value = record.value();
            System.out.println("Received message: key=" + key + ", value=" + value);
            return record;
        }).runWith(Sink.ignore(), materializer);
    }
}

위의 예제는 Kafka로부터 메시지를 읽어들여 간단한 처리 로직을 적용하는 예제입니다. ConsumerSettings를 통해 Kafka Consumer를 설정하고, plainSource를 사용하여 Kafka로부터 메시지를 읽어들이는 Source를 생성합니다. 그 후 map을 사용하여 메시지 처리 로직을 정의하고, runWith를 호출하여 스트림을 실행합니다.

결론

Akka와 Kafka의 통합을 통해 실시간 데이터 처리 애플리케이션의 개발과 운영을 간편하게 할 수 있습니다. Akka-Stream-Kafka를 사용하면 Akka 액터 모델의 강력함과 Kafka의 메시징 기능을 협업하여 대규모 실시간 애플리케이션을 구축할 수 있습니다.