Post

Cosmic Python 제2부: 이벤트 기반 아키텍처(Event-Driven Architecture)

객체보다 중요한 건 메시징이다. 도메인 이벤트와 메시지 버스로 Aggregate 간 통신을 설계하는 이벤트 기반 아키텍처의 핵심.

Cosmic Python 제2부: 이벤트 기반 아키텍처(Event-Driven Architecture)

제2부: 이벤트 기반 아키텍처(Event-Driven Architecture)

“오래전에 이 주제에 대해 ‘객체(objects)’라는 용어를 만든 것이 유감입니다. 그 단어 때문에 많은 사람들이 덜 중요한 개념에 집중하게 되었거든요. 진짜 중요한 건 ‘메시징(messaging)’입니다.…위대하고 성장 가능한 시스템을 만드는 핵심은, 모듈의 내부 속성이나 동작이 아니라 모듈 간의 통신을 어떻게 설계하느냐에 달려 있습니다.” — Alan Kay


Part 1에서 Part 2로, 왜 넘어가야 할까?

Part 1에서 우리는 단일 도메인 모델을 중심으로 아키텍처 패턴들을 살펴봤다.

  • Repository 패턴으로 영속성을 추상화하고
  • Service Layer로 유스케이스를 관리하고
  • Unit of Work로 트랜잭션을 제어하고
  • Aggregate로 일관성 경계를 설정했다.

그런데 현실의 소프트웨어는 단일 도메인 모델 하나로 끝나지 않는다.

회사에서 만드는 시스템은 다른 시스템과 정보를 주고받아야 하고, 조직 내에서 여러 비즈니스 프로세스를 동시에 관리해야 한다. 결국, “하나의 모델을 넘어서 여러 모델이 서로 협력하는 구조”를 고민해야 하는 시점이 온다.


마이크로서비스? 그냥 하면 큰일 난다

많은 팀들이 이 문제를 해결하기 위해 마이크로서비스를 도입하고, HTTP API로 서비스 간 통신을 구현한다.

그런데 이걸 신중하게 설계하지 않으면 어떻게 될까?

“분산된 거대한 진흙 덩어리(distributed big ball of mud)”가 탄생한다.

모노리스에서의 엉킨 코드보다 더 나쁜 상황이 벌어질 수 있다는 뜻이다. 서비스 간의 의존성이 복잡해지고, 장애가 전파되고, 데이터 정합성이 깨지기 시작하면… 그때부터는 정말 손쓰기 어려워진다.

그래서 Part 2에서는 이 문제를 제대로 다루는 방법을 알아본다.


Part 2에서 다루는 핵심 패턴들

Part 1에서 배운 패턴들이 분산 시스템으로 자연스럽게 확장된다. 작은 컴포넌트들이 비동기 메시지 전달(asynchronous message passing)로 소통하는 시스템을 구성하는 방법을 배우게 된다.

1. 도메인 이벤트(Domain Events)

“주문이 생성되었다”, “결제가 완료되었다” 같은 사건을 이벤트로 표현하는 패턴이다.

일관성 경계(consistency boundary)를 넘나드는 워크플로우를 트리거할 때 사용한다. 예를 들어, 주문 서비스에서 “주문 완료” 이벤트가 발생하면 → 배송 서비스가 이를 받아서 배송 준비를 시작하는 식이다.

개념설명
도메인 이벤트도메인에서 발생한 중요한 사건을 나타내는 객체
발행(Publish)이벤트를 시스템에 알리는 행위
구독(Subscribe)특정 이벤트에 반응하여 처리하는 행위

2. 메시지 버스(Message Bus)

유스케이스를 어떤 진입점에서든 통일된 방식으로 호출할 수 있게 해주는 패턴이다.

웹 API에서 호출하든, 이벤트 핸들러에서 호출하든, CLI에서 호출하든 동일한 방식으로 처리할 수 있다. Service Layer와 Unit of Work 패턴 덕분에 애플리케이션이 비동기 메시지 처리기(asynchronous message processor)로 동작할 수 있게 된다.

3. CQRS (Command Query Responsibility Segregation)

읽기(Query)와 쓰기(Command)를 분리하는 패턴이다.

하나의 모델로 읽기와 쓰기를 모두 처리하면, 어느 쪽도 완벽하게 최적화하기 어렵다. CQRS는 이 둘을 분리해서 각각에 맞는 최적의 구조를 가져갈 수 있게 해준다.

구분Command (쓰기)Query (읽기)
목적상태 변경데이터 조회
모델도메인 모델 (복잡한 비즈니스 로직)읽기 전용 모델 (단순, 빠름)
최적화정합성, 트랜잭션성능, 캐싱

4. 의존성 주입(Dependency Injection)

Part 2에서는 의존성 주입 프레임워크도 함께 다룬다. 이벤트 기반 아키텍처와 직접적인 관련은 없지만, 아키텍처가 복잡해질수록 의존성 관리가 중요해지기 때문이다.

부트스트래핑(bootstrapping) 과정에서 의존성을 깔끔하게 주입하면, 테스트도 쉬워지고 컴포넌트 간 결합도도 낮출 수 있다.


Part 2의 큰 그림

정리하면, Part 2는 이런 흐름으로 진행된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Part 1의 패턴들 (Repository, Service Layer, UoW, Aggregate)
    │
    ▼
도메인 이벤트로 Aggregate 간 통신
    │
    ▼
메시지 버스로 유스케이스 통합
    │
    ▼
이벤트 기반으로 마이크로서비스 간 통합
    │
    ▼
CQRS로 읽기/쓰기 최적화
    │
    ▼
의존성 주입으로 전체 구조 정리

결국 핵심은 “모듈 간 통신을 어떻게 설계할 것인가”이다. Alan Kay의 말처럼, 객체의 내부 구현보다 메시징이 더 중요하다.


이걸 왜 알아야 할까?

실무에서 서비스가 커지다 보면, 반드시 이런 고민을 하게 된다.

  • “이 서비스에서 저 서비스로 데이터를 어떻게 전달하지?”
  • “동기 호출로 하면 장애가 전파되는데, 비동기로 바꿔야 하나?”
  • “이벤트를 쓰면 순서 보장은 어떻게 하지?”

Part 2는 이런 질문들에 대한 체계적인 답을 제시한다. 단순히 “메시지 큐 쓰면 됩니다”가 아니라, 도메인 모델과 아키텍처 패턴을 기반으로 설계하는 방법을 알려준다.

Part 1에서 단단한 기초를 쌓았다면, Part 2에서는 그 기초 위에 확장 가능한 시스템을 설계하는 방법을 배우는 것이다.

그럼 이제 본격적으로 Chapter 8과 Chapter 9를 살펴보자.


Chapter 8: 이벤트와 메시지 버스(Events and the Message Bus)

문제 상황: “이메일도 보내야 하는데 어디에 넣지?”

지금까지 구축한 아키텍처가 진짜 가치를 발휘하는 순간은, 명확한 기능 구현이 아니라 “기능 주변의 잡다한 것들(the goop around the edge)”을 처리할 때다. 보고서, 권한 체크, 알림 발송 같은 부수적인 요구사항 말이다.

구체적인 예시를 들어보자. 재고 할당(allocation)에 실패하면, 구매팀에게 재고 부족 이메일을 보내야 한다.

그런데 이 이메일 로직을 도대체 어디에 넣어야 할까?


안티패턴 1: 웹 컨트롤러에 넣기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
    line = model.OrderLine(
        request.json["orderid"],
        request.json["sku"],
        request.json["qty"],
    )
    try:
        uow = unit_of_work.SqlAlchemyUnitOfWork()
        batchref = services.allocate(line, uow)
    except (model.OutOfStock, services.InvalidSku) as e:
        send_mail(
            "out of stock",
            "stock_admin@made.com",
            f"{line.orderid} - {line.sku}"
        )
        return {"message": str(e)}, 400
    return {"batchref": batchref}, 201

Flask 엔드포인트에 이메일 로직을 넣으면? 관심사 분리가 완전히 깨진다. 웹 계층이 비즈니스 로직과 인프라 로직을 모두 알고 있어야 하고, 단위 테스트도 어려워진다.

안티패턴 2: 도메인 모델에 넣기

1
2
3
4
5
6
def allocate(self, line: OrderLine) -> str:
    try:
        batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
    except StopIteration:
        email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
        raise OutOfStock(f"Out of stock for sku {line.sku}")

도메인 모델에 email.send_mail이 들어가는 순간, 도메인 모델이 인프라에 의존하게 된다. 도메인 모델은 비즈니스 로직만 담고 있어야 한다.

안티패턴 3: 서비스 레이어에 넣기

1
2
3
4
5
6
7
8
9
10
11
12
13
def allocate(orderid, sku, qty, uow):
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except model.OutOfStock:
            email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
            raise

서비스 레이어에 넣으면 좀 낫지 않을까? 여전히 문제다. 할당 로직과 이메일 발송이 하나의 함수에 섞여 있다. 예외를 잡아서 이메일을 보내고 다시 던지는 패턴은 코드 냄새(code smell)다.


핵심 원칙: 단일 책임 원칙(SRP)

여기서 저자들이 강조하는 원칙이 있다.

“함수를 설명할 때 ‘그리고(and)’ 또는 ‘그 다음에(then)’를 사용해야 한다면, SRP를 위반하고 있는 것이다.”

allocate() 함수는 할당만 담당해야 한다. 나중에 이메일 대신 SMS로 바꾼다고 해서 allocate() 함수를 수정해야 한다면? 그건 잘못된 설계다.


해결책: 도메인 이벤트 + 메시지 버스

이벤트를 값 객체로 정의하기

이벤트는 “도메인에서 발생한 사실”을 나타내는 단순한 데이터 구조다.

1
2
3
4
5
6
7
8
from dataclasses import dataclass

class Event:
    pass

@dataclass
class OutOfStock(Event):
    sku: str

행동(behavior)은 없고, 데이터만 있다. “무슨 일이 일어났는가”를 기록할 뿐이다.

도메인 모델이 이벤트를 기록하게 하기

예외를 던지는 대신, Aggregate가 이벤트를 내부 리스트에 기록한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Product:
    def __init__(self, sku, batches, version_number=0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # 이벤트를 쌓아두는 리스트

    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            batch.allocate(line)
            self.version_number += 1
            return batch.reference
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))
            return None

테스트 코드로 확인해보면 이렇다.

1
2
3
4
5
6
7
8
def test_records_out_of_stock_event_if_cannot_allocate():
    batch = Batch("batch1", "SMALL-FORK", 10, eta=today)
    product = Product(sku="SMALL-FORK", batches=[batch])
    product.allocate(OrderLine("order1", "SMALL-FORK", 10))

    allocation = product.allocate(OrderLine("order2", "SMALL-FORK", 1))
    assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK")
    assert allocation is None

예외를 흐름 제어에 사용하던 방식에서, 이벤트 기록 방식으로 바뀐 것이다.

메시지 버스: 이벤트를 핸들러에 라우팅

메시지 버스는 본질적으로 이벤트 타입 → 핸들러 함수 매핑이다. 매우 단순하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)

def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        "stock@made.com",
        f"Out of stock for {event.sku}",
    )

HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],
}

메시지 버스는 멍청한 인프라다. 이벤트의 의미를 모르고, 그저 라우팅만 한다. 그리고 이 구현은 동기적(synchronous)이다. 핸들러가 같은 스레드에서 순차적으로 실행된다.


이벤트를 누가 발행할 것인가? 세 가지 옵션

옵션 1: 서비스 레이어가 명시적으로 발행

1
2
3
4
5
6
7
8
def allocate(orderid, sku, qty, uow):
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        batchref = product.allocate(line)
        uow.commit()
        return batchref
    # finally 블록에서 messagebus.handle(product.events) 호출

단순하지만, 서비스 레이어가 이벤트를 알아야 한다.

옵션 2: 서비스 레이어가 자체 이벤트를 생성

서비스 레이어가 결과를 보고 직접 이벤트를 만드는 방식이다. 도메인 모델은 단순해지지만, 이벤트 로직이 서비스 레이어로 이동한다.

옵션 3: Unit of Work가 이벤트를 수집하여 발행 (권장)

가장 깔끔한 방식이다. UoW가 commit할 때 자동으로 이벤트를 수집하고 발행한다.

1
2
3
4
5
6
7
8
9
10
class AbstractUnitOfWork(abc.ABC):
    def commit(self):
        self._commit()
        self.publish_events()

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

이걸 위해 Repository에 seen 추적 기능을 추가한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
class AbstractRepository(abc.ABC):
    def __init__(self):
        self.seen = set()

    def add(self, product):
        self._add(product)
        self.seen.add(product)

    def get(self, sku):
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

Repository가 add()get()으로 접근한 Aggregate를 seen 집합에 기록하고, UoW가 커밋 시점에 seen에 있는 모든 Aggregate의 이벤트를 수집하는 구조다.

이렇게 하면 서비스 레이어는 다시 깔끔해진다.

1
2
3
4
5
6
7
8
9
def allocate(orderid, sku, qty, uow):
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = product.allocate(line)
        uow.commit()
        return batchref

이메일 발송 로직은 어디에도 보이지 않는다. 커밋하면 UoW가 알아서 이벤트를 수집하고, 메시지 버스가 핸들러를 실행한다.


Chapter 8 트레이드오프 정리

장점단점
할당 로직과 알림 로직이 완전히 분리된다 (SRP)메시지 버스라는 새로운 개념이 추가되어 복잡도가 올라간다
이벤트와 핸들러를 독립적으로 테스트할 수 있다commit() 호출 시 이메일이 발송되는 건 직관적이지 않다
이메일 → SMS 변경 시 핸들러만 바꾸면 된다이벤트 기반 워크플로우는 한눈에 흐름을 파악하기 어렵다
이벤트가 도메인 용어로 표현되어 비즈니스와 소통이 쉽다이벤트 핸들러 간 순환 의존이 생길 수 있다

Chapter 9: 메시지 버스로 전면 전환(Going to Town on the Message Bus)

Chapter 8에서는 메시지 버스를 부수적인 효과(side effect)를 처리하는 용도로 도입했다. Chapter 9에서는 한 발 더 나아간다. 모든 것을 메시지 버스를 통해 처리하는 구조로 전환한다.

“이벤트가 선택적 부수효과”인 아키텍처에서, “모든 것이 메시지 버스를 통해 흐르는” 아키텍처로 바뀌는 것이다.


새로운 요구사항이 아키텍처를 이끈다

현실의 소프트웨어는 상황 기반 소프트웨어(situated software)다. 실제 프로세스를 장기간 관리하다 보면, 예상치 못한 상황이 끊임없이 발생한다. 재고가 손상되거나, 통관이 지연되거나, 공장에서 제조 수량이 줄어들거나…

이번 장의 구체적인 요구사항은 이렇다.

배치(Batch)의 수량이 변경되면, 이미 할당된 주문이 초과된 경우 할당을 해제하고 재할당해야 한다.

예를 들어보자.

  • Batch에 50개가 있고, 20개씩 2건의 주문이 할당되어 있다 (남은 수량: 10개)
  • 그런데 Batch 수량이 25개로 줄어들었다!
  • 이미 할당된 40개 중 초과분이 생겨서, 한 건을 할당 해제(deallocate)하고 다른 Batch로 재할당(reallocate)해야 한다

이런 연쇄적인 처리를 어떻게 우아하게 구현할 수 있을까?


핵심 아이디어: 모든 것을 이벤트 핸들러로

저자들의 제안은 이렇다.

API가 호출하는 서비스 함수와, 내부 이벤트 핸들러를 구분하지 말자. 모든 것을 이벤트 핸들러로 만들자.

기존의 서비스 함수들이 이벤트 핸들러로 바뀐다.

기존 (서비스 함수)변경 후 (이벤트 핸들러)
services.add_batch(ref, sku, qty, eta)handlers.add_batch(BatchCreated 이벤트)
services.allocate(orderid, sku, qty)handlers.allocate(AllocationRequired 이벤트)
(신규)handlers.change_batch_quantity(BatchQuantityChanged 이벤트)

이벤트가 시스템의 입력이 된다

API 입력을 표현하는 이벤트를 새로 정의한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@dataclass
class BatchCreated(Event):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

@dataclass
class AllocationRequired(Event):
    orderid: str
    sku: str
    qty: int

@dataclass
class BatchQuantityChanged(Event):
    ref: str
    qty: int

서비스 함수의 시그니처가 개별 인자(primitives)에서 이벤트 객체로 바뀐다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Before
def add_batch(ref: str, sku: str, qty: int, eta: Optional[date], uow):
    ...

# After
def add_batch(event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork):
    with uow:
        product = uow.products.get(sku=event.sku)
        if product is None:
            product = model.Product(sku=event.sku, batches=[])
            uow.products.add(product)
        product.batches.append(
            model.Batch(event.ref, event.sku, event.qty, event.eta)
        )
        uow.commit()

