RabbitMQ 매뉴얼
목차
- RabbitMQ 소개
- Work Queue 활용
- Pub/sub 기능 활용
- Routing 활용
- Topic 활용
작성일 : 2018.09.11
2. Work Queue 활용
Work Queues란?

작업 큐를 만들어서 time-consuming task를 여러 worker에게 분할한다.
-
resource-intentive한task를 행하면서 계속 끝나기를 기다리는 비효율적인 현상을 방지한다. -
대신에
task가 나중에 실행되도록 스케줄링 하는 것!-
task를message로 만들어서 큐에 보낸다. -
worker프로세스는 백그라운드에서task를 뽑아서 처리한다.worker가 여러 개일 경우,task들은 워커들 사이에 공유된다.
-
-
장점
-
쉽게 병행 처리를 할 수 있게 된다.
-
쉽게
worker의 scale-out을 만들어낼 수 있다.
-
Work Queues의 특징과 기능들
Round-Robin Dispatching
-
RabbitMQ는 기본적으로 라운드로빈 형식으로consumer에게 일을 준다.따라서 평균적으로 각
consumer들은 같은 수의 메시지들을 가지게 된다.
Message Acknowledgement
-
이 상황에서 한
cousumer가 엄청 긴task를 맡아서 하게 되었다면? 심지어 하다가 죽는다면?… 현재는
RabbitMQ가 메시지를 전달하고 나면 큐에서 해당 메시지를 즉시 삭제하기 때문에, 처리 중에 죽는 메시지는 유실된다.… 특정
worker에게 전달되었지만 아직 처리되지 않은 메시지도 날아간다. -
Message Acknowledgment : 이런 유실 현상을 방지하기 위한 방법이다.
-
특정 메시지를 수신하고 처리까지 마친 뒤에,
RabbitMQ에게 해당 메시지를 지워도 된다고 알려준다. -
consumer가ack를 보내지 않은 채로 죽는다면,RabbitMQ는 해당 메시지가 완벽히 처리되지 않은 걸로 이해하고 re-queue한다.그리고 다른
consumer에게 해당 메시지를 넘긴다. -
메시지가 오가는 채널과 같은 채널에
ack를 전송해야 함에 유의한다.
-
Message Durability
-
ack를 써도,RabbitMQ서버 자체가 죽어버리면 현재 메시지들을 또 잃게 될 수도 있다. -
큐와 메시지를 둘다 durable 하게 세팅해 이런 일을 막는다.
ch.assertQueue(큐_이름, { durable: true });ch.sendToQueue(큐_이름, 메시지, { persistent: true });
Fair Dispatch

- 라운드로빈 방식의 경우, 특정
worker에게 작업이 몰릴 수 있다는 단점이 있다. - 이를 방지하기 위해
prefetch()를 사용한다. ch.prefetch(1)- 해당
worker에게 한 번에 하나의 일만 주라고 하는 것! - 이전에 준 메시지에 대한
ack가 오지 않았으면 새 메시지를 할당하지 말고, 다음 순서에 있는worker엑 ㅔ넘기도록 한다.
- 해당
- 근데 이렇게 하다보면 밀린 일들이 처리가 되지 않아 큐가 꽉 찰 수 있음에 유의해야 한다.
new_task.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";
// 보낼 메시지의 내용을 argv로 받아온다.
ch.assertQueue(q, {durable: true});
// durable을 true로 설정해 RabbitMQ 서버가 죽어도 잃어버리지 않게 한다.
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
// persistent을 true로 설정해 잃어버리지 않게 한다.
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
worker.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';
ch.assertQueue(q, {durable: true});
ch.prefetch(1);
// prefetch를 1로 설정해 1개 이상의 메시지를 동시에 받지 않도록 한다.
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000); // setTimeout을 통해 메시지를 처리하는 데 시간이 드는 '척'한다.
}, {noAck: false}); // noAck을 false로 설정해 RabbitMQ에게 ack를 전달하도록 한다.
});
});
참고 문헌
RabbitMQ 공식 홈페이지의 튜토리얼을 번역하며 공부한 내용입니다.