Skip to content

第27章 领域驱动设计

学习目标

完成本章学习后,读者将能够:

  • 理解领域驱动设计的核心概念与理论基础
  • 掌握实体、值对象、聚合的数学定义与设计原则
  • 实现领域服务和应用服务的分层架构
  • 应用仓储和工作单元模式实现持久化抽象
  • 设计领域事件实现跨聚合通信
  • 运用战略设计工具划分限界上下文

27.1 理论基础与形式化定义

27.1.1 领域驱动设计的形式化定义

定义 27.1(领域模型):领域模型 $\mathcal{M}$ 是一个四元组:

$$\mathcal{M} = \langle \mathcal{E}, \mathcal{V}, \mathcal{R}, \mathcal{I} \rangle$$

其中:

  • $\mathcal{E}$:实体集合,具有唯一标识的领域对象
  • $\mathcal{V}$:值对象集合,通过属性值定义的不可变对象
  • $\mathcal{R}$:关系集合,描述对象间的关联
  • $\mathcal{I}$:不变量集合,维护模型一致性的业务规则

定义 27.2(聚合):聚合 $\mathcal{A}$ 是一个二元组:

$$\mathcal{A} = \langle r, {e_1, e_2, ..., e_n} \rangle$$

其中 $r$ 为聚合根,${e_1, e_2, ..., e_n}$ 为内部实体集合,满足:

  • 外部只能通过 $r$ 访问聚合内部
  • 聚合内保证事务一致性
  • 聚合间通过标识引用

定义 27.3(限界上下文):限界上下文 $\mathcal{B}$ 定义为:

$$\mathcal{B} = \langle \mathcal{M}, \mathcal{L}, \mathcal{U} \rangle$$

其中:

  • $\mathcal{M}$:上下文内的领域模型
  • $\mathcal{L}$:通用语言词汇表
  • $\mathcal{U}$:上下文映射关系

27.1.2 历史背景与发展脉络

时期里程碑代表人物/著作核心贡献
1995分析模式Martin Fowler领域建模实践
2003DDD奠基Eric Evans《领域驱动设计》出版
2006领域特定语言Eric Evans, Martin FowlerDSL与领域建模结合
2013DDD战略设计Vaughn Vernon《实现领域驱动设计》
2014微服务架构Sam Newman限界上下文与微服务映射
2016事件风暴Alberto Brandolini可视化领域探索方法
2018函数式DDDScott Wlaschin函数式编程与DDD结合
2020+领域驱动微服务社区实践云原生DDD架构

27.1.3 DDD分层架构

┌─────────────────────────────────────────────────────────────┐
│                    用户界面层 (UI Layer)                      │
│         控制器、视图模型、API端点                              │
├─────────────────────────────────────────────────────────────┤
│                    应用层 (Application Layer)                 │
│         应用服务、命令处理器、事件处理器                       │
├─────────────────────────────────────────────────────────────┤
│                    领域层 (Domain Layer)                      │
│         实体、值对象、聚合、领域服务、领域事件                 │
├─────────────────────────────────────────────────────────────┤
│                    基础设施层 (Infrastructure Layer)          │
│         仓储实现、消息队列、外部服务适配器                     │
└─────────────────────────────────────────────────────────────┘

27.2 实体与值对象

27.2.1 实体的形式化定义

定义 27.4(实体):实体 $e \in \mathcal{E}$ 是一个具有唯一标识的对象,满足:

$$\forall e_1, e_2 \in \mathcal{E}: e_1 = e_2 \Leftrightarrow id(e_1) = id(e_2)$$

实体的相等性由标识决定,而非属性值。

27.2.2 值对象的形式化定义

定义 27.5(值对象):值对象 $v \in \mathcal{V}$ 是不可变对象,满足:

$$\forall v_1, v_2 \in \mathcal{V}: v_1 = v_2 \Leftrightarrow \forall a \in Attr(v): v_1.a = v_2.a$$

值对象的相等性由所有属性值决定。

27.2.3 UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                           <<abstract>>                                   │
│                           Entity                                         │
├─────────────────────────────────────────────────────────────────────────┤
│ - _id: int                                                              │
├─────────────────────────────────────────────────────────────────────────┤
│ + id: int <<property>>                                                  │
│ + __eq__(other): bool                                                   │
│ + __hash__(): int                                                       │
└─────────────────────────────────────────────────────────────────────────┘


                                    │ extends

┌─────────────────────────────────────────────────────────────────────────┐
│                           Customer                                       │
├─────────────────────────────────────────────────────────────────────────┤
│ - _name: str                                                            │
│ - _email: Email                                                         │
│ - _address: Address                                                     │
├─────────────────────────────────────────────────────────────────────────┤
│ + name: str <<property>>                                                │
│ + email: Email <<property>>                                             │
│ + address: Address <<property>>                                         │
│ + change_address(new_address: Address): void                            │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<ValueObject>> Money                                 │
├─────────────────────────────────────────────────────────────────────────┤
│ + amount: float <<frozen>>                                              │
│ + currency: str <<frozen>>                                              │
├─────────────────────────────────────────────────────────────────────────┤
│ + add(other: Money): Money                                              │
│ + multiply(factor: float): Money                                        │
│ + subtract(other: Money): Money                                         │
│ + __eq__(other): bool                                                   │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<ValueObject>> Email                                 │
├─────────────────────────────────────────────────────────────────────────┤
│ + value: str <<frozen>>                                                 │
├─────────────────────────────────────────────────────────────────────────┤
│ + __post_init__(): void                                                 │
│ + __str__(): str                                                        │
│ + domain(): str                                                         │
└─────────────────────────────────────────────────────────────────────────┘

27.2.4 完整实现

python
from dataclasses import dataclass, field
from typing import Optional, List, TypeVar, Generic, Dict, Any
from datetime import datetime
from abc import ABC, abstractmethod
from enum import Enum
import re
from functools import total_ordering

class Entity(ABC):
    def __init__(self, id: int):
        self._id = id
    
    @property
    def id(self) -> int:
        return self._id
    
    def __eq__(self, other) -> bool:
        if not isinstance(other, self.__class__):
            return False
        return self._id == other._id
    
    def __hash__(self) -> int:
        return hash(self._id)
    
    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(id={self._id})"

@total_ordering
@dataclass(frozen=True)
class Money:
    amount: float
    currency: str
    
    def __post_init__(self):
        if self.amount < 0:
            raise ValueError(f"金额不能为负数: {self.amount}")
        if not self.currency or len(self.currency) != 3:
            raise ValueError(f"无效的货币代码: {self.currency}")
    
    def add(self, other: 'Money') -> 'Money':
        if other.currency != self.currency:
            raise ValueError(f"货币类型不匹配: {self.currency} vs {other.currency}")
        return Money(round(self.amount + other.amount, 2), self.currency)
    
    def subtract(self, other: 'Money') -> 'Money':
        if other.currency != self.currency:
            raise ValueError(f"货币类型不匹配: {self.currency} vs {other.currency}")
        result = round(self.amount - other.amount, 2)
        if result < 0:
            raise ValueError("减法结果不能为负数")
        return Money(result, self.currency)
    
    def multiply(self, factor: float) -> 'Money':
        if factor < 0:
            raise ValueError("乘数不能为负数")
        return Money(round(self.amount * factor, 2), self.currency)
    
    def __lt__(self, other: 'Money') -> bool:
        if self.currency != other.currency:
            raise ValueError("无法比较不同货币")
        return self.amount < other.amount
    
    def __str__(self) -> str:
        return f"{self.amount:.2f} {self.currency}"
    
    @classmethod
    def zero(cls, currency: str = "CNY") -> 'Money':
        return cls(0.0, currency)

@dataclass(frozen=True)
class Email:
    value: str
    
    EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
    
    def __post_init__(self):
        if not self.EMAIL_PATTERN.match(self.value):
            raise ValueError(f"无效的邮箱地址: {self.value}")
    
    def __str__(self) -> str:
        return self.value
    
    @property
    def domain(self) -> str:
        return self.value.split('@')[1]
    
    @property
    def local_part(self) -> str:
        return self.value.split('@')[0]

@dataclass(frozen=True)
class Address:
    street: str
    city: str
    postal_code: str
    country: str
    
    def __post_init__(self):
        if not all([self.street, self.city, self.country]):
            raise ValueError("地址字段不能为空")
    
    def __str__(self) -> str:
        return f"{self.street}, {self.city}, {self.postal_code}, {self.country}"
    
    def with_street(self, new_street: str) -> 'Address':
        return Address(new_street, self.city, self.postal_code, self.country)
    
    def with_city(self, new_city: str) -> 'Address':
        return Address(self.street, new_city, self.postal_code, self.country)

@dataclass(frozen=True)
class PhoneNumber:
    value: str
    country_code: str = "+86"
    
    def __post_init__(self):
        digits = re.sub(r'\D', '', self.value)
        if len(digits) < 10 or len(digits) > 15:
            raise ValueError(f"无效的电话号码: {self.value}")
    
    def __str__(self) -> str:
        return f"{self.country_code} {self.value}"

class Customer(Entity):
    def __init__(
        self, 
        id: int, 
        name: str, 
        email: Email, 
        address: Address,
        phone: Optional[PhoneNumber] = None
    ):
        super().__init__(id)
        if not name or len(name.strip()) == 0:
            raise ValueError("客户名称不能为空")
        self._name = name.strip()
        self._email = email
        self._address = address
        self._phone = phone
        self._created_at = datetime.now()
        self._version = 1
    
    @property
    def name(self) -> str:
        return self._name
    
    @property
    def email(self) -> Email:
        return self._email
    
    @property
    def address(self) -> Address:
        return self._address
    
    @property
    def phone(self) -> Optional[PhoneNumber]:
        return self._phone
    
    @property
    def version(self) -> int:
        return self._version
    
    def change_address(self, new_address: Address) -> None:
        self._address = new_address
        self._increment_version()
    
    def change_email(self, new_email: Email) -> None:
        self._email = new_email
        self._increment_version()
    
    def change_phone(self, new_phone: PhoneNumber) -> None:
        self._phone = new_phone
        self._increment_version()
    
    def _increment_version(self) -> None:
        self._version += 1
    
    def __repr__(self) -> str:
        return f"Customer(id={self._id}, name='{self._name}', email={self._email})"

email = Email("zhangsan@example.com")
address = Address("人民路100号", "北京", "100000", "中国")
phone = PhoneNumber("13800138000")
customer = Customer(1, "张三", email, address, phone)

print(f"客户: {customer.name}")
print(f"邮箱: {customer.email}")
print(f"地址: {customer.address}")
print(f"版本: {customer.version}")

customer.change_address(Address("中山路200号", "上海", "200000", "中国"))
print(f"新地址: {customer.address}")
print(f"新版本: {customer.version}")

27.3 聚合与聚合根

27.3.1 聚合设计原则

原则 27.1(聚合边界):聚合边界由事务一致性需求决定:

  • 聚合内:强一致性(ACID事务)
  • 聚合间:最终一致性(领域事件)

原则 27.2(聚合大小):聚合应尽可能小,遵循以下规则:

  1. 通过标识引用其他聚合
  2. 单次事务只修改一个聚合
  3. 使用最终一致性维护聚合间约束

27.3.2 聚合UML图

┌─────────────────────────────────────────────────────────────────────────┐
│                        <<AggregateRoot>>                                 │
│                           Order                                          │
├─────────────────────────────────────────────────────────────────────────┤
│ - _order_id: OrderId                                                    │
│ - _customer_id: CustomerId                                              │
│ - _items: List[OrderItem]                                               │
│ - _status: OrderStatus                                                  │
│ - _created_at: datetime                                                 │
│ - _events: List[DomainEvent]                                            │
├─────────────────────────────────────────────────────────────────────────┤
│ + order_id: OrderId <<property>>                                        │
│ + customer_id: CustomerId <<property>>                                  │
│ + status: OrderStatus <<property>>                                      │
│ + items: List[OrderItem] <<property>>                                   │
│ + add_item(...): void                                                   │
│ + remove_item(product_id: ProductId): void                              │
│ + confirm(): void                                                       │
│ + ship(): void                                                          │
│ + deliver(): void                                                       │
│ + cancel(): void                                                        │
│ + calculate_total(): Money                                              │
│ + pull_domain_events(): List[DomainEvent]                               │
└─────────────────────────────────────────────────────────────────────────┘

                                    │ contains


┌─────────────────────────────────────────────────────────────────────────┐
│                        <<Entity>> OrderItem                              │
├─────────────────────────────────────────────────────────────────────────┤
│ - _id: OrderItemId                                                      │
│ - _product_id: ProductId                                                │
│ - _product_name: str                                                    │
│ - _unit_price: Money                                                    │
│ - _quantity: int                                                        │
├─────────────────────────────────────────────────────────────────────────┤
│ + id: OrderItemId <<property>>                                          │
│ + product_id: ProductId <<property>>                                    │
│ + product_name: str <<property>>                                        │
│ + unit_price: Money <<property>>                                        │
│ + quantity: int <<property>>                                            │
│ + calculate_total(): Money                                              │
│ + update_quantity(new_quantity: int): void                              │
└─────────────────────────────────────────────────────────────────────────┘

        ┌──────────────┐                    ┌──────────────┐
        │   Customer   │                    │   Product    │
        │  (Aggregate) │                    │  (Aggregate) │
        └──────────────┘                    └──────────────┘
              ↑                                   ↑
              │ reference by ID                   │ reference by ID
              │                                   │
        ┌─────┴───────────────────────────────────┴─────┐
        │                   Order                        │
        │                (Aggregate)                     │
        └────────────────────────────────────────────────┘

27.3.3 聚合完整实现

python
from dataclasses import dataclass, field
from typing import List, Optional, Dict, TypeVar, Generic
from datetime import datetime
from enum import Enum
from copy import deepcopy

class OrderStatus(Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    PAID = "paid"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"
    
    def can_transition_to(self, target: 'OrderStatus') -> bool:
        transitions = {
            OrderStatus.PENDING: {OrderStatus.CONFIRMED, OrderStatus.CANCELLED},
            OrderStatus.CONFIRMED: {OrderStatus.PAID, OrderStatus.CANCELLED},
            OrderStatus.PAID: {OrderStatus.SHIPPED, OrderStatus.CANCELLED},
            OrderStatus.SHIPPED: {OrderStatus.DELIVERED},
            OrderStatus.DELIVERED: set(),
            OrderStatus.CANCELLED: set(),
        }
        return target in transitions.get(self, set())

@dataclass(frozen=True)
class OrderItemId:
    value: int
    
    def __post_init__(self):
        if self.value <= 0:
            raise ValueError("订单项ID必须为正整数")

@dataclass(frozen=True)
class ProductId:
    value: int
    
    def __post_init__(self):
        if self.value <= 0:
            raise ValueError("产品ID必须为正整数")

@dataclass(frozen=True)
class OrderId:
    value: int
    
    def __post_init__(self):
        if self.value <= 0:
            raise ValueError("订单ID必须为正整数")

@dataclass(frozen=True)
class CustomerId:
    value: int
    
    def __post_init__(self):
        if self.value <= 0:
            raise ValueError("客户ID必须为正整数")

class OrderItem(Entity):
    def __init__(
        self, 
        id: OrderItemId, 
        product_id: ProductId, 
        product_name: str, 
        unit_price: Money, 
        quantity: int
    ):
        super().__init__(id.value)
        if quantity <= 0:
            raise ValueError("数量必须大于0")
        if not product_name or len(product_name.strip()) == 0:
            raise ValueError("产品名称不能为空")
        
        self._id = id
        self._product_id = product_id
        self._product_name = product_name.strip()
        self._unit_price = unit_price
        self._quantity = quantity
    
    @property
    def order_item_id(self) -> OrderItemId:
        return self._id
    
    @property
    def product_id(self) -> ProductId:
        return self._product_id
    
    @property
    def product_name(self) -> str:
        return self._product_name
    
    @property
    def unit_price(self) -> Money:
        return self._unit_price
    
    @property
    def quantity(self) -> int:
        return self._quantity
    
    def calculate_total(self) -> Money:
        return self._unit_price.multiply(self._quantity)
    
    def update_quantity(self, new_quantity: int) -> None:
        if new_quantity <= 0:
            raise ValueError("数量必须大于0")
        self._quantity = new_quantity
    
    def __repr__(self) -> str:
        return f"OrderItem(product={self._product_name}, qty={self._quantity}, price={self._unit_price})"

class DomainEvent(ABC):
    def __init__(self):
        self.occurred_at = datetime.now()
        self.event_id = str(uuid.uuid4())

class OrderCreatedEvent(DomainEvent):
    def __init__(self, order_id: OrderId, customer_id: CustomerId):
        super().__init__()
        self.order_id = order_id
        self.customer_id = customer_id

class OrderConfirmedEvent(DomainEvent):
    def __init__(self, order_id: OrderId, total: Money):
        super().__init__()
        self.order_id = order_id
        self.total = total

class OrderShippedEvent(DomainEvent):
    def __init__(self, order_id: OrderId, tracking_number: str):
        super().__init__()
        self.order_id = order_id
        self.tracking_number = tracking_number

class OrderCancelledEvent(DomainEvent):
    def __init__(self, order_id: OrderId, reason: str):
        super().__init__()
        self.order_id = order_id
        self.reason = reason

class OrderItemAddedEvent(DomainEvent):
    def __init__(self, order_id: OrderId, product_id: ProductId, quantity: int):
        super().__init__()
        self.order_id = order_id
        self.product_id = product_id
        self.quantity = quantity

import uuid

class Order(Entity):
    MAX_ITEMS = 50
    
    def __init__(self, id: OrderId, customer_id: CustomerId):
        super().__init__(id.value)
        self._order_id = id
        self._customer_id = customer_id
        self._items: List[OrderItem] = []
        self._status = OrderStatus.PENDING
        self._created_at = datetime.now()
        self._updated_at = datetime.now()
        self._next_item_id = 1
        self._events: List[DomainEvent] = []
        self._version = 1
        
        self._add_event(OrderCreatedEvent(id, customer_id))
    
    @property
    def order_id(self) -> OrderId:
        return self._order_id
    
    @property
    def customer_id(self) -> CustomerId:
        return self._customer_id
    
    @property
    def status(self) -> OrderStatus:
        return self._status
    
    @property
    def items(self) -> List[OrderItem]:
        return self._items.copy()
    
    @property
    def created_at(self) -> datetime:
        return self._created_at
    
    @property
    def updated_at(self) -> datetime:
        return self._updated_at
    
    @property
    def version(self) -> int:
        return self._version
    
    def add_item(
        self, 
        product_id: ProductId, 
        product_name: str, 
        unit_price: Money, 
        quantity: int
    ) -> None:
        self._ensure_status(OrderStatus.PENDING, "只能修改待确认的订单")
        
        if len(self._items) >= self.MAX_ITEMS:
            raise ValueError(f"订单项数量超过上限: {self.MAX_ITEMS}")
        
        existing_item = self._find_item_by_product(product_id)
        if existing_item:
            existing_item.update_quantity(existing_item.quantity + quantity)
        else:
            item = OrderItem(
                OrderItemId(self._next_item_id),
                product_id,
                product_name,
                unit_price,
                quantity
            )
            self._items.append(item)
            self._next_item_id += 1
        
        self._add_event(OrderItemAddedEvent(self._order_id, product_id, quantity))
        self._touch()
    
    def remove_item(self, product_id: ProductId) -> None:
        self._ensure_status(OrderStatus.PENDING, "只能修改待确认的订单")
        
        original_count = len(self._items)
        self._items = [
            item for item in self._items 
            if item.product_id != product_id
        ]
        
        if len(self._items) == original_count:
            raise ValueError(f"产品不存在于订单中: {product_id}")
        
        self._touch()
    
    def confirm(self) -> None:
        self._ensure_status(OrderStatus.PENDING, "只能确认待确认的订单")
        
        if not self._items:
            raise ValueError("订单不能为空")
        
        self._status = OrderStatus.CONFIRMED
        self._add_event(OrderConfirmedEvent(self._order_id, self.calculate_total()))
        self._touch()
    
    def pay(self) -> None:
        self._ensure_status(OrderStatus.CONFIRMED, "只能支付已确认的订单")
        self._status = OrderStatus.PAID
        self._touch()
    
    def ship(self, tracking_number: str) -> None:
        self._ensure_status(OrderStatus.PAID, "只能发货已支付的订单")
        
        if not tracking_number or len(tracking_number.strip()) == 0:
            raise ValueError("快递单号不能为空")
        
        self._status = OrderStatus.SHIPPED
        self._add_event(OrderShippedEvent(self._order_id, tracking_number))
        self._touch()
    
    def deliver(self) -> None:
        self._ensure_status(OrderStatus.SHIPPED, "只能签收已发货的订单")
        self._status = OrderStatus.DELIVERED
        self._touch()
    
    def cancel(self, reason: str = "") -> None:
        if self._status in (OrderStatus.SHIPPED, OrderStatus.DELIVERED):
            raise ValueError("无法取消已发货或已签收的订单")
        
        self._status = OrderStatus.CANCELLED
        self._add_event(OrderCancelledEvent(self._order_id, reason))
        self._touch()
    
    def calculate_total(self) -> Money:
        if not self._items:
            return Money.zero()
        
        currency = self._items[0].unit_price.currency
        total = Money.zero(currency)
        for item in self._items:
            total = total.add(item.calculate_total())
        return total
    
    def item_count(self) -> int:
        return sum(item.quantity for item in self._items)
    
    def pull_domain_events(self) -> List[DomainEvent]:
        events = self._events.copy()
        self._events.clear()
        return events
    
    def _find_item_by_product(self, product_id: ProductId) -> Optional[OrderItem]:
        for item in self._items:
            if item.product_id == product_id:
                return item
        return None
    
    def _ensure_status(self, expected: OrderStatus, message: str) -> None:
        if self._status != expected:
            raise ValueError(f"{message}, 当前状态: {self._status.value}")
    
    def _add_event(self, event: DomainEvent) -> None:
        self._events.append(event)
    
    def _touch(self) -> None:
        self._updated_at = datetime.now()
        self._version += 1
    
    def __repr__(self) -> str:
        return f"Order(id={self._order_id.value}, status={self._status.value}, items={len(self._items)})"

order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 2)
order.add_item(ProductId(2), "商品B", Money(50, "CNY"), 3)

