Akka와 Kafka는 모두 실시간 데이터 처리를 위한 인기있는 도구입니다. Akka는 액터 모델을 기반으로 하는 고성능, 분산 애플리케이션 개발을 위한 프레임워크이며, Kafka는 고성능의 메시지 큐 시스템입니다. 이 두 가지를 함께 사용하면 대규모 실시간 애플리케이션을 개발하고 운영하는 데 매우 유용합니다.
Akka-Stream-Kafka
Akka는 Akka-Stream-Kafka 모듈을 통해 Kafka와의 통합을 제공합니다. 이 모듈을 사용하면 Kafka로부터 메시지를 읽어들이거나 메시지를 Kafka로 전송할 수 있습니다. Akka-Stream-Kafka는 Akka Streams의 소재와 함께 메시지를 처리하는 일련의 스트림 워크플로를 정의할 수 있도록 도와줍니다.
예제
다음은 Akka-Stream-Kafka를 사용하여 Kafka에서 메시지를 읽고 처리하는 간단한 예제입니다:
import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.scaladsl.Consumer;
import akka.kafka.scaladsl.Producer;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaIntegrationExample {
public static void main(String[] args) {
// ActorSystem 설정
final ActorSystem system = ActorSystem.create("kafka-integration-example");
// ActorMaterializer 설정
final ActorMaterializer materializer = ActorMaterializer.create(system);
// Kafka Consumer 설정
final String bootstrapServers = "localhost:9092";
final String topic = "my-topic";
final String groupId = "my-group-id";
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
final ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId);
// Kafka로부터 메시지를 읽어들이는 Source 생성
final Source<ConsumerRecord<String, String>, Consumer.Control> kafkaSource =
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic));
// 메시지 처리를 위한 로직 적용
kafkaSource.map(record -> {
// 메시지 처리 로직
String key = record.key();
String value = record.value();
System.out.println("Received message: key=" + key + ", value=" + value);
return record;
}).runWith(Sink.ignore(), materializer);
}
}
위의 예제는 Kafka로부터 메시지를 읽어들여 간단한 처리 로직을 적용하는 예제입니다. ConsumerSettings
를 통해 Kafka Consumer를 설정하고, plainSource
를 사용하여 Kafka로부터 메시지를 읽어들이는 Source를 생성합니다. 그 후 map
을 사용하여 메시지 처리 로직을 정의하고, runWith
를 호출하여 스트림을 실행합니다.
결론
Akka와 Kafka의 통합을 통해 실시간 데이터 처리 애플리케이션의 개발과 운영을 간편하게 할 수 있습니다. Akka-Stream-Kafka를 사용하면 Akka 액터 모델의 강력함과 Kafka의 메시징 기능을 협업하여 대규모 실시간 애플리케이션을 구축할 수 있습니다.