이번 예제에서는 Python의 aiohttp 라이브러리를 사용하여 비동기적으로 웹 크롤러의 결과를 Telegram으로 전송하는 방법에 대해 알아보겠습니다.
1. Telegram Bot 생성
먼저 Telegram에서 Bot을 생성해야 합니다. Telegram에서 BotFather
를 검색하고, BotFather에게 /newbot
명령어를 보내서 새로운 Bot을 생성합니다. BotFather는 Token을 제공해줄 것입니다. 이 Token은 우리가 Python 코드에서 사용할 것입니다.
2. aiohttp를 사용하여 웹 크롤러 작성
이제 aiohttp를 사용하여 웹 크롤러를 작성해보겠습니다. 아래는 간단한 예제입니다.
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'http://example.com',
'http://google.com',
'http://bing.com'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# 크롤링 결과를 Telegram으로 전송하는 함수 호출
send_to_telegram(results)
def send_to_telegram(results):
# Telegram Bot에게 결과 전송하는 코드 작성
pass
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
위의 코드에서 fetch
함수는 aiohttp를 사용하여 비동기적으로 URL을 가져오는 함수입니다. main
함수에서는 fetch
함수를 병렬로 실행하고, 결과를 수집한 후 send_to_telegram
함수를 호출하여 Telegram으로 결과를 전송하도록 구현할 수 있습니다.
3. Telegram으로 결과 전송하기
Telegram으로 결과를 전송하기 위해서는 Bot이 필요합니다. 아래는 Telegram Bot API를 통해 Bot이 메시지를 전송하는 코드입니다.
import requests
def send_to_telegram(results):
bot_token = 'YOUR_BOT_TOKEN'
chat_id = 'YOUR_CHAT_ID'
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
headers = {'Content-Type': 'application/json'}
for result in results:
message = f"크롤링 결과: {result}"
data = {'chat_id': chat_id, 'text': message}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
print("메시지 전송 성공!")
else:
print("메시지 전송 실패.")
# 코드에서 'YOUR_BOT_TOKEN'과 'YOUR_CHAT_ID'는 각각 생성한 Bot의 Token과 대화방(Chat)의 ID로 대체되어야 합니다.
위의 코드에서 bot_token
은 BotFather에서 생성한 Token을 사용하고, chat_id
는 메시지를 전송할 대화방의 ID를 사용해야 합니다.
4. 실행 및 결과 확인
모든 설정이 완료되었다면 Python 스크립트를 실행하여 웹 크롤러를 동작시킬 수 있습니다. 크롤링 결과는 Telegram으로 전송되며, 메시지 전송 성공 여부는 출력됩니다.
이제 aiohttp를 사용하여 비동기적으로 웹 크롤러 결과를 Telegram으로 전송할 수 있습니다. 이를 활용해 새로운 프로젝트를 시작하거나 기존 프로젝트에 적용해보세요!