print(f"订单总额: {order.calculate_total()}")
print(f"商品数量: {order.item_count()}")

order.confirm()
print(f"订单状态: {order.status.value}")
print(f"领域事件数量: {len(order.pull_domain_events())}")

27.4 领域服务

27.4.1 领域服务的定义

定义 27.6(领域服务):领域服务 $s$ 是一个操作,满足:

  • 不自然属于任何实体或值对象
  • 使用领域概念定义接口
  • 是无状态的

形式化表示: $$s: \mathcal{E}^n \times \mathcal{V}^m \rightarrow \mathcal{E}^p \times \mathcal{V}^q$$

27.4.2 领域服务UML图

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<DomainService>>                                     │
│                    PricingService                                        │
├─────────────────────────────────────────────────────────────────────────┤
│ - _discount_rules: List[DiscountRule]                                   │
│ - _tax_rate: float                                                      │
├─────────────────────────────────────────────────────────────────────────┤
│ + calculate_discount(total: Money): Money                               │
│ + calculate_tax(subtotal: Money): Money                                 │
│ + calculate_final_price(order: Order): Money                            │
│ + apply_promotion(order: Order, code: str): Money                       │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<DomainService>>                                     │
│                    InventoryService                                      │
├─────────────────────────────────────────────────────────────────────────┤
│ - _stock: Dict[ProductId, int]                                          │
│ - _reservations: Dict[ProductId, List[Reservation]]                     │
├─────────────────────────────────────────────────────────────────────────┤
│ + check_availability(product_id: ProductId, qty: int): bool             │
│ + reserve(product_id: ProductId, qty: int, order_id: OrderId): bool     │
│ + release(product_id: ProductId, qty: int, order_id: OrderId): void     │
│ + commit_reservation(order_id: OrderId): void                           │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<DomainService>>                                     │
│                    TransferService                                       │
├─────────────────────────────────────────────────────────────────────────┤
│ + transfer(from: Account, to: Account, amount: Money): void             │
│ + validate_transfer(from: Account, to: Account, amount: Money): bool    │
└─────────────────────────────────────────────────────────────────────────┘

27.4.3 领域服务实现

python
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Protocol
from dataclasses import dataclass
from enum import Enum

@dataclass(frozen=True)
class DiscountRule:
    min_amount: float
    discount_percent: float
    name: str = ""
    
    def applies_to(self, amount: float) -> bool:
        return amount >= self.min_amount
    
    def calculate_discount(self, amount: float) -> float:
        return amount * (self.discount_percent / 100)

@dataclass
class PromotionCode:
    code: str
    discount_percent: float
    min_order_amount: float
    expires_at: datetime
    max_uses: int
    current_uses: int = 0
    
    def is_valid(self) -> bool:
        return (
            datetime.now() < self.expires_at and
            self.current_uses < self.max_uses
        )
    
    def use(self) -> None:
        self.current_uses += 1

class PricingService:
    def __init__(
        self, 
        discount_rules: List[DiscountRule],
        tax_rate: float = 0.0,
        promotions: Optional[Dict[str, PromotionCode]] = None
    ):
        self._discount_rules = sorted(
            discount_rules, 
            key=lambda r: r.min_amount, 
            reverse=True
        )
        self._tax_rate = tax_rate
        self._promotions = promotions or {}
    
    def calculate_discount(self, total: Money) -> Money:
        for rule in self._discount_rules:
            if rule.applies_to(total.amount):
                discount = rule.calculate_discount(total.amount)
                return Money(round(discount, 2), total.currency)
        return Money.zero(total.currency)
    
    def calculate_tax(self, subtotal: Money) -> Money:
        tax = subtotal.amount * self._tax_rate
        return Money(round(tax, 2), subtotal.currency)
    
    def calculate_final_price(self, order: Order) -> Money:
        subtotal = order.calculate_total()
        discount = self.calculate_discount(subtotal)
        after_discount = subtotal.subtract(discount)
        tax = self.calculate_tax(after_discount)
        return after_discount.add(tax)
    
    def apply_promotion(self, order: Order, code: str) -> Money:
        if code not in self._promotions:
            raise ValueError(f"无效的促销码: {code}")
        
        promotion = self._promotions[code]
        if not promotion.is_valid():
            raise ValueError(f"促销码已过期或已达使用上限: {code}")
        
        subtotal = order.calculate_total()
        if subtotal.amount < promotion.min_order_amount:
            raise ValueError(
                f"订单金额不足,最低需要: {promotion.min_order_amount}"
            )
        
        discount = subtotal.amount * (promotion.discount_percent / 100)
        promotion.use()
        
        return Money(round(discount, 2), subtotal.currency)
    
    def get_applicable_discount(self, amount: Money) -> Optional[DiscountRule]:
        for rule in self._discount_rules:
            if rule.applies_to(amount.amount):
                return rule
        return None

@dataclass
class Reservation:
    product_id: ProductId
    quantity: int
    order_id: OrderId
    reserved_at: datetime
    expires_at: datetime
    
    def is_expired(self) -> bool:
        return datetime.now() > self.expires_at

class InventoryService:
    RESERVATION_TIMEOUT_MINUTES = 30
    
    def __init__(self):
        self._stock: Dict[ProductId, int] = {}
        self._reservations: Dict[OrderId, List[Reservation]] = {}
    
    def set_stock(self, product_id: ProductId, quantity: int) -> None:
        if quantity < 0:
            raise ValueError("库存不能为负数")
        self._stock[product_id] = quantity
    
    def get_stock(self, product_id: ProductId) -> int:
        return self._stock.get(product_id, 0)
    
    def get_available_stock(self, product_id: ProductId) -> int:
        total = self._stock.get(product_id, 0)
        reserved = sum(
            r.quantity 
            for reservations in self._reservations.values()
            for r in reservations
            if r.product_id == product_id and not r.is_expired()
        )
        return total - reserved
    
    def check_availability(self, product_id: ProductId, quantity: int) -> bool:
        return self.get_available_stock(product_id) >= quantity
    
    def reserve(
        self, 
        product_id: ProductId, 
        quantity: int, 
        order_id: OrderId
    ) -> bool:
        if not self.check_availability(product_id, quantity):
            return False
        
        now = datetime.now()
        expires_at = datetime.fromtimestamp(
            now.timestamp() + self.RESERVATION_TIMEOUT_MINUTES * 60
        )
        
        reservation = Reservation(
            product_id=product_id,
            quantity=quantity,
            order_id=order_id,
            reserved_at=now,
            expires_at=expires_at
        )
        
        if order_id not in self._reservations:
            self._reservations[order_id] = []
        self._reservations[order_id].append(reservation)
        
        return True
    
    def release(self, product_id: ProductId, quantity: int, order_id: OrderId) -> None:
        if order_id not in self._reservations:
            return
        
        self._reservations[order_id] = [
            r for r in self._reservations[order_id]
            if not (r.product_id == product_id and r.quantity == quantity)
        ]
    
    def commit_reservation(self, order_id: OrderId) -> None:
        if order_id not in self._reservations:
            return
        
        for reservation in self._reservations[order_id]:
            current_stock = self._stock.get(reservation.product_id, 0)
            self._stock[reservation.product_id] = current_stock - reservation.quantity
        
        del self._reservations[order_id]
    
    def cancel_reservation(self, order_id: OrderId) -> None:
        if order_id in self._reservations:
            del self._reservations[order_id]
    
    def cleanup_expired_reservations(self) -> int:
        expired_count = 0
        for order_id in list(self._reservations.keys()):
            original_len = len(self._reservations[order_id])
            self._reservations[order_id] = [
                r for r in self._reservations[order_id]
                if not r.is_expired()
            ]
            expired_count += original_len - len(self._reservations[order_id])
        return expired_count

class TransferService:
    def transfer(
        self, 
        from_account: 'Account', 
        to_account: 'Account', 
        amount: Money
    ) -> None:
        self.validate_transfer(from_account, to_account, amount)
        
        from_account.debit(amount)
        to_account.credit(amount)
    
    def validate_transfer(
        self, 
        from_account: 'Account', 
        to_account: 'Account', 
        amount: Money
    ) -> bool:
        if amount.amount <= 0:
            raise ValueError("转账金额必须大于0")
        
        if from_account.id == to_account.id:
            raise ValueError("不能向同一账户转账")
        
        if from_account.balance < amount:
            raise ValueError("账户余额不足")
        
        if from_account.currency != to_account.currency:
            raise ValueError("账户货币类型不匹配")
        
        return True

pricing_service = PricingService([
    DiscountRule(500, 5, "普通折扣"),
    DiscountRule(1000, 10, "高级折扣"),
    DiscountRule(2000, 15, "VIP折扣"),
], tax_rate=0.13)

order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(600, "CNY"), 2)
order.add_item(ProductId(2), "商品B", Money(400, "CNY"), 1)

subtotal = order.calculate_total()
final_price = pricing_service.calculate_final_price(order)
discount_rule = pricing_service.get_applicable_discount(subtotal)

print(f"小计: {subtotal}")
print(f"适用折扣: {discount_rule.name if discount_rule else '无'}")
print(f"最终价格(含税): {final_price}")

27.5 仓储模式

27.5.1 仓储的形式化定义

定义 27.7(仓储):仓储 $R$ 是聚合的集合抽象,提供以下操作:

$$R: \mathcal{A} \rightarrow \mathcal{P}(\mathcal{A})$$

其中 $\mathcal{P}(\mathcal{A})$ 表示聚合的幂集。基本操作:

  • $find(id): \mathcal{A} \cup {\bot}$ — 按标识查找
  • $save(a): \mathcal{A}$ — 持久化聚合
  • $delete(a): \emptyset$ — 删除聚合

27.5.2 仓储UML图

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<interface>>                                         │
│                    Repository<T>                                         │
├─────────────────────────────────────────────────────────────────────────┤
│ + find_by_id(id: int): Optional<T>                                      │
│ + find_all(): List<T>                                                   │
│ + save(entity: T): T                                                    │
│ + delete(entity: T): void                                               │
└─────────────────────────────────────────────────────────────────────────┘


                                    │ implements

┌─────────────────────────────────────────────────────────────────────────┐
│                    OrderRepository                                       │
├─────────────────────────────────────────────────────────────────────────┤
│ - _orders: Dict[int, Order]                                             │
│ - _specifications: Dict[str, Specification]                             │
├─────────────────────────────────────────────────────────────────────────┤
│ + find_by_id(id: int): Optional[Order]                                  │
│ + find_by_customer(customer_id: CustomerId): List[Order]                │
│ + find_by_status(status: OrderStatus): List[Order]                      │
│ + find_by_specification(spec: Specification): List[Order]               │
│ + find_all(): List[Order]                                               │
│ + save(order: Order): Order                                             │
│ + delete(order: Order): void                                            │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<interface>>                                         │
│                    Specification<T>                                      │
├─────────────────────────────────────────────────────────────────────────┤
│ + is_satisfied_by(entity: T): bool                                      │
│ + and(other: Specification): Specification                              │
│ + or(other: Specification): Specification                               │
│ + not(): Specification                                                  │
└─────────────────────────────────────────────────────────────────────────┘

27.5.3 仓储实现

python
from abc import ABC, abstractmethod
from typing import Optional, List, TypeVar, Generic, Callable, Any
from contextlib import contextmanager
from dataclasses import dataclass
import threading

T = TypeVar('T')

class Specification(ABC, Generic[T]):
    @abstractmethod
    def is_satisfied_by(self, entity: T) -> bool:
        pass
    
    def __and__(self, other: 'Specification[T]') -> 'Specification[T]':
        return AndSpecification(self, other)
    
    def __or__(self, other: 'Specification[T]') -> 'Specification[T]':
        return OrSpecification(self, other)
    
    def __invert__(self) -> 'Specification[T]':
        return NotSpecification(self)

class AndSpecification(Specification[T]):
    def __init__(self, left: Specification[T], right: Specification[T]):
        self._left = left
        self._right = right
    
    def is_satisfied_by(self, entity: T) -> bool:
        return self._left.is_satisfied_by(entity) and self._right.is_satisfied_by(entity)

class OrSpecification(Specification[T]):
    def __init__(self, left: Specification[T], right: Specification[T]):
        self._left = left
        self._right = right
    
    def is_satisfied_by(self, entity: T) -> bool:
        return self._left.is_satisfied_by(entity) or self._right.is_satisfied_by(entity)

class NotSpecification(Specification[T]):
    def __init__(self, spec: Specification[T]):
        self._spec = spec
    
    def is_satisfied_by(self, entity: T) -> bool:
        return not self._spec.is_satisfied_by(entity)

class OrderStatusSpecification(Specification[Order]):
    def __init__(self, status: OrderStatus):
        self._status = status
    
    def is_satisfied_by(self, order: Order) -> bool:
        return order.status == self._status

class CustomerSpecification(Specification[Order]):
    def __init__(self, customer_id: CustomerId):
        self._customer_id = customer_id
    
    def is_satisfied_by(self, order: Order) -> bool:
        return order.customer_id == self._customer_id

class DateRangeSpecification(Specification[Order]):
    def __init__(self, start: datetime, end: datetime):
        self._start = start
        self._end = end
    
    def is_satisfied_by(self, order: Order) -> bool:
        return self._start <= order.created_at <= self._end

class Repository(ABC, Generic[T]):
    @abstractmethod
    def find_by_id(self, id: int) -> Optional[T]:
        pass
    
    @abstractmethod
    def find_all(self) -> List[T]:
        pass
    
    @abstractmethod
    def save(self, entity: T) -> T:
        pass
    
    @abstractmethod
    def delete(self, entity: T) -> None:
        pass
    
    def find_by_specification(self, spec: Specification[T]) -> List[T]:
        return [e for e in self.find_all() if spec.is_satisfied_by(e)]

class OrderRepository(Repository[Order]):
    def __init__(self):
        self._orders: Dict[int, Order] = {}
        self._lock = threading.RLock()
    
    def find_by_id(self, id: int) -> Optional[Order]:
        with self._lock:
            return self._orders.get(id)
    
    def find_by_customer(self, customer_id: CustomerId) -> List[Order]:
        spec = CustomerSpecification(customer_id)
        return self.find_by_specification(spec)
    
    def find_by_status(self, status: OrderStatus) -> List[Order]:
        spec = OrderStatusSpecification(status)
        return self.find_by_specification(spec)
    
    def find_by_date_range(self, start: datetime, end: datetime) -> List[Order]:
        spec = DateRangeSpecification(start, end)
        return self.find_by_specification(spec)
    
    def find_pending_orders(self) -> List[Order]:
        return self.find_by_status(OrderStatus.PENDING)
    
    def find_all(self) -> List[Order]:
        with self._lock:
            return list(self._orders.values())
    
    def save(self, order: Order) -> Order:
        with self._lock:
            self._orders[order.id] = order
            return order
    
    def delete(self, order: Order) -> None:
        with self._lock:
            self._orders.pop(order.id, None)
    
    def count(self) -> int:
        with self._lock:
            return len(self._orders)
    
    def exists(self, id: int) -> bool:
        with self._lock:
            return id in self._orders

class UnitOfWork(ABC):
    @abstractmethod
    def commit(self) -> None:
        pass
    
    @abstractmethod
    def rollback(self) -> None:
        pass
    
    @abstractmethod
    def register_new(self, entity: Any) -> None:
        pass
    
    @abstractmethod
    def register_modified(self, entity: Any) -> None:
        pass
    
    @abstractmethod
    def register_deleted(self, entity: Any) -> None:
        pass

class OrderUnitOfWork(UnitOfWork):
    def __init__(self, order_repository: OrderRepository):
        self.order_repository = order_repository
        self._new_objects: List[Order] = []
        self._modified_objects: List[Order] = []
        self._deleted_objects: List[Order] = []
        self._snapshots: Dict[int, Order] = {}
        self._lock = threading.RLock()
    
    def register_new(self, order: Order) -> None:
        with self._lock:
            if order.id in self._snapshots:
                raise ValueError(f"订单已存在: {order.id}")
            self._new_objects.append(order)
    
    def register_modified(self, order: Order) -> None:
        with self._lock:
            if order not in self._new_objects:
                self._modified_objects.append(order)
    
    def register_deleted(self, order: Order) -> None:
        with self._lock:
            self._deleted_objects.append(order)
    
    def commit(self) -> None:
        with self._lock:
            try:
                for order in self._new_objects:
                    self.order_repository.save(order)
                
                for order in self._modified_objects:
                    self.order_repository.save(order)
                
                for order in self._deleted_objects:
                    self.order_repository.delete(order)
                
                self._clear()
            except Exception as e:
                self.rollback()
                raise
    
    def rollback(self) -> None:
        with self._lock:
            for order_id, snapshot in self._snapshots.items():
                self.order_repository.save(snapshot)
            self._clear()
    
    def _clear(self) -> None:
        self._new_objects.clear()
        self._modified_objects.clear()
        self._deleted_objects.clear()
        self._snapshots.clear()
    
    @contextmanager
    def transaction(self):
        try:
            yield self
            self.commit()
        except Exception:
            self.rollback()
            raise

order_repo = OrderRepository()
uow = OrderUnitOfWork(order_repo)

with uow.transaction():
    order = Order(OrderId(1), CustomerId(100))
    order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 2)
    uow.register_new(order)

print(f"仓储中的订单数量: {order_repo.count()}")
print(f"订单: {order_repo.find_by_id(1)}")

27.6 应用服务

27.6.1 应用服务的职责

应用服务负责:

  1. 用例编排:协调领域对象完成业务用例
  2. 事务管理:确保操作的原子性
  3. 权限控制:验证用户权限
  4. DTO转换:隔离领域模型与外部表示

27.6.2 应用服务UML图

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<ApplicationService>>                                │
│                    OrderApplicationService                               │
├─────────────────────────────────────────────────────────────────────────┤
│ - _order_repository: OrderRepository                                    │
│ - _pricing_service: PricingService                                      │
│ - _inventory_service: InventoryService                                  │
│ - _event_publisher: DomainEventPublisher                                │
│ - _unit_of_work: UnitOfWork                                             │
├─────────────────────────────────────────────────────────────────────────┤
│ + create_order(cmd: CreateOrderCommand): OrderDTO                       │
│ + add_item(cmd: AddItemCommand): OrderDTO                               │
│ + remove_item(cmd: RemoveItemCommand): OrderDTO                         │
│ + confirm_order(order_id: int): OrderDTO                                │
│ + cancel_order(cmd: CancelOrderCommand): OrderDTO                       │
│ + get_order(order_id: int): Optional[OrderDTO]                          │
│ + get_customer_orders(customer_id: int): List[OrderDTO]                 │
└─────────────────────────────────────────────────────────────────────────┘

                                    │ uses

        ┌───────────────────────────┼───────────────────────────┐
        │                           │                           │
        ▼                           ▼                           ▼
┌───────────────┐          ┌───────────────┐          ┌───────────────┐
│   Repository  │          │ DomainService │          │ EventPublisher│
└───────────────┘          └───────────────┘          └───────────────┘

27.6.3 应用服务实现

python
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from functools import wraps

def transactional(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        with self._unit_of_work.transaction():
            return func(self, *args, **kwargs)
    return wrapper

@dataclass
class CreateOrderCommand:
    customer_id: int
    items: List[Dict[str, Any]]

@dataclass
class AddItemCommand:
    order_id: int
    product_id: int
    product_name: str
    unit_price: float
    quantity: int

@dataclass
class RemoveItemCommand:
    order_id: int
    product_id: int

@dataclass
class ConfirmOrderCommand:
    order_id: int

@dataclass
class CancelOrderCommand:
    order_id: int
    reason: str

@dataclass
class ShipOrderCommand:
    order_id: int
    tracking_number: str

@dataclass
class OrderDTO:
    id: int
    customer_id: int
    status: str
    total: float
    tax: float
    items: List[Dict[str, Any]]
    created_at: str
    version: int
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "customer_id": self.customer_id,
            "status": self.status,
            "total": self.total,
            "tax": self.tax,
            "items": self.items,
            "created_at": self.created_at,
            "version": self.version
        }

class ApplicationError(Exception):
    def __init__(self, message: str, code: str = "APPLICATION_ERROR"):
        super().__init__(message)
        self.code = code

class OrderNotFoundError(ApplicationError):
    def __init__(self, order_id: int):
        super().__init__(f"订单不存在: {order_id}", "ORDER_NOT_FOUND")
        self.order_id = order_id

class OrderValidationError(ApplicationError):
    def __init__(self, message: str):
        super().__init__(message, "ORDER_VALIDATION_ERROR")

class OrderApplicationService:
    def __init__(
        self, 
        order_repository: OrderRepository,
        pricing_service: PricingService,
        inventory_service: InventoryService,
        event_publisher: 'DomainEventPublisher',
        unit_of_work: UnitOfWork
    ):
        self._order_repository = order_repository
        self._pricing_service = pricing_service
        self._inventory_service = inventory_service
        self._event_publisher = event_publisher
        self._unit_of_work = unit_of_work
        self._next_order_id = 1
    
    def create_order(self, command: CreateOrderCommand) -> OrderDTO:
        order = Order(
            OrderId(self._next_order_id),
            CustomerId(command.customer_id)
        )
        self._next_order_id += 1
        
        for item in command.items:
            product_id = ProductId(item["product_id"])
            quantity = item["quantity"]
            
            if not self._inventory_service.check_availability(product_id, quantity):
                raise OrderValidationError(
                    f"库存不足: 产品 {item['product_name']}"
                )
            
            self._inventory_service.reserve(product_id, quantity, order.order_id)
            
            order.add_item(
                product_id,
                item["product_name"],
                Money(item["unit_price"], "CNY"),
                quantity
            )
        
        saved_order = self._order_repository.save(order)
        self._publish_events(saved_order)
        
        return self._to_dto(saved_order)
    
    def add_item(self, command: AddItemCommand) -> OrderDTO:
        order = self._get_order_or_fail(command.order_id)
        
        product_id = ProductId(command.product_id)
        if not self._inventory_service.check_availability(product_id, command.quantity):
            raise OrderValidationError(f"库存不足: 产品 {command.product_name}")
        
        self._inventory_service.reserve(product_id, command.quantity, order.order_id)
        
        order.add_item(
            product_id,
            command.product_name,
            Money(command.unit_price, "CNY"),
            command.quantity
        )
        
        saved_order = self._order_repository.save(order)
        self._publish_events(saved_order)
        
        return self._to_dto(saved_order)
    
    def remove_item(self, command: RemoveItemCommand) -> OrderDTO:
        order = self._get_order_or_fail(command.order_id)
        
        product_id = ProductId(command.product_id)
        for item in order.items:
            if item.product_id == product_id:
                self._inventory_service.release(
                    product_id, item.quantity, order.order_id
                )
                break
        
        order.remove_item(product_id)
        saved_order = self._order_repository.save(order)
        self._publish_events(saved_order)
        
        return self._to_dto(saved_order)
    
    def confirm_order(self, order_id: int) -> OrderDTO:
        order = self._get_order_or_fail(order_id)
        order.confirm()
        
        self._inventory_service.commit_reservation(order.order_id)
        
        saved_order = self._order_repository.save(order)
        self._publish_events(saved_order)
        
        return self._to_dto(saved_order)
    
    def cancel_order(self, command: CancelOrderCommand) -> OrderDTO:
        order = self._get_order_or_fail(command.order_id)
        order.cancel(command.reason)
        
        self._inventory_service.cancel_reservation(order.order_id)
        
        saved_order = self._order_repository.save(order)
        self._publish_events(saved_order)
        
        return self._to_dto(saved_order)
    
    def ship_order(self, command: ShipOrderCommand) -> OrderDTO:
        order = self._get_order_or_fail(command.order_id)
        order.ship(command.tracking_number)
        
        saved_order = self._order_repository.save(order)
        self._publish_events(saved_order)
        
        return self._to_dto(saved_order)
    
    def get_order(self, order_id: int) -> Optional[OrderDTO]:
        order = self._order_repository.find_by_id(order_id)
        return self._to_dto(order) if order else None
    
    def get_customer_orders(self, customer_id: int) -> List[OrderDTO]:
        orders = self._order_repository.find_by_customer(CustomerId(customer_id))
        return [self._to_dto(order) for order in orders]
    
    def get_orders_by_status(self, status: str) -> List[OrderDTO]:
        try:
            order_status = OrderStatus(status)
        except ValueError:
            raise ApplicationError(f"无效的订单状态: {status}")
        
        orders = self._order_repository.find_by_status(order_status)
        return [self._to_dto(order) for order in orders]
    
    def _get_order_or_fail(self, order_id: int) -> Order:
        order = self._order_repository.find_by_id(order_id)
        if not order:
            raise OrderNotFoundError(order_id)
        return order
    
    def _publish_events(self, order: Order) -> None:
        events = order.pull_domain_events()
        for event in events:
            self._event_publisher.publish(event)
    
    def _to_dto(self, order: Order) -> OrderDTO:
        subtotal = order.calculate_total()
        tax = self._pricing_service.calculate_tax(subtotal)
        
        return OrderDTO(
            id=order.id,
            customer_id=order.customer_id.value,
            status=order.status.value,
            total=subtotal.amount,
            tax=tax.amount,
            items=[
                {
                    "product_id": item.product_id.value,
                    "product_name": item.product_name,
                    "quantity": item.quantity,
                    "unit_price": item.unit_price.amount,
                    "total": item.calculate_total().amount
                }
                for item in order.items
            ],
            created_at=order.created_at.isoformat(),
            version=order.version
        )

class DomainEventPublisher:
    def __init__(self):
        self._handlers: Dict[type, List[Callable]] = {}
    
    def subscribe(self, event_type: type, handler: Callable) -> None:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    def publish(self, event: DomainEvent) -> None:
        handlers = self._handlers.get(type(event), [])
        for handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"事件处理失败: {e}")

event_publisher = DomainEventPublisher()
event_publisher.subscribe(
    OrderConfirmedEvent, 
    lambda e: print(f"[通知] 订单 {e.order_id.value} 已确认,总额: {e.total}")
)
event_publisher.subscribe(
    OrderShippedEvent, 
    lambda e: print(f"[通知] 订单 {e.order_id.value} 已发货,快递单号: {e.tracking_number}")
)

order_repo = OrderRepository()
pricing_svc = PricingService([DiscountRule(1000, 10)], tax_rate=0.13)
inventory_svc = InventoryService()
uow = OrderUnitOfWork(order_repo)

inventory_svc.set_stock(ProductId(1), 100)
inventory_svc.set_stock(ProductId(2), 50)

app_service = OrderApplicationService(
    order_repo, pricing_svc, inventory_svc, event_publisher, uow
)

order_dto = app_service.create_order(CreateOrderCommand(
    customer_id=100,
    items=[
        {"product_id": 1, "product_name": "商品A", "unit_price": 500, "quantity": 2},
        {"product_id": 2, "product_name": "商品B", "unit_price": 300, "quantity": 1}
    ]
))

print(f"订单ID: {order_dto.id}")
print(f"状态: {order_dto.status}")
print(f"总额: {order_dto.total}")
print(f"税额: {order_dto.tax}")

confirmed_dto = app_service.confirm_order(order_dto.id)
print(f"确认后状态: {confirmed_dto.status}")

27.7 领域事件

27.7.1 领域事件的形式化定义

定义 27.8(领域事件):领域事件 $e$ 是领域内发生的具有业务意义的事实:

$$e = \langle type, data, timestamp, causation_id, correlation_id \rangle$$

其中:

  • $type$:事件类型
  • $data$:事件数据
  • $timestamp$:发生时间
  • $causation_id$:引发此事件的事件ID
  • $correlation_id$:关联ID,用于追踪整个流程

27.7.2 领域事件UML图

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<abstract>>                                          │
│                    DomainEvent                                           │
├─────────────────────────────────────────────────────────────────────────┤
│ + event_id: str                                                          │
│ + occurred_at: datetime                                                  │
│ + causation_id: Optional[str]                                            │
│ + correlation_id: Optional[str]                                          │
├─────────────────────────────────────────────────────────────────────────┤
│ + to_dict(): Dict[str, Any]                                              │
│ + event_type(): str                                                      │
└─────────────────────────────────────────────────────────────────────────┘


            ┌───────────────────────┼───────────────────────┐
            │                       │                       │
            ▼                       ▼                       ▼
┌───────────────────┐   ┌───────────────────┐   ┌───────────────────┐
│ OrderCreatedEvent │   │OrderConfirmedEvent│   │ OrderShippedEvent │
├───────────────────┤   ├───────────────────┤   ├───────────────────┤
│ + order_id        │   │ + order_id        │   │ + order_id        │
│ + customer_id     │   │ + total           │   │ + tracking_number │
└───────────────────┘   └───────────────────┘   └───────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    DomainEventPublisher                                  │
├─────────────────────────────────────────────────────────────────────────┤
│ - _handlers: Dict[type, List[Handler]]                                  │
│ - _middleware: List[Middleware]                                         │
├─────────────────────────────────────────────────────────────────────────┤
│ + subscribe(event_type: type, handler: Callable): void                  │
│ + publish(event: DomainEvent): void                                     │
│ + add_middleware(middleware: Middleware): void                          │
└─────────────────────────────────────────────────────────────────────────┘

27.7.3 领域事件完整实现

python
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Callable, Dict, Any, Optional
from abc import ABC, abstractmethod
import uuid
import json

class DomainEvent(ABC):
    def __init__(
        self,
        causation_id: Optional[str] = None,
        correlation_id: Optional[str] = None
    ):
        self.event_id = str(uuid.uuid4())
        self.occurred_at = datetime.now()
        self.causation_id = causation_id
        self.correlation_id = correlation_id or str(uuid.uuid4())
    
    @property
    @abstractmethod
    def event_type(self) -> str:
        pass
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "event_id": self.event_id,
            "event_type": self.event_type,
            "occurred_at": self.occurred_at.isoformat(),
            "causation_id": self.causation_id,
            "correlation_id": self.correlation_id
        }

class OrderCreatedEvent(DomainEvent):
    @property
    def event_type(self) -> str:
        return "order.created"
    
    def __init__(
        self, 
        order_id: OrderId, 
        customer_id: CustomerId,
        causation_id: Optional[str] = None,
        correlation_id: Optional[str] = None
    ):
        super().__init__(causation_id, correlation_id)
        self.order_id = order_id
        self.customer_id = customer_id
    
    def to_dict(self) -> Dict[str, Any]:
        d = super().to_dict()
        d.update({
            "order_id": self.order_id.value,
            "customer_id": self.customer_id.value
        })
        return d

class OrderConfirmedEvent(DomainEvent):
    @property
    def event_type(self) -> str:
        return "order.confirmed"
    
    def __init__(
        self, 
        order_id: OrderId, 
        total: Money,
        causation_id: Optional[str] = None,
        correlation_id: Optional[str] = None
    ):
        super().__init__(causation_id, correlation_id)
        self.order_id = order_id
        self.total = total
    
    def to_dict(self) -> Dict[str, Any]:
        d = super().to_dict()
        d.update({
            "order_id": self.order_id.value,
            "total": self.total.amount,
            "currency": self.total.currency
        })
        return d

class OrderShippedEvent(DomainEvent):
    @property
    def event_type(self) -> str:
        return "order.shipped"
    
    def __init__(
        self, 
        order_id: OrderId, 
        tracking_number: str,
        causation_id: Optional[str] = None,
        correlation_id: Optional[str] = None
    ):
        super().__init__(causation_id, correlation_id)
        self.order_id = order_id
        self.tracking_number = tracking_number
    
    def to_dict(self) -> Dict[str, Any]:
        d = super().to_dict()
        d.update({
            "order_id": self.order_id.value,
            "tracking_number": self.tracking_number
        })
        return d

class OrderCancelledEvent(DomainEvent):
    @property
    def event_type(self) -> str:
        return "order.cancelled"
    
    def __init__(
        self, 
        order_id: OrderId, 
        reason: str,
        causation_id: Optional[str] = None,
        correlation_id: Optional[str] = None
    ):
        super().__init__(causation_id, correlation_id)
        self.order_id = order_id
        self.reason = reason
    
    def to_dict(self) -> Dict[str, Any]:
        d = super().to_dict()
        d.update({
            "order_id": self.order_id.value,
            "reason": self.reason
        })
        return d

class PaymentReceivedEvent(DomainEvent):
    @property
    def event_type(self) -> str:
        return "payment.received"
    
    def __init__(
        self, 
        order_id: OrderId, 
        amount: Money,
        payment_method: str,
        causation_id: Optional[str] = None,
        correlation_id: Optional[str] = None
    ):
        super().__init__(causation_id, correlation_id)
        self.order_id = order_id
        self.amount = amount
        self.payment_method = payment_method

class Middleware(ABC):
    @abstractmethod
    def before_publish(self, event: DomainEvent) -> DomainEvent:
        pass
    
    @abstractmethod
    def after_publish(self, event: DomainEvent) -> None:
        pass
    
    def on_error(self, event: DomainEvent, error: Exception) -> None:
        pass

class LoggingMiddleware(Middleware):
    def before_publish(self, event: DomainEvent) -> DomainEvent:
        print(f"[LOG] 发布事件: {event.event_type} [{event.event_id}]")
        return event
    
    def after_publish(self, event: DomainEvent) -> None:
        print(f"[LOG] 事件处理完成: {event.event_type}")
    
    def on_error(self, event: DomainEvent, error: Exception) -> None:
        print(f"[ERROR] 事件处理失败: {event.event_type}, 错误: {error}")

class EventStoreMiddleware(Middleware):
    def __init__(self):
        self._events: List[Dict[str, Any]] = []
    
    def before_publish(self, event: DomainEvent) -> DomainEvent:
        return event
    
    def after_publish(self, event: DomainEvent) -> None:
        self._events.append(event.to_dict())
    
    def get_events(self) -> List[Dict[str, Any]]:
        return self._events.copy()
    
    def get_events_by_type(self, event_type: str) -> List[Dict[str, Any]]:
        return [e for e in self._events if e["event_type"] == event_type]

class DomainEventPublisher:
    def __init__(self):
        self._handlers: Dict[type, List[Callable]] = {}
        self._middleware: List[Middleware] = []
        self._dead_letter_queue: List[Dict[str, Any]] = []
    
    def subscribe(self, event_type: type, handler: Callable) -> None:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    def unsubscribe(self, event_type: type, handler: Callable) -> None:
        if event_type in self._handlers:
            self._handlers[event_type].remove(handler)
    
    def add_middleware(self, middleware: Middleware) -> None:
        self._middleware.append(middleware)
    
    def publish(self, event: DomainEvent) -> None:
        processed_event = event
        for middleware in self._middleware:
            processed_event = middleware.before_publish(processed_event)
        
        handlers = self._handlers.get(type(processed_event), [])
        if not handlers:
            handlers = self._handlers.get(processed_event.__class__, [])
        
        for handler in handlers:
            try:
                handler(processed_event)
            except Exception as e:
                for middleware in self._middleware:
                    middleware.on_error(processed_event, e)
                self._dead_letter_queue.append({
                    "event": processed_event.to_dict(),
                    "error": str(e),
                    "failed_at": datetime.now().isoformat()
                })
        
        for middleware in self._middleware:
            middleware.after_publish(processed_event)
    
    def publish_all(self, events: List[DomainEvent]) -> None:
        for event in events:
            self.publish(event)
    
    def get_dead_letter_queue(self) -> List[Dict[str, Any]]:
        return self._dead_letter_queue.copy()

class EventHandler:
    def __init__(self, name: str):
        self.name = name
        self._processed_events: List[str] = []
    
    def handle(self, event: DomainEvent) -> None:
        self._processed_events.append(event.event_id)
        print(f"[{self.name}] 处理事件: {event.event_type}")
    
    def has_processed(self, event_id: str) -> bool:
        return event_id in self._processed_events

event_publisher = DomainEventPublisher()
logging_middleware = LoggingMiddleware()
event_store = EventStoreMiddleware()

event_publisher.add_middleware(logging_middleware)
event_publisher.add_middleware(event_store)

notification_handler = EventHandler("通知服务")
inventory_handler = EventHandler("库存服务")
analytics_handler = EventHandler("分析服务")

event_publisher.subscribe(OrderConfirmedEvent, notification_handler.handle)
event_publisher.subscribe(OrderConfirmedEvent, analytics_handler.handle)
event_publisher.subscribe(OrderShippedEvent, notification_handler.handle)
event_publisher.subscribe(OrderCancelledEvent, inventory_handler.handle)

order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 1)
order.confirm()

events = order.pull_domain_events()
event_publisher.publish_all(events)

print(f"\n事件存储中的事件数量: {len(event_store.get_events())}")
for e in event_store.get_events():
    print(f"  - {e['event_type']}: {e['event_id'][:8]}...")

27.8 战略设计:限界上下文

27.8.1 限界上下文的形式化定义

定义 27.9(限界上下文映射):上下文映射 $\mathcal{M}$ 定义了限界上下文间的关系:

$$\mathcal{M}: \mathcal{B}_1 \times \mathcal{B}_2 \rightarrow Relationship$$

关系类型包括:

  • 共享内核:$SharedKernel \subset \mathcal{M}_1 \cap \mathcal{M}_2$
  • 客户-供应商:$\mathcal{B}_1 \xrightarrow{supply} \mathcal{B}_2$
  • 防腐层:$ACL: \mathcal{M}{external} \rightarrow \mathcal{M}$
  • 开放主机服务:$OHS: \mathcal{B} \rightarrow API$

27.8.2 限界上下文映射图

┌─────────────────────────────────────────────────────────────────────────┐
│                           销售上下文                                      │
│                    (Sales Context)                                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                      │
│  │   Order     │  │  Customer   │  │   Product   │                      │
│  │  Aggregate  │  │  Aggregate  │  │  Aggregate  │                      │
│  └─────────────┘  └─────────────┘  └─────────────┘                      │
└─────────────────────────────────────────────────────────────────────────┘
         │                    │                    │
         │ OHS/PL             │ ACL                │ ACL
         │ (开放主机服务)       │ (防腐层)            │ (防腐层)
         ▼                    ▼                    ▼
┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│   物流上下文     │  │   支付上下文     │  │   库存上下文     │
│ (Shipping Ctx)  │  │ (Payment Ctx)   │  │ (Inventory Ctx) │
│ ┌─────────────┐ │  │ ┌─────────────┐ │  │ ┌─────────────┐ │
│ │  Shipment   │ │  │ │  Payment    │ │  │ │  Stock      │ │
│ │  Aggregate  │ │  │ │  Aggregate  │ │  │ │  Aggregate  │ │
│ └─────────────┘ │  │ └─────────────┘ │  │ └─────────────┘ │
└─────────────────┘  └─────────────────┘  └─────────────────┘
         │                    │
         │ CF                 │ CF
         │ (符合性)            │ (符合性)
         ▼                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                           通知上下文                                      │
│                    (Notification Context)                                │
│  ┌─────────────┐  ┌─────────────┐                                       │
│  │   Email     │  │    SMS      │                                       │
│  │   Service   │  │   Service   │                                       │
│  └─────────────┘  └─────────────┘                                       │
└─────────────────────────────────────────────────────────────────────────┘

图例:
OHS/PL = Open Host Service / Published Language (开放主机服务/发布语言)
ACL    = Anti-Corruption Layer (防腐层)
CF     = Conformist (符合者)

27.8.3 上下文映射实现

python
from dataclasses import dataclass
from typing import Protocol, Dict, Any, Optional
from abc import ABC, abstractmethod

@dataclass(frozen=True)
class OrderIdDTO:
    value: int

@dataclass(frozen=True)
class CustomerIdDTO:
    value: int

@dataclass(frozen=True)
class ProductIdDTO:
    value: int

@dataclass
class OrderSummaryDTO:
    order_id: OrderIdDTO
    customer_id: CustomerIdDTO
    total_amount: float
    status: str
    items: List[Dict[str, Any]]

class OrderRepositoryInterface(Protocol):
    def find_by_id(self, id: int) -> Optional['Order']:
        ...
    
    def save(self, order: 'Order') -> 'Order':
        ...

class AntiCorruptionLayer(ABC):
    @abstractmethod
    def to_local(self, external_data: Any) -> Any:
        pass
    
    @abstractmethod
    def to_external(self, local_data: Any) -> Any:
        pass

class ShippingAntiCorruptionLayer(AntiCorruptionLayer):
    def to_local(self, external_data: Dict[str, Any]) -> OrderSummaryDTO:
        return OrderSummaryDTO(
            order_id=OrderIdDTO(external_data["order_id"]),
            customer_id=CustomerIdDTO(external_data["customer_id"]),
            total_amount=external_data["total"],
            status=external_data["status"],
            items=external_data.get("items", [])
        )
    
    def to_external(self, local_data: Order) -> Dict[str, Any]:
        return {
            "order_id": local_data.order_id.value,
            "customer_id": local_data.customer_id.value,
            "total": local_data.calculate_total().amount,
            "status": local_data.status.value,
            "items": [
                {
                    "product_id": item.product_id.value,
                    "product_name": item.product_name,
                    "quantity": item.quantity
                }
                for item in local_data.items
            ]
        }

class PaymentAntiCorruptionLayer(AntiCorruptionLayer):
    def to_local(self, external_data: Dict[str, Any]) -> Dict[str, Any]:
        return {
            "payment_id": external_data.get("payment_id"),
            "order_id": OrderIdDTO(external_data["order_id"]),
            "amount": Money(
                external_data["amount"],
                external_data.get("currency", "CNY")
            ),
            "status": external_data.get("status", "pending")
        }
    
    def to_external(self, local_data: Dict[str, Any]) -> Dict[str, Any]:
        return {
            "order_id": local_data["order_id"].value,
            "amount": local_data["amount"].amount,
            "currency": local_data["amount"].currency,
            "customer_id": local_data.get("customer_id", 0)
        }

class OpenHostService(ABC):
    @abstractmethod
    def get_order(self, order_id: int) -> Optional[Dict[str, Any]]:
        pass
    
    @abstractmethod
    def get_customer_orders(self, customer_id: int) -> List[Dict[str, Any]]:
        pass

class OrderOpenHostService(OpenHostService):
    def __init__(self, order_repository: OrderRepository):
        self._order_repository = order_repository
    
    def get_order(self, order_id: int) -> Optional[Dict[str, Any]]:
        order = self._order_repository.find_by_id(order_id)
        if not order:
            return None
        
        return {
            "order_id": order.order_id.value,
            "customer_id": order.customer_id.value,
            "status": order.status.value,
            "total": order.calculate_total().amount,
            "items": [
                {
                    "product_id": item.product_id.value,
                    "product_name": item.product_name,
                    "quantity": item.quantity,
                    "unit_price": item.unit_price.amount
                }
                for item in order.items
            ],
            "created_at": order.created_at.isoformat()
        }
    
    def get_customer_orders(self, customer_id: int) -> List[Dict[str, Any]]:
        orders = self._order_repository.find_by_customer(CustomerId(customer_id))
        return [
            {
                "order_id": o.order_id.value,
                "status": o.status.value,
                "total": o.calculate_total().amount,
                "item_count": len(o.items)
            }
            for o in orders
        ]

class PublishedLanguage:
    EVENT_SCHEMAS = {
        "order.created": {
            "order_id": "integer",
            "customer_id": "integer",
            "occurred_at": "datetime"
        },
        "order.confirmed": {
            "order_id": "integer",
            "total": "number",
            "currency": "string"
        },
        "order.shipped": {
            "order_id": "integer",
            "tracking_number": "string"
        }
    }
    
    @classmethod
    def validate_event(cls, event_type: str, data: Dict[str, Any]) -> bool:
        schema = cls.EVENT_SCHEMAS.get(event_type)
        if not schema:
            return False
        
        for field_name, field_type in schema.items():
            if field_name not in data:
                return False
        
        return True

class SharedKernel:
    @dataclass(frozen=True)
    class Money:
        amount: float
        currency: str
    
    @dataclass(frozen=True)
    class OrderId:
        value: int
    
    @dataclass(frozen=True)
    class CustomerId:
        value: int
    
    @dataclass(frozen=True)
    class ProductId:
        value: int

class ShippingContext:
    def __init__(self, order_service: OpenHostService, acl: AntiCorruptionLayer):
        self._order_service = order_service
        self._acl = acl
    
    def create_shipment(self, order_id: int) -> Dict[str, Any]:
        order_data = self._order_service.get_order(order_id)
        if not order_data:
            raise ValueError(f"订单不存在: {order_id}")
        
        order_dto = self._acl.to_local(order_data)
        
        shipment = {
            "shipment_id": str(uuid.uuid4()),
            "order_id": order_dto.order_id.value,
            "customer_id": order_dto.customer_id.value,
            "status": "preparing",
            "items": order_dto.items,
            "created_at": datetime.now().isoformat()
        }
        
        return shipment

class PaymentContext:
    def __init__(self, acl: AntiCorruptionLayer):
        self._acl = acl
        self._payments: Dict[int, Dict[str, Any]] = {}
    
    def create_payment(
        self, 
        order_id: int, 
        amount: Money, 
        customer_id: int
    ) -> Dict[str, Any]:
        payment_data = {
            "payment_id": str(uuid.uuid4()),
            "order_id": OrderIdDTO(order_id),
            "amount": amount,
            "customer_id": customer_id,
            "status": "pending"
        }
        
        external_data = self._acl.to_external(payment_data)
        
        self._payments[order_id] = {
            **external_data,
            "payment_id": payment_data["payment_id"]
        }
        
        return self._payments[order_id]
    
    def confirm_payment(self, order_id: int) -> Dict[str, Any]:
        if order_id not in self._payments:
            raise ValueError(f"支付记录不存在: {order_id}")
        
        self._payments[order_id]["status"] = "confirmed"
        return self._payments[order_id]

order_repo = OrderRepository()
order_service = OrderOpenHostService(order_repo)

order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 2)
order_repo.save(order)

shipping_acl = ShippingAntiCorruptionLayer()
shipping_ctx = ShippingContext(order_service, shipping_acl)

shipment = shipping_ctx.create_shipment(1)
print(f"创建发货单: {shipment['shipment_id'][:8]}...")

payment_acl = PaymentAntiCorruptionLayer()
payment_ctx = PaymentContext(payment_acl)

payment = payment_ctx.create_payment(1, Money(200, "CNY"), 100)
print(f"创建支付: {payment['payment_id'][:8]}...")

confirmed = payment_ctx.confirm_payment(1)
print(f"支付状态: {confirmed['status']}")

27.9 反模式与最佳实践

27.9.1 常见反模式

反模式描述后果解决方案
贫血模型实体只有getter/setter,逻辑在服务层违反封装,逻辑分散将业务逻辑移入实体
臃肿聚合聚合包含过多实体性能问题,并发冲突拆分聚合,使用标识引用
万能服务一个服务处理所有逻辑职责不清,难以维护按领域划分服务
忽略通用语言代码术语与业务术语不一致沟通障碍建立统一术语表
过度设计简单问题复杂化开发效率低根据复杂度选择策略
聚合间直接引用聚合持有其他聚合实例事务边界模糊使用ID引用

27.9.2 贫血模型示例与修正

python
class AnemicOrder:
    def __init__(self, id: int, customer_id: int):
        self.id = id
        self.customer_id = customer_id
        self.items: List[Dict] = []
        self.status = "pending"
    