이벤트 객체를 통해 인터페이스가 통일된다. 모든 핸들러는 (event, uow) 형태의 동일한 시그니처를 갖는다.


메시지 버스의 진화: UoW에서 이벤트를 수집

Chapter 8에서는 UoW가 메시지 버스에 이벤트를 푸시(push)했다. Chapter 9에서는 메시지 버스가 UoW에서 이벤트를 풀(pull)하는 방식으로 바뀐다.

이유는? 순환 의존을 제거하기 위해서다.

1
2
3
4
5
6
7
8
9
def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):
    results = []
    queue = [event]
    while queue:
        event = queue.pop(0)
        for handler in HANDLERS[type(event)]:
            results.append(handler(event, uow=uow))
            queue.extend(uow.collect_new_events())
    return results

핵심 포인트를 정리하면 이렇다.

  • 메시지 버스가 이벤트 큐를 내부적으로 관리한다
  • 핸들러가 실행된 후, UoW에서 새로 발생한 이벤트를 수집한다
  • 수집된 이벤트가 큐에 추가되어 연쇄적으로 처리된다
  • FIFO 순서로 처리된다

UoW의 publish_events() 메서드는 collect_new_events()로 바뀌어, 능동적으로 발행하는 게 아니라 수동적으로 수집당하는 구조가 된다.


테스트도 이벤트 기반으로

테스트 코드도 서비스 함수를 직접 호출하는 대신, 이벤트를 생성하고 메시지 버스에 전달하는 방식으로 바뀐다.

1
2
3
4
5
6
7
8
class TestAddBatch:
    def test_for_new_product(self):
        uow = FakeUnitOfWork()
        messagebus.handle(
            events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
        )
        assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
        assert uow.committed

Flask 엔드포인트도 마찬가지다.

1
2
3
4
5
6
7
8
9
10
11
12
# Before
batchref = services.allocate(
    request.json["orderid"], request.json["sku"],
    request.json["qty"], unit_of_work.SqlAlchemyUnitOfWork(),
)

# After
event = events.AllocationRequired(
    request.json["orderid"], request.json["sku"], request.json["qty"]
)
results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork())
batchref = results.pop(0)

새로운 요구사항 구현: 배치 수량 변경

이제 메시지 버스 중심 아키텍처에서 새 요구사항을 구현해보자. TDD로 진행한다.

테스트 케이스 1: 단순 수량 변경

1
2
3
4
5
6
7
8
9
10
11
class TestChangeBatchQuantity:
    def test_changes_available_quantity(self):
        uow = FakeUnitOfWork()
        messagebus.handle(
            events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None), uow
        )
        [batch] = uow.products.get("ADORABLE-SETTEE").batches
        assert batch.available_quantity == 100

        messagebus.handle(events.BatchQuantityChanged("batch1", 50), uow)
        assert batch.available_quantity == 50

테스트 케이스 2: 수량 감소로 재할당이 필요한 경우

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def test_reallocates_if_necessary(self):
    uow = FakeUnitOfWork()
    event_history = [
        events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
        events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
        events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
        events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
    ]
    for e in event_history:
        messagebus.handle(e, uow)

    [batch1, batch2] = uow.products.get("INDIFFERENT-TABLE").batches
    assert batch1.available_quantity == 10
    assert batch2.available_quantity == 50

    messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)

    # batch1: 25 - 20 = 5 (order1만 남음)
    # batch2: 50 - 20 = 30 (order2가 재할당됨)
    assert batch1.available_quantity == 5
    assert batch2.available_quantity == 30

테스트가 이벤트 히스토리를 먼저 쌓고, 수량 변경 이벤트를 보내는 방식으로 작성된다. 순수하게 이벤트와 상태 변화만으로 테스트할 수 있어서 매우 깔끔하다.

핸들러 구현

1
2
3
4
5
6
7
8
def change_batch_quantity(
    event: events.BatchQuantityChanged,
    uow: unit_of_work.AbstractUnitOfWork,
):
    with uow:
        product = uow.products.get_by_batchref(batchref=event.ref)
        product.change_batch_quantity(ref=event.ref, qty=event.qty)
        uow.commit()

도메인 모델의 변화

