[java] Apache Avro의 메시징 시스템과 Java 연동

Apache Avro는 데이터 직렬화 및 RPC(Remote Procedure Call) 프레임워크로 널리 사용되는 오픈소스 프로젝트입니다. 이번 글에서는 Avro의 메시징 시스템을 사용하여 Java 언어로 메시지를 송수신하는 방법에 대해 알아보겠습니다.

1. Avro의 메시징 시스템

Avro의 메시징 시스템은 Pub-Sub(발행-구독) 패턴을 기반으로 동작합니다. 이를 통해 하나 이상의 프로듀서(송신자)가 메시지를 발행하고, 하나 이상의 컨슈머(수신자)가 메시지를 수신할 수 있습니다. 메시지는 Avro 스키마를 기반으로 직렬화되어 전달되며, 스키마는 직렬화된 데이터의 구조를 정의하는데 사용됩니다.

2. Avro 메시징 시스템 설정

Avro의 메시징 시스템을 사용하기 위해서는 다음과 같은 설정이 필요합니다.

3. Avro 메시징 시스템과 Java 연동

Java에서 Avro 메시징 시스템을 구현하기 위해 다음 단계를 따라야 합니다.

3.1. Avro 스키마 생성

먼저 메시지의 구조를 정의하는 Avro 스키마를 생성해야 합니다. Avro 스키마는 JSON 형식으로 작성되며, 필드 이름과 유형을 포함합니다. 예를 들어, 다음은 사용자 정보를 포함하는 Avro 스키마의 예입니다.

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"}
  ]
}

이 스키마는 사용자의 id, 이름, 이메일을 포함하고 있습니다.

3.2. 메시지 전송

Avro 메시징 시스템을 사용하여 메시지를 전송하려면 다음과 같은 단계를 따라야 합니다.

  1. Avro 메시지의 스키마를 등록합니다.
  2. 메시지 프로듀서를 생성하고, 해당 토픽에 메시지를 발행합니다.

다음은 Java에서 Avro 메시지를 발행하는 예제 코드입니다.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class AvroProducer {

  public void sendMessage(String topic, Schema schema, GenericRecord message) {
    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(...); // KafkaProducer 설정
    
    try {
      DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
      Encoder encoder = EncoderFactory.get().binaryEncoder(...); // Avro 메시지를 바이트로 인코딩
      writer.write(message, encoder);
      encoder.flush();
      byte[] serializedMessage = ... // 인코딩된 메시지를 byte 배열로 변환
      
      ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, serializedMessage);
      producer.send(record);
      
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      producer.close();
    }
  }
}

3.3. 메시지 수신

메시지를 수신하기 위해서는 다음과 같은 단계를 따르면 됩니다.

  1. Avro 메시지의 스키마를 등록합니다.
  2. 메시지 컨슈머를 생성하고, 해당 토픽에서 메시지를 구독합니다.
  3. 구독한 메시지를 역직렬화하여 사용합니다.

다음은 Java에서 Avro 메시지를 수신하는 예제 코드입니다.

import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;

public class AvroConsumer {

  public void receiveMessage(String topic, Schema schema) {
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(...); // KafkaConsumer 설정
    consumer.subscribe(topic);
    
    try {
      while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(...); // 메시지를 폴링하여 수신
        for (ConsumerRecord<String, byte[]> record : records) {
          byte[] serializedMessage = record.value();
          
          Decoder decoder = DecoderFactory.get().binaryDecoder(serializedMessage, null);
          SpecificDatumReader<GenericRecord> reader = new SpecificDatumReader<>(schema);
          GenericRecord message = reader.read(null, decoder); // 메시지 역직렬화
          
          // 수신한 메시지를 처리하는 로직 작성
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      consumer.close();
    }
  }
}

마무리

위에서 소개한 방법을 따라하면 Java 프로그램에서 Avro의 메시징 시스템을 사용할 수 있습니다. Avro를 통해 데이터를 구조화하고 직렬화하여 효율적인 메시지 교환을 구현할 수 있습니다. 자세한 내용은 Apache Avro 공식 문서를 참고하세요.