[Kafka] 3장. 카프카 프로듀서 : 카프카에 메시지 쓰기

Kafka 기초 다지기

출처 : 카프카 핵심 가이드 (O’Reilly)

목차

  1. 카프카 훑어보기
  2. 범용 메시지 큐와 비교하기
  3. 카프카 프로듀서 : 카프카에 메시지 쓰기
  4. 카프카 컨슈머 : 중요 개념
  5. 카프카 컨슈머 : 카프카에서 데이터 읽기
  6. 스키마 레지스트리
  7. 카프카 내부 메커니즘
  8. 신뢰성 있는 데이터 전달
  9. 데이터 파이프라인 구축하기

카프카 프로듀서 : 카프카에 메시지 쓰기

1. 프로듀서 개요

1) 카프카 프로듀서

2) 카프카 프로듀서의 작업 처리 개요

이미지

  1. 카프카에 쓰고자 하는 메시지를 가지는 ProducerRecode를 생성한다
  2. 직렬처리기 : 키와 값의 쌍으로 구성되는 메시지 객체들이 네트워크로 전송될 수 있도록 바이트 배열로 직렬화한다
  3. 해당 데이터는 파티셔너 컴포넌트에 전달된다.
  4. 같은 토픽과 파티션으로 전송될 레코드들을 모은 레코드 배치에 추가한다.
  5. 별개의 스레드가 그 배치를 카프카 브로커에게 전달한다.
  6. 브로커는 수신된 레코드의 메시지를 처리한 후 응답을 전송한다.
    • 메세지가 성공적으로 쓰이면 RecordMetadata를 반환한다.
    • 이 객체는 토픽, 파티션, 파티션 내부의 메시지 오프셋을 갖는다.
    • 실패하면 에러를 반환한다.

3) 프로듀서 설정하기

private Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", 	"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", 	"org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = 
    new KafkaProducer<String, String> (kafkaProps);

2. 카프카에 메시지 전송하기

1) 간단한 예시

ProducerRecode<String, String> record = 
    new ProducerRecode<>("CustomerCountry", "Precision Products", "France");

try {
    producer.cend(record);
} catch (Exception e) {
    e.printStackTrace();
}

2) 동기식으로 메시지 전송하기

ProducerRecode<String, String> record = 
    new ProducerRecode<>("CustomerCountry", "Precision Products", "France");

try {
    producer.cend(record).get();
} catch (Exception e) {
    e.printStackTrace();
}

3) 비동기식으로 메시지 전송하기

priate class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) P
        	e.printStackTrace();
    }
}

ProducerRecode<String, String> record = 
    new ProducerRecode<>("CustomerCountry", "Precision Products", "France");

producer.send(record, new DemoProducerCallback());

3. 직렬처리기

1) 기본 직렬처리기

2) 커스텀 직렬처리기

public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) {
        // 구성이 필요한 내용
    }
    
    @Override
    /**
    Customer의 직렬화는 다음과 같이 한다.
    - customerId를 나타내는 4바이트의 정수
    - customerName의 길이를 나타내는 4바이트의 정수
    - customerName의 내용이 들어간느 N바이트의 문자열
    */
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serialziedName;
            int stringSize;
            if (data == null)
                return null;
            else {
                if (data.getName() != null) {
                    serializedName = data.getName.getBytes("UTF-8");
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException();
        }
    }
    
    @Override
    public void close() {
        // close 해줘야 할 내용
    }
}

3) 범용 직렬처리기 (Apache Avro)


4. 파티션

1) 기본 파티션

2) 커스텀 파티셔너 구현하기