Celery는 Python으로 작성된 분산 작업 처리 라이브러리이며, Elasticsearch는 실시간 검색 및 분석을 위한 분산형 오픈 소스 검색 엔진입니다. Celery를 사용하여 실행한 작업 결과를 실시간으로 Elasticsearch에 저장하는 방법을 알아보겠습니다.
-
Celery 설치 및 설정
먼저 Celery 라이브러리를 설치합니다.
pip install celery
Celery를 설정하기 위해
celery.py
파일을 생성하고, 다음과 같이 설정합니다.from celery import Celery app = Celery('myapp', broker='amqp://guest@localhost//', backend='elasticsearch://localhost:9200') app.conf.task_serializer = 'json' app.conf.result_serializer = 'json' app.conf.accept_content = ['json'] app.conf.result_backend = 'elasticsearch://localhost:9200' app.conf.result_elasticsearch_index = 'celery' app.conf.result_elasticsearch_type = 'task'
위 설정에서
broker
는 Celery가 메시지를 교환할 RabbitMQ 또는 Redis와 같은 브로커를 지정합니다.backend
는 작업 결과를 저장할 백엔드를 지정하는데, 여기서는 Elasticsearch를 사용합니다. -
작업 정의 및 실행
Celery 작업을 정의하기 위해
tasks.py
파일을 생성하고, 다음과 같이 작업을 정의합니다.from celery import shared_task @shared_task def add(x, y): result = x + y # Elasticsearch에 작업 결과 저장 app.send_task('yourapp.tasks.save_result', [result]) return result
위 코드에서
save_result
라는 작업을 실행하여 작업 결과를 Elasticsearch에 저장합니다.작업을 실행하기 위해 Celery worker를 시작합니다.
celery -A yourapp.celery worker --loglevel=info
-
작업 결과 저장
작업 결과를 Elasticsearch에 저장하기 위해
save_result
작업을 정의합니다.@shared_task def save_result(result): from elasticsearch import Elasticsearch es = Elasticsearch() # Elasticsearch에 작업 결과 저장 es.index(index='celery', doc_type='task', body={'result': result})
위 코드에서는 Elasticsearch 클라이언트를 사용하여
celery
인덱스에task
문서 타입으로 작업 결과를 저장합니다. -
결과 확인
작업 결과를 Elasticsearch에서 확인하기 위해 다음과 같이 쿼리를 실행할 수 있습니다.
from elasticsearch import Elasticsearch es = Elasticsearch() response = es.search(index='celery', doc_type='task') for hit in response['hits']['hits']: print(hit['_source']['result'])
위 코드는
celery
인덱스의task
문서 타입의 모든 결과를 출력합니다.
이렇게 Celery와 Elasticsearch를 함께 사용하여 작업 결과를 실시간으로 저장하고 검색할 수 있습니다. Elasticsearch를 사용하면 작업 결과를 쉽게 분석하고 실시간으로 모니터링할 수 있습니다. Celery와 Elasticsearch의 더 자세한 내용은 공식 문서를 참고하시기 바랍니다.
참고 자료: