[Kafka] 5장. 카프카 컨슈머 : 카프카에서 데이터 읽기

Kafka 기초 다지기

출처 : 카프카 핵심 가이드 (O’Reilly)

목차

  1. 카프카 훑어보기
  2. 범용 메시지 큐와 비교하기
  3. 카프카 프로듀서 : 카프카에 메시지 쓰기
  4. 카프카 컨슈머 : 중요 개념
  5. 카프카 컨슈머 : 카프카에서 데이터 읽기
  6. 스키마 레지스트리
  7. 카프카 내부 메커니즘
  8. 신뢰성 있는 데이터 전달
  9. 데이터 파이프라인 구축하기

카프카 컨슈머 : 카프카에서 데이터 읽기

1. 카프카 컨슈머 생성하기

1) KafkaConsumer

2) 컨슈머 설정하기

3) 컨슈머 생성하기

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "...StringDeserializer");
props.put("value.deserializer", "...StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

2. 토픽 구독과 폴링 루프

1) 토픽 구독

2) 폴링 루프

try {
    while (true) { // 많은 데이터를 읽기 위해 카프카 계속 폴링
        ConsumerRecords<String, String> records = consumer.poll(100); 
        // 여기서 100은 타임아웃 간격
        // 이 안에 poll()이 안이루어지면 죽은 것으로 간주됨
        // poll()은 레코드들이 저장된 List 반환
        
        for (ConsumerRecord<String, String> record : records) {
            // 할 일 하자...
            
            int updatedCount = 1;
            
            if (custCountryMap.containsValue(record.value())) {
                updatedCount = custCountryMap.get(record.value()) + 1;
            }
            custCountryMap.put(record.value(), updatedCount);
            
            JSONObject json = new JSONObject(custCountryMap);            
        }
    }
} finally {
    customer.close(); // 컨슈머가 종료될 때는 항상 close()를 실행해야 한다!
}

3) 어떻게 폴링 루프를 벗어나야 할까?

public static void main(String args[]) {
    // ...
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            consumer.wakeup();
            
            try {
                mainThread.join();
            } catch (InterruptException e) {
                e.printStackTrace();
            }
        }
    });
    
    // ...
    
    try {
        // 루프를 계속 반복 실행... Ctrl + C 를 누르면 바로 앞의 코드에서
        // addShutdownHook()로 등록한 스레드의 run() 메소드가 실행됨
        
        while (true) {
            ConsumerRecords<String, String> records = movingAvg.consumer.poll(1000);
            
            for (ConsumerRecord<String, String> record : records) {
            	// 할 거 하자!   
            }
            for (TopicPartition tp : consumer.assignment()) {
                movingAvg.consumer.commitSync();
            }
        }
    } catch (WakeupException e) {
        // 컨슈머 스레드를 닫고 애플리케이션을 종료할 것이므로 이 에러는 무시한다.
    } finally {
        consumer.close(); // 컨슈머 스레드는 종료 전에 반드시 닫아야 한다!
    }    
}

4. 커밋하기

1) 현재의 오프셋 커밋하기

2) 비동기 커밋

3) 동기와 비동기 커밋 함께 사용하기

4) 특정 오프셋 커밋하기


5. 리밸런싱 리스너

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionAssigned(Collection<TopicPartition> partitions) {
    }
    
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 할 일 하자!
        consumer.commitSync(currentOffsets);
    }
}

6. 역직렬처리기

1) 기본 역직렬처리기

2) 커스텀 역직렬처리기

public class CustomerDeserializer implements Deserializer<Customer> {
    
    @Override
    public void configure(Map configs, boolean isKey) {}
    
    @Override
    public Customer deserialize(String topic, byte[] data) {
        int id; // 필드를 미리 정의해둬야 한다.
        int nameSize;
        String name;
        
        try {
            if (data == null)
            	return null;
            if (data.length < 0)
            	throw new SerializationException("...");
            	
            ByteBuffer buffer = ByteBuffer.wrap(data);
            id = buffer.getInt();
            nameSize = buffer.getInt();
            byte[] nameBytes = new byte[nameSize];
            buffer.get(nameBytes);
            name = new String(nameBytes, "UTF-8");
            
            return new Customer(id, name);
        } catch (Exception e) {
            throw new SerializationException("...");
        }
    }
    
    @Override
    public void close() {
    }
}