[java] 자바로 카프카 클러스터 확장하기

카프카는 대용량 실시간 데이터 스트리밍을 처리하기 위한 분산 메시지 큐 시스템으로 인기가 많습니다. 클러스터는 여러 대의 브로커로 구성되어 동작하며, 확장이 필요한 경우 새로운 브로커를 추가하여 클러스터의 용량을 늘릴 수 있습니다.

이 글에서는 자바를 사용하여 카프카 클러스터를 확장하는 방법에 대해 알아보겠습니다.

1. 카프카 라이브러리 추가하기

먼저, 카프카를 사용하기 위해 프로젝트에 카프카 클라이언트 라이브러리를 추가해야 합니다. Maven의 경우, pom.xml 파일에 다음 의존성을 추가합니다:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

Gradle의 경우, build.gradle 파일에 다음 의존성을 추가합니다:

implementation 'org.apache.kafka:kafka-clients:2.8.0'

2. 새로운 브로커 추가하기

클러스터에 새로운 브로커를 추가하는 방법은 여러 가지가 있지만, 가장 일반적인 방법은 server.properties 파일을 수정하는 것입니다.

새로운 브로커를 추가하기 위해 다음 단계를 수행합니다:

  1. 카프카 설치 디렉토리에서 config 폴더로 이동합니다.
  2. server.properties 파일을 복사하여 새로운 브로커의 설정 파일을 생성합니다.
  3. 생성한 설정 파일을 열고 다음 값을 수정합니다:
    • broker.id: 새로운 브로커의 고유 식별자
    • listeners: 새로운 브로커가 클라이언트와 통신할 때 사용할 호스트와 포트 지정
    • log.dirs: 새로운 브로커의 로그 파일을 저장할 디렉토리 경로
  4. 새로운 브로커의 설정 파일을 저장합니다.

3. 자바 코드에서 카프카 클러스터 설정하기

자바 코드에서 카프카 클러스터에 접속하기 위해 다음 단계를 수행합니다:

  1. Properties 객체를 생성하여 카프카 클러스터의 설정을 지정합니다. 다음 속성을 설정해야 합니다:
    • bootstrap.servers: 클러스터에 접속할 브로커의 호스트와 포트 지정
    • key.serializer: 메시지의 키를 직렬화할 클래스 지정
    • value.serializer: 메시지의 값을 직렬화할 클래스 지정
  2. KafkaProducer 또는 KafkaConsumer 객체를 생성합니다. 생성자에 앞서 생성한 Properties 객체를 전달합니다.

예를 들어, 다음은 카프카 클러스터에 메시지를 전송하는 자바 코드입니다:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "my_key", "my_value");
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.err.println("Failed to send message: " + exception.getMessage());
            } else {
                System.out.println("Message sent successfully");
            }
        });

        producer.close();
    }
}

위의 코드는 my_topic 이라는 토픽에 my_keymy_value 를 가지는 메시지를 전송하는 예제입니다.

마무리

이제 자바를 사용하여 카프카 클러스터를 확장하는 방법을 알아보았습니다. 카프카의 공식 도큐먼트와 예제 코드를 참고하여 더 자세한 내용을 학습할 수 있습니다.

참고 문서: