[python] Celery의 작업 결과를 실시간으로 Elasticsearch에 저장하는 방법은 어떻게 되는가?

Celery는 Python으로 작성된 분산 작업 처리 라이브러리이며, Elasticsearch는 실시간 검색 및 분석을 위한 분산형 오픈 소스 검색 엔진입니다. Celery를 사용하여 실행한 작업 결과를 실시간으로 Elasticsearch에 저장하는 방법을 알아보겠습니다.

  1. Celery 설치 및 설정

    먼저 Celery 라이브러리를 설치합니다.

    pip install celery
    

    Celery를 설정하기 위해 celery.py 파일을 생성하고, 다음과 같이 설정합니다.

    from celery import Celery
    
    app = Celery('myapp', broker='amqp://guest@localhost//', backend='elasticsearch://localhost:9200')
    
    app.conf.task_serializer = 'json'
    app.conf.result_serializer = 'json'
    app.conf.accept_content = ['json']
    app.conf.result_backend = 'elasticsearch://localhost:9200'
    app.conf.result_elasticsearch_index = 'celery'
    app.conf.result_elasticsearch_type = 'task'
    

    위 설정에서 broker는 Celery가 메시지를 교환할 RabbitMQ 또는 Redis와 같은 브로커를 지정합니다. backend는 작업 결과를 저장할 백엔드를 지정하는데, 여기서는 Elasticsearch를 사용합니다.

  2. 작업 정의 및 실행

    Celery 작업을 정의하기 위해 tasks.py 파일을 생성하고, 다음과 같이 작업을 정의합니다.

    from celery import shared_task
       
    @shared_task
    def add(x, y):
        result = x + y
        # Elasticsearch에 작업 결과 저장
        app.send_task('yourapp.tasks.save_result', [result])
        return result
    

    위 코드에서 save_result라는 작업을 실행하여 작업 결과를 Elasticsearch에 저장합니다.

    작업을 실행하기 위해 Celery worker를 시작합니다.

    celery -A yourapp.celery worker --loglevel=info
    
  3. 작업 결과 저장

    작업 결과를 Elasticsearch에 저장하기 위해 save_result 작업을 정의합니다.

    @shared_task
    def save_result(result):
        from elasticsearch import Elasticsearch
        es = Elasticsearch()
    
        # Elasticsearch에 작업 결과 저장
        es.index(index='celery', doc_type='task', body={'result': result})
    

    위 코드에서는 Elasticsearch 클라이언트를 사용하여 celery 인덱스에 task 문서 타입으로 작업 결과를 저장합니다.

  4. 결과 확인

    작업 결과를 Elasticsearch에서 확인하기 위해 다음과 같이 쿼리를 실행할 수 있습니다.

    from elasticsearch import Elasticsearch
    es = Elasticsearch()
    
    response = es.search(index='celery', doc_type='task')
    for hit in response['hits']['hits']:
        print(hit['_source']['result'])
    

    위 코드는 celery 인덱스의 task 문서 타입의 모든 결과를 출력합니다.

이렇게 Celery와 Elasticsearch를 함께 사용하여 작업 결과를 실시간으로 저장하고 검색할 수 있습니다. Elasticsearch를 사용하면 작업 결과를 쉽게 분석하고 실시간으로 모니터링할 수 있습니다. Celery와 Elasticsearch의 더 자세한 내용은 공식 문서를 참고하시기 바랍니다.

참고 자료: