[DDDStart] 10장. 이벤트

10. 이벤트

 

1. 시스템 간 강결합의 문제

이벤트를 소개하기 앞서서 이벤트를 왜 사용하는가에 대해 알 수 있다.

여기서는 고객의 상품 구매 취소에 대한 환불 처리를 예시로 든다.

고객이 구매 취소를 했다

class Order {
    
    //...
    
    @Transactional
    public void cancel(RefuncService refundService) { // 주문 취소
        verifyNotYetShipped();
        this.state = OrderState.CANCELED;
        
        this.refundStatus = State.REFUND_STARTED;
        try {
            refundService.refund(getPaymentId()); // 외부의 환불 시스템 (문제점 2)
            this.refundStatus = State.REFUND_COMPLETED;
        } catch(Exception ex) {
           	// ??? 문제점 1
        }
    }
    
    //...
}

 

 

이벤트를 사용하면 이러한 문제점들을 해결 할 수 있다.

특히, 비동기 이벤트를 사용하면 두 시스템 간의 결합을 크게 낮출 수 있다.

 

 

2. 이벤트 개요

이벤트 : “과거에 벌어진 어떤 것”
ex) 암호를 변경 했음, 주문을 취소 했음

 

요구사항 중에 ~ 할 때, ~가 발생하면, 만약 ~하면 과 같은 요구사항은 도메인의 상태 변경과 관련된 경우가 많고 이런 요구사항을 이벤트를 이용해서 구현할 수 있다.

ex) 주문을 취소할 때 이메일을 보낸다 -> 주문이 취소됨 이벤트를 활용하면 된다.

 

<이벤트 구성="" 요소="">

이벤트 생성 주체 이벤트 디스패처 이벤트 핸들러
도메인 객체
(엔터티, 밸류, 도메인 서비스 등)
상태가 바뀌면 관련 이벤트를 발생
이벤트 생성 주체와 이벤트 핸들러를 연결시켜줌
이벤트 디스패처의 구현방식에 따라 이벤트 생성과 처리를 동기비동기로 실행
이벤트 생성 주체가 발생한 이벤트에 반응한다
이벤트에 담긴 데이터를 이용해서 기능 실행

 

<이벤트 구성="">

 

ex) 고객이 배송지를 변경

 

Event (ShippingInfoChangedEvent)

@Getter
@RequiredArgsConstructor
public class ShippingInfoChangedEvent { // 이벤트 종류 (과거 시제를 사용)
    private final String orderNumber; // 추가 데이터
    private final ShippingInfo newShippingInfo; // 추가 데이터
    private long timeStamp; // 이벤트 발생 시간
}

 

이벤트 생성 주체 (Order 애그리거트)

public class Order {
    public void changeShippingInfo(ShippingInfo newShippingInfo) {
        verifyNotYetShipped();
        setShippingInfo(newShippingInfo);
        Events.raise(new ShippingInfoChangedEvent(number, newShippingInfo));
    }
}

 

이벤트 핸들러 (변경된 배송지 정보를 물류 서비스에 재전송)

public class ShippingInfoChangedHandler 
    implement EventHandler<ShippingInfoChangedEvent> {
    
    @Override
    public void handle(ShippingInfoChangedEvent evt) {
        shippingInfoSynchronizer.sync(
        	evt.getOrderNumber(),
        	evt.getNewShippingInfo());
    }
}
// 필요에 따라 Repository 에서 데이터를 조회할 수도 있다.
public class ShippingInfoChangedHandler 
    implement EventHandler<ShippingInfoChangedEvent> {
    
    @Override
    public void handle(ShippingInfoChangedEvent evt) {
		Order order = orderRepository.findById(evt.getOrderNo());
        shippingInfoSynchronizer.sync(
        	order.getNumber.getValue(),
            order.getNewShippingInfo());
    }    
}

 

이벤트는 어떨 때 쓰이나?

이벤트 용도에는 2가지가 있다

 

이벤트 장점

 

3. 이벤트, 핸들러, 디스패처 구현

음 그렇구나

 

4. 동기 이벤트 처리 문제

위에서 동기 이벤트를 통하여 도메인 로직을 간결화하는 이득을 얻었다. (문제점3 해결)

하지만 p280 의 시퀀스 다이어그램을 보면 알 수 있듯이 문제점 1, 2는 여전히 남아있다.

 

OrderService.class

@Transacional
public void cancel(OrderNo orderNo) {
    Events.handle(
    	(OrderCanceledEvent evt) -> refundService.refund(evt.getOrderNumber())
    );
    
    Order order = findOrder(orderNo);
    order.cancel();
}

 

 

5. 비동기 이벤트 처리

이메일 인증과 같이 후속 조치를 즉시 해야하는 것이 아니라, 일정 시간 안에만 처리하면 되는 경우가 많다.

ex) 회원가입 신청하면 인증 이메일 보내라

이벤트: 회원가입 신청했음

핸들러: 이메일 전송

 

비동기 이벤트 처리 4가지 방법

이벤트를 비동기로 구현할 수 있는 방법은 매우 다양하다고 한다.

이 책에서는 4가지 방법을 소개함

  1. 로컬 핸들러를 비동기로 실행
  2. 메시지 큐(MQ) 사용
  3. 이벤트 저장소와 이벤트 포워더 사용
  4. 이벤트 저장소와 이벤트 제공 API 사용

각 방법마다 구현방식이 다르며, 장단점이 있다.

 

1) 로컬 핸들러를 비동기로 실행

동기로 이벤트 처리할 때 이벤트는 같은 쓰레드로 실행되며 트랜잭션으로 묶여있었음.

그래서 환불이 완료될 때 까지 주문취소는 기다려야 했으며, 트랜잭션을 롤백할지 커밋할지에 대해 고민했어야 했음.

이번 방법은 이벤트 핸들러가 별도의 스레드에서 동작하는 방법이다.

결과적으로 이벤트 핸들러를 실행시키는 스레드는 Events.raise()를 실행하는 스레드와는 다른 스레드이다.

 

트랜잭션 처리는 어떻게 되는가!?!?!?!?

트랜잭션 범위에 이벤트 핸들러는 묶이지 않는다.

별도의 스레드로 실행되니까 트랜잭션 범위에 묶이지 않는 것임

=> 이벤트를 제외한 것들만 트랜잭션으로 묶일듯

 

 

 

2) MQ 사용

그림 10.8

이벤트가 발생하면 이벤트 디스패처는 이벤트를 MQ에 보낸다.

MQ는 이벤트를 메시지 리스너에 전달하고, 메시지 리스너는 알맞은 이벤트 핸들러를 호출한다.

이 때, 이벤트를 MQ로 보내는 스레드(프로세스) 와 MQ에서 이벤트를 읽고 처리하는 스레드(프로세스) 는 서로 다르다.

 

트랜잭션은 어떻게 묶일까?

MQ에 이벤트를 저장하는 절차까지 한 트랜잭션으로 묶을 수 있다. (단, 글로벌 트랜잭션을 사용해야 함. 내 APP과 RabbitMQ를 한 트랜잭션으로 묶으려면 로컬 트랜잭션이 아닌, 글로벌 트랜잭션이 필요)

트랜잭션 걸어서 MQ에 이벤트를 정상적으로 넣는 것 까지만 보장해주면 됨. 이후에 MQ에서 핸들링에서 실패하더라도 이벤트는 큐에 계속 존재하기 때문에 이벤트 유실되지 않음

 

장점: 글로벌 트랜잭션을 사용하면 안전하게 이벤트를 MQ에 전달할 수 있다. (이벤트 유실 안함)

단점: 글로벌 트랜잭션을 사용하기 때문에 전체 성능은 떨어진다.

 

RabbitMQ: 글로벌 트랜잭션을 지원함. 클러스터와 고가용성을 지원하기 때문에 안정적으로 메시지 전달 가능.

Kafka: 글로벌 트랜잭션을 지원하지 않음. 높은 성능을 보여줌

 

 

3) 이벤트 저장소와 포워더를 이용한 비동기 처리

이벤트가 발생하면 이벤트를 DB에 저장하기!!

이후 별도의 프로그램(포워더)를 이용하여 이벤트 핸들러에 전달

 

포워더는 주기적으로 이벤트 저장소에서 이벤트를 가져와 이벤트 핸들러를 실행 -> 스케쥴링

포워더는 별도의 스레드를 이용하기 때문에 이벤트 발행과 처리가 비동기로 처리됨

 

이것도 트랜잭션 범위는 DB에 이벤트를 저장하는 것 까지일듯

이후에 핸들러가 이벤트 처리에 실패하더라도, 포워더는 다시 이벤트 저장소에서 이벤트를 읽어와 핸들러를 실행하면 된다.

 

 

4) 이벤트 저장소와 이벤트 제공 API 사용

포워더 방식은 포워더가 이벤트를 불러와 이벤트 핸들러를 실행했지만,

이번 방법은 핸들러가 API 서버를 통해 이벤트 목록을 가져온다.

따라서 이벤트 목록을 요구하는 외부 핸들러는 자신이 이벤트를 어디까지 처리했는지를 기억하고 있어야 한다.

 

 

이벤트 저장소 구현

이제부터 3,4번에 대한 구현방식을 살펴본다.

두 방법 모두 이벤트가 발생할 경우 이벤트를 저장소(RDB, NoSQL 등)에 저장해야 한다.

그림 10.11 이벤트 저장소 클래스 다이어그램

 

EventStore

이벤트는 과거에 벌어진 사건이므로 수정이나 삭제가 필요하지 않기 때문에 저장, 조회 기능만 제공하면 됨

구현 코드와 DDL은 책에 있음 (p.294 ~ p.299)

 

 

이벤트 저장을 위한 이벤트 핸들러 구현