1
2
3
4
5
6
7
8
9
class Product:
    def change_batch_quantity(self, ref: str, qty: int):
        batch = next(b for b in self.batches if b.reference == ref)
        batch._purchased_quantity = qty
        while batch.available_quantity < 0:
            line = batch.deallocate_one()
            self.events.append(
                events.AllocationRequired(line.orderid, line.sku, line.qty)
            )

여기서 이벤트 체이닝이 일어난다.

  1. BatchQuantityChanged 이벤트 → change_batch_quantity 핸들러 실행
  2. 도메인 모델이 초과된 주문을 해제하면서 AllocationRequired 이벤트를 발생
  3. 메시지 버스가 새 이벤트를 수집하여 allocate 핸들러 실행
  4. 재할당 불가 시 OutOfStock 이벤트 → 알림 핸들러 실행

하나의 이벤트가 연쇄적으로 다른 이벤트를 트리거하는 구조다.

메시지 버스 핸들러 등록

1
2
3
4
5
6
HANDLERS = {
    events.BatchCreated: [handlers.add_batch],
    events.BatchQuantityChanged: [handlers.change_batch_quantity],
    events.AllocationRequired: [handlers.allocate],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}

새로운 요구사항을 구현하는 데 아키텍처를 변경할 필요가 없었다. 이벤트와 핸들러만 추가하면 됐다.


격리된 핸들러 테스트 (선택)

이벤트 체이닝이 복잡해지면, 개별 핸들러만 격리해서 테스트하고 싶을 수 있다. 이 경우 FakeUnitOfWork를 확장하여 발행된 이벤트를 캡처하는 방식을 사용한다.

1
2
3
4
5
6
7
8
9
class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):
    def __init__(self):
        super().__init__()
        self.events_published = []

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                self.events_published.append(product.events.pop(0))
1
2
3
4
5
6
7
8
9
def test_reallocates_if_necessary_isolated():
    uow = FakeUnitOfWorkWithFakeMessageBus()
    # ... 이벤트 히스토리 설정 ...
    messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)

    # 연쇄 실행 결과가 아닌, 발행된 이벤트만 검증
    [reallocation_event] = uow.events_published
    assert isinstance(reallocation_event, events.AllocationRequired)
    assert reallocation_event.orderid in {"order1", "order2"}

전체 체인을 실행하지 않고, “이 핸들러가 올바른 이벤트를 발행하는가?”만 검증할 수 있다.


Chapter 9 트레이드오프 정리

장점단점
핸들러와 서비스 함수가 하나의 개념으로 통합된다메시지 버스는 웹 관점에서 예측하기 어렵다 (언제 끝나는지 모른다)
이벤트가 시스템 입력을 깔끔하게 표현한다도메인 객체와 이벤트 간 필드 중복이 생긴다
새 요구사항에 아키텍처 변경 없이 이벤트/핸들러만 추가하면 된다필드 추가 시 여러 관련 구조를 함께 수정해야 한다
이벤트 체이닝으로 복잡한 워크플로우를 자연스럽게 처리한다서로 다른 트랜잭션에서 일부만 성공하면 정합성 문제가 생길 수 있다

핵심 정리: 복잡성 증가 속도를 늦추는 것

Chapter 8과 9를 관통하는 핵심 메시지는 이것이다.

“애플리케이션 크기에 비해 복잡성이 더 빠르게 증가하지 않도록 하는 것”

배치 수량 변경 → 할당 해제 → 재할당 → 알림 발송이라는 복잡한 요구사항이 들어왔지만, 기존 아키텍처 패턴을 그대로 사용하여 이벤트와 핸들러만 추가하는 것으로 해결했다.

1
2
3
4
5
6
7
8
9
10
BatchQuantityChanged 이벤트
    │
    ▼
change_batch_quantity 핸들러
    │ (수량 초과 시 deallocate → AllocationRequired 이벤트 발행)
    ▼
allocate 핸들러
    │ (재고 부족 시 OutOfStock 이벤트 발행)
    ▼
send_out_of_stock_notification 핸들러

새로운 아키텍처 카테고리를 도입할 필요 없이, 동일한 패턴의 반복으로 복잡한 비즈니스 요구를 처리할 수 있게 된 것이다.

다음 장에서는 커맨드(Command)와 이벤트(Event)의 구분을 다룬다. “수행해야 할 작업”과 “이미 발생한 사실”을 왜 구분해야 하는지 알아보게 된다.


참고 자료

This post is licensed under CC BY 4.0 by the author.