이번 글에서는 Apache Kafka를 사용하여 자바로 데이터 파이프라인을 구축하는 방법을 알아보겠습니다. 카프카는 분산 스트리밍 플랫폼으로, 대량의 데이터를 안정적이고 확장 가능한 방식으로 처리할 수 있도록 지원합니다.
1. 카프카 설치 및 설정
먼저, 카프카를 설치하고 설정하는 과정부터 시작합니다. 아래의 단계를 따라 진행해보세요.
1-1. 카프카 설치
먼저 Apache Kafka의 공식 웹사이트에서 카프카를 다운로드 받습니다. 다운로드 후 압축을 해제하면 카프카 폴더가 생성됩니다.
1-2. 카프카 설정
카프카 폴더에서 config
폴더로 이동한 후, server.properties
파일을 열어서 카프카의 설정을 수정합니다. 예를 들어, listeners
속성을 PLAINTEXT://localhost:9092
로 변경하여 로컬에서 접속할 수 있도록 설정할 수 있습니다.
2. 카프카 프로듀서 작성
이제 카프카 프로듀서를 작성해보겠습니다. 카프카 프로듀서는 데이터를 생성하여 카프카 토픽으로 보내는 역할을 합니다. 아래는 간단한 자바 코드 예시입니다.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 프로듀서 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 카프카 프로듀서 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 메시지 전송
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key", "value");
producer.send(record);
// 카프카 프로듀서 종료
producer.close();
}
}
위의 코드는 my_topic
라는 토픽에 “key”와 “value”라는 메시지를 보내는 카프카 프로듀서를 작성한 예시입니다. 카프카 클러스터와 연결할 주소와 직렬화 설정은 Properties 객체에 지정하고, KafkaProducer
를 생성하여 메시지를 전송하고 프로듀서를 종료하는 간단한 코드입니다.
3. 카프카 컨슈머 작성
이번에는 카프카 컨슈머를 작성해보겠습니다. 컨슈머는 카프카 토픽에서 데이터를 소비하는 역할을 합니다. 아래는 간단한 자바 코드 예시입니다.
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 컨슈머 설정
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 카프카 컨슈머 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 토픽 구독
consumer.subscribe(Collections.singletonList("my_topic"));
// 메시지 수신 및 처리
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 메시지 처리 로직 작성
}
// 카프카 컨슈머 종료
consumer.close();
}
}
위의 코드는 my_topic
토픽을 구독하는 카프카 컨슈머를 작성한 예시입니다. 마찬가지로 카프카 클러스터와 연결할 주소와 직렬화 설정은 Properties 객체에 지정하며, KafkaConsumer
를 생성하여 메시지를 수신하고 처리하는 간단한 코드입니다.
4. 실행 및 결과 확인
카프카 프로듀서와 컨슈머를 작성한 후, 실행하여 결과를 확인해보세요. 프로듀서가 메시지를 카프카 토픽으로 전송하고, 컨슈머가 해당 토픽에서 메시지를 수신하고 처리하는 것을 확인할 수 있습니다.
결론
이상으로 카프카를 사용하여 자바로 데이터 파이프라인을 구성하는 방법을 알아보았습니다. 카프카를 통해 대용량의 데이터를 안정적이고 확장 가능한 방식으로 처리할 수 있으며, 자바를 사용하여 간편하게 데이터 파이프라인을 구축할 수 있습니다.
더 자세한 내용은 Apache Kafka 공식 웹사이트를 참고하시기 바랍니다.
참고문헌:
- Kafka Documentation. https://kafka.apache.org/documentation/
- “카프카 한글 번역 문서” 개발자몪. https://kaafkateam.gitbook.io/kafka-kr