RabbitMQ 소개

메시지큐들이 너무 많아서 뭐를 써야 할지 모르겠다.

RabbitMQ의 설치

Docker로 설치

Dockerfile은 아래와 같이 만든다. 처음에는 안만들어도 상관없지만, 어차피 나중에 만들 것이라서 함께 만든다.

## Dockerfile

FROM rabbitmq:3-management

EXPOSE 5672 15672

아래와 같이 이미지 이름을 my-rabbitmq로 하여 도커 이미지를 만든다.

## 이미지 만들기
$  docker build -t my-rabbitmq .

## 이미지 만들어졌는지 확인
$  docker image ls

이제 아래와 같은 docker-compose.xml 파일을 만든다. 컨테이너 이름은 c-my-rabbitmq로 했다.

## docker-compose.xml

version: '3'
services:
    my-rabbitmq:
        image: my-rabbitmq
        container_name: c-my-rabbitmq
        ports:
            - 5672:5672
            - 15672:15672
        restart: unless-stopped
        environment:
            RABBITMQ_DEFAULT_USER: admin
            RABBITMQ_DEFAULT_PASS: 1111

아래의 명령을 실행하면 컨테이너가 실행된다.

## 포그라운드로 도커 컨테이너 실행
$  docker-compose up --build

## 백그라운드로 도커 컨테이너 실행할 때는 -d(detach) 옵션 추가
$  docker-compose up -d --build

여기까지가 설치의 끝이다.

RabbitMQ의 설정

RabbitMQ를 설정하려면 RabbitMQ를 이해해야 한다. 그 중에 연결을 위해 반드시 필요한 포트번호에 대해서만 일단 적는다.

포트번호

RabbitMQ 관리자 페이지 접속

브라우저에서 아래의 URL에 접속한다.

ID/PW는 admin/1111

http://localhost:15672/

RabbitMQ Client 프로그램 작성

파이썬으로 RabbitMQ Client 프로그램을 작성하려면 파이썬용 RabbitMQ 라이브러리가 필요하다. 여기서는 pika를 사용한다. RabbitMQ 팀에서 추천하는 파이썬 라이브러리가 pika이다.

pika 설치

$  pip install pika

RabbitMQ에 메시지 보내기

아직 작성하지는 말고 그냥 눈으로 코드를 살펴보자. 연결을 하고 메시지를 보내는 샘플코드이다.

import pika

params = pika.URLParameters("amqp://admin:1111@localhost:5672/")
connection = pika.BlockingConnection(params)
channel = connection.channel()

channel.basic_publish(exchange='cafe.topic',
                      routing_key='order.coffee.1',
                      body=b'hello world')
connection.close()

RabbitMQ 튜토리얼

이제 대충 겉모양은 확인하였고 단계별로 테스트해보자.

아래의 RabbitMQ의 공식문서를 내 나름으로 정리해본다.

https://www.rabbitmq.com/getstarted.html

튜토리얼1 Hello world

RabbitMQ는 메시지를 받아서 전달하는 메시지 브로커다. 이것은 우체국처럼 생각할 수 있다. 누군가 우체국에 편지를 보내면, 그 편지가 수신자에게 도착한다. RabbitMQ는 우편함이자, 우체국이고, 배달원으로 생각할 수 있다.

연결 수립

아래는 RabbitMQ에 연결을 수립하는 코드이다.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

큐 만들기

연결이 되어 메시지를 보내면 메시지는 큐로 보내진다. 따라서 메시지를 보내기 전에 큐를 만들어야 한다. 관리자 페이지에서 만들 수도 있고, 프로그램에서 만들 수도 있다. 존재하지 않는 큐에 메시지를 보내면 RabbitMQ는 그냥 버린다.

아래는 hello 큐를 생성하는 코드이다. 큐의 이름이 hello이다.

channel.queue_declare(queue='hello')

큐에 메시지 보내기

이제 hello 큐에 메시지를 보낼 수 있다.

아래와 같이 디폴트 익스체인지를 사용하여 hello 큐에 “Hello world”를 보낸다.

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

연결 닫기

프로그램을 종료할 때는 아래와 같이 연결을 닫아주어야 한다.

connection.close()

메시지 받기

메시지를 수신하는 프로그램을 만들어보자.

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

튜토리얼2 Work queue

라운드 로빈 디스패칭(Round-robin dispatching)

메시지 Ack 개념

메시지 손실에 대한 문제
자동 Ack

이전에 테스트 했던 소비자 코드는 다음과 같다. auto_ack=True로 설정되어 있어서 자동으로 Ack가 보내진다. 참고로 auto_ack의 기본값은 False이다.

channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)
수동 Ack
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep( body.count('.') )
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello',
                      auto_ack=False,
                      on_message_callback=callback)

메시지 내구성(Durability) 개념

channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

공정한 분배(Fair dispatch)

channel.basic_qos(prefetch_count=1)

지금까지 다룬 코드를 정리하면 다음과 같다.

## 생산자 new_task.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()
## 소비자 worker.py

import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

튜토리얼3 발행과 구독(publish/subscribe)

익스체인지(Exchange)

fanout

channel.exchange_declare(exchange='logs', exchange_type='fanout')

## fanout에서는 routing_key가 의미없다. 무시됨
channel.basic_publish(exchange='logs', routing_key='', body=message)

임시 큐들(Temporary queues)

result = channel.queue_declare(queue='', durable=False) # 랜덤큐 생성

## 랜덤 큐의 이름을 출력
print(result.method.queue) # ex: amq.gen-JzTY20BRgKO-HjmUJj0wLg
result = channel.queue_declare(queue='', exclusive=True)

바인딩(Bindings)

channel.queue_bind(exchange='logs', queue=result.method.queue)

소스코드

여기까지의 코드를 정리하면 아래와 같다.

아래의 소비자 프로그램에서

## 소비자 receive_logs.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

## 생산자 emit_log.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

튜토리얼4 라우팅(Routing)

바인딩(Bindings)

channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black') # binding key

다이렉트 익스체인지(Direct exchange)

로그 발행 - 생산자

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
log_level="error" # info or warning or error
channel.basic_publish(exchange='direct_logs',
                      routing_key=log_level,
                      body=message)

구독 - 소비자

## 콘솔에 출력하는 소비자
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for log_level in ['info','warn','error']:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=log_level)
## 디스크에 저장하는 소비자
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

log_level='error'
channel.queue_bind(exchange='direct_logs',
                    queue=queue_name,
                    routing_key=log_level)

소스코드

## 실행
$  python emit_log_direct.py error "Run. Run. Or it will explode."
## => [x] Sent 'error':'Run. Run. Or it will explode.'
## emit_log_direct.py - 생산자
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
## 실행
## 에러를 디스크에 저장
$  python receive_logs_direct.py error > logs_from_rabbit.log

## 에러를 콘솔에 출력
$  python receive_logs_direct.py info warn error
## receive_logs_direct.py - 소비자
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

튜토리얼5 Topic

topic 익스체인지

소스 코드

$  python emit_log_topic.py "kern.critical" "A critical kernel error"
## emit_log_topic.py - 생산자
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
$  python receive_logs_topic.py "#"
$  python receive_logs_topic.py "kern.*"
$  python receive_logs_topic.py "kern.*" "*.critical"

## receive_logs_topic.py - 소비자
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()