오늘날, 많은 시스템은 단일 애플리케이션에서 다중 애플리케이션으로, 계산 중심에서 데이터 중심으로 확장되고 있습니다. 그에 따라, 외부 연동 서버를 이용도 증가하고 있으며 효율적인 관리 기술이 필요합니다. 다양한 외부 서버를 연동하면서 비동기 처리를 하기 위한 솔루션들이 많은데 이 중에 하나인 RabbitMQ를 알아봅니다.
* AMQP(Advanced Message Queuing Protocol)
먼저, MQ (Message Queue) 란 메시지 기반의 미들웨어로 메시지를 이용하여 여러 어플리케이션, 시스템, 서비스들을 연결해주는 솔루션입니다. MOM(Message Oriented Middleware) 를 구현한 솔루션으로 비동기 메시지를 사용하는 서비스들 사이에서 데이터를 교환해주는 역할을 합니다. MQ를 오픈소스로하여 표준 프로토콜이 AMQP이고 그 중 하나가 RabbitMQ입니다.
* RabbitMQ 구조
RabbitMQ의 라우팅 모델은 Exchange, Binding, Queue 총 3가지로 이루어 집니다. RabbitMQ를 사용할 때 publisher는 결코 queue에 직접적으로 메세지를 보내지 않습니다. publisher는 exchange에 메세지를 보냅니다. Exchange는 라우팅, binding, 헤더 특징에 근거해 적절한 queue로 메세지를 보냅니다. Exchange는 우리가 정의할 수 있는 메세지 라우팅 에이전트이며 binding은 exchange와 queue를 연결합니다.
Exchange는 메세지를 어떻게 라우팅 할지 정하는 일종의 알고리즘이며, Publisher(Producer)로부터 수신한 메시지를 큐에 분배하는 라우터 역할을 합니다.
Queue는 메시지를 메모리나 디스크에 저장했다가 Cusomer에게 메시지를 전달하는 역할을 합니다.
Binding은 exchange와 queue의 관계를 정의합니다.
*Exchagne Type
1. Direct Exchange
라우팅 키에 근거해 메세지는 binding 규칙에서 정해진 라우팅 키를 가진 queue로 이동됩니다. exchange의 라우팅 키와 binding queue는 완전히 동일해야 합니다. 메세지는 정확히 하나의 queue로만 이동합니다.
2. Fanout Exchange
메세지는 모든 이용가능한 queue에 라우팅됩니다. 라우팅 키가 제공되어도 무시됩니다. 따라서, 일종의 발행-구독 디자인패턴의 한 종류로도 볼 수 있습니다.
3. Topic Exchange
라우팅 키가 사용되지만, direct exchange와는 다르게 exchange와 queue의 라우팅 키들이 무조건 완벽하게 동일해야 할 필요가 없습니다. 와일드카드 같은 일반 정규식을 사용해서 exchange에서 여러개의 queue에 전송할 수 있습니다.
4. Header Exchange
라우팅 키 대신에 header에 있는 기준에 의해서 정해집니다. topic exchange와 비슷하지만, 라우팅 queue를 선택하기 위해 훨씬 더 복잡한 기준을 구체화 할 수 있습니다.
*RabbitMQ
RabbitMQ는 AMQP를 구현한 오픈 소스 메시지 브로커 소프트웨어로 Publisher(Producer)로부터 메시지를 받아 Consumer에게 라우트하는 것이 주된 역할입니다. 이를 이용하면 작업 큐, 발행 및 구독, 라우팅, 주제, 원격 프로시저 호출 등의 모델을 구현할 수 있으며, 이 글에서는 작업 큐를 구현하는 방법을 다룹니다.
*Work Queue
Work Queue의 기본 개념은 자원 집중적인 작업을 즉시 수행하고 완료될 때까지 기다려야 하는 것을 방지하는 것입니다. 대신에, 작업이 나중에 처리되도록 예약합니다. 작업을 메세지로 캡슐화하고 큐로 전송합니다. 백그라운드에서 실행되고 있는 작업자 프로세스는 작업을 꺼내고 최종적으로 일을 처리합니다. 많은 작업자들을 실행한다면, 작업들이 공유됩니다.
이 개념은 짧은 HTTP 요청동안 복잡한 작업을 처리하기 어려운 웹 어플리케이션에서 특히 효과적입니다.
만약 여러개의 작업자가 있다면, RabbitMQ는 각각 메세지를 consumer들에게 순차적으로 보냅니다. 이러한 방식은 라운드로빈(round-robin) 방식이라고 불리며 예를 들어 결과가 다음과 같습니다.
//1번 Queue
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
//2번 Queue
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
* Message acknowledgment
작업을 처리하는데 몇초가 걸립니다. 만약에 하나의 consumer가 오랜 작업을 시작하고, 부분적으로 끝내고 죽어버린다면 어떤 일이 벌어질까요? RabbitMQ는 메세지를 consumer에게 전달하면, 그 즉시 삭제표시를 합니다. 만약에 작업자를 죽인다면, 방금 처리한 메세지가 손실됩니다. 또한 해당 작업자에게 할당되었지만 아직 처리되지 않은 메세지들도 손실됩니다.
하지만, 손실을 방지하기 위해서, 작업자가 죽어버리면, 해당 작업들을 다른 작업자들에게 전송합니다.
메세지가 유실되지 않도록하기 위해, RabbitMQ는 message acknowledgments를 제공합니다. acknowledgement는 특정 메세지를 수신, 처리되었으며 RabbitMQ가 이를 삭제할 수 있음을 RabbitMQ에 알리기 위해 승인을 다시 보냅니다.
만약에 consumer가 ack을 다시 보내지 못하고 죽어버린다면, RabbitMQ는 메세지가 완전히 처리되지 않았고, 따라서 다시 queue에 넣어야한다고 이해합니다. 만약에 그 시점에 또 다른 consumer가 존재한다면, 해당 consumer에게 메세지를 전달합니다. 이에 따라, 작업자가 죽는 한이 있더라도, 메세지를 유실할 위험이 없습니다.
timeout(기본 30분)은 consumer 배송 승인에 강제됩니다. 이를 통해, 결코 배송이 승인되지 않은 버그성 consumer들을 감지할 수 있습니다.
...
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
...
autoAck은 false로 설정되어 있어야 합니다. 만약에 true로 설정되어 있으면, 메세지를 가져옴과 동시에 ack을 보내서 RabbitMQ 서버에서는 메세지가 사라집니다. 따라서 메세지를 가져와서 처리가 끝날 때 수동으로 ack를 보낼 수 있도록 false로 설정합니다. ack을 보내기 전까지는 이 메세지를 다른 consumer에게 주지도 않고, queue에서 제거하기도 않습니다. 정상처리가 안된다면 nack을 보내면 됩니다.
*Message durability
consumer가 죽는 경우 메세지 복구를 알아보았는데, 만약에 RabbitMQ 서버가 죽어버린다면 작업들은 어떻게 될까요? RabbitMQ가 중단되거나, 충돌하면 따로 처리하지 않는 이상 queue와 메세지들을 잊어버립니다. 이것을 방지하기 위해서는 queue와 메세지에 durability을 꼭 표시해야 합니다.
먼저, RabbitMQ가 재실행해도 queue가 다시 살아날 수 있도록 해야합니다.
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
durable을 true로 설정하여, task_queue가 RabbitMQ가 재시작하더라도 아무것도 손실되지 않도록 방지합니다.
다음으로, 메세지를 영구 저장하기 위해서, MessageProperties를 PERSISTENT_TEXT_PLAIN으로 설정합니다.
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
*주의
메세지를 영구적으로 표시한다고 해서 메세지가 손실되지 않음을 100% 보장할 수는 없습니다. 비록, RabbitMQ에게 메세지를 디스크에 저장하라고 명령해도, RabbitMQ가 메세지를 받고, 저장하지 못하는 짧은 순간이 있습니다. 또한, RabbitMQ는 모든 매새지를 fsync(2)하지 못하므로, 단지 캐시에만 저장되고 실제로 디스크에 저장이 되지 않을 수도 있습니다. 영구 보장은 강력하지 않지만, 단순한 작업 queue보다는 훌륭합니다. 더 강력한 보장을 위해서는 다음을 참고합니다.
https://www.rabbitmq.com/confirms.html
*Fair dispatch
만약에 2개의 작업자가 있는데, 홀수 메세지는 굉장히 무겁고, 짝수 메세지는 굉장히 가볍다면 어떤 일이 벌어질까요? 하나의 작업자는 끊임없이 바쁘고, 다른 작업자는 거의 일이 없을 것입니다. RabbitMQ는 이 사실을 전혀 알지 못하고 메세지를 균등하게 보내는 것에만 관심이 있습니다.
이유는 RabbitMQ는 메세지가 queue에 들어온 것을 기준으로 작업자에게 전송하기 때문입니다. consumer에 있는 unack 메세지는 확인하지 않습니다. 단지, 매번 n개의 메세지는 n개의 consumer에게 보내기만 합니다.
이를 해결하기 위해서는 basicQos의 prefetchCount를 1로 설정합니다. 이를 통해 RabbitMQ가 한번에 1개의 메세지만 보내도록 제한합니다. 즉, 하나의 일이 끝나고 ack를 받기 전까지는 새로운 메세지를 보내지 않습니다. 대신에, 그 시간에 작업을 하지 않는 다른 작업자에게 메세지를 전송합니다.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
*Windows에 설치하기
Windows 환경에 rabbitMQ를 설치하려면 먼저 Erlang이 설치되어 있어야 한다. 아래 사이트에서 Download Windows installer로 다운받습니다.
https://www.erlang.org/downloads
다음으로 rabbitMQ를 다운받습니다.
https://www.rabbitmq.com/install-windows.html#service
윈도우 버전의 Download를 실행합니다.
시작에서 rabbitMQ를 검색하고 관리자 권한으로 실행한다. 활성화를 시킵니다.
rabbitmq-plugins enable rabbitmq_management
rabiitmq를 종료시키고 설치후 다시 시작합니다.
> rabbitmq-service.bat stop
> rabbitmq-service.bat install
> rabbitmq-service.bat start
정상적으로 설치가 완료되었습니다.
* CentOs에서 설치하기
sudo yum install -y docker
sudo systemctl start docker
sudo chmod 666 /var/run/docker.sock
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
* 접속하기
http://localhost:15672/ 에 들어가서 guest/guest 로 접속합니다.
다음과 같이 Overview 화면이 나온다.
* 자주쓰는 포트번호 참고
epmd : 4369
Erlang Distribution : 25672
AMQP 0-9-1 Without and With TLS : 5671, 5672
Management Plugin : 15672
STOMP : 61613, 61614
MQTT : 1883, 8883
* 로그 파일 위치
C:\Users\계정명\AppData\Roaming\RabbitMQ\log
application.yml에 계정 정보, port 번호 등을 입력합니다
Queues 탭으로 이동해서 Add a new queue를 선택해 새로운 큐를 만든다. 이름은 CREATE_POST_QUEUE로 합니다.
Unacked란, Unacknowledgement의 약자입니다.
Unacknowledgement는 데이터의 안전을 위해 사용합니다. 작업자에게 메세지가 도착하고 성공적으로 처리됨을 보장합니다. Consumer가 작업 중간에 메세지를 분실을 하였고 Rabbit MQ를 확인하지 않더라도, 메시지는 궁극적으로 손실되지 않고 다시 처리할 수 있습니다.
생성된 큐의 결과는 다음과 같습니다.
rabbitmq를 이용해서 저장을 한다면 producer consumer에 의해 다음과 같이 실행됨을 알 수 있습니다.
* 코드로 확인하기
게시글 작성을 위한 Producer와 Consumer 구조를 간단하게 알아보겠습니다.
build.gradle에 rabbitMQ 사용을 위해 다음과 같은 의존성을 추가합니다
implementation 'org.springframework.boot:spring-boot-starter-amqp'
게시글을 작성하는 요청을 추가합니다.
@PostMapping("/post")
@ResponseStatus(HttpStatus.CREATED)
public Post create(@RequestBody Post post) throws JsonProcessingException {
String jsonPost = objectMapper.writeValueAsString(post);
producer.sendTo(jsonPost);
return post;
}
ObjectMapper를 통해서 자바의 클래스 객체를 json 형태로 변환하여 Producer에 전달합니다.
@Component
public class Producer {
private final RabbitTemplate rabbitTemplate;
public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendTo(String message) {
this.rabbitTemplate.convertAndSend("CREATE_POST_QUEUE", message);
}
}
Producer에서는 RabbitTemplate을 통해서 Queue에 json 데이터를 전달합니다. CREATE_POST_QUEUE라고 만든 큐에 들어갑니다.
@Component
public class Consumer {
private final PostService postService;
private final ObjectMapper objectMapper;
public Consumer(PostService postService, ObjectMapper objectMapper) {
this.postService = postService;
this.objectMapper = objectMapper;
}
@RabbitListener(queues = "CREATE_POST_QUEUE")
public void handler(String message) throws JsonProcessingException {
Post post = objectMapper.readValue(message, Post.class);
post.setCreatedDate(LocalDateTime.now());
postService.create(post);
}
}
Consumer에서는 ObjectMapper로 json으로 받았던 데이터를 다시 객체로 바꾸고, 저장을 합니다. @RabbitListener를 통해서 우리가 만든 CREATE_POST_QUEUE를 항상 Listen하고 있다가 Producer에서 데이터를 보내면 처리하면 됩니다.
이렇게 Producer, Consumer로 작업하는 것이 일반적으로 DB에 I/O를 수행하는 것보다 나은 성능을 보여줍니다.
* 참고
https://m.blog.naver.com/tmondev/221051503100
'문제 해결, 기술 비교 > 개인프로젝트(북클럽)' 카테고리의 다른 글
sentry.io로 에러 로그 관리하기 (0) | 2022.06.01 |
---|---|
Elastic Search 연동 및 테스트하기 (0) | 2022.05.27 |
cerbot로 SSL 인증서 만들기 (0) | 2022.05.24 |
application.yml 분리하기 (0) | 2022.05.22 |
jenkins build시, slack 알람 연동하기 (0) | 2022.05.22 |