Celery와 Elasticsearch를 이용한 실시간 검색

소개

실시간 검색은 사용자가 쿼리를 입력하면 최신 데이터를 포함하여 즉각적으로 결과를 반환하는 기능입니다. 이를 구현하기 위해 Celery와 Elasticsearch를 결합할 수 있습니다. Celery는 비동기 작업 큐를 제공하고, Elasticsearch는 검색 엔진으로 사용됩니다.

Celery 설정

Celery를 사용하기 위해 먼저 Celery를 설치해야 합니다. 다음 명령을 사용하여 Celery를 설치합니다.

pip install celery

Celery는 작업자(worker)와 작업(queue)을 구성해야 합니다. Celery 설정 파일(celery.py)을 다음과 같이 작성합니다.

from celery import Celery

app = Celery('myapp', broker='amqp://guest@localhost//', backend='rpc://')

@app.task
def add(x, y):
    return x + y

위의 예시에서는 작업자와 작업을 추가하는 간단한 설정을 만들었습니다. 작업자는 RabbitMQ를 사용하는 것을 전제로 하고 있습니다.

Elasticsearch 설정

Elasticsearch를 사용하기 위해서는 Elasticsearch를 설치해야 합니다. Elasticsearch 공식 홈페이지에서 다운로드하여 설치할 수 있습니다.

Elasticsearch와 연결하는 Python 라이브러리 elasticsearch-py를 설치해야 합니다. 다음 명령을 사용하여 설치할 수 있습니다.

pip install elasticsearch

Elasticsearch 연결 및 인덱스 생성 코드는 다음과 같습니다.

from elasticsearch import Elasticsearch

es = Elasticsearch()

# 인덱스 생성
es.indices.create(index='my_index', ignore=400)

위의 코드에서는 Elasticsearch와 연결하고, ‘my_index’ 라는 인덱스를 생성하고 있습니다. 이 인덱스는 실시간 검색에 사용됩니다.

실시간 검색 구현

Celery와 Elasticsearch를 사용하여 실시간 검색을 구현하려면 다음 단계를 따릅니다.

  1. Celery 작업으로 색인 업데이트를 수행합니다.
    from celery import Celery
    from elasticsearch import Elasticsearch
    
    app = Celery('myapp', broker='amqp://guest@localhost//', backend='rpc://')
    es = Elasticsearch()
    
    @app.task
    def update_index(index_name, data):
        # Elasticsearch에 데이터 색인 업데이트
        res = es.index(index=index_name, body=data)
        return res['result']
    

    위의 코드에서는 Celery 작업으로 Elasticsearch에 데이터를 색인 업데이트하는 작업을 수행하는 함수를 정의하였습니다.

  2. 실시간 검색 수행하기 위한 API를 작성합니다.
    from flask import Flask, request, jsonify
    from celery import Celery
    
    app = Flask(__name__)
    app.config['CELERY_BROKER_URL'] = 'amqp://guest@localhost//'
    app.config['CELERY_RESULT_BACKEND'] = 'rpc://'
    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    
    @celery.task
    def search(query):
        es = Elasticsearch()
        result = es.search(index='my_index', body=query)
        return result['hits']['hits']
    
    @app.route('/search', methods=['POST'])
    def perform_search():
        query = request.json.get('query')
        if query:
            result = search.delay(query)
            return jsonify({"status": "ok", "task_id": result.task_id}), 202
        else:
            return jsonify({"status": "fail", "message": "Query is missing"}), 400
    

    위의 코드에서는 Flask 웹 애플리케이션을 작성하고, /search 경로로 POST 요청을 받아 실시간 검색을 수행하는 API를 구현하였습니다.

결론

Celery와 Elasticsearch를 이용한 실시간 검색을 구현하는 방법에 대해 알아보았습니다. Celery는 비동기 작업 큐를 제공하고, Elasticsearch는 검색 엔진으로 사용됩니다. 이 두 가지 기술을 조합하여 실시간 검색을 구현할 수 있습니다. 이를 통해 사용자는 빠르게 최신 데이터를 검색할 수 있으며, 확장성과 유연성을 높일 수 있습니다.

#elasticsearch #celery