[python] Celery로 비동기적으로 Slack 알림을 보내는 방법은 어떻게 되는가?
Celery는 Python으로 작성된 분산 작업 큐/타스크 큐 라이브러리입니다. Slack은 팀 커뮤니케이션 도구로 널리 사용되며, 비동기적으로 알림을 보내고 싶을 때 Celery를 Slack와 통합할 수 있습니다. 이번 블로그에서는 Celery를 사용하여 Slack 알림을 비동기적으로 보내는 방법에 대해 알아보겠습니다.
Slack API 토큰 생성
- Slack 애플리케이션을 생성하고, 해당 애플리케이션에 대한 API 토큰을 생성합니다.
- Slack 애플리케이션에 접속하여, 애플리케이션 설정 페이지에서 “OAuth & Permissions”를 선택합니다.
- “Scopes” 섹션에서 알림을 보내고자 하는 채널에 대한 권한을 부여합니다. “chat:write” 스코프를 선택하세요.
- “OAuth Tokens & Redirect URLs” 섹션에서 “Bot User OAuth Token”을 복사합니다.
Celery 설정
Celery를 설치하고 초기화합니다.
pip install celery
from celery import Celery
# Celery 인스턴스 생성
celery = Celery('slack_app', broker='amqp://guest@localhost//', backend='rpc://')
알림 함수 작성
import requests
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
@celery.task
def send_slack_notification(channel, message):
try:
# Slack API 클라이언트 생성
client = WebClient(token='YOUR_SLACK_TOKEN')
# 알림 전송
response = client.chat_postMessage(
channel=channel,
text=message
)
if not response['ok']:
raise SlackApiError(response['error'])
except SlackApiError as e:
print(f"Slack API Error: {e.response['error']}")
except Exception as e:
print(f"Error: {str(e)}")
Celery 작업 실행
result = send_slack_notification.delay('CHANNEL_NAME', 'This is a test notification')
print(result.id)
Celery 워커 실행
celery -A slack_app worker --loglevel=info
Celery 워커를 실행하고 나면, 위의 코드를 사용하여 Slack 알림을 비동기적으로 보낼 수 있습니다. send_slack_notification.delay()
함수를 호출하여 알림을 보내며, delay()
함수는 알림을 즉시 보내지 않고 Celery 워커가 이를 처리할 수 있도록 예약합니다.
결론
Celery와 Slack을 통합하여 비동기적으로 알림을 보내는 방법에 대해 알아보았습니다. Celery를 사용하면 작업과 알림을 비동기적으로 처리할 수 있으며, Slack API를 활용하여 팀원들에게 중요한 정보를 신속하게 알릴 수 있습니다.