이벤트를 저장하기 위한 핸들러가 필요하다.

저장만 하면 되므로 위의 eventStore.save(event); 를 실행하게 한다.

(핸들러를 AOP로 만들어서 사용하면 간단.)

구현 코드는 책 p299 ~ p301

 

 

포워더 구현

3번 방법의 핵심인 포워더를 구현해본다.

일정시간마다 이벤트저장소에서 이벤트들을 읽어서 핸들러를 호출하는 식이다.

 

@Component
public class EventForwarder {
    // ...

    private int sendEvent(List<EventEntry> events) {
    	int processedCount = 0;
        try {
            for (EventEntry entry : events) {
                eventSender.send(entry);
                processedCount++;
            }
        } catch (Exception ex) {
            // 로깅 처리
        }
        return processedCount;
	}

}	

 

책을 읽으면서 헷갈리는게 있었는데 코드를 보면서 깨달은 것이 있다.

100개의 이벤트를 실행한다고 했을 때, 71번째 이벤트에서 에러가 발생할 경우, 다음 배치때는 71번부터 시작해야 한다.

하지만 에러가 날 경우 다시 1번부터 시작할 것 같았는데 아래의 코드를 보고 뜨끔했다.

try-catch 문을 아애 쓰질 않아서 이런 로직을 생각하지 못한 것 같다.

 

 

REST API 구현

 


@RestController
@RequiredArgsConstructor
public class EventApi {
    private final EventStore eventStore;
    
    @GetMapping("/api/events")
    public List<EventEntry> list(
        @RequestParam(name="offset" required=true) Long offset,
        @RequestParam(name="limit" required=true) Long limit) {
        return eventStore.get(offset, limit);
    }
        
}

조회 기능만 제공하면 된다.

 

API를 사용하는 클라이언트의 라이프 사이클

 

이벤트를 실패하면 실패한 이벤트부터 읽어와 이벤트를 재처리할 수 있다.

 

이벤트 핸들링하는 프로젝트 는 api와 별개로 따로 있음.

이벤트 핸들러가 RestAPi를 호출할 때 파라미터(offset)로 어떤 값을 줘야하는지 알고 있어야 함. (어디까지 이벤트를 처리하였는지)

Rest API 는 달라는 대로 줌..

 

 

이벤트 적용 시 추가 고려사항

이벤트를 구현할 때 추가로 고려할 사항이 있다.

 

1. 특정 주체가 발생한 이벤트만 조회해야 한다면?

이벤트를 저장할 때 주체를 함께 저장해야 하고,

이벤트 핸들러에서는 특정 주체일 때 실행하도록 해야 한다.

(뭐 이것저것 할거 많아보이는데, 그냥 이벤트에 source 필드 하나 추가한 것 뿐임)

 

2. 포워더에서 전송 실패를 얼마나 허용할 것이냐?

포워더에서 이벤트가 실패하면 실패한 곳부터 재실행한다.

그런데 이전에 실패했던 것이 같은 문제로 인해 무기한 실패를 할 수도 있을 것이다.

이 경우, 어떻게 처리하는 것이 좋을까?

 

동일 이벤트를 전송하는 데 3회 실패했다면 해당 이벤트는 생략하고 다음 이벤트로 넘어간다는 등의 정책이 필요하다.

사실 이건 REST API 에서도 고려해야 할 상황 아닌가?

 

3. 이벤트 유실

MQ나 이벤트 저장소를 이용할 때 트랜잭션에 성공하면 이벤트가 저장소에 저장된다는 것을 보장할 수 있다.

반면에 로컬 핸들러를 이용해서 이벤트를 비동기로 처리할 경우 이벤트 처리에 실패하면 이벤트를 유실하게 된다.

비즈니스 로직에 사용하지 말자

 

4. 이벤트 순서

MQ를 사용할 경우에 문제가 될 수도 있는 부분이다.

MQ 사용 기술에 따라 이벤트 발생 순서와 메시지 전달 순서가 다를 수가 있다.

하나의 파티션에 대한 소비자를 최대 1개만 연결시키도록 설정할 수 있다. -> 하나의 이벤트 스트림에 대해서는 동시성이 일어나지 않기 때문에 메시지 순서를 보장받을 수 있다.

 

5. 이벤트 재처리

시스템 장애로 인해 이벤트가 중복해서 발생하게 될 경우, 동일한 이벤트를 어떻게 처리하는 것이 좋을까?

 

 

책을 읽기 전에 MQ도 비동기 처리인지에 대한 궁금증이 있었는데, 책에 MQ가 나와서 더 재밌게 읽었던 것 같다.

비동기 처리를 위해 일단 디비에 저장해놨다가 배치 돌린다는 생각은 몇번 해봤는데, 책에 이렇게 나오니까 또 반갑게 느껴졌다.

예상과는 달리 이번 장 너무 재밌게 봤음.

 

같이보면 좋은 영상: 스프링캠프 2017 [Day2 A2] : 이벤트 소싱 소개 (이론부)