Post

Cosmic Python 제2부: 커맨드와 외부 이벤트 통합(Commands & External Events)

커맨드는 의도, 이벤트는 사실. 이 둘을 구분하고 외부 시스템과 느슨하게 통합하는 방법.

Cosmic Python 제2부: 커맨드와 외부 이벤트 통합(Commands & External Events)

Chapter 10: 커맨드와 커맨드 핸들러(Commands and Command Handlers)

Chapter 8-9에서 우리는 모든 유스케이스를 이벤트 핸들러로 통합했다. 그런데 한 가지 어색한 점이 있다.

BatchCreated, AllocationRequired 같은 이벤트는 이미 일어난 사실을 나타내는 건가, 아니면 “이걸 해줘”라는 요청인가?

이 둘은 본질적으로 다르다. 그래서 구분해야 한다.


커맨드(Command)와 이벤트(Event)의 차이

구분커맨드(Command)이벤트(Event)
문법명령형 (“~해라”)과거형 (“~되었다”)
의미의도(intent)를 표현한다사실(fact)을 기록한다
수신자딱 하나의 핸들러여러 핸들러가 받을 수 있다
실패 처리예외를 즉시 터뜨린다 (fail noisily)실패해도 계속 진행한다 (fail independently)

한마디로 정리하면 이렇다.

커맨드는 “이걸 해줘”라는 소원이고, 이벤트는 “이런 일이 있었어”라는 알림이다.

커맨드의 발신자는 결과에 관심이 있다. 실패하면 알아야 한다. 이벤트의 발신자는 수신자가 누구인지조차 모른다. 실패해도 자기 일은 이미 끝났다.


코드로 분리하기

커맨드 클래스 정의

기존에 이벤트로 표현했던 것 중, “요청”에 해당하는 것들을 커맨드로 분리한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Command:
    pass

@dataclass
class Allocate(Command):
    orderid: str
    sku: str
    qty: int

@dataclass
class CreateBatch(Command):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

@dataclass
class ChangeBatchQuantity(Command):
    ref: str
    qty: int

이벤트는 도메인에서 발생한 사실만 남긴다.

1
2
3
4
5
6
7
8
9
10
@dataclass
class OutOfStock(Event):
    sku: str

@dataclass
class Allocated(Event):
    orderid: str
    sku: str
    qty: int
    batchref: str

이벤트 이름이 과거형(OutOfStock, Allocated)이고, 커맨드 이름은 명령형(Allocate, CreateBatch)인 것에 주목하자.


메시지 버스의 변화: 커맨드와 이벤트를 다르게 처리

이제 메시지 버스가 메시지 타입에 따라 다른 전략을 적용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
Message = Union[commands.Command, events.Event]

def handle(message: Message, uow: unit_of_work.AbstractUnitOfWork):
    results = []
    queue = [message]
    while queue:
        message = queue.pop(0)
        if isinstance(message, events.Event):
            handle_event(message, queue, uow)
        elif isinstance(message, commands.Command):
            cmd_result = handle_command(message, queue, uow)
            results.append(cmd_result)
    return results

이벤트 핸들러: 실패해도 계속 진행

1
2
3
4
5
6
7
8
def handle_event(event: events.Event, queue: List[Message], uow):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception("Exception handling event %s", event)
            continue  # 실패해도 다음 핸들러로 넘어간다

이벤트 핸들러는 하나가 실패해도 나머지는 계속 실행된다. 이메일 발송이 실패했다고 해서 주문 처리까지 멈추면 안 되니까.

커맨드 핸들러: 실패하면 즉시 터뜨린다

1
2
3
4
5
6
7
8
9
def handle_command(command: commands.Command, queue: List[Message], uow):
    try:
        handler = COMMAND_HANDLERS[type(command)]
        result = handler(command, uow=uow)
        queue.extend(uow.collect_new_events())
        return result
    except Exception:
        logger.exception("Exception handling command %s", command)
        raise  # 예외를 그대로 다시 던진다

커맨드 핸들러는 실패하면 예외를 즉시 전파한다. “주문을 할당해줘”라는 요청이 실패했다면, 발신자가 그 결과를 알아야 하기 때문이다.


핸들러 등록도 분리

1
2
3
4
5
6
7
8
9
EVENT_HANDLERS = {
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}

COMMAND_HANDLERS = {
    commands.Allocate: handlers.allocate,
    commands.CreateBatch: handlers.add_batch,
    commands.ChangeBatchQuantity: handlers.change_batch_quantity,
}

차이가 보이는가?

  • 이벤트: 핸들러가 리스트다 → 여러 핸들러가 하나의 이벤트에 반응할 수 있다
  • 커맨드: 핸들러가 하나다 → 하나의 커맨드에 정확히 하나의 핸들러만 매핑된다

실패 복구 전략

이벤트 핸들러가 실패할 수 있다면, 재시도 전략도 필요하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from tenacity import Retrying, RetryError, stop_after_attempt, wait_exponential

def handle_event(event: events.Event, queue, uow):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            for attempt in Retrying(
                stop=stop_after_attempt(3),
                wait=wait_exponential()
            ):
                with attempt:
                    handler(event, uow=uow)
                    queue.extend(uow.collect_new_events())
        except RetryError as retry_failure:
            logger.error(
                "Failed after %s attempts handling event %s",
                retry_failure.last_attempt.attempt_number, event
            )
            continue

tenacity 라이브러리를 사용하여 지수 백오프(exponential backoff) 기반의 재시도를 적용한다. 3번까지 시도하고, 그래도 실패하면 로그를 남기고 넘어간다.

또한 이벤트가 dataclass이기 때문에, 로그에 찍힌 데이터를 그대로 복사해서 Python 셸에서 재현할 수 있다. 이것이 디버깅 시 큰 이점이 된다.


왜 이 구분이 중요한가?

비즈니스 관점에서 보면 이렇다.

커맨드 CreateOrder가 성공하면 → 이벤트 OrderCreated가 발생한다. 이벤트 OrderCreated는 → 이력 기록, VIP 전환, 이메일 발송 등의 후속 작업을 트리거한다. 이메일 발송이 실패해도, 주문 생성 자체는 성공이다.

만약 이 구분 없이 전부 동일하게 처리했다면? 이메일 서버가 다운되는 순간 주문 생성까지 실패하게 된다. 커맨드와 이벤트의 분리는 “어디까지가 반드시 성공해야 하고, 어디부터는 실패해도 되는가”를 명확히 한다.


Chapter 10 트레이드오프 정리

장점단점
반드시 성공해야 하는 작업과 부수 효과를 명확히 구분한다커맨드와 이벤트의 경계에 대한 의미론적 논쟁이 생길 수 있다
이벤트 핸들러 하나가 실패해도 나머지는 계속 동작한다이벤트 핸들러의 “실패해도 OK” 전략은 강력한 모니터링을 요구한다
핸들러 이름이 비즈니스 의도를 명시적으로 표현한다커맨드/이벤트/핸들러 간의 매핑이 많아지면서 시스템 전체 흐름 파악이 어려워진다
실패 도메인을 작고 격리된 단위로 관리할 수 있다전체 시스템 복잡성이 증가한다

Chapter 11: 외부 이벤트를 활용한 마이크로서비스 통합(Using Events to Integrate Microservices)

지금까지의 이벤트는 하나의 서비스 내부에서 흐르는 것이었다. Chapter 11에서는 드디어 서비스의 경계를 넘어서 이벤트를 주고받는다.

서비스 하나가 아무리 잘 설계되어 있어도, 다른 서비스와 어떻게 통신하느냐에 따라 전체 시스템의 운명이 갈린다.


문제: 명사 기반 서비스 분리의 함정

마이크로서비스를 설계할 때 흔히 하는 실수가 있다. 데이터베이스 테이블 이름으로 서비스를 나누는 것이다.

1
Orders 서비스 → Batches 서비스 → Warehouse 서비스

이렇게 나누면 어떤 문제가 생길까?

  • 실행 순서 결합(Connascence of Execution): 창고에서 재고 손상이 발생하면, Warehouse가 Batches를 호출하고, Batches가 Orders를 호출해야 한다
  • 시간 결합(Connascence of Timing): 모든 서비스가 동시에 동작해야 한다
  • 장애 전파: 하나가 다운되면 전체 체인이 멈춘다

결국 “분산된 거대한 진흙 덩어리(distributed big ball of mud)”가 된다.

이 문제의 근본 원인은 동기식 RPC(원격 프로시저 호출)로 서비스를 엮었기 때문이다.


해결: 동사 기반, 이벤트 기반 설계

명사(noun) 중심이 아닌 동사(verb) 중심으로 생각을 바꿔야 한다.

