第27章 领域驱动设计
学习目标
完成本章学习后,读者将能够:
- 理解领域驱动设计的核心概念与理论基础
- 掌握实体、值对象、聚合的数学定义与设计原则
- 实现领域服务和应用服务的分层架构
- 应用仓储和工作单元模式实现持久化抽象
- 设计领域事件实现跨聚合通信
- 运用战略设计工具划分限界上下文
27.1 理论基础与形式化定义
27.1.1 领域驱动设计的形式化定义
定义 27.1(领域模型):领域模型 $\mathcal{M}$ 是一个四元组:
$$\mathcal{M} = \langle \mathcal{E}, \mathcal{V}, \mathcal{R}, \mathcal{I} \rangle$$
其中:
- $\mathcal{E}$:实体集合,具有唯一标识的领域对象
- $\mathcal{V}$:值对象集合,通过属性值定义的不可变对象
- $\mathcal{R}$:关系集合,描述对象间的关联
- $\mathcal{I}$:不变量集合,维护模型一致性的业务规则
定义 27.2(聚合):聚合 $\mathcal{A}$ 是一个二元组:
$$\mathcal{A} = \langle r, {e_1, e_2, ..., e_n} \rangle$$
其中 $r$ 为聚合根,${e_1, e_2, ..., e_n}$ 为内部实体集合,满足:
- 外部只能通过 $r$ 访问聚合内部
- 聚合内保证事务一致性
- 聚合间通过标识引用
定义 27.3(限界上下文):限界上下文 $\mathcal{B}$ 定义为:
$$\mathcal{B} = \langle \mathcal{M}, \mathcal{L}, \mathcal{U} \rangle$$
其中:
- $\mathcal{M}$:上下文内的领域模型
- $\mathcal{L}$:通用语言词汇表
- $\mathcal{U}$:上下文映射关系
27.1.2 历史背景与发展脉络
| 时期 | 里程碑 | 代表人物/著作 | 核心贡献 |
|---|---|---|---|
| 1995 | 分析模式 | Martin Fowler | 领域建模实践 |
| 2003 | DDD奠基 | Eric Evans | 《领域驱动设计》出版 |
| 2006 | 领域特定语言 | Eric Evans, Martin Fowler | DSL与领域建模结合 |
| 2013 | DDD战略设计 | Vaughn Vernon | 《实现领域驱动设计》 |
| 2014 | 微服务架构 | Sam Newman | 限界上下文与微服务映射 |
| 2016 | 事件风暴 | Alberto Brandolini | 可视化领域探索方法 |
| 2018 | 函数式DDD | Scott Wlaschin | 函数式编程与DDD结合 |
| 2020+ | 领域驱动微服务 | 社区实践 | 云原生DDD架构 |
27.1.3 DDD分层架构
┌─────────────────────────────────────────────────────────────┐
│ 用户界面层 (UI Layer) │
│ 控制器、视图模型、API端点 │
├─────────────────────────────────────────────────────────────┤
│ 应用层 (Application Layer) │
│ 应用服务、命令处理器、事件处理器 │
├─────────────────────────────────────────────────────────────┤
│ 领域层 (Domain Layer) │
│ 实体、值对象、聚合、领域服务、领域事件 │
├─────────────────────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure Layer) │
│ 仓储实现、消息队列、外部服务适配器 │
└─────────────────────────────────────────────────────────────┘27.2 实体与值对象
27.2.1 实体的形式化定义
定义 27.4(实体):实体 $e \in \mathcal{E}$ 是一个具有唯一标识的对象,满足:
$$\forall e_1, e_2 \in \mathcal{E}: e_1 = e_2 \Leftrightarrow id(e_1) = id(e_2)$$
实体的相等性由标识决定,而非属性值。
27.2.2 值对象的形式化定义
定义 27.5(值对象):值对象 $v \in \mathcal{V}$ 是不可变对象,满足:
$$\forall v_1, v_2 \in \mathcal{V}: v_1 = v_2 \Leftrightarrow \forall a \in Attr(v): v_1.a = v_2.a$$
值对象的相等性由所有属性值决定。
27.2.3 UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<abstract>> │
│ Entity │
├─────────────────────────────────────────────────────────────────────────┤
│ - _id: int │
├─────────────────────────────────────────────────────────────────────────┤
│ + id: int <<property>> │
│ + __eq__(other): bool │
│ + __hash__(): int │
└─────────────────────────────────────────────────────────────────────────┘
△
│
│ extends
│
┌─────────────────────────────────────────────────────────────────────────┐
│ Customer │
├─────────────────────────────────────────────────────────────────────────┤
│ - _name: str │
│ - _email: Email │
│ - _address: Address │
├─────────────────────────────────────────────────────────────────────────┤
│ + name: str <<property>> │
│ + email: Email <<property>> │
│ + address: Address <<property>> │
│ + change_address(new_address: Address): void │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<ValueObject>> Money │
├─────────────────────────────────────────────────────────────────────────┤
│ + amount: float <<frozen>> │
│ + currency: str <<frozen>> │
├─────────────────────────────────────────────────────────────────────────┤
│ + add(other: Money): Money │
│ + multiply(factor: float): Money │
│ + subtract(other: Money): Money │
│ + __eq__(other): bool │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<ValueObject>> Email │
├─────────────────────────────────────────────────────────────────────────┤
│ + value: str <<frozen>> │
├─────────────────────────────────────────────────────────────────────────┤
│ + __post_init__(): void │
│ + __str__(): str │
│ + domain(): str │
└─────────────────────────────────────────────────────────────────────────┘27.2.4 完整实现
from dataclasses import dataclass, field
from typing import Optional, List, TypeVar, Generic, Dict, Any
from datetime import datetime
from abc import ABC, abstractmethod
from enum import Enum
import re
from functools import total_ordering
class Entity(ABC):
def __init__(self, id: int):
self._id = id
@property
def id(self) -> int:
return self._id
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
return False
return self._id == other._id
def __hash__(self) -> int:
return hash(self._id)
def __repr__(self) -> str:
return f"{self.__class__.__name__}(id={self._id})"
@total_ordering
@dataclass(frozen=True)
class Money:
amount: float
currency: str
def __post_init__(self):
if self.amount < 0:
raise ValueError(f"金额不能为负数: {self.amount}")
if not self.currency or len(self.currency) != 3:
raise ValueError(f"无效的货币代码: {self.currency}")
def add(self, other: 'Money') -> 'Money':
if other.currency != self.currency:
raise ValueError(f"货币类型不匹配: {self.currency} vs {other.currency}")
return Money(round(self.amount + other.amount, 2), self.currency)
def subtract(self, other: 'Money') -> 'Money':
if other.currency != self.currency:
raise ValueError(f"货币类型不匹配: {self.currency} vs {other.currency}")
result = round(self.amount - other.amount, 2)
if result < 0:
raise ValueError("减法结果不能为负数")
return Money(result, self.currency)
def multiply(self, factor: float) -> 'Money':
if factor < 0:
raise ValueError("乘数不能为负数")
return Money(round(self.amount * factor, 2), self.currency)
def __lt__(self, other: 'Money') -> bool:
if self.currency != other.currency:
raise ValueError("无法比较不同货币")
return self.amount < other.amount
def __str__(self) -> str:
return f"{self.amount:.2f} {self.currency}"
@classmethod
def zero(cls, currency: str = "CNY") -> 'Money':
return cls(0.0, currency)
@dataclass(frozen=True)
class Email:
value: str
EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
def __post_init__(self):
if not self.EMAIL_PATTERN.match(self.value):
raise ValueError(f"无效的邮箱地址: {self.value}")
def __str__(self) -> str:
return self.value
@property
def domain(self) -> str:
return self.value.split('@')[1]
@property
def local_part(self) -> str:
return self.value.split('@')[0]
@dataclass(frozen=True)
class Address:
street: str
city: str
postal_code: str
country: str
def __post_init__(self):
if not all([self.street, self.city, self.country]):
raise ValueError("地址字段不能为空")
def __str__(self) -> str:
return f"{self.street}, {self.city}, {self.postal_code}, {self.country}"
def with_street(self, new_street: str) -> 'Address':
return Address(new_street, self.city, self.postal_code, self.country)
def with_city(self, new_city: str) -> 'Address':
return Address(self.street, new_city, self.postal_code, self.country)
@dataclass(frozen=True)
class PhoneNumber:
value: str
country_code: str = "+86"
def __post_init__(self):
digits = re.sub(r'\D', '', self.value)
if len(digits) < 10 or len(digits) > 15:
raise ValueError(f"无效的电话号码: {self.value}")
def __str__(self) -> str:
return f"{self.country_code} {self.value}"
class Customer(Entity):
def __init__(
self,
id: int,
name: str,
email: Email,
address: Address,
phone: Optional[PhoneNumber] = None
):
super().__init__(id)
if not name or len(name.strip()) == 0:
raise ValueError("客户名称不能为空")
self._name = name.strip()
self._email = email
self._address = address
self._phone = phone
self._created_at = datetime.now()
self._version = 1
@property
def name(self) -> str:
return self._name
@property
def email(self) -> Email:
return self._email
@property
def address(self) -> Address:
return self._address
@property
def phone(self) -> Optional[PhoneNumber]:
return self._phone
@property
def version(self) -> int:
return self._version
def change_address(self, new_address: Address) -> None:
self._address = new_address
self._increment_version()
def change_email(self, new_email: Email) -> None:
self._email = new_email
self._increment_version()
def change_phone(self, new_phone: PhoneNumber) -> None:
self._phone = new_phone
self._increment_version()
def _increment_version(self) -> None:
self._version += 1
def __repr__(self) -> str:
return f"Customer(id={self._id}, name='{self._name}', email={self._email})"
email = Email("zhangsan@example.com")
address = Address("人民路100号", "北京", "100000", "中国")
phone = PhoneNumber("13800138000")
customer = Customer(1, "张三", email, address, phone)
print(f"客户: {customer.name}")
print(f"邮箱: {customer.email}")
print(f"地址: {customer.address}")
print(f"版本: {customer.version}")
customer.change_address(Address("中山路200号", "上海", "200000", "中国"))
print(f"新地址: {customer.address}")
print(f"新版本: {customer.version}")27.3 聚合与聚合根
27.3.1 聚合设计原则
原则 27.1(聚合边界):聚合边界由事务一致性需求决定:
- 聚合内:强一致性(ACID事务)
- 聚合间:最终一致性(领域事件)
原则 27.2(聚合大小):聚合应尽可能小,遵循以下规则:
- 通过标识引用其他聚合
- 单次事务只修改一个聚合
- 使用最终一致性维护聚合间约束
27.3.2 聚合UML图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<AggregateRoot>> │
│ Order │
├─────────────────────────────────────────────────────────────────────────┤
│ - _order_id: OrderId │
│ - _customer_id: CustomerId │
│ - _items: List[OrderItem] │
│ - _status: OrderStatus │
│ - _created_at: datetime │
│ - _events: List[DomainEvent] │
├─────────────────────────────────────────────────────────────────────────┤
│ + order_id: OrderId <<property>> │
│ + customer_id: CustomerId <<property>> │
│ + status: OrderStatus <<property>> │
│ + items: List[OrderItem] <<property>> │
│ + add_item(...): void │
│ + remove_item(product_id: ProductId): void │
│ + confirm(): void │
│ + ship(): void │
│ + deliver(): void │
│ + cancel(): void │
│ + calculate_total(): Money │
│ + pull_domain_events(): List[DomainEvent] │
└─────────────────────────────────────────────────────────────────────────┘
│
│ contains
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ <<Entity>> OrderItem │
├─────────────────────────────────────────────────────────────────────────┤
│ - _id: OrderItemId │
│ - _product_id: ProductId │
│ - _product_name: str │
│ - _unit_price: Money │
│ - _quantity: int │
├─────────────────────────────────────────────────────────────────────────┤
│ + id: OrderItemId <<property>> │
│ + product_id: ProductId <<property>> │
│ + product_name: str <<property>> │
│ + unit_price: Money <<property>> │
│ + quantity: int <<property>> │
│ + calculate_total(): Money │
│ + update_quantity(new_quantity: int): void │
└─────────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐
│ Customer │ │ Product │
│ (Aggregate) │ │ (Aggregate) │
└──────────────┘ └──────────────┘
↑ ↑
│ reference by ID │ reference by ID
│ │
┌─────┴───────────────────────────────────┴─────┐
│ Order │
│ (Aggregate) │
└────────────────────────────────────────────────┘27.3.3 聚合完整实现
from dataclasses import dataclass, field
from typing import List, Optional, Dict, TypeVar, Generic
from datetime import datetime
from enum import Enum
from copy import deepcopy
class OrderStatus(Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
def can_transition_to(self, target: 'OrderStatus') -> bool:
transitions = {
OrderStatus.PENDING: {OrderStatus.CONFIRMED, OrderStatus.CANCELLED},
OrderStatus.CONFIRMED: {OrderStatus.PAID, OrderStatus.CANCELLED},
OrderStatus.PAID: {OrderStatus.SHIPPED, OrderStatus.CANCELLED},
OrderStatus.SHIPPED: {OrderStatus.DELIVERED},
OrderStatus.DELIVERED: set(),
OrderStatus.CANCELLED: set(),
}
return target in transitions.get(self, set())
@dataclass(frozen=True)
class OrderItemId:
value: int
def __post_init__(self):
if self.value <= 0:
raise ValueError("订单项ID必须为正整数")
@dataclass(frozen=True)
class ProductId:
value: int
def __post_init__(self):
if self.value <= 0:
raise ValueError("产品ID必须为正整数")
@dataclass(frozen=True)
class OrderId:
value: int
def __post_init__(self):
if self.value <= 0:
raise ValueError("订单ID必须为正整数")
@dataclass(frozen=True)
class CustomerId:
value: int
def __post_init__(self):
if self.value <= 0:
raise ValueError("客户ID必须为正整数")
class OrderItem(Entity):
def __init__(
self,
id: OrderItemId,
product_id: ProductId,
product_name: str,
unit_price: Money,
quantity: int
):
super().__init__(id.value)
if quantity <= 0:
raise ValueError("数量必须大于0")
if not product_name or len(product_name.strip()) == 0:
raise ValueError("产品名称不能为空")
self._id = id
self._product_id = product_id
self._product_name = product_name.strip()
self._unit_price = unit_price
self._quantity = quantity
@property
def order_item_id(self) -> OrderItemId:
return self._id
@property
def product_id(self) -> ProductId:
return self._product_id
@property
def product_name(self) -> str:
return self._product_name
@property
def unit_price(self) -> Money:
return self._unit_price
@property
def quantity(self) -> int:
return self._quantity
def calculate_total(self) -> Money:
return self._unit_price.multiply(self._quantity)
def update_quantity(self, new_quantity: int) -> None:
if new_quantity <= 0:
raise ValueError("数量必须大于0")
self._quantity = new_quantity
def __repr__(self) -> str:
return f"OrderItem(product={self._product_name}, qty={self._quantity}, price={self._unit_price})"
class DomainEvent(ABC):
def __init__(self):
self.occurred_at = datetime.now()
self.event_id = str(uuid.uuid4())
class OrderCreatedEvent(DomainEvent):
def __init__(self, order_id: OrderId, customer_id: CustomerId):
super().__init__()
self.order_id = order_id
self.customer_id = customer_id
class OrderConfirmedEvent(DomainEvent):
def __init__(self, order_id: OrderId, total: Money):
super().__init__()
self.order_id = order_id
self.total = total
class OrderShippedEvent(DomainEvent):
def __init__(self, order_id: OrderId, tracking_number: str):
super().__init__()
self.order_id = order_id
self.tracking_number = tracking_number
class OrderCancelledEvent(DomainEvent):
def __init__(self, order_id: OrderId, reason: str):
super().__init__()
self.order_id = order_id
self.reason = reason
class OrderItemAddedEvent(DomainEvent):
def __init__(self, order_id: OrderId, product_id: ProductId, quantity: int):
super().__init__()
self.order_id = order_id
self.product_id = product_id
self.quantity = quantity
import uuid
class Order(Entity):
MAX_ITEMS = 50
def __init__(self, id: OrderId, customer_id: CustomerId):
super().__init__(id.value)
self._order_id = id
self._customer_id = customer_id
self._items: List[OrderItem] = []
self._status = OrderStatus.PENDING
self._created_at = datetime.now()
self._updated_at = datetime.now()
self._next_item_id = 1
self._events: List[DomainEvent] = []
self._version = 1
self._add_event(OrderCreatedEvent(id, customer_id))
@property
def order_id(self) -> OrderId:
return self._order_id
@property
def customer_id(self) -> CustomerId:
return self._customer_id
@property
def status(self) -> OrderStatus:
return self._status
@property
def items(self) -> List[OrderItem]:
return self._items.copy()
@property
def created_at(self) -> datetime:
return self._created_at
@property
def updated_at(self) -> datetime:
return self._updated_at
@property
def version(self) -> int:
return self._version
def add_item(
self,
product_id: ProductId,
product_name: str,
unit_price: Money,
quantity: int
) -> None:
self._ensure_status(OrderStatus.PENDING, "只能修改待确认的订单")
if len(self._items) >= self.MAX_ITEMS:
raise ValueError(f"订单项数量超过上限: {self.MAX_ITEMS}")
existing_item = self._find_item_by_product(product_id)
if existing_item:
existing_item.update_quantity(existing_item.quantity + quantity)
else:
item = OrderItem(
OrderItemId(self._next_item_id),
product_id,
product_name,
unit_price,
quantity
)
self._items.append(item)
self._next_item_id += 1
self._add_event(OrderItemAddedEvent(self._order_id, product_id, quantity))
self._touch()
def remove_item(self, product_id: ProductId) -> None:
self._ensure_status(OrderStatus.PENDING, "只能修改待确认的订单")
original_count = len(self._items)
self._items = [
item for item in self._items
if item.product_id != product_id
]
if len(self._items) == original_count:
raise ValueError(f"产品不存在于订单中: {product_id}")
self._touch()
def confirm(self) -> None:
self._ensure_status(OrderStatus.PENDING, "只能确认待确认的订单")
if not self._items:
raise ValueError("订单不能为空")
self._status = OrderStatus.CONFIRMED
self._add_event(OrderConfirmedEvent(self._order_id, self.calculate_total()))
self._touch()
def pay(self) -> None:
self._ensure_status(OrderStatus.CONFIRMED, "只能支付已确认的订单")
self._status = OrderStatus.PAID
self._touch()
def ship(self, tracking_number: str) -> None:
self._ensure_status(OrderStatus.PAID, "只能发货已支付的订单")
if not tracking_number or len(tracking_number.strip()) == 0:
raise ValueError("快递单号不能为空")
self._status = OrderStatus.SHIPPED
self._add_event(OrderShippedEvent(self._order_id, tracking_number))
self._touch()
def deliver(self) -> None:
self._ensure_status(OrderStatus.SHIPPED, "只能签收已发货的订单")
self._status = OrderStatus.DELIVERED
self._touch()
def cancel(self, reason: str = "") -> None:
if self._status in (OrderStatus.SHIPPED, OrderStatus.DELIVERED):
raise ValueError("无法取消已发货或已签收的订单")
self._status = OrderStatus.CANCELLED
self._add_event(OrderCancelledEvent(self._order_id, reason))
self._touch()
def calculate_total(self) -> Money:
if not self._items:
return Money.zero()
currency = self._items[0].unit_price.currency
total = Money.zero(currency)
for item in self._items:
total = total.add(item.calculate_total())
return total
def item_count(self) -> int:
return sum(item.quantity for item in self._items)
def pull_domain_events(self) -> List[DomainEvent]:
events = self._events.copy()
self._events.clear()
return events
def _find_item_by_product(self, product_id: ProductId) -> Optional[OrderItem]:
for item in self._items:
if item.product_id == product_id:
return item
return None
def _ensure_status(self, expected: OrderStatus, message: str) -> None:
if self._status != expected:
raise ValueError(f"{message}, 当前状态: {self._status.value}")
def _add_event(self, event: DomainEvent) -> None:
self._events.append(event)
def _touch(self) -> None:
self._updated_at = datetime.now()
self._version += 1
def __repr__(self) -> str:
return f"Order(id={self._order_id.value}, status={self._status.value}, items={len(self._items)})"
order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 2)
order.add_item(ProductId(2), "商品B", Money(50, "CNY"), 3)
print(f"订单总额: {order.calculate_total()}")
print(f"商品数量: {order.item_count()}")
order.confirm()
print(f"订单状态: {order.status.value}")
print(f"领域事件数量: {len(order.pull_domain_events())}")27.4 领域服务
27.4.1 领域服务的定义
定义 27.6(领域服务):领域服务 $s$ 是一个操作,满足:
- 不自然属于任何实体或值对象
- 使用领域概念定义接口
- 是无状态的
形式化表示: $$s: \mathcal{E}^n \times \mathcal{V}^m \rightarrow \mathcal{E}^p \times \mathcal{V}^q$$
27.4.2 领域服务UML图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<DomainService>> │
│ PricingService │
├─────────────────────────────────────────────────────────────────────────┤
│ - _discount_rules: List[DiscountRule] │
│ - _tax_rate: float │
├─────────────────────────────────────────────────────────────────────────┤
│ + calculate_discount(total: Money): Money │
│ + calculate_tax(subtotal: Money): Money │
│ + calculate_final_price(order: Order): Money │
│ + apply_promotion(order: Order, code: str): Money │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<DomainService>> │
│ InventoryService │
├─────────────────────────────────────────────────────────────────────────┤
│ - _stock: Dict[ProductId, int] │
│ - _reservations: Dict[ProductId, List[Reservation]] │
├─────────────────────────────────────────────────────────────────────────┤
│ + check_availability(product_id: ProductId, qty: int): bool │
│ + reserve(product_id: ProductId, qty: int, order_id: OrderId): bool │
│ + release(product_id: ProductId, qty: int, order_id: OrderId): void │
│ + commit_reservation(order_id: OrderId): void │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<DomainService>> │
│ TransferService │
├─────────────────────────────────────────────────────────────────────────┤
│ + transfer(from: Account, to: Account, amount: Money): void │
│ + validate_transfer(from: Account, to: Account, amount: Money): bool │
└─────────────────────────────────────────────────────────────────────────┘27.4.3 领域服务实现
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Protocol
from dataclasses import dataclass
from enum import Enum
@dataclass(frozen=True)
class DiscountRule:
min_amount: float
discount_percent: float
name: str = ""
def applies_to(self, amount: float) -> bool:
return amount >= self.min_amount
def calculate_discount(self, amount: float) -> float:
return amount * (self.discount_percent / 100)
@dataclass
class PromotionCode:
code: str
discount_percent: float
min_order_amount: float
expires_at: datetime
max_uses: int
current_uses: int = 0
def is_valid(self) -> bool:
return (
datetime.now() < self.expires_at and
self.current_uses < self.max_uses
)
def use(self) -> None:
self.current_uses += 1
class PricingService:
def __init__(
self,
discount_rules: List[DiscountRule],
tax_rate: float = 0.0,
promotions: Optional[Dict[str, PromotionCode]] = None
):
self._discount_rules = sorted(
discount_rules,
key=lambda r: r.min_amount,
reverse=True
)
self._tax_rate = tax_rate
self._promotions = promotions or {}
def calculate_discount(self, total: Money) -> Money:
for rule in self._discount_rules:
if rule.applies_to(total.amount):
discount = rule.calculate_discount(total.amount)
return Money(round(discount, 2), total.currency)
return Money.zero(total.currency)
def calculate_tax(self, subtotal: Money) -> Money:
tax = subtotal.amount * self._tax_rate
return Money(round(tax, 2), subtotal.currency)
def calculate_final_price(self, order: Order) -> Money:
subtotal = order.calculate_total()
discount = self.calculate_discount(subtotal)
after_discount = subtotal.subtract(discount)
tax = self.calculate_tax(after_discount)
return after_discount.add(tax)
def apply_promotion(self, order: Order, code: str) -> Money:
if code not in self._promotions:
raise ValueError(f"无效的促销码: {code}")
promotion = self._promotions[code]
if not promotion.is_valid():
raise ValueError(f"促销码已过期或已达使用上限: {code}")
subtotal = order.calculate_total()
if subtotal.amount < promotion.min_order_amount:
raise ValueError(
f"订单金额不足,最低需要: {promotion.min_order_amount}"
)
discount = subtotal.amount * (promotion.discount_percent / 100)
promotion.use()
return Money(round(discount, 2), subtotal.currency)
def get_applicable_discount(self, amount: Money) -> Optional[DiscountRule]:
for rule in self._discount_rules:
if rule.applies_to(amount.amount):
return rule
return None
@dataclass
class Reservation:
product_id: ProductId
quantity: int
order_id: OrderId
reserved_at: datetime
expires_at: datetime
def is_expired(self) -> bool:
return datetime.now() > self.expires_at
class InventoryService:
RESERVATION_TIMEOUT_MINUTES = 30
def __init__(self):
self._stock: Dict[ProductId, int] = {}
self._reservations: Dict[OrderId, List[Reservation]] = {}
def set_stock(self, product_id: ProductId, quantity: int) -> None:
if quantity < 0:
raise ValueError("库存不能为负数")
self._stock[product_id] = quantity
def get_stock(self, product_id: ProductId) -> int:
return self._stock.get(product_id, 0)
def get_available_stock(self, product_id: ProductId) -> int:
total = self._stock.get(product_id, 0)
reserved = sum(
r.quantity
for reservations in self._reservations.values()
for r in reservations
if r.product_id == product_id and not r.is_expired()
)
return total - reserved
def check_availability(self, product_id: ProductId, quantity: int) -> bool:
return self.get_available_stock(product_id) >= quantity
def reserve(
self,
product_id: ProductId,
quantity: int,
order_id: OrderId
) -> bool:
if not self.check_availability(product_id, quantity):
return False
now = datetime.now()
expires_at = datetime.fromtimestamp(
now.timestamp() + self.RESERVATION_TIMEOUT_MINUTES * 60
)
reservation = Reservation(
product_id=product_id,
quantity=quantity,
order_id=order_id,
reserved_at=now,
expires_at=expires_at
)
if order_id not in self._reservations:
self._reservations[order_id] = []
self._reservations[order_id].append(reservation)
return True
def release(self, product_id: ProductId, quantity: int, order_id: OrderId) -> None:
if order_id not in self._reservations:
return
self._reservations[order_id] = [
r for r in self._reservations[order_id]
if not (r.product_id == product_id and r.quantity == quantity)
]
def commit_reservation(self, order_id: OrderId) -> None:
if order_id not in self._reservations:
return
for reservation in self._reservations[order_id]:
current_stock = self._stock.get(reservation.product_id, 0)
self._stock[reservation.product_id] = current_stock - reservation.quantity
del self._reservations[order_id]
def cancel_reservation(self, order_id: OrderId) -> None:
if order_id in self._reservations:
del self._reservations[order_id]
def cleanup_expired_reservations(self) -> int:
expired_count = 0
for order_id in list(self._reservations.keys()):
original_len = len(self._reservations[order_id])
self._reservations[order_id] = [
r for r in self._reservations[order_id]
if not r.is_expired()
]
expired_count += original_len - len(self._reservations[order_id])
return expired_count
class TransferService:
def transfer(
self,
from_account: 'Account',
to_account: 'Account',
amount: Money
) -> None:
self.validate_transfer(from_account, to_account, amount)
from_account.debit(amount)
to_account.credit(amount)
def validate_transfer(
self,
from_account: 'Account',
to_account: 'Account',
amount: Money
) -> bool:
if amount.amount <= 0:
raise ValueError("转账金额必须大于0")
if from_account.id == to_account.id:
raise ValueError("不能向同一账户转账")
if from_account.balance < amount:
raise ValueError("账户余额不足")
if from_account.currency != to_account.currency:
raise ValueError("账户货币类型不匹配")
return True
pricing_service = PricingService([
DiscountRule(500, 5, "普通折扣"),
DiscountRule(1000, 10, "高级折扣"),
DiscountRule(2000, 15, "VIP折扣"),
], tax_rate=0.13)
order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(600, "CNY"), 2)
order.add_item(ProductId(2), "商品B", Money(400, "CNY"), 1)
subtotal = order.calculate_total()
final_price = pricing_service.calculate_final_price(order)
discount_rule = pricing_service.get_applicable_discount(subtotal)
print(f"小计: {subtotal}")
print(f"适用折扣: {discount_rule.name if discount_rule else '无'}")
print(f"最终价格(含税): {final_price}")27.5 仓储模式
27.5.1 仓储的形式化定义
定义 27.7(仓储):仓储 $R$ 是聚合的集合抽象,提供以下操作:
$$R: \mathcal{A} \rightarrow \mathcal{P}(\mathcal{A})$$
其中 $\mathcal{P}(\mathcal{A})$ 表示聚合的幂集。基本操作:
- $find(id): \mathcal{A} \cup {\bot}$ — 按标识查找
- $save(a): \mathcal{A}$ — 持久化聚合
- $delete(a): \emptyset$ — 删除聚合
27.5.2 仓储UML图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ Repository<T> │
├─────────────────────────────────────────────────────────────────────────┤
│ + find_by_id(id: int): Optional<T> │
│ + find_all(): List<T> │
│ + save(entity: T): T │
│ + delete(entity: T): void │
└─────────────────────────────────────────────────────────────────────────┘
△
│
│ implements
│
┌─────────────────────────────────────────────────────────────────────────┐
│ OrderRepository │
├─────────────────────────────────────────────────────────────────────────┤
│ - _orders: Dict[int, Order] │
│ - _specifications: Dict[str, Specification] │
├─────────────────────────────────────────────────────────────────────────┤
│ + find_by_id(id: int): Optional[Order] │
│ + find_by_customer(customer_id: CustomerId): List[Order] │
│ + find_by_status(status: OrderStatus): List[Order] │
│ + find_by_specification(spec: Specification): List[Order] │
│ + find_all(): List[Order] │
│ + save(order: Order): Order │
│ + delete(order: Order): void │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ Specification<T> │
├─────────────────────────────────────────────────────────────────────────┤
│ + is_satisfied_by(entity: T): bool │
│ + and(other: Specification): Specification │
│ + or(other: Specification): Specification │
│ + not(): Specification │
└─────────────────────────────────────────────────────────────────────────┘27.5.3 仓储实现
from abc import ABC, abstractmethod
from typing import Optional, List, TypeVar, Generic, Callable, Any
from contextlib import contextmanager
from dataclasses import dataclass
import threading
T = TypeVar('T')
class Specification(ABC, Generic[T]):
@abstractmethod
def is_satisfied_by(self, entity: T) -> bool:
pass
def __and__(self, other: 'Specification[T]') -> 'Specification[T]':
return AndSpecification(self, other)
def __or__(self, other: 'Specification[T]') -> 'Specification[T]':
return OrSpecification(self, other)
def __invert__(self) -> 'Specification[T]':
return NotSpecification(self)
class AndSpecification(Specification[T]):
def __init__(self, left: Specification[T], right: Specification[T]):
self._left = left
self._right = right
def is_satisfied_by(self, entity: T) -> bool:
return self._left.is_satisfied_by(entity) and self._right.is_satisfied_by(entity)
class OrSpecification(Specification[T]):
def __init__(self, left: Specification[T], right: Specification[T]):
self._left = left
self._right = right
def is_satisfied_by(self, entity: T) -> bool:
return self._left.is_satisfied_by(entity) or self._right.is_satisfied_by(entity)
class NotSpecification(Specification[T]):
def __init__(self, spec: Specification[T]):
self._spec = spec
def is_satisfied_by(self, entity: T) -> bool:
return not self._spec.is_satisfied_by(entity)
class OrderStatusSpecification(Specification[Order]):
def __init__(self, status: OrderStatus):
self._status = status
def is_satisfied_by(self, order: Order) -> bool:
return order.status == self._status
class CustomerSpecification(Specification[Order]):
def __init__(self, customer_id: CustomerId):
self._customer_id = customer_id
def is_satisfied_by(self, order: Order) -> bool:
return order.customer_id == self._customer_id
class DateRangeSpecification(Specification[Order]):
def __init__(self, start: datetime, end: datetime):
self._start = start
self._end = end
def is_satisfied_by(self, order: Order) -> bool:
return self._start <= order.created_at <= self._end
class Repository(ABC, Generic[T]):
@abstractmethod
def find_by_id(self, id: int) -> Optional[T]:
pass
@abstractmethod
def find_all(self) -> List[T]:
pass
@abstractmethod
def save(self, entity: T) -> T:
pass
@abstractmethod
def delete(self, entity: T) -> None:
pass
def find_by_specification(self, spec: Specification[T]) -> List[T]:
return [e for e in self.find_all() if spec.is_satisfied_by(e)]
class OrderRepository(Repository[Order]):
def __init__(self):
self._orders: Dict[int, Order] = {}
self._lock = threading.RLock()
def find_by_id(self, id: int) -> Optional[Order]:
with self._lock:
return self._orders.get(id)
def find_by_customer(self, customer_id: CustomerId) -> List[Order]:
spec = CustomerSpecification(customer_id)
return self.find_by_specification(spec)
def find_by_status(self, status: OrderStatus) -> List[Order]:
spec = OrderStatusSpecification(status)
return self.find_by_specification(spec)
def find_by_date_range(self, start: datetime, end: datetime) -> List[Order]:
spec = DateRangeSpecification(start, end)
return self.find_by_specification(spec)
def find_pending_orders(self) -> List[Order]:
return self.find_by_status(OrderStatus.PENDING)
def find_all(self) -> List[Order]:
with self._lock:
return list(self._orders.values())
def save(self, order: Order) -> Order:
with self._lock:
self._orders[order.id] = order
return order
def delete(self, order: Order) -> None:
with self._lock:
self._orders.pop(order.id, None)
def count(self) -> int:
with self._lock:
return len(self._orders)
def exists(self, id: int) -> bool:
with self._lock:
return id in self._orders
class UnitOfWork(ABC):
@abstractmethod
def commit(self) -> None:
pass
@abstractmethod
def rollback(self) -> None:
pass
@abstractmethod
def register_new(self, entity: Any) -> None:
pass
@abstractmethod
def register_modified(self, entity: Any) -> None:
pass
@abstractmethod
def register_deleted(self, entity: Any) -> None:
pass
class OrderUnitOfWork(UnitOfWork):
def __init__(self, order_repository: OrderRepository):
self.order_repository = order_repository
self._new_objects: List[Order] = []
self._modified_objects: List[Order] = []
self._deleted_objects: List[Order] = []
self._snapshots: Dict[int, Order] = {}
self._lock = threading.RLock()
def register_new(self, order: Order) -> None:
with self._lock:
if order.id in self._snapshots:
raise ValueError(f"订单已存在: {order.id}")
self._new_objects.append(order)
def register_modified(self, order: Order) -> None:
with self._lock:
if order not in self._new_objects:
self._modified_objects.append(order)
def register_deleted(self, order: Order) -> None:
with self._lock:
self._deleted_objects.append(order)
def commit(self) -> None:
with self._lock:
try:
for order in self._new_objects:
self.order_repository.save(order)
for order in self._modified_objects:
self.order_repository.save(order)
for order in self._deleted_objects:
self.order_repository.delete(order)
self._clear()
except Exception as e:
self.rollback()
raise
def rollback(self) -> None:
with self._lock:
for order_id, snapshot in self._snapshots.items():
self.order_repository.save(snapshot)
self._clear()
def _clear(self) -> None:
self._new_objects.clear()
self._modified_objects.clear()
self._deleted_objects.clear()
self._snapshots.clear()
@contextmanager
def transaction(self):
try:
yield self
self.commit()
except Exception:
self.rollback()
raise
order_repo = OrderRepository()
uow = OrderUnitOfWork(order_repo)
with uow.transaction():
order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 2)
uow.register_new(order)
print(f"仓储中的订单数量: {order_repo.count()}")
print(f"订单: {order_repo.find_by_id(1)}")27.6 应用服务
27.6.1 应用服务的职责
应用服务负责:
- 用例编排:协调领域对象完成业务用例
- 事务管理:确保操作的原子性
- 权限控制:验证用户权限
- DTO转换:隔离领域模型与外部表示
27.6.2 应用服务UML图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<ApplicationService>> │
│ OrderApplicationService │
├─────────────────────────────────────────────────────────────────────────┤
│ - _order_repository: OrderRepository │
│ - _pricing_service: PricingService │
│ - _inventory_service: InventoryService │
│ - _event_publisher: DomainEventPublisher │
│ - _unit_of_work: UnitOfWork │
├─────────────────────────────────────────────────────────────────────────┤
│ + create_order(cmd: CreateOrderCommand): OrderDTO │
│ + add_item(cmd: AddItemCommand): OrderDTO │
│ + remove_item(cmd: RemoveItemCommand): OrderDTO │
│ + confirm_order(order_id: int): OrderDTO │
│ + cancel_order(cmd: CancelOrderCommand): OrderDTO │
│ + get_order(order_id: int): Optional[OrderDTO] │
│ + get_customer_orders(customer_id: int): List[OrderDTO] │
└─────────────────────────────────────────────────────────────────────────┘
│
│ uses
│
┌───────────────────────────┼───────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Repository │ │ DomainService │ │ EventPublisher│
└───────────────┘ └───────────────┘ └───────────────┘27.6.3 应用服务实现
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from functools import wraps
def transactional(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
with self._unit_of_work.transaction():
return func(self, *args, **kwargs)
return wrapper
@dataclass
class CreateOrderCommand:
customer_id: int
items: List[Dict[str, Any]]
@dataclass
class AddItemCommand:
order_id: int
product_id: int
product_name: str
unit_price: float
quantity: int
@dataclass
class RemoveItemCommand:
order_id: int
product_id: int
@dataclass
class ConfirmOrderCommand:
order_id: int
@dataclass
class CancelOrderCommand:
order_id: int
reason: str
@dataclass
class ShipOrderCommand:
order_id: int
tracking_number: str
@dataclass
class OrderDTO:
id: int
customer_id: int
status: str
total: float
tax: float
items: List[Dict[str, Any]]
created_at: str
version: int
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"customer_id": self.customer_id,
"status": self.status,
"total": self.total,
"tax": self.tax,
"items": self.items,
"created_at": self.created_at,
"version": self.version
}
class ApplicationError(Exception):
def __init__(self, message: str, code: str = "APPLICATION_ERROR"):
super().__init__(message)
self.code = code
class OrderNotFoundError(ApplicationError):
def __init__(self, order_id: int):
super().__init__(f"订单不存在: {order_id}", "ORDER_NOT_FOUND")
self.order_id = order_id
class OrderValidationError(ApplicationError):
def __init__(self, message: str):
super().__init__(message, "ORDER_VALIDATION_ERROR")
class OrderApplicationService:
def __init__(
self,
order_repository: OrderRepository,
pricing_service: PricingService,
inventory_service: InventoryService,
event_publisher: 'DomainEventPublisher',
unit_of_work: UnitOfWork
):
self._order_repository = order_repository
self._pricing_service = pricing_service
self._inventory_service = inventory_service
self._event_publisher = event_publisher
self._unit_of_work = unit_of_work
self._next_order_id = 1
def create_order(self, command: CreateOrderCommand) -> OrderDTO:
order = Order(
OrderId(self._next_order_id),
CustomerId(command.customer_id)
)
self._next_order_id += 1
for item in command.items:
product_id = ProductId(item["product_id"])
quantity = item["quantity"]
if not self._inventory_service.check_availability(product_id, quantity):
raise OrderValidationError(
f"库存不足: 产品 {item['product_name']}"
)
self._inventory_service.reserve(product_id, quantity, order.order_id)
order.add_item(
product_id,
item["product_name"],
Money(item["unit_price"], "CNY"),
quantity
)
saved_order = self._order_repository.save(order)
self._publish_events(saved_order)
return self._to_dto(saved_order)
def add_item(self, command: AddItemCommand) -> OrderDTO:
order = self._get_order_or_fail(command.order_id)
product_id = ProductId(command.product_id)
if not self._inventory_service.check_availability(product_id, command.quantity):
raise OrderValidationError(f"库存不足: 产品 {command.product_name}")
self._inventory_service.reserve(product_id, command.quantity, order.order_id)
order.add_item(
product_id,
command.product_name,
Money(command.unit_price, "CNY"),
command.quantity
)
saved_order = self._order_repository.save(order)
self._publish_events(saved_order)
return self._to_dto(saved_order)
def remove_item(self, command: RemoveItemCommand) -> OrderDTO:
order = self._get_order_or_fail(command.order_id)
product_id = ProductId(command.product_id)
for item in order.items:
if item.product_id == product_id:
self._inventory_service.release(
product_id, item.quantity, order.order_id
)
break
order.remove_item(product_id)
saved_order = self._order_repository.save(order)
self._publish_events(saved_order)
return self._to_dto(saved_order)
def confirm_order(self, order_id: int) -> OrderDTO:
order = self._get_order_or_fail(order_id)
order.confirm()
self._inventory_service.commit_reservation(order.order_id)
saved_order = self._order_repository.save(order)
self._publish_events(saved_order)
return self._to_dto(saved_order)
def cancel_order(self, command: CancelOrderCommand) -> OrderDTO:
order = self._get_order_or_fail(command.order_id)
order.cancel(command.reason)
self._inventory_service.cancel_reservation(order.order_id)
saved_order = self._order_repository.save(order)
self._publish_events(saved_order)
return self._to_dto(saved_order)
def ship_order(self, command: ShipOrderCommand) -> OrderDTO:
order = self._get_order_or_fail(command.order_id)
order.ship(command.tracking_number)
saved_order = self._order_repository.save(order)
self._publish_events(saved_order)
return self._to_dto(saved_order)
def get_order(self, order_id: int) -> Optional[OrderDTO]:
order = self._order_repository.find_by_id(order_id)
return self._to_dto(order) if order else None
def get_customer_orders(self, customer_id: int) -> List[OrderDTO]:
orders = self._order_repository.find_by_customer(CustomerId(customer_id))
return [self._to_dto(order) for order in orders]
def get_orders_by_status(self, status: str) -> List[OrderDTO]:
try:
order_status = OrderStatus(status)
except ValueError:
raise ApplicationError(f"无效的订单状态: {status}")
orders = self._order_repository.find_by_status(order_status)
return [self._to_dto(order) for order in orders]
def _get_order_or_fail(self, order_id: int) -> Order:
order = self._order_repository.find_by_id(order_id)
if not order:
raise OrderNotFoundError(order_id)
return order
def _publish_events(self, order: Order) -> None:
events = order.pull_domain_events()
for event in events:
self._event_publisher.publish(event)
def _to_dto(self, order: Order) -> OrderDTO:
subtotal = order.calculate_total()
tax = self._pricing_service.calculate_tax(subtotal)
return OrderDTO(
id=order.id,
customer_id=order.customer_id.value,
status=order.status.value,
total=subtotal.amount,
tax=tax.amount,
items=[
{
"product_id": item.product_id.value,
"product_name": item.product_name,
"quantity": item.quantity,
"unit_price": item.unit_price.amount,
"total": item.calculate_total().amount
}
for item in order.items
],
created_at=order.created_at.isoformat(),
version=order.version
)
class DomainEventPublisher:
def __init__(self):
self._handlers: Dict[type, List[Callable]] = {}
def subscribe(self, event_type: type, handler: Callable) -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def publish(self, event: DomainEvent) -> None:
handlers = self._handlers.get(type(event), [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"事件处理失败: {e}")
event_publisher = DomainEventPublisher()
event_publisher.subscribe(
OrderConfirmedEvent,
lambda e: print(f"[通知] 订单 {e.order_id.value} 已确认,总额: {e.total}")
)
event_publisher.subscribe(
OrderShippedEvent,
lambda e: print(f"[通知] 订单 {e.order_id.value} 已发货,快递单号: {e.tracking_number}")
)
order_repo = OrderRepository()
pricing_svc = PricingService([DiscountRule(1000, 10)], tax_rate=0.13)
inventory_svc = InventoryService()
uow = OrderUnitOfWork(order_repo)
inventory_svc.set_stock(ProductId(1), 100)
inventory_svc.set_stock(ProductId(2), 50)
app_service = OrderApplicationService(
order_repo, pricing_svc, inventory_svc, event_publisher, uow
)
order_dto = app_service.create_order(CreateOrderCommand(
customer_id=100,
items=[
{"product_id": 1, "product_name": "商品A", "unit_price": 500, "quantity": 2},
{"product_id": 2, "product_name": "商品B", "unit_price": 300, "quantity": 1}
]
))
print(f"订单ID: {order_dto.id}")
print(f"状态: {order_dto.status}")
print(f"总额: {order_dto.total}")
print(f"税额: {order_dto.tax}")
confirmed_dto = app_service.confirm_order(order_dto.id)
print(f"确认后状态: {confirmed_dto.status}")27.7 领域事件
27.7.1 领域事件的形式化定义
定义 27.8(领域事件):领域事件 $e$ 是领域内发生的具有业务意义的事实:
$$e = \langle type, data, timestamp, causation_id, correlation_id \rangle$$
其中:
- $type$:事件类型
- $data$:事件数据
- $timestamp$:发生时间
- $causation_id$:引发此事件的事件ID
- $correlation_id$:关联ID,用于追踪整个流程
27.7.2 领域事件UML图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<abstract>> │
│ DomainEvent │
├─────────────────────────────────────────────────────────────────────────┤
│ + event_id: str │
│ + occurred_at: datetime │
│ + causation_id: Optional[str] │
│ + correlation_id: Optional[str] │
├─────────────────────────────────────────────────────────────────────────┤
│ + to_dict(): Dict[str, Any] │
│ + event_type(): str │
└─────────────────────────────────────────────────────────────────────────┘
△
│
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ OrderCreatedEvent │ │OrderConfirmedEvent│ │ OrderShippedEvent │
├───────────────────┤ ├───────────────────┤ ├───────────────────┤
│ + order_id │ │ + order_id │ │ + order_id │
│ + customer_id │ │ + total │ │ + tracking_number │
└───────────────────┘ └───────────────────┘ └───────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ DomainEventPublisher │
├─────────────────────────────────────────────────────────────────────────┤
│ - _handlers: Dict[type, List[Handler]] │
│ - _middleware: List[Middleware] │
├─────────────────────────────────────────────────────────────────────────┤
│ + subscribe(event_type: type, handler: Callable): void │
│ + publish(event: DomainEvent): void │
│ + add_middleware(middleware: Middleware): void │
└─────────────────────────────────────────────────────────────────────────┘27.7.3 领域事件完整实现
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Callable, Dict, Any, Optional
from abc import ABC, abstractmethod
import uuid
import json
class DomainEvent(ABC):
def __init__(
self,
causation_id: Optional[str] = None,
correlation_id: Optional[str] = None
):
self.event_id = str(uuid.uuid4())
self.occurred_at = datetime.now()
self.causation_id = causation_id
self.correlation_id = correlation_id or str(uuid.uuid4())
@property
@abstractmethod
def event_type(self) -> str:
pass
def to_dict(self) -> Dict[str, Any]:
return {
"event_id": self.event_id,
"event_type": self.event_type,
"occurred_at": self.occurred_at.isoformat(),
"causation_id": self.causation_id,
"correlation_id": self.correlation_id
}
class OrderCreatedEvent(DomainEvent):
@property
def event_type(self) -> str:
return "order.created"
def __init__(
self,
order_id: OrderId,
customer_id: CustomerId,
causation_id: Optional[str] = None,
correlation_id: Optional[str] = None
):
super().__init__(causation_id, correlation_id)
self.order_id = order_id
self.customer_id = customer_id
def to_dict(self) -> Dict[str, Any]:
d = super().to_dict()
d.update({
"order_id": self.order_id.value,
"customer_id": self.customer_id.value
})
return d
class OrderConfirmedEvent(DomainEvent):
@property
def event_type(self) -> str:
return "order.confirmed"
def __init__(
self,
order_id: OrderId,
total: Money,
causation_id: Optional[str] = None,
correlation_id: Optional[str] = None
):
super().__init__(causation_id, correlation_id)
self.order_id = order_id
self.total = total
def to_dict(self) -> Dict[str, Any]:
d = super().to_dict()
d.update({
"order_id": self.order_id.value,
"total": self.total.amount,
"currency": self.total.currency
})
return d
class OrderShippedEvent(DomainEvent):
@property
def event_type(self) -> str:
return "order.shipped"
def __init__(
self,
order_id: OrderId,
tracking_number: str,
causation_id: Optional[str] = None,
correlation_id: Optional[str] = None
):
super().__init__(causation_id, correlation_id)
self.order_id = order_id
self.tracking_number = tracking_number
def to_dict(self) -> Dict[str, Any]:
d = super().to_dict()
d.update({
"order_id": self.order_id.value,
"tracking_number": self.tracking_number
})
return d
class OrderCancelledEvent(DomainEvent):
@property
def event_type(self) -> str:
return "order.cancelled"
def __init__(
self,
order_id: OrderId,
reason: str,
causation_id: Optional[str] = None,
correlation_id: Optional[str] = None
):
super().__init__(causation_id, correlation_id)
self.order_id = order_id
self.reason = reason
def to_dict(self) -> Dict[str, Any]:
d = super().to_dict()
d.update({
"order_id": self.order_id.value,
"reason": self.reason
})
return d
class PaymentReceivedEvent(DomainEvent):
@property
def event_type(self) -> str:
return "payment.received"
def __init__(
self,
order_id: OrderId,
amount: Money,
payment_method: str,
causation_id: Optional[str] = None,
correlation_id: Optional[str] = None
):
super().__init__(causation_id, correlation_id)
self.order_id = order_id
self.amount = amount
self.payment_method = payment_method
class Middleware(ABC):
@abstractmethod
def before_publish(self, event: DomainEvent) -> DomainEvent:
pass
@abstractmethod
def after_publish(self, event: DomainEvent) -> None:
pass
def on_error(self, event: DomainEvent, error: Exception) -> None:
pass
class LoggingMiddleware(Middleware):
def before_publish(self, event: DomainEvent) -> DomainEvent:
print(f"[LOG] 发布事件: {event.event_type} [{event.event_id}]")
return event
def after_publish(self, event: DomainEvent) -> None:
print(f"[LOG] 事件处理完成: {event.event_type}")
def on_error(self, event: DomainEvent, error: Exception) -> None:
print(f"[ERROR] 事件处理失败: {event.event_type}, 错误: {error}")
class EventStoreMiddleware(Middleware):
def __init__(self):
self._events: List[Dict[str, Any]] = []
def before_publish(self, event: DomainEvent) -> DomainEvent:
return event
def after_publish(self, event: DomainEvent) -> None:
self._events.append(event.to_dict())
def get_events(self) -> List[Dict[str, Any]]:
return self._events.copy()
def get_events_by_type(self, event_type: str) -> List[Dict[str, Any]]:
return [e for e in self._events if e["event_type"] == event_type]
class DomainEventPublisher:
def __init__(self):
self._handlers: Dict[type, List[Callable]] = {}
self._middleware: List[Middleware] = []
self._dead_letter_queue: List[Dict[str, Any]] = []
def subscribe(self, event_type: type, handler: Callable) -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def unsubscribe(self, event_type: type, handler: Callable) -> None:
if event_type in self._handlers:
self._handlers[event_type].remove(handler)
def add_middleware(self, middleware: Middleware) -> None:
self._middleware.append(middleware)
def publish(self, event: DomainEvent) -> None:
processed_event = event
for middleware in self._middleware:
processed_event = middleware.before_publish(processed_event)
handlers = self._handlers.get(type(processed_event), [])
if not handlers:
handlers = self._handlers.get(processed_event.__class__, [])
for handler in handlers:
try:
handler(processed_event)
except Exception as e:
for middleware in self._middleware:
middleware.on_error(processed_event, e)
self._dead_letter_queue.append({
"event": processed_event.to_dict(),
"error": str(e),
"failed_at": datetime.now().isoformat()
})
for middleware in self._middleware:
middleware.after_publish(processed_event)
def publish_all(self, events: List[DomainEvent]) -> None:
for event in events:
self.publish(event)
def get_dead_letter_queue(self) -> List[Dict[str, Any]]:
return self._dead_letter_queue.copy()
class EventHandler:
def __init__(self, name: str):
self.name = name
self._processed_events: List[str] = []
def handle(self, event: DomainEvent) -> None:
self._processed_events.append(event.event_id)
print(f"[{self.name}] 处理事件: {event.event_type}")
def has_processed(self, event_id: str) -> bool:
return event_id in self._processed_events
event_publisher = DomainEventPublisher()
logging_middleware = LoggingMiddleware()
event_store = EventStoreMiddleware()
event_publisher.add_middleware(logging_middleware)
event_publisher.add_middleware(event_store)
notification_handler = EventHandler("通知服务")
inventory_handler = EventHandler("库存服务")
analytics_handler = EventHandler("分析服务")
event_publisher.subscribe(OrderConfirmedEvent, notification_handler.handle)
event_publisher.subscribe(OrderConfirmedEvent, analytics_handler.handle)
event_publisher.subscribe(OrderShippedEvent, notification_handler.handle)
event_publisher.subscribe(OrderCancelledEvent, inventory_handler.handle)
order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 1)
order.confirm()
events = order.pull_domain_events()
event_publisher.publish_all(events)
print(f"\n事件存储中的事件数量: {len(event_store.get_events())}")
for e in event_store.get_events():
print(f" - {e['event_type']}: {e['event_id'][:8]}...")27.8 战略设计:限界上下文
27.8.1 限界上下文的形式化定义
定义 27.9(限界上下文映射):上下文映射 $\mathcal{M}$ 定义了限界上下文间的关系:
$$\mathcal{M}: \mathcal{B}_1 \times \mathcal{B}_2 \rightarrow Relationship$$
关系类型包括:
- 共享内核:$SharedKernel \subset \mathcal{M}_1 \cap \mathcal{M}_2$
- 客户-供应商:$\mathcal{B}_1 \xrightarrow{supply} \mathcal{B}_2$
- 防腐层:$ACL: \mathcal{M}{external} \rightarrow \mathcal{M}$
- 开放主机服务:$OHS: \mathcal{B} \rightarrow API$
27.8.2 限界上下文映射图
┌─────────────────────────────────────────────────────────────────────────┐
│ 销售上下文 │
│ (Sales Context) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Order │ │ Customer │ │ Product │ │
│ │ Aggregate │ │ Aggregate │ │ Aggregate │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│ │ │
│ OHS/PL │ ACL │ ACL
│ (开放主机服务) │ (防腐层) │ (防腐层)
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 物流上下文 │ │ 支付上下文 │ │ 库存上下文 │
│ (Shipping Ctx) │ │ (Payment Ctx) │ │ (Inventory Ctx) │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Shipment │ │ │ │ Payment │ │ │ │ Stock │ │
│ │ Aggregate │ │ │ │ Aggregate │ │ │ │ Aggregate │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
│ CF │ CF
│ (符合性) │ (符合性)
▼ ▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 通知上下文 │
│ (Notification Context) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Email │ │ SMS │ │
│ │ Service │ │ Service │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
图例:
OHS/PL = Open Host Service / Published Language (开放主机服务/发布语言)
ACL = Anti-Corruption Layer (防腐层)
CF = Conformist (符合者)27.8.3 上下文映射实现
from dataclasses import dataclass
from typing import Protocol, Dict, Any, Optional
from abc import ABC, abstractmethod
@dataclass(frozen=True)
class OrderIdDTO:
value: int
@dataclass(frozen=True)
class CustomerIdDTO:
value: int
@dataclass(frozen=True)
class ProductIdDTO:
value: int
@dataclass
class OrderSummaryDTO:
order_id: OrderIdDTO
customer_id: CustomerIdDTO
total_amount: float
status: str
items: List[Dict[str, Any]]
class OrderRepositoryInterface(Protocol):
def find_by_id(self, id: int) -> Optional['Order']:
...
def save(self, order: 'Order') -> 'Order':
...
class AntiCorruptionLayer(ABC):
@abstractmethod
def to_local(self, external_data: Any) -> Any:
pass
@abstractmethod
def to_external(self, local_data: Any) -> Any:
pass
class ShippingAntiCorruptionLayer(AntiCorruptionLayer):
def to_local(self, external_data: Dict[str, Any]) -> OrderSummaryDTO:
return OrderSummaryDTO(
order_id=OrderIdDTO(external_data["order_id"]),
customer_id=CustomerIdDTO(external_data["customer_id"]),
total_amount=external_data["total"],
status=external_data["status"],
items=external_data.get("items", [])
)
def to_external(self, local_data: Order) -> Dict[str, Any]:
return {
"order_id": local_data.order_id.value,
"customer_id": local_data.customer_id.value,
"total": local_data.calculate_total().amount,
"status": local_data.status.value,
"items": [
{
"product_id": item.product_id.value,
"product_name": item.product_name,
"quantity": item.quantity
}
for item in local_data.items
]
}
class PaymentAntiCorruptionLayer(AntiCorruptionLayer):
def to_local(self, external_data: Dict[str, Any]) -> Dict[str, Any]:
return {
"payment_id": external_data.get("payment_id"),
"order_id": OrderIdDTO(external_data["order_id"]),
"amount": Money(
external_data["amount"],
external_data.get("currency", "CNY")
),
"status": external_data.get("status", "pending")
}
def to_external(self, local_data: Dict[str, Any]) -> Dict[str, Any]:
return {
"order_id": local_data["order_id"].value,
"amount": local_data["amount"].amount,
"currency": local_data["amount"].currency,
"customer_id": local_data.get("customer_id", 0)
}
class OpenHostService(ABC):
@abstractmethod
def get_order(self, order_id: int) -> Optional[Dict[str, Any]]:
pass
@abstractmethod
def get_customer_orders(self, customer_id: int) -> List[Dict[str, Any]]:
pass
class OrderOpenHostService(OpenHostService):
def __init__(self, order_repository: OrderRepository):
self._order_repository = order_repository
def get_order(self, order_id: int) -> Optional[Dict[str, Any]]:
order = self._order_repository.find_by_id(order_id)
if not order:
return None
return {
"order_id": order.order_id.value,
"customer_id": order.customer_id.value,
"status": order.status.value,
"total": order.calculate_total().amount,
"items": [
{
"product_id": item.product_id.value,
"product_name": item.product_name,
"quantity": item.quantity,
"unit_price": item.unit_price.amount
}
for item in order.items
],
"created_at": order.created_at.isoformat()
}
def get_customer_orders(self, customer_id: int) -> List[Dict[str, Any]]:
orders = self._order_repository.find_by_customer(CustomerId(customer_id))
return [
{
"order_id": o.order_id.value,
"status": o.status.value,
"total": o.calculate_total().amount,
"item_count": len(o.items)
}
for o in orders
]
class PublishedLanguage:
EVENT_SCHEMAS = {
"order.created": {
"order_id": "integer",
"customer_id": "integer",
"occurred_at": "datetime"
},
"order.confirmed": {
"order_id": "integer",
"total": "number",
"currency": "string"
},
"order.shipped": {
"order_id": "integer",
"tracking_number": "string"
}
}
@classmethod
def validate_event(cls, event_type: str, data: Dict[str, Any]) -> bool:
schema = cls.EVENT_SCHEMAS.get(event_type)
if not schema:
return False
for field_name, field_type in schema.items():
if field_name not in data:
return False
return True
class SharedKernel:
@dataclass(frozen=True)
class Money:
amount: float
currency: str
@dataclass(frozen=True)
class OrderId:
value: int
@dataclass(frozen=True)
class CustomerId:
value: int
@dataclass(frozen=True)
class ProductId:
value: int
class ShippingContext:
def __init__(self, order_service: OpenHostService, acl: AntiCorruptionLayer):
self._order_service = order_service
self._acl = acl
def create_shipment(self, order_id: int) -> Dict[str, Any]:
order_data = self._order_service.get_order(order_id)
if not order_data:
raise ValueError(f"订单不存在: {order_id}")
order_dto = self._acl.to_local(order_data)
shipment = {
"shipment_id": str(uuid.uuid4()),
"order_id": order_dto.order_id.value,
"customer_id": order_dto.customer_id.value,
"status": "preparing",
"items": order_dto.items,
"created_at": datetime.now().isoformat()
}
return shipment
class PaymentContext:
def __init__(self, acl: AntiCorruptionLayer):
self._acl = acl
self._payments: Dict[int, Dict[str, Any]] = {}
def create_payment(
self,
order_id: int,
amount: Money,
customer_id: int
) -> Dict[str, Any]:
payment_data = {
"payment_id": str(uuid.uuid4()),
"order_id": OrderIdDTO(order_id),
"amount": amount,
"customer_id": customer_id,
"status": "pending"
}
external_data = self._acl.to_external(payment_data)
self._payments[order_id] = {
**external_data,
"payment_id": payment_data["payment_id"]
}
return self._payments[order_id]
def confirm_payment(self, order_id: int) -> Dict[str, Any]:
if order_id not in self._payments:
raise ValueError(f"支付记录不存在: {order_id}")
self._payments[order_id]["status"] = "confirmed"
return self._payments[order_id]
order_repo = OrderRepository()
order_service = OrderOpenHostService(order_repo)
order = Order(OrderId(1), CustomerId(100))
order.add_item(ProductId(1), "商品A", Money(100, "CNY"), 2)
order_repo.save(order)
shipping_acl = ShippingAntiCorruptionLayer()
shipping_ctx = ShippingContext(order_service, shipping_acl)
shipment = shipping_ctx.create_shipment(1)
print(f"创建发货单: {shipment['shipment_id'][:8]}...")
payment_acl = PaymentAntiCorruptionLayer()
payment_ctx = PaymentContext(payment_acl)
payment = payment_ctx.create_payment(1, Money(200, "CNY"), 100)
print(f"创建支付: {payment['payment_id'][:8]}...")
confirmed = payment_ctx.confirm_payment(1)
print(f"支付状态: {confirmed['status']}")27.9 反模式与最佳实践
27.9.1 常见反模式
| 反模式 | 描述 | 后果 | 解决方案 |
|---|---|---|---|
| 贫血模型 | 实体只有getter/setter,逻辑在服务层 | 违反封装,逻辑分散 | 将业务逻辑移入实体 |
| 臃肿聚合 | 聚合包含过多实体 | 性能问题,并发冲突 | 拆分聚合,使用标识引用 |
| 万能服务 | 一个服务处理所有逻辑 | 职责不清,难以维护 | 按领域划分服务 |
| 忽略通用语言 | 代码术语与业务术语不一致 | 沟通障碍 | 建立统一术语表 |
| 过度设计 | 简单问题复杂化 | 开发效率低 | 根据复杂度选择策略 |
| 聚合间直接引用 | 聚合持有其他聚合实例 | 事务边界模糊 | 使用ID引用 |
27.9.2 贫血模型示例与修正
class AnemicOrder:
def __init__(self, id: int, customer_id: int):
self.id = id
self.customer_id = customer_id
self.items: List[Dict] = []
self.status = "pending"
class AnemicOrderService:
def add_item(
self,
order: AnemicOrder,
product_id: int,
name: str,
price: float,
qty: int
) -> None:
if order.status != "pending":
raise ValueError("只能修改待确认订单")
order.items.append({
"product_id": product_id,
"name": name,
"price": price,
"quantity": qty
})
def confirm(self, order: AnemicOrder) -> None:
if order.status != "pending":
raise ValueError("只能确认待确认订单")
if not order.items:
raise ValueError("订单不能为空")
order.status = "confirmed"
class RichOrder:
def __init__(self, id: OrderId, customer_id: CustomerId):
self._order_id = id
self._customer_id = customer_id
self._items: List[OrderItem] = []
self._status = OrderStatus.PENDING
@property
def order_id(self) -> OrderId:
return self._order_id
@property
def status(self) -> OrderStatus:
return self._status
def add_item(
self,
product_id: ProductId,
name: str,
price: Money,
qty: int
) -> None:
self._ensure_modifiable()
self._validate_item(product_id, name, price, qty)
existing = self._find_item(product_id)
if existing:
existing.update_quantity(existing.quantity + qty)
else:
self._items.append(OrderItem(
OrderItemId(len(self._items) + 1),
product_id, name, price, qty
))
def confirm(self) -> None:
self._ensure_status(OrderStatus.PENDING)
if not self._items:
raise ValueError("订单不能为空")
self._status = OrderStatus.CONFIRMED
def _ensure_modifiable(self) -> None:
if self._status != OrderStatus.PENDING:
raise ValueError("只能修改待确认订单")
def _ensure_status(self, expected: OrderStatus) -> None:
if self._status != expected:
raise ValueError(f"状态错误: 期望 {expected.value}, 实际 {self._status.value}")
def _find_item(self, product_id: ProductId) -> Optional[OrderItem]:
return next((i for i in self._items if i.product_id == product_id), None)
def _validate_item(self, product_id: ProductId, name: str, price: Money, qty: int) -> None:
if qty <= 0:
raise ValueError("数量必须大于0")
if not name:
raise ValueError("商品名称不能为空")27.9.3 最佳实践清单
class DDDBestPractices:
ENTITY_RULES = [
"实体通过标识而非属性定义相等性",
"实体生命周期内标识不变",
"实体可变,但通过方法封装变更",
"实体验证自身不变量"
]
VALUE_OBJECT_RULES = [
"值对象不可变",
"值对象通过属性值定义相等性",
"值对象可自由替换",
"值对象自验证"
]
AGGREGATE_RULES = [
"聚合根是唯一入口",
"聚合边界内保证事务一致性",
"聚合间通过ID引用",
"聚合尽可能小",
"使用最终一致性维护聚合间约束"
]
SERVICE_RULES = [
"领域服务无状态",
"服务操作不属于任何实体或值对象",
"服务接口使用领域语言",
"服务命名反映领域概念"
]
REPOSITORY_RULES = [
"仓储只针对聚合根",
"仓储模拟内存集合",
"仓储隐藏持久化细节",
"仓储不处理业务逻辑"
]
@classmethod
def validate_aggregate(cls, aggregate: Any) -> List[str]:
violations = []
for item in getattr(aggregate, 'items', []):
if hasattr(item, 'id') and not hasattr(aggregate, '_find_item_by_id'):
violations.append("聚合内部实体应通过聚合根访问")
if hasattr(aggregate, 'get_other_aggregate'):
violations.append("聚合不应持有其他聚合实例")
return violations27.10 决策指南
27.10.1 DDD适用性评估
┌─────────────────────────────────────┐
│ 业务复杂度评估 │
└─────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 简单CRUD │ │ 复杂业务 │
│ 低复杂度 │ │ 高复杂度 │
└───────────────┘ └───────────────┘
│ │
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 不推荐DDD │ │ 推荐DDD │
│ 使用事务脚本 │ │ 使用领域模型 │
└───────────────┘ └───────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 单一限界上下文 │ │ 多限界上下文 │
│ 战术设计为主 │ │ 战略设计为主 │
└───────────────┘ └───────────────┘27.10.2 聚合设计决策树
┌─────────────────────────────────────┐
│ 是否需要事务一致性? │
└─────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 是 │ │ 否 │
└───────────────┘ └───────────────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 放入同一聚合 │ │ 分离为不同聚合 │
└───────────────┘ │ 使用最终一致性 │
└───────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 领域事件 │ │ 消息队列 │
│ 同步处理 │ │ 异步处理 │
└───────────────┘ └───────────────┘27.10.3 模式选择对照表
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 用户注册 | 实体 + 值对象 | 需要唯一标识,邮箱/地址为值对象 |
| 订单管理 | 聚合 + 领域事件 | 订单项与订单强一致,订单确认触发下游 |
| 价格计算 | 领域服务 | 折扣规则不属于任何实体 |
| 库存预留 | 领域服务 + 仓储 | 跨聚合操作 |
| 支付处理 | 防腐层 | 外部支付系统集成 |
| 订单查询 | 应用服务 + DTO | 读模型与写模型分离 |
| 状态变更 | 实体方法 + 事件 | 封装状态转换逻辑 |
27.11 快速参考卡片
27.11.1 DDD核心概念速查
┌─────────────────────────────────────────────────────────────────────────┐
│ DDD 核心概念速查表 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 战略设计 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 限界上下文 │ │ 上下文映射 │ │ 通用语言 │ │
│ │ Bounded │ │ Context │ │ Ubiquitous │ │
│ │ Context │ │ Mapping │ │ Language │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 战术设计 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 实体 │ │ 值对象 │ │ 聚合 │ │ 领域服务 │ │
│ │ Entity │ │Value Object │ │ Aggregate │ │Domain Svc │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 仓储 │ │ 工作单元 │ │ 领域事件 │ │
│ │ Repository │ │Unit of Work │ │Domain Event│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────────────┤
│ 实体 vs 值对象 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 实体: 有标识, 可变, 生命周期, 通过ID比较 │ │
│ │ 值对象: 无标识, 不可变, 无生命周期, 通过属性比较 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 聚合规则 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 1. 只有聚合根有全局标识 │ │
│ │ 2. 外部只能通过聚合根访问聚合内部 │ │
│ │ 3. 聚合边界内保证事务一致性 │ │
│ │ 4. 聚合间通过ID引用 │ │
│ │ 5. 聚合尽可能小 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘27.11.2 代码模板速查
# 值对象模板
@dataclass(frozen=True)
class MyValueObject:
value: str
def __post_init__(self):
if not self._validate():
raise ValueError("验证失败")
def _validate(self) -> bool:
return True
# 实体模板
class MyEntity(Entity):
def __init__(self, id: int, name: str):
super().__init__(id)
self._name = name
@property
def name(self) -> str:
return self._name
def change_name(self, new_name: str) -> None:
self._validate_name(new_name)
self._name = new_name
# 聚合根模板
class MyAggregateRoot(Entity):
def __init__(self, id: int):
super().__init__(id)
self._events: List[DomainEvent] = []
def pull_domain_events(self) -> List[DomainEvent]:
events = self._events.copy()
self._events.clear()
return events
def _add_event(self, event: DomainEvent) -> None:
self._events.append(event)
# 仓储模板
class MyRepository(Repository[MyAggregate]):
def find_by_id(self, id: int) -> Optional[MyAggregate]:
pass
def save(self, entity: MyAggregate) -> MyAggregate:
pass
# 领域服务模板
class MyDomainService:
def perform_operation(self, entity: MyEntity) -> Result:
pass
# 应用服务模板
class MyApplicationService:
def __init__(self, repository: MyRepository, uow: UnitOfWork):
self._repository = repository
self._uow = uow
def handle_command(self, command: Command) -> DTO:
with self._uow.transaction():
entity = self._repository.find_by_id(command.id)
entity.perform_action()
self._repository.save(entity)
return self._to_dto(entity)27.12 小结
27.12.1 核心要点
- 通用语言:团队共享的领域术语,代码与业务语言一致
- 限界上下文:明确模型边界,避免概念混淆
- 实体与值对象:区分标识与属性,正确建模领域概念
- 聚合:保证事务一致性,控制边界大小
- 领域服务:封装不属于实体的领域逻辑
- 仓储:持久化的抽象,模拟内存集合
- 领域事件:实现跨聚合通信,支持最终一致性
27.12.2 DDD适用场景
| 适用 | 不适用 |
|---|---|
| 业务复杂的核心领域 | 简单CRUD应用 |
| 需要长期维护的系统 | 短期项目 |
| 团队对领域有深入理解 | 领域知识不稳定 |
| 需要与业务专家协作 | 技术驱动的项目 |
27.12.3 实施建议
- 从小处着手:先在一个限界上下文中实践
- 建立通用语言:与领域专家共同维护术语表
- 迭代演进:模型随理解深入而演进
- 保持简单:避免过度设计,根据复杂度选择策略
- 持续重构:通过重构提炼领域模型