class AnemicOrderService:
    def add_item(
        self, 
        order: AnemicOrder, 
        product_id: int, 
        name: str, 
        price: float, 
        qty: int
    ) -> None:
        if order.status != "pending":
            raise ValueError("只能修改待确认订单")
        order.items.append({
            "product_id": product_id,
            "name": name,
            "price": price,
            "quantity": qty
        })
    
    def confirm(self, order: AnemicOrder) -> None:
        if order.status != "pending":
            raise ValueError("只能确认待确认订单")
        if not order.items:
            raise ValueError("订单不能为空")
        order.status = "confirmed"

class RichOrder:
    def __init__(self, id: OrderId, customer_id: CustomerId):
        self._order_id = id
        self._customer_id = customer_id
        self._items: List[OrderItem] = []
        self._status = OrderStatus.PENDING
    
    @property
    def order_id(self) -> OrderId:
        return self._order_id
    
    @property
    def status(self) -> OrderStatus:
        return self._status
    
    def add_item(
        self, 
        product_id: ProductId, 
        name: str, 
        price: Money, 
        qty: int
    ) -> None:
        self._ensure_modifiable()
        self._validate_item(product_id, name, price, qty)
        
        existing = self._find_item(product_id)
        if existing:
            existing.update_quantity(existing.quantity + qty)
        else:
            self._items.append(OrderItem(
                OrderItemId(len(self._items) + 1),
                product_id, name, price, qty
            ))
    
    def confirm(self) -> None:
        self._ensure_status(OrderStatus.PENDING)
        if not self._items:
            raise ValueError("订单不能为空")
        self._status = OrderStatus.CONFIRMED
    
    def _ensure_modifiable(self) -> None:
        if self._status != OrderStatus.PENDING:
            raise ValueError("只能修改待确认订单")
    
    def _ensure_status(self, expected: OrderStatus) -> None:
        if self._status != expected:
            raise ValueError(f"状态错误: 期望 {expected.value}, 实际 {self._status.value}")
    
    def _find_item(self, product_id: ProductId) -> Optional[OrderItem]:
        return next((i for i in self._items if i.product_id == product_id), None)
    
    def _validate_item(self, product_id: ProductId, name: str, price: Money, qty: int) -> None:
        if qty <= 0:
            raise ValueError("数量必须大于0")
        if not name:
            raise ValueError("商品名称不能为空")

27.9.3 最佳实践清单

python
class DDDBestPractices:
    ENTITY_RULES = [
        "实体通过标识而非属性定义相等性",
        "实体生命周期内标识不变",
        "实体可变,但通过方法封装变更",
        "实体验证自身不变量"
    ]
    
    VALUE_OBJECT_RULES = [
        "值对象不可变",
        "值对象通过属性值定义相等性",
        "值对象可自由替换",
        "值对象自验证"
    ]
    
    AGGREGATE_RULES = [
        "聚合根是唯一入口",
        "聚合边界内保证事务一致性",
        "聚合间通过ID引用",
        "聚合尽可能小",
        "使用最终一致性维护聚合间约束"
    ]
    
    SERVICE_RULES = [
        "领域服务无状态",
        "服务操作不属于任何实体或值对象",
        "服务接口使用领域语言",
        "服务命名反映领域概念"
    ]
    
    REPOSITORY_RULES = [
        "仓储只针对聚合根",
        "仓储模拟内存集合",
        "仓储隐藏持久化细节",
        "仓储不处理业务逻辑"
    ]
    
    @classmethod
    def validate_aggregate(cls, aggregate: Any) -> List[str]:
        violations = []
        
        for item in getattr(aggregate, 'items', []):
            if hasattr(item, 'id') and not hasattr(aggregate, '_find_item_by_id'):
                violations.append("聚合内部实体应通过聚合根访问")
        
        if hasattr(aggregate, 'get_other_aggregate'):
            violations.append("聚合不应持有其他聚合实例")
        
        return violations

27.10 决策指南

27.10.1 DDD适用性评估

                    ┌─────────────────────────────────────┐
                    │         业务复杂度评估               │
                    └─────────────────────────────────────┘

                    ┌───────────────┴───────────────┐
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │   简单CRUD     │               │   复杂业务     │
            │   低复杂度     │               │   高复杂度     │
            └───────────────┘               └───────────────┘
                    │                               │
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │ 不推荐DDD     │               │  推荐DDD      │
            │ 使用事务脚本   │               │ 使用领域模型   │
            └───────────────┘               └───────────────┘

                                    ┌───────────────┴───────────────┐
                                    │                               │
                                    ▼                               ▼
                            ┌───────────────┐               ┌───────────────┐
                            │ 单一限界上下文 │               │ 多限界上下文   │
                            │ 战术设计为主   │               │ 战略设计为主   │
                            └───────────────┘               └───────────────┘

27.10.2 聚合设计决策树

                    ┌─────────────────────────────────────┐
                    │     是否需要事务一致性?             │
                    └─────────────────────────────────────┘

                    ┌───────────────┴───────────────┐
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │     是        │               │     否        │
            └───────────────┘               └───────────────┘
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │ 放入同一聚合   │               │ 分离为不同聚合 │
            └───────────────┘               │ 使用最终一致性 │
                                            └───────────────┘

                                    ┌───────────────┴───────────────┐
                                    │                               │
                                    ▼                               ▼
                            ┌───────────────┐               ┌───────────────┐
                            │   领域事件    │               │   消息队列    │
                            │   同步处理    │               │   异步处理    │
                            └───────────────┘               └───────────────┘

27.10.3 模式选择对照表

场景推荐模式原因
用户注册实体 + 值对象需要唯一标识,邮箱/地址为值对象
订单管理聚合 + 领域事件订单项与订单强一致,订单确认触发下游
价格计算领域服务折扣规则不属于任何实体
库存预留领域服务 + 仓储跨聚合操作
支付处理防腐层外部支付系统集成
订单查询应用服务 + DTO读模型与写模型分离
状态变更实体方法 + 事件封装状态转换逻辑

27.11 快速参考卡片

27.11.1 DDD核心概念速查

┌─────────────────────────────────────────────────────────────────────────┐
│                        DDD 核心概念速查表                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  战略设计                                                                │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                     │
│  │ 限界上下文   │  │ 上下文映射   │  │ 通用语言    │                     │
│  │ Bounded     │  │ Context     │  │ Ubiquitous │                     │
│  │ Context     │  │ Mapping     │  │ Language   │                     │
│  └─────────────┘  └─────────────┘  └─────────────┘                     │
│                                                                         │
│  战术设计                                                                │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐   │
│  │   实体      │  │  值对象     │  │   聚合      │  │  领域服务   │   │
│  │  Entity    │  │Value Object │  │ Aggregate  │  │Domain Svc   │   │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘   │
│                                                                         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                     │
│  │   仓储      │  │  工作单元   │  │  领域事件   │                     │
│  │ Repository │  │Unit of Work │  │Domain Event│                     │
│  └─────────────┘  └─────────────┘  └─────────────┘                     │
│                                                                         │
├─────────────────────────────────────────────────────────────────────────┤
│  实体 vs 值对象                                                          │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ 实体: 有标识, 可变, 生命周期, 通过ID比较                          │   │
│  │ 值对象: 无标识, 不可变, 无生命周期, 通过属性比较                   │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  聚合规则                                                                │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ 1. 只有聚合根有全局标识                                          │   │
│  │ 2. 外部只能通过聚合根访问聚合内部                                 │   │
│  │ 3. 聚合边界内保证事务一致性                                       │   │
│  │ 4. 聚合间通过ID引用                                              │   │
│  │ 5. 聚合尽可能小                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

27.11.2 代码模板速查

python
# 值对象模板
@dataclass(frozen=True)
class MyValueObject:
    value: str
    
    def __post_init__(self):
        if not self._validate():
            raise ValueError("验证失败")
    
    def _validate(self) -> bool:
        return True

# 实体模板
class MyEntity(Entity):
    def __init__(self, id: int, name: str):
        super().__init__(id)
        self._name = name
    
    @property
    def name(self) -> str:
        return self._name
    
    def change_name(self, new_name: str) -> None:
        self._validate_name(new_name)
        self._name = new_name

# 聚合根模板
class MyAggregateRoot(Entity):
    def __init__(self, id: int):
        super().__init__(id)
        self._events: List[DomainEvent] = []
    
    def pull_domain_events(self) -> List[DomainEvent]:
        events = self._events.copy()
        self._events.clear()
        return events
    
    def _add_event(self, event: DomainEvent) -> None:
        self._events.append(event)

# 仓储模板
class MyRepository(Repository[MyAggregate]):
    def find_by_id(self, id: int) -> Optional[MyAggregate]:
        pass
    
    def save(self, entity: MyAggregate) -> MyAggregate:
        pass

# 领域服务模板
class MyDomainService:
    def perform_operation(self, entity: MyEntity) -> Result:
        pass

# 应用服务模板
class MyApplicationService:
    def __init__(self, repository: MyRepository, uow: UnitOfWork):
        self._repository = repository
        self._uow = uow
    
    def handle_command(self, command: Command) -> DTO:
        with self._uow.transaction():
            entity = self._repository.find_by_id(command.id)
            entity.perform_action()
            self._repository.save(entity)
            return self._to_dto(entity)

27.12 小结

27.12.1 核心要点

  1. 通用语言:团队共享的领域术语,代码与业务语言一致
  2. 限界上下文:明确模型边界,避免概念混淆
  3. 实体与值对象:区分标识与属性,正确建模领域概念
  4. 聚合:保证事务一致性,控制边界大小
  5. 领域服务:封装不属于实体的领域逻辑
  6. 仓储:持久化的抽象,模拟内存集合
  7. 领域事件:实现跨聚合通信,支持最终一致性

27.12.2 DDD适用场景

适用不适用
业务复杂的核心领域简单CRUD应用
需要长期维护的系统短期项目
团队对领域有深入理解领域知识不稳定
需要与业务专家协作技术驱动的项目

27.12.3 实施建议

  1. 从小处着手:先在一个限界上下文中实践
  2. 建立通用语言:与领域专家共同维护术语表
  3. 迭代演进:模型随理解深入而演进
  4. 保持简单:避免过度设计,根据复杂度选择策略
  5. 持续重构:通过重构提炼领域模型

Python技术丛书 - 江苏省宿城中等专业学校