[python] FastAPI에서 Kafka와 연동하기

Kafka는 대용량의 실시간 데이터 스트리밍을 다루기 위한 분산 데이터 플랫폼입니다. 이 글에서는 FastAPI와 Kafka를 연동하여 실시간 데이터 처리를 수행하는 방법에 대해 알아보겠습니다.

1. Kafka Python 라이브러리 설치하기

Kafka와의 연동을 위해 우선 Kafka Python 라이브러리를 설치해야 합니다. 다음 명령어를 실행하여 라이브러리를 설치합니다.

pip install kafka-python

2. Kafka Producer 생성하기

데이터를 Kafka에 전송하기 위해 Producer를 생성해야 합니다. 다음은 FastAPI에서 Kafka Producer를 생성하는 예시 코드입니다.

import json
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

async def send_data_to_kafka(data: dict):
    producer.send('my-topic', value=data)
    producer.flush()

위의 코드에서 bootstrap_servers는 Kafka 브로커의 호스트와 포트를 지정해야 합니다. value_serializer는 데이터를 JSON 형태로 직렬화하는 함수를 지정합니다.

send_data_to_kafka 함수는 FastAPI의 API 엔드포인트에서 호출되어 데이터를 Kafka로 전송합니다.

3. Kafka Consumer 생성하기

Kafka로부터 데이터를 수신하기 위해 Consumer를 생성해야 합니다. 다음은 FastAPI에서 Kafka Consumer를 생성하는 예시 코드입니다.

import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    data = message.value
    # 데이터 처리 로직 작성

위의 코드에서 bootstrap_servers는 Kafka 브로커의 호스트와 포트를 지정해야 합니다. value_deserializer는 데이터를 JSON 형태로 역직렬화하는 함수를 지정합니다.

Consumer는 for 루프를 통해 지속적으로 메시지를 수신하고 데이터 처리 로직을 수행합니다.

4. FastAPI와의 통합

FastAPI와 Kafka를 연동하는 방법에는 여러 가지가 있습니다. 예를 들어, background tasks, WebSocket, asyncio 등을 이용하여 데이터를 Kafka로 전송하거나 수신할 수 있습니다. 그 중에서도 background tasks를 사용한 예시를 보여드리겠습니다.

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

@app.post("/send-data")
async def send_data(data: dict, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_data_to_kafka, data)
    return {"message": "Data sent to Kafka!"}

위의 코드에서 BackgroundTasks를 이용하여 데이터를 비동기적으로 Kafka로 전송하는 백그라운드 작업을 생성합니다.

이제 FastAPI 애플리케이션에서 POST /send-data 엔드포인트에 데이터를 전송하면 백그라운드 작업이 실행되어 데이터가 Kafka로 전송됩니다.

마무리

이 글에서는 FastAPI와 Kafka를 연동하는 방법에 대해 간단하게 알아보았습니다. FastAPI는 높은 퍼포먼스와 사용자 친화적인 개발 경험을 제공하므로 Kafka와 함께 사용하면 실시간 데이터 처리를 효과적으로 수행할 수 있습니다.

더 자세한 내용은 Kafka PythonFastAPI 공식 문서를 참고하시기 바랍니다.