[python] Celery로 비동기적으로 Slack 알림을 보내는 방법은 어떻게 되는가?

Celery는 Python으로 작성된 분산 작업 큐/타스크 큐 라이브러리입니다. Slack은 팀 커뮤니케이션 도구로 널리 사용되며, 비동기적으로 알림을 보내고 싶을 때 Celery를 Slack와 통합할 수 있습니다. 이번 블로그에서는 Celery를 사용하여 Slack 알림을 비동기적으로 보내는 방법에 대해 알아보겠습니다.

Slack API 토큰 생성

  1. Slack 애플리케이션을 생성하고, 해당 애플리케이션에 대한 API 토큰을 생성합니다.
  2. Slack 애플리케이션에 접속하여, 애플리케이션 설정 페이지에서 “OAuth & Permissions”를 선택합니다.
  3. “Scopes” 섹션에서 알림을 보내고자 하는 채널에 대한 권한을 부여합니다. “chat:write” 스코프를 선택하세요.
  4. “OAuth Tokens & Redirect URLs” 섹션에서 “Bot User OAuth Token”을 복사합니다.

Celery 설정

Celery를 설치하고 초기화합니다.

pip install celery
from celery import Celery

# Celery 인스턴스 생성
celery = Celery('slack_app', broker='amqp://guest@localhost//', backend='rpc://')

알림 함수 작성

import requests
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

@celery.task
def send_slack_notification(channel, message):
    try:
        # Slack API 클라이언트 생성
        client = WebClient(token='YOUR_SLACK_TOKEN')

        # 알림 전송
        response = client.chat_postMessage(
            channel=channel,
            text=message
        )

        if not response['ok']:
            raise SlackApiError(response['error'])

    except SlackApiError as e:
        print(f"Slack API Error: {e.response['error']}")

    except Exception as e:
        print(f"Error: {str(e)}")

Celery 작업 실행

result = send_slack_notification.delay('CHANNEL_NAME', 'This is a test notification')
print(result.id)

Celery 워커 실행

celery -A slack_app worker --loglevel=info

Celery 워커를 실행하고 나면, 위의 코드를 사용하여 Slack 알림을 비동기적으로 보낼 수 있습니다. send_slack_notification.delay() 함수를 호출하여 알림을 보내며, delay() 함수는 알림을 즉시 보내지 않고 Celery 워커가 이를 처리할 수 있도록 예약합니다.

결론

Celery와 Slack을 통합하여 비동기적으로 알림을 보내는 방법에 대해 알아보았습니다. Celery를 사용하면 작업과 알림을 비동기적으로 처리할 수 있으며, Slack API를 활용하여 팀원들에게 중요한 정보를 신속하게 알릴 수 있습니다.