“주문 서비스”, “배치 서비스”가 아니라, “주문하기(ordering)”, “할당하기(allocating)” 같은 워크플로우로 서비스를 설계한다.

각 서비스는 일관성 경계(consistency boundary)가 되어, 커맨드를 받아 처리하고 이벤트를 발행한다.

1
2
3
4
5
6
7
8
9
10
Before (동기식 RPC - 단일 장애 체인):
Orders → Batches → Warehouse → CRM
(하나가 죽으면 전부 멈춘다)

After (이벤트 기반 - 독립적인 서비스):
Orders: "OrderPlaced" 이벤트 발행
    ├─ Batches가 구독 → "Allocated" 이벤트 발행
    ├─ Warehouse가 독립적으로 구독
    └─ CRM이 독립적으로 구독
(각 서비스가 독립적으로 동작한다)

서비스 간의 결합이 “이벤트 이름과 필드에 대한 합의”로만 한정된다. 이를 이름 결합(Connascence of Name)이라 하는데, 이것은 가장 약한(=좋은) 형태의 결합이다.


Redis Pub/Sub로 외부 이벤트 연동하기

이 책에서는 Redis Pub/Sub를 메시지 브로커로 사용한다. (프로덕션에서는 Kafka, RabbitMQ 등을 쓰겠지만, 개념을 이해하기엔 Redis로 충분하다.)

외부 이벤트 수신: Redis → 내부 커맨드

외부에서 오는 메시지를 받아서 내부 커맨드로 변환하는 어댑터를 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
r = redis.Redis(**config.get_redis_host_and_port())

def main():
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe("change_batch_quantity")

    for m in pubsub.listen():
        handle_change_batch_quantity(m)

def handle_change_batch_quantity(m):
    data = json.loads(m["data"])
    cmd = commands.ChangeBatchQuantity(ref=data["batchref"], qty=data["qty"])
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())

구조가 낯이 익지 않은가? Flask 엔드포인트와 같은 패턴이다.

진입점역할
Flask 엔드포인트HTTP 요청 → 커맨드 → 메시지 버스
Redis 컨슈머Redis 메시지 → 커맨드 → 메시지 버스

외부 인프라가 무엇이든 상관없이, 내부적으로는 동일한 메시지 버스를 통해 처리된다. 이것이 바로 포트와 어댑터(Ports and Adapters) 아키텍처의 진가다.

내부 이벤트 발행: 도메인 → Redis

할당이 성공하면, 외부로 알려야 한다. 도메인 모델이 Allocated 이벤트를 발생시키고, 이를 Redis로 발행한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 도메인 모델에서 이벤트 발생
class Product:
    def allocate(self, line: OrderLine) -> str:
        # ... 할당 로직 ...
        batch.allocate(line)
        self.version_number += 1
        self.events.append(
            events.Allocated(
                orderid=line.orderid,
                sku=line.sku,
                qty=line.qty,
                batchref=batch.reference,
            )
        )
        return batch.reference
1
2
3
4
5
6
7
8
# Redis로 발행하는 핸들러
def publish_allocated_event(event: events.Allocated, uow):
    redis_eventpublisher.publish("line_allocated", event)

# 실제 발행 로직
def publish(channel, event: events.Event):
    logging.debug("publishing: channel=%s, event=%s", channel, event)
    r.publish(channel, json.dumps(asdict(event)))
1
2
3
4
5
# 메시지 버스 핸들러 등록
EVENT_HANDLERS = {
    events.Allocated: [handlers.publish_allocated_event],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}

전체 흐름: 외부 이벤트 → 내부 처리 → 외부 발행

이벤트의 전체 흐름을 정리하면 이렇다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1. 외부 시스템이 Redis "change_batch_quantity" 채널에 메시지 발행
    │
    ▼
2. Redis 컨슈머가 메시지를 수신하고 ChangeBatchQuantity 커맨드로 변환
    │
    ▼
3. 메시지 버스가 change_batch_quantity 핸들러를 실행
    │ (수량 초과 시 deallocate → AllocationRequired 이벤트 발행)
    ▼
4. allocate 핸들러가 재할당을 수행
    │ (성공 시 Allocated 이벤트 발행)
    ▼
5. publish_allocated_event 핸들러가 Redis "line_allocated" 채널로 발행
    │
    ▼
