[Kafka] 11장. Docker로 Schema Registry 실습해보기

Kafka 기초 다지기

Docker로 Schema Registry 실습해보기

목차


1. 준비작업

Zookeeper 실행

docker run -d \
  --net=confluent \
  --name=zookeeper \
  -e ZOOKEEPER_CLIENT_PORT=2181 \
  confluentinc/cp-zookeeper:5.1.0

포트 2181으로 지정


Kafka 실행

docker run -d \
  --net=confluent \
  --name=kafka \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  confluentinc/cp-kafka:5.1.0

2181은 미리 설정해 둔 주키퍼 포트


Schema Registry 실행

docker run -d \
--net=confluent \
--name=schema-registry \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 \
confluentinc/cp-schema-registry:5.1.0

포트를 8081으로 지정


2. 테스트용 토픽 생성 및 메시지 전송

토픽 생성

docker run \
--net=confluent \
--rm confluentinc/cp-kafka:5.1.0 \
kafka-topics --create --topic ${토픽 이름} --partitions 1 --replication-factor 1 \
--if-not-exists --zookeeper zookeeper:2181

Created topic "foo".


토픽 검증 (verify)

docker run \
--net=confluent \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-topics --describe --topic foo --zookeeper zookeeper:2181

Topic:foo PartitionCount:1 ReplicationFactor:1 Configs: Topic: foo Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001


새 데이터 pub

docker run \
--net=confluent \
--rm \
confluentinc/cp-kafka:5.1.0 \
bash -c "seq 42 | kafka-console-producer --request-required-acks 1 \
--broker-list kafka:9092 --topic foo && echo '${메시지 내용}"

Produced 42 messages. // 메시지 내용


생성한 데이터 sub (built-in Consumer)

docker run \
--net=confluent \
--rm \
confluentinc/cp-kafka:5.1.0 \
kafka-console-consumer --bootstrap-server kafka:9092 --topic ${토픽 이름} --from-beginning --max-messages 42

1 .... 42 Processed a total of 42 messages


3. 스키마 레지스트리 활용

interactive (-it) 모드로 실행

docker run -it --net=confluent --rm confluentinc/cp-schema-registry:5.1.0 bash

스키마 검증 (validate)

# /usr/bin/kafka-avro-console-producer \
  --broker-list kafka:9092 --topic bar \
  --property schema.registry.url=http://schema-registry:8081 \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Error deserializing json to Avro of schema {"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}

스키마와 맞지 않는 값을 보내면 에러를 뱉는다


4. REST 프록시 사용

REST 프록시 실행

docker run -d \
--net=confluent \
--name=kafka-rest \
-e KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_REST_LISTENERS=http://0.0.0.0:8082 \
-e KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-e KAFKA_REST_HOST_NAME=kafka-rest \
confluentinc/cp-kafka-rest:5.1.0

interactive (-it) 모드로 실행

docker run -it --net=confluent --rm confluentinc/cp-schema-registry:5.1.0 bash

컨슈머 인스턴스 생성

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
  --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "smallest"}' \
  http://kafka-rest:8082/consumers/my_avro_consumer

{"instance_id":"my_consumer_instance","base_uri":"http://kafka-rest:8082/consumers/my_avro_consumer/instances/my_consumer_instance"}


bar 토픽의 데이터 가져오기

curl -X GET -H "Accept: application/vnd.kafka.avro.v1+json" \
  http://kafka-rest:8082/consumers/my_avro_consumer/instances/my_consumer_instance/topics/bar

[{"key":null,"value":{"f1":"value1"},"partition":0,"offset":0},{"key":null,"value":{"f1":"value2"},"partition":0,"offset":1},{"key":null,"value":{"f1":"value3"},"partition":0,"offset":2}]


5. Control Center 시작하기

실행부터

docker run -d \
--name=control-center \
--net=confluent \
--ulimit nofile=16384:16384 \
-p 9021:9021 \
-v /tmp/control-center/data:/var/lib/confluent-control-center \
-e CONTROL_CENTER_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e CONTROL_CENTER_BOOTSTRAP_SERVERS=kafka:9092 \
-e CONTROL_CENTER_REPLICATION_FACTOR=1 \
-e CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS=1 \
-e CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS=1 \
-e CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS=2 \
-e CONTROL_CENTER_CONNECT_CLUSTER=http://kafka-connect:8082 \
confluentinc/cp-enterprise-control-center:5.1.0

HTTP 인터페이스로, 포트를 9092로 지정해서 사용함! 접속할 때, 도커 썼으면 http://${host-ip}:9092


테스트용 토픽 생성

docker run \
--net=confluent \
--rm confluentinc/cp-kafka:5.1.0 \
kafka-topics --create --topic c3-test --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181

Created topic "c3-test".


콘솔 프로듀서로 메시지 전송

  while true;
do
  docker run \
    --net=confluent \
    --rm \
    -e CLASSPATH=/usr/share/java/monitoring-interceptors/monitoring-interceptors-5.1.0.jar \
    confluentinc/cp-kafka-connect:5.1.0 \
    bash -c 'seq 10000 | kafka-console-producer --request-required-acks 1 --broker-list kafka:9092 --topic c3-test --producer-property interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor --producer-property acks=1 && echo "Produced 10000 messages."'
    sleep 10;
done

Produced 10000 messages


콘솔 컨슈머로 데이터 소비

OFFSET=0
  while true;
do
  docker run \
    --net=confluent \
    --rm \
    -e CLASSPATH=/usr/share/java/monitoring-interceptors/monitoring-interceptors-5.1.0.jar \
    confluentinc/cp-kafka-connect:5.1.0 \
    bash -c 'kafka-console-consumer --consumer-property group.id=qs-consumer --consumer-property interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor --bootstrap-server kafka:9092 --topic c3-test --offset '$OFFSET' --partition 0 --max-messages=1000'
  sleep 1;
  let OFFSET=OFFSET+1000
done

1 .... 1000 Processed a total of 1000 messages

이후 UI를 통해 경과를 확인할 수 있다