카프카는 대용량의 실시간 데이터 스트리밍을 처리하기 위한 분산 메시징 시스템으로 널리 사용되고 있습니다. 이번 포스트에서는 자바를 사용하여 카프카 브로커 클러스터를 관리하는 방법에 대해 알아보겠습니다.
1. 카프카 클라이언트 설정하기
카프카 브로커에 연결하기 위해서는 먼저 카프카 클라이언트를 설정해야 합니다. 다음과 같이 Maven을 사용하여 아파치 카프카 클라이언트 라이브러리를 프로젝트에 추가합니다.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
2. 카프카 토픽 생성하기
카프카에서는 데이터를 주고 받기 위해 토픽을 사용합니다. 토픽을 생성하기 위해서는 먼저 카프카 어드민 클라이언트를 생성해야 합니다. 다음은 자바 코드에서 토픽을 생성하는 예시입니다.
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
public class KafkaTopicManager {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
AdminClient adminClient = AdminClient.create(config);
String topicName = "my-topic";
int numPartitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
adminClient.createTopics(Collections.singletonList(newTopic));
adminClient.close();
}
}
위의 예시 코드에서는 AdminClient
를 사용하여 카프카 클러스터에 연결하고, NewTopic
객체를 생성하여 토픽을 생성합니다.
3. 카프카 메시지 전송하기
토픽이 생성되었다면 실제 데이터를 카프카에 전송해야 합니다. 자바를 사용하여 카프카에 메시지를 전송하는 방법은 다음과 같습니다.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaMessageProducer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "my-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String message = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record);
producer.close();
}
}
위의 예시 코드에서는 KafkaProducer
를 사용하여 카프카에 메시지를 전송합니다. ProducerRecord
객체를 생성하여 메시지를 전송하고, send
메서드를 사용하여 메시지를 보냅니다.
4. 카프카 메시지 수신하기
카프카에서 전송된 메시지를 수신하기 위해서는 컨슈머를 사용해야 합니다. 다음은 자바 코드에서 카프카 메시지를 수신하는 예시입니다.
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaMessageConsumer {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "my-topic";
private static final String GROUP_ID = "my-consumer-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
}
}
}
위의 예시 코드에서는 KafkaConsumer
를 사용하여 카프카에서 메시지를 수신합니다. subscribe
메서드를 사용하여 해당 토픽을 구독하고, poll
메서드를 사용하여 메시지를 가져옵니다.
결론
이번 포스트에서는 자바를 사용하여 카프카 브로커 클러스터를 관리하는 방법에 대해 알아보았습니다. 카프카를 사용하여 대용량의 실시간 데이터 스트리밍을 처리하려는 경우, 자바를 통한 카프카 클러스터 관리는 매우 유용한 도구입니다.
더 자세한 사항은 아파치 카프카 공식 문서를 참조하시기 바랍니다.