6. 다른 외부 서비스가 "line_allocated" 채널을 구독하여 후속 처리

하나의 외부 메시지가 내부 커맨드 → 도메인 처리 → 이벤트 체이닝 → 외부 발행까지 자연스럽게 흘러간다.


외부 이벤트의 E2E 테스트

비동기 시스템의 테스트는 재시도 루프를 사용해야 한다. 메시지 전달이 즉시 이루어지지 않기 때문이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def test_change_batch_quantity_leading_to_reallocation():
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref("old"), random_batchref("newer")

    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta="2011-01-01")
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta="2011-01-02")
    response = api_client.post_to_allocate(orderid, sku, 10)
    assert response.json()["batchref"] == earlier_batch

    subscription = redis_client.subscribe_to("line_allocated")

    # 외부 이벤트 발행
    redis_client.publish_message(
        "change_batch_quantity",
        {"batchref": earlier_batch, "qty": 5},
    )

    # 재할당 결과가 외부로 발행되었는지 확인 (재시도 루프)
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                data = json.loads(message["data"])
                assert data["orderid"] == orderid
                assert data["batchref"] == later_batch

이 테스트가 검증하는 것은 이렇다.

  1. HTTP API로 배치와 주문을 생성한다
  2. Redis를 통해 배치 수량 변경 외부 이벤트를 발행한다
  3. 내부에서 재할당이 이루어지고, 결과가 Redis로 다시 발행되는지 확인한다

HTTP 진입점과 Redis 진입점이 하나의 테스트에서 함께 검증된다.


내부 이벤트 vs 외부 이벤트

한 가지 중요한 설계 원칙이 있다.

내부 이벤트와 외부 이벤트를 구분하라.

모든 내부 이벤트를 외부에 그대로 발행하면 안 된다. 내부 이벤트 중 일부는 구현 세부사항이기 때문이다.

구분내부 이벤트외부 이벤트
범위하나의 서비스 내부서비스 경계를 넘어 전파
예시AllocationRequiredAllocated
목적내부 이벤트 체이닝다른 서비스에 사실 전달
변경 영향내부 코드만 수정외부 소비자까지 영향
검증단위 테스트계약 테스트(contract test) 필요

외부로 나가는 이벤트는 계약(contract)이다. 한번 정의하면 쉽게 바꿀 수 없으므로, 스키마 검증과 버전 관리가 필요하다.


Chapter 11 트레이드오프 정리

장점단점
서비스 간 결합도가 극적으로 낮아진다전체 정보 흐름이 암묵적이 되어 파악하기 어렵다
한 서비스가 실패해도 다른 서비스에 영향이 없다최종 일관성(eventual consistency) 패러다임을 팀 전체가 이해해야 한다
새로운 서비스를 기존 이벤트에 독립적으로 구독시킬 수 있다메시지 신뢰성, 순서 보장, 멱등성 처리를 설계해야 한다
포트와 어댑터 패턴 덕분에 인프라 교체가 용이하다인과 관계 추적(tracing)과 디버깅이 어려워진다

Martin Fowler의 말을 빌리면 이렇다.

“이벤트 알림은 낮은 결합도를 의미하지만, 논리적 흐름이 코드에 명시적으로 드러나지 않아 파악하기 어렵다.”


Chapter 10-11 핵심 정리

두 장을 관통하는 핵심 메시지를 정리하면 이렇다.

Chapter 10: 메시지의 성격을 구분하라.

  • 커맨드 = “해줘” → 실패하면 바로 터뜨려라
  • 이벤트 = “일어났어” → 실패해도 계속 진행하라
  • 이 구분이 장애 격리복원력의 기반이 된다

Chapter 11: 이벤트로 서비스 경계를 넘어라.

  • 동기식 RPC 대신 비동기 이벤트로 통신하라
  • 내부 이벤트와 외부 이벤트를 명확히 구분하라
  • Flask든 Redis든, 진입점이 달라도 내부 처리는 동일하다
1
2
3
4
5
6
7
외부 세계                    내부 세계
──────────────────    ──────────────────────────────
HTTP 요청 ─────┐
               ├──→ 커맨드 → 메시지 버스 → 핸들러
Redis 메시지 ──┘         │
                         ├── 도메인 이벤트 → 내부 핸들러 체이닝
                         └── 외부 이벤트 → Redis 발행 → 다른 서비스

참고 자료

This post is licensed under CC BY 4.0 by the author.