第26章 架构设计模式
学习目标
完成本章学习后,读者将能够:
- 理解软件架构的核心概念与形式化定义
- 掌握分层架构、六边形架构、CQRS等经典架构模式
- 实现插件架构、事件驱动架构等可扩展架构
- 识别不同架构模式的适用场景与权衡
- 设计符合业务需求的软件架构
26.1 软件架构概述
26.1.1 核心定义
软件架构(Software Architecture) 是软件系统的高层结构,它定义了系统的组成部分、它们之间的关系以及指导设计与演进的原则。
26.1.2 形式化定义
从形式化角度,软件架构可以定义为一个四元组:
$$\mathcal{A} = \langle C, R, V, P \rangle$$
其中:
- $C = {c_1, c_2, \ldots, c_n}$ 是组件集合(Components)
- $R \subseteq C \times C$ 是组件间关系集合(Relations)
- $V$ 是视图集合(Views)
- $P$ 是属性集合(Properties)
组件定义:
组件 $c$ 是一个三元组:
$$c = \langle I, O, B \rangle$$
其中 $I$ 是接口集合,$O$ 是操作集合,$B$ 是行为规约。
连接件定义:
连接件 $l$ 定义组件间的交互方式:
$$l: I_1 \times I_2 \rightarrow Interaction$$
架构风格:
架构风格 $S$ 定义了一组约束:
$$S = \langle C_S, R_S, Constraints \rangle$$
其中 $Constraints$ 是必须满足的不变量集合。
质量属性:
架构质量属性 $Q$ 可以表示为:
$$Q = {q_1, q_2, \ldots, q_m}$$
常见的质量属性包括:
- 可维护性:$Maintainability = f(Coupling, Cohesion)$
- 可扩展性:$Scalability = f(Throughput, Latency)$
- 可靠性:$Reliability = f(MTBF, MTTR)$
26.1.3 架构视图
┌─────────────────────────────────────────────────────────────────┐
│ 4+1架构视图模型 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ 场景 │ │
│ │Scenarios│ │
│ └────┬────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 逻辑视图 │ │ 进程视图 │ │ 开发视图 │ │
│ │ Logical │ │ Process │ │Development│ │
│ │ View │ │ View │ │ View │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 物理视图 │ │
│ │ Physical │ │
│ │ View │ │
│ └──────────┘ │
│ │
│ 视图说明: │
│ - 逻辑视图:功能需求,面向最终用户 │
│ - 进程视图:并发和分布,面向系统集成者 │
│ - 开发视图:程序组织,面向程序员 │
│ - 物理视图:系统拓扑,面向系统工程师 │
│ - 场景:用例,面向所有利益相关者 │
│ │
└─────────────────────────────────────────────────────────────────┘26.1.4 架构模式分类
| 类别 | 模式 | 核心思想 |
|---|---|---|
| 分层架构 | N层架构、整洁架构 | 关注点分离 |
| 端口适配器 | 六边形架构、洋葱架构 | 依赖反转 |
| 分布式架构 | 微服务、SOA | 服务解耦 |
| 事件驱动 | 事件溯源、CQRS | 异步解耦 |
| 扩展架构 | 插件架构、微内核 | 动态扩展 |
26.2 历史背景与演进
26.2.1 历史发展
| 年代 | 里程碑 | 描述 |
|---|---|---|
| 1968 | Dijkstra分层 | 分层结构概念提出 |
| 1970s | MVC | Smalltalk的MVC模式 |
| 1990s | 三层架构 | 客户端-服务器架构演进 |
| 1996 | 4+1视图 | Philippe Kruchten提出架构视图模型 |
| 2000s | SOA | 面向服务架构兴起 |
| 2005 | 六边形架构 | Alistair Cockburn提出 |
| 2012 | 整洁架构 | Robert C. Martin提出 |
| 2014 | 微服务 | Martin Fowler推广微服务概念 |
| 2010s | 云原生 | Kubernetes、Service Mesh |
| 2020s | 模块化单体 | 平衡复杂度的回归 |
26.2.2 理论基础
软件架构的理论基础来源于:
- 软件工程:模块化、信息隐藏、关注点分离
- 系统理论:系统分解、层次结构
- 分布式计算:CAP定理、一致性模型
- 领域驱动设计:限界上下文、聚合
26.3 分层架构模式
26.3.1 经典三层架构
python
from dataclasses import dataclass, field
from typing import List, Optional, Protocol, Dict, Any
from abc import ABC, abstractmethod
from datetime import datetime
import json
@dataclass
class User:
"""用户实体"""
id: int
name: str
email: str
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
class UserRepository(Protocol):
"""用户仓储协议:数据访问层接口"""
def find_by_id(self, user_id: int) -> Optional[User]:
...
def find_all(self) -> List[User]:
...
def find_by_email(self, email: str) -> Optional[User]:
...
def save(self, user: User) -> User:
...
def delete(self, user_id: int) -> bool:
...
class InMemoryUserRepository:
"""内存用户仓储:数据访问层实现"""
def __init__(self):
self._users: Dict[int, User] = {}
self._email_index: Dict[str, int] = {}
self._next_id = 1
def find_by_id(self, user_id: int) -> Optional[User]:
return self._users.get(user_id)
def find_all(self) -> List[User]:
return list(self._users.values())
def find_by_email(self, email: str) -> Optional[User]:
user_id = self._email_index.get(email.lower())
return self._users.get(user_id) if user_id else None
def save(self, user: User) -> User:
if user.id == 0:
user = User(
id=self._next_id,
name=user.name,
email=user.email,
created_at=datetime.now(),
updated_at=datetime.now()
)
self._next_id += 1
else:
user = User(
id=user.id,
name=user.name,
email=user.email,
created_at=user.created_at,
updated_at=datetime.now()
)
self._users[user.id] = user
self._email_index[user.email.lower()] = user.id
return user
def delete(self, user_id: int) -> bool:
user = self._users.pop(user_id, None)
if user:
self._email_index.pop(user.email.lower(), None)
return True
return False
class UserService:
"""用户服务:业务逻辑层"""
def __init__(self, repository: UserRepository):
self._repository = repository
def get_user(self, user_id: int) -> Optional[User]:
return self._repository.find_by_id(user_id)
def get_all_users(self) -> List[User]:
return self._repository.find_all()
def create_user(self, name: str, email: str) -> User:
self._validate_name(name)
self._validate_email(email)
existing = self._repository.find_by_email(email)
if existing:
raise ValueError(f"邮箱已被使用: {email}")
user = User(id=0, name=name, email=email)
return self._repository.save(user)
def update_user(self, user_id: int, name: str = None, email: str = None) -> User:
user = self._repository.find_by_id(user_id)
if not user:
raise ValueError(f"用户不存在: {user_id}")
new_name = name if name is not None else user.name
new_email = email if email is not None else user.email
if email is not None:
self._validate_email(email)
existing = self._repository.find_by_email(email)
if existing and existing.id != user_id:
raise ValueError(f"邮箱已被使用: {email}")
if name is not None:
self._validate_name(name)
updated_user = User(
id=user_id,
name=new_name,
email=new_email,
created_at=user.created_at
)
return self._repository.save(updated_user)
def delete_user(self, user_id: int) -> bool:
return self._repository.delete(user_id)
def _validate_email(self, email: str) -> None:
if not email or '@' not in email:
raise ValueError("无效的邮箱地址")
parts = email.split('@')
if len(parts) != 2 or '.' not in parts[1]:
raise ValueError("无效的邮箱地址")
def _validate_name(self, name: str) -> None:
if not name or len(name.strip()) < 2:
raise ValueError("姓名长度不足")
@dataclass
class ApiResponse:
"""API响应"""
success: bool
data: Any = None
error: str = None
status_code: int = 200
class UserController:
"""用户控制器:表示层"""
def __init__(self, service: UserService):
self._service = service
def get_user(self, user_id: int) -> ApiResponse:
user = self._service.get_user(user_id)
if not user:
return ApiResponse(False, error="用户不存在", status_code=404)
return ApiResponse(True, data=self._user_to_dict(user))
def list_users(self) -> ApiResponse:
users = self._service.get_all_users()
return ApiResponse(True, data=[self._user_to_dict(u) for u in users])
def create_user(self, request: dict) -> ApiResponse:
try:
user = self._service.create_user(
name=request.get('name'),
email=request.get('email')
)
return ApiResponse(True, data=self._user_to_dict(user), status_code=201)
except ValueError as e:
return ApiResponse(False, error=str(e), status_code=400)
def update_user(self, user_id: int, request: dict) -> ApiResponse:
try:
user = self._service.update_user(
user_id=user_id,
name=request.get('name'),
email=request.get('email')
)
return ApiResponse(True, data=self._user_to_dict(user))
except ValueError as e:
return ApiResponse(False, error=str(e), status_code=400)
def delete_user(self, user_id: int) -> ApiResponse:
if self._service.delete_user(user_id):
return ApiResponse(True, status_code=204)
return ApiResponse(False, error="用户不存在", status_code=404)
def _user_to_dict(self, user: User) -> dict:
return {
'id': user.id,
'name': user.name,
'email': user.email,
'created_at': user.created_at.isoformat(),
'updated_at': user.updated_at.isoformat()
}
repository = InMemoryUserRepository()
service = UserService(repository)
controller = UserController(service)
response = controller.create_user({'name': '张三', 'email': 'zhangsan@example.com'})
print(f"创建用户: {response}")
response = controller.create_user({'name': '李四', 'email': 'lisi@example.com'})
print(f"创建用户: {response}")
response = controller.list_users()
print(f"用户列表: {response}")26.3.2 依赖注入容器
python
from typing import Dict, Type, Callable, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum, auto
import inspect
class ServiceLifetime(Enum):
"""服务生命周期"""
TRANSIENT = auto()
SINGLETON = auto()
SCOPED = auto()
@dataclass
class ServiceDescriptor:
"""服务描述符"""
service_type: Type
implementation: Any
lifetime: ServiceLifetime
factory: Optional[Callable] = None
class Container:
"""
依赖注入容器
支持:
- 瞬态:每次请求创建新实例
- 单例:整个应用生命周期共享一个实例
- 作用域:同一作用域内共享实例
"""
def __init__(self):
self._descriptors: Dict[Type, ServiceDescriptor] = {}
self._singletons: Dict[Type, Any] = {}
self._scopes: Dict[str, Dict[Type, Any]] = {}
self._current_scope: Optional[str] = None
def register_transient(
self,
service_type: Type,
implementation: Type = None
) -> 'Container':
"""注册瞬态服务"""
impl = implementation or service_type
self._descriptors[service_type] = ServiceDescriptor(
service_type=service_type,
implementation=impl,
lifetime=ServiceLifetime.TRANSIENT
)
return self
def register_singleton(
self,
service_type: Type,
implementation: Type = None
) -> 'Container':
"""注册单例服务"""
impl = implementation or service_type
self._descriptors[service_type] = ServiceDescriptor(
service_type=service_type,
implementation=impl,
lifetime=ServiceLifetime.SINGLETON
)
return self
def register_factory(
self,
service_type: Type,
factory: Callable[['Container'], Any],
lifetime: ServiceLifetime = ServiceLifetime.TRANSIENT
) -> 'Container':
"""注册工厂方法"""
self._descriptors[service_type] = ServiceDescriptor(
service_type=service_type,
implementation=None,
lifetime=lifetime,
factory=factory
)
return self
def register_instance(self, service_type: Type, instance: Any) -> 'Container':
"""注册现有实例"""
self._singletons[service_type] = instance
return self
def get(self, service_type: Type) -> Any:
"""获取服务实例"""
if service_type in self._singletons:
return self._singletons[service_type]
descriptor = self._descriptors.get(service_type)
if not descriptor:
raise ValueError(f"未注册的服务: {service_type.__name__}")
if descriptor.lifetime == ServiceLifetime.SINGLETON:
if service_type not in self._singletons:
self._singletons[service_type] = self._create_instance(descriptor)
return self._singletons[service_type]
if descriptor.lifetime == ServiceLifetime.SCOPED and self._current_scope:
scope = self._scopes[self._current_scope]
if service_type not in scope:
scope[service_type] = self._create_instance(descriptor)
return scope[service_type]
return self._create_instance(descriptor)
def _create_instance(self, descriptor: ServiceDescriptor) -> Any:
"""创建服务实例"""
if descriptor.factory:
return descriptor.factory(self)
impl = descriptor.implementation
sig = inspect.signature(impl.__init__)
kwargs = {}
for param_name, param in sig.parameters.items():
if param_name == 'self':
continue
if param.annotation != inspect.Parameter.empty:
try:
kwargs[param_name] = self.get(param.annotation)
except ValueError:
if param.default != inspect.Parameter.empty:
continue
raise
return impl(**kwargs)
def create_scope(self, scope_id: str = None) -> 'Scope':
"""创建作用域"""
import uuid
scope_id = scope_id or str(uuid.uuid4())
self._scopes[scope_id] = {}
return Scope(self, scope_id)
def build(self) -> 'ServiceProvider':
"""构建服务提供者"""
return ServiceProvider(self)
class Scope:
"""作用域"""
def __init__(self, container: Container, scope_id: str):
self._container = container
self._scope_id = scope_id
container._current_scope = scope_id
def get(self, service_type: Type) -> Any:
return self._container.get(service_type)
def dispose(self) -> None:
if self._scope_id in self._container._scopes:
del self._container._scopes[self._scope_id]
self._container._current_scope = None
class ServiceProvider:
"""服务提供者"""
def __init__(self, container: Container):
self._container = container
def get(self, service_type: Type) -> Any:
return self._container.get(service_type)
def create_scope(self) -> Scope:
return self._container.create_scope()
container = Container()
container.register_singleton(InMemoryUserRepository)
container.register_transient(UserService)
container.register_transient(UserController)
provider = container.build()
controller1 = provider.get(UserController)
controller2 = provider.get(UserController)
print(f"控制器是不同实例: {controller1 is not controller2}")
print(f"仓储是相同实例: {controller1._service._repository is controller2._service._repository}")26.3.3 整洁架构
python
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
class OrderStatus(Enum):
PENDING = auto()
CONFIRMED = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
@dataclass
class OrderItem:
"""订单项实体"""
product_id: int
product_name: str
quantity: int
unit_price: float
@property
def total(self) -> float:
return self.quantity * self.unit_price
@dataclass
class Order:
"""订单实体(核心业务逻辑)"""
id: int
customer_id: int
items: List[OrderItem]
status: OrderStatus = OrderStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
@property
def total(self) -> float:
return sum(item.total for item in self.items)
def add_item(self, item: OrderItem) -> None:
self.items.append(item)
def remove_item(self, product_id: int) -> bool:
for i, item in enumerate(self.items):
if item.product_id == product_id:
self.items.pop(i)
return True
return False
def confirm(self) -> None:
if self.status != OrderStatus.PENDING:
raise ValueError("只有待确认订单可以确认")
if not self.items:
raise ValueError("订单不能为空")
self.status = OrderStatus.CONFIRMED
def ship(self) -> None:
if self.status != OrderStatus.CONFIRMED:
raise ValueError("只有已确认订单可以发货")
self.status = OrderStatus.SHIPPED
def deliver(self) -> None:
if self.status != OrderStatus.SHIPPED:
raise ValueError("只有已发货订单可以确认收货")
self.status = OrderStatus.DELIVERED
def cancel(self) -> None:
if self.status in (OrderStatus.SHIPPED, OrderStatus.DELIVERED):
raise ValueError("已发货或已收货的订单不能取消")
self.status = OrderStatus.CANCELLED
class OrderRepository(ABC):
"""订单仓储接口(核心层)"""
@abstractmethod
def find_by_id(self, order_id: int) -> Optional[Order]:
pass
@abstractmethod
def find_by_customer(self, customer_id: int) -> List[Order]:
pass
@abstractmethod
def save(self, order: Order) -> Order:
pass
@abstractmethod
def delete(self, order_id: int) -> bool:
pass
class OrderUseCases:
"""订单用例(核心层)"""
def __init__(self, repository: OrderRepository):
self._repository = repository
def create_order(self, customer_id: int, items: List[Dict]) -> Order:
order_items = [
OrderItem(
product_id=item['product_id'],
product_name=item['product_name'],
quantity=item['quantity'],
unit_price=item['unit_price']
)
for item in items
]
order = Order(id=0, customer_id=customer_id, items=order_items)
return self._repository.save(order)
def get_order(self, order_id: int) -> Optional[Order]:
return self._repository.find_by_id(order_id)
def get_customer_orders(self, customer_id: int) -> List[Order]:
return self._repository.find_by_customer(customer_id)
def confirm_order(self, order_id: int) -> Order:
order = self._repository.find_by_id(order_id)
if not order:
raise ValueError(f"订单不存在: {order_id}")
order.confirm()
return self._repository.save(order)
def ship_order(self, order_id: int) -> Order:
order = self._repository.find_by_id(order_id)
if not order:
raise ValueError(f"订单不存在: {order_id}")
order.ship()
return self._repository.save(order)
def cancel_order(self, order_id: int) -> Order:
order = self._repository.find_by_id(order_id)
if not order:
raise ValueError(f"订单不存在: {order_id}")
order.cancel()
return self._repository.save(order)
class InMemoryOrderRepository(OrderRepository):
"""内存订单仓储(基础设施层)"""
def __init__(self):
self._orders: Dict[int, Order] = {}
self._customer_index: Dict[int, List[int]] = {}
self._next_id = 1
def find_by_id(self, order_id: int) -> Optional[Order]:
return self._orders.get(order_id)
def find_by_customer(self, customer_id: int) -> List[Order]:
order_ids = self._customer_index.get(customer_id, [])
return [self._orders[oid] for oid in order_ids if oid in self._orders]
def save(self, order: Order) -> Order:
if order.id == 0:
order = Order(
id=self._next_id,
customer_id=order.customer_id,
items=order.items,
status=order.status,
created_at=order.created_at
)
self._next_id += 1
self._orders[order.id] = order
if order.customer_id not in self._customer_index:
self._customer_index[order.customer_id] = []
if order.id not in self._customer_index[order.customer_id]:
self._customer_index[order.customer_id].append(order.id)
return order
def delete(self, order_id: int) -> bool:
order = self._orders.pop(order_id, None)
if order:
if order.customer_id in self._customer_index:
self._customer_index[order.customer_id] = [
oid for oid in self._customer_index[order.customer_id]
if oid != order_id
]
return True
return False
class OrderController:
"""订单控制器(接口适配层)"""
def __init__(self, use_cases: OrderUseCases):
self._use_cases = use_cases
def create_order(self, request: dict) -> dict:
try:
order = self._use_cases.create_order(
customer_id=request['customer_id'],
items=request['items']
)
return {'success': True, 'data': self._order_to_dict(order)}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_order(self, order_id: int) -> dict:
order = self._use_cases.get_order(order_id)
if not order:
return {'success': False, 'error': '订单不存在'}
return {'success': True, 'data': self._order_to_dict(order)}
def confirm_order(self, order_id: int) -> dict:
try:
order = self._use_cases.confirm_order(order_id)
return {'success': True, 'data': self._order_to_dict(order)}
except Exception as e:
return {'success': False, 'error': str(e)}
def _order_to_dict(self, order: Order) -> dict:
return {
'id': order.id,
'customer_id': order.customer_id,
'items': [
{
'product_id': item.product_id,
'product_name': item.product_name,
'quantity': item.quantity,
'unit_price': item.unit_price,
'total': item.total
}
for item in order.items
],
'total': order.total,
'status': order.status.name,
'created_at': order.created_at.isoformat()
}
repository = InMemoryOrderRepository()
use_cases = OrderUseCases(repository)
controller = OrderController(use_cases)
result = controller.create_order({
'customer_id': 1,
'items': [
{'product_id': 1, 'product_name': '商品A', 'quantity': 2, 'unit_price': 100},
{'product_id': 2, 'product_name': '商品B', 'quantity': 1, 'unit_price': 200},
]
})
print(f"创建订单: {result}")
result = controller.confirm_order(1)
print(f"确认订单: {result}")26.4 六边形架构
26.4.1 核心概念
python
from abc import ABC, abstractmethod
from typing import Protocol, List, Optional, Dict, Any, Type
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
@dataclass
class Product:
"""产品实体"""
id: int
name: str
description: str
price: float
stock: int
category_id: int
created_at: datetime = field(default_factory=datetime.now)
def update_stock(self, quantity: int) -> None:
if self.stock + quantity < 0:
raise ValueError("库存不足")
self.stock += quantity
def update_price(self, new_price: float) -> None:
if new_price < 0:
raise ValueError("价格不能为负")
self.price = new_price
class ProductRepository(Protocol):
"""产品仓储端口(入站端口)"""
def find_by_id(self, product_id: int) -> Optional[Product]:
...
def find_all(self) -> List[Product]:
...
def find_by_category(self, category_id: int) -> List[Product]:
...
def save(self, product: Product) -> Product:
...
def delete(self, product_id: int) -> bool:
...
class ProductUseCases:
"""产品用例(应用核心)"""
def __init__(self, repository: ProductRepository):
self._repository = repository
def get_product(self, product_id: int) -> Optional[Product]:
return self._repository.find_by_id(product_id)
def list_products(self) -> List[Product]:
return self._repository.find_all()
def list_by_category(self, category_id: int) -> List[Product]:
return self._repository.find_by_category(category_id)
def create_product(
self,
name: str,
description: str,
price: float,
stock: int,
category_id: int
) -> Product:
self._validate_price(price)
self._validate_stock(stock)
product = Product(
id=0,
name=name,
description=description,
price=price,
stock=stock,
category_id=category_id
)
return self._repository.save(product)
def update_product(
self,
product_id: int,
name: str = None,
description: str = None,
price: float = None,
stock: int = None
) -> Product:
product = self._repository.find_by_id(product_id)
if not product:
raise ValueError(f"产品不存在: {product_id}")
if price is not None:
self._validate_price(price)
product.price = price
if stock is not None:
product.stock = stock
if name is not None:
product.name = name
if description is not None:
product.description = description
return self._repository.save(product)
def update_stock(self, product_id: int, quantity: int) -> Product:
product = self._repository.find_by_id(product_id)
if not product:
raise ValueError(f"产品不存在: {product_id}")
product.update_stock(quantity)
return self._repository.save(product)
def delete_product(self, product_id: int) -> bool:
return self._repository.delete(product_id)
def _validate_price(self, price: float) -> None:
if price < 0:
raise ValueError("价格不能为负")
def _validate_stock(self, stock: int) -> None:
if stock < 0:
raise ValueError("库存不能为负")
class InMemoryProductRepository:
"""内存产品仓储适配器(出站适配器)"""
def __init__(self):
self._products: Dict[int, Product] = {}
self._category_index: Dict[int, List[int]] = {}
self._next_id = 1
def find_by_id(self, product_id: int) -> Optional[Product]:
return self._products.get(product_id)
def find_all(self) -> List[Product]:
return list(self._products.values())
def find_by_category(self, category_id: int) -> List[Product]:
product_ids = self._category_index.get(category_id, [])
return [self._products[pid] for pid in product_ids if pid in self._products]
def save(self, product: Product) -> Product:
if product.id == 0:
product = Product(
id=self._next_id,
name=product.name,
description=product.description,
price=product.price,
stock=product.stock,
category_id=product.category_id,
created_at=product.created_at
)
self._next_id += 1
self._products[product.id] = product
if product.category_id not in self._category_index:
self._category_index[product.category_id] = []
if product.id not in self._category_index[product.category_id]:
self._category_index[product.category_id].append(product.id)
return product
def delete(self, product_id: int) -> bool:
product = self._products.pop(product_id, None)
if product:
if product.category_id in self._category_index:
self._category_index[product.category_id] = [
pid for pid in self._category_index[product.category_id]
if pid != product_id
]
return True
return False
class RestAdapter:
"""REST API适配器(入站适配器)"""
def __init__(self, use_cases: ProductUseCases):
self._use_cases = use_cases
def get_product(self, product_id: int) -> dict:
product = self._use_cases.get_product(product_id)
if not product:
return {'success': False, 'error': '产品不存在', 'status': 404}
return {'success': True, 'data': self._to_dict(product)}
def list_products(self, category_id: int = None) -> dict:
if category_id:
products = self._use_cases.list_by_category(category_id)
else:
products = self._use_cases.list_products()
return {'success': True, 'data': [self._to_dict(p) for p in products]}
def create_product(self, request: dict) -> dict:
try:
product = self._use_cases.create_product(
name=request['name'],
description=request.get('description', ''),
price=request['price'],
stock=request.get('stock', 0),
category_id=request['category_id']
)
return {'success': True, 'data': self._to_dict(product), 'status': 201}
except ValueError as e:
return {'success': False, 'error': str(e), 'status': 400}
def update_product(self, product_id: int, request: dict) -> dict:
try:
product = self._use_cases.update_product(
product_id=product_id,
name=request.get('name'),
description=request.get('description'),
price=request.get('price'),
stock=request.get('stock')
)
return {'success': True, 'data': self._to_dict(product)}
except ValueError as e:
return {'success': False, 'error': str(e), 'status': 400}
def _to_dict(self, product: Product) -> dict:
return {
'id': product.id,
'name': product.name,
'description': product.description,
'price': product.price,
'stock': product.stock,
'category_id': product.category_id,
'created_at': product.created_at.isoformat()
}
class CliAdapter:
"""命令行适配器(入站适配器)"""
def __init__(self, use_cases: ProductUseCases):
self._use_cases = use_cases
def list_products(self) -> str:
products = self._use_cases.list_products()
if not products:
return "没有产品"
lines = ["ID\t名称\t价格\t库存"]
lines.extend(f"{p.id}\t{p.name}\t{p.price}\t{p.stock}" for p in products)
return "\n".join(lines)
def show_product(self, product_id: int) -> str:
product = self._use_cases.get_product(product_id)
if not product:
return f"产品不存在: {product_id}"
return f"""
产品详情
--------
ID: {product.id}
名称: {product.name}
描述: {product.description}
价格: ¥{product.price}
库存: {product.stock}
分类: {product.category_id}
创建时间: {product.created_at}
"""
repository = InMemoryProductRepository()
use_cases = ProductUseCases(repository)
rest_adapter = RestAdapter(use_cases)
cli_adapter = CliAdapter(use_cases)
result = rest_adapter.create_product({
'name': 'Python编程',
'description': 'Python编程入门书籍',
'price': 89.0,
'stock': 100,
'category_id': 1
})
print(f"REST创建产品: {result}")
print(f"\nCLI产品列表:\n{cli_adapter.list_products()}")26.5 CQRS模式
26.5.1 命令查询职责分离
python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable, Type
from datetime import datetime
from abc import ABC, abstractmethod
import json
@dataclass
class Command:
"""命令基类"""
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class Query:
"""查询基类"""
pass
@dataclass
class CreateOrderCommand(Command):
"""创建订单命令"""
customer_id: int
items: List[dict]
@dataclass
class UpdateOrderStatusCommand(Command):
"""更新订单状态命令"""
order_id: int
status: str
@dataclass
class CancelOrderCommand(Command):
"""取消订单命令"""
order_id: int
reason: str
@dataclass
class GetOrderQuery(Query):
"""获取订单查询"""
order_id: int
@dataclass
class ListOrdersQuery(Query):
"""列出订单查询"""
customer_id: Optional[int] = None
status: Optional[str] = None
page: int = 1
page_size: int = 10
@dataclass
class OrderSummary:
"""订单摘要(读模型)"""
id: int
customer_id: int
customer_name: str
total: float
status: str
item_count: int
created_at: datetime
@dataclass
class OrderDetail:
"""订单详情(读模型)"""
id: int
customer_id: int
customer_name: str
items: List[dict]
total: float
status: str
created_at: datetime
updated_at: datetime
class OrderWriteModel:
"""订单写模型"""
def __init__(self):
self._orders: Dict[int, dict] = {}
self._next_id = 1
def create(self, customer_id: int, items: List[dict]) -> int:
order_id = self._next_id
self._next_id += 1
total = sum(item['price'] * item['quantity'] for item in items)
self._orders[order_id] = {
'id': order_id,
'customer_id': customer_id,
'items': items,
'total': total,
'status': 'pending',
'created_at': datetime.now(),
'updated_at': datetime.now()
}
return order_id
def update_status(self, order_id: int, status: str) -> bool:
if order_id not in self._orders:
return False
self._orders[order_id]['status'] = status
self._orders[order_id]['updated_at'] = datetime.now()
return True
def cancel(self, order_id: int) -> bool:
if order_id not in self._orders:
return False
order = self._orders[order_id]
if order['status'] in ('shipped', 'delivered'):
return False
order['status'] = 'cancelled'
order['updated_at'] = datetime.now()
return True
def get(self, order_id: int) -> Optional[dict]:
return self._orders.get(order_id)
class OrderReadModel:
"""订单读模型"""
def __init__(self):
self._summaries: Dict[int, OrderSummary] = {}
self._details: Dict[int, OrderDetail] = {}
self._by_customer: Dict[int, List[int]] = {}
self._by_status: Dict[str, List[int]] = {}
def index(self, order_data: dict, customer_name: str = None) -> None:
order_id = order_data['id']
summary = OrderSummary(
id=order_id,
customer_id=order_data['customer_id'],
customer_name=customer_name or f"客户{order_data['customer_id']}",
total=order_data['total'],
status=order_data['status'],
item_count=len(order_data['items']),
created_at=order_data['created_at']
)
self._summaries[order_id] = summary
detail = OrderDetail(
id=order_id,
customer_id=order_data['customer_id'],
customer_name=customer_name or f"客户{order_data['customer_id']}",
items=order_data['items'],
total=order_data['total'],
status=order_data['status'],
created_at=order_data['created_at'],
updated_at=order_data['updated_at']
)
self._details[order_id] = detail
customer_id = order_data['customer_id']
if customer_id not in self._by_customer:
self._by_customer[customer_id] = []
if order_id not in self._by_customer[customer_id]:
self._by_customer[customer_id].append(order_id)
status = order_data['status']
if status not in self._by_status:
self._by_status[status] = []
if order_id not in self._by_status[status]:
self._by_status[status].append(order_id)
def get_summary(self, order_id: int) -> Optional[OrderSummary]:
return self._summaries.get(order_id)
def get_detail(self, order_id: int) -> Optional[OrderDetail]:
return self._details.get(order_id)
def list_by_customer(self, customer_id: int) -> List[OrderSummary]:
order_ids = self._by_customer.get(customer_id, [])
return [self._summaries[oid] for oid in order_ids if oid in self._summaries]
def list_by_status(self, status: str) -> List[OrderSummary]:
order_ids = self._by_status.get(status, [])
return [self._summaries[oid] for oid in order_ids if oid in self._summaries]
def list_all(self, page: int = 1, page_size: int = 10) -> List[OrderSummary]:
all_summaries = sorted(
self._summaries.values(),
key=lambda s: s.created_at,
reverse=True
)
start = (page - 1) * page_size
return all_summaries[start:start + page_size]
class CommandHandler:
"""命令处理器"""
def __init__(self, write_model: OrderWriteModel, read_model: OrderReadModel):
self._write_model = write_model
self._read_model = read_model
self._handlers: Dict[Type, Callable] = {
CreateOrderCommand: self._handle_create,
UpdateOrderStatusCommand: self._handle_update_status,
CancelOrderCommand: self._handle_cancel,
}
def handle(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"未知命令类型: {type(command).__name__}")
return handler(command)
def _handle_create(self, command: CreateOrderCommand) -> int:
order_id = self._write_model.create(
customer_id=command.customer_id,
items=command.items
)
order_data = self._write_model.get(order_id)
self._read_model.index(order_data)
return order_id
def _handle_update_status(self, command: UpdateOrderStatusCommand) -> bool:
success = self._write_model.update_status(
order_id=command.order_id,
status=command.status
)
if success:
order_data = self._write_model.get(command.order_id)
self._read_model.index(order_data)
return success
def _handle_cancel(self, command: CancelOrderCommand) -> bool:
success = self._write_model.cancel(command.order_id)
if success:
order_data = self._write_model.get(command.order_id)
self._read_model.index(order_data)
return success
class QueryHandler:
"""查询处理器"""
def __init__(self, read_model: OrderReadModel):
self._read_model = read_model
self._handlers: Dict[Type, Callable] = {
GetOrderQuery: self._handle_get,
ListOrdersQuery: self._handle_list,
}
def handle(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"未知查询类型: {type(query).__name__}")
return handler(query)
def _handle_get(self, query: GetOrderQuery) -> Optional[OrderDetail]:
return self._read_model.get_detail(query.order_id)
def _handle_list(self, query: ListOrdersQuery) -> List[OrderSummary]:
if query.customer_id:
return self._read_model.list_by_customer(query.customer_id)
if query.status:
return self._read_model.list_by_status(query.status)
return self._read_model.list_all(query.page, query.page_size)
write_model = OrderWriteModel()
read_model = OrderReadModel()
command_handler = CommandHandler(write_model, read_model)
query_handler = QueryHandler(read_model)
order_id = command_handler.handle(CreateOrderCommand(
customer_id=1,
items=[
{'product_id': 1, 'name': '商品A', 'price': 100, 'quantity': 2},
{'product_id': 2, 'name': '商品B', 'price': 50, 'quantity': 1},
]
))
print(f"创建订单ID: {order_id}")
command_handler.handle(CreateOrderCommand(
customer_id=1,
items=[{'product_id': 3, 'name': '商品C', 'price': 200, 'quantity': 1}]
))
command_handler.handle(UpdateOrderStatusCommand(order_id=order_id, status='confirmed'))
detail = query_handler.handle(GetOrderQuery(order_id=order_id))
print(f"订单详情: ID={detail.id}, 状态={detail.status}, 总额={detail.total}")
summaries = query_handler.handle(ListOrdersQuery(customer_id=1))
print(f"客户订单数: {len(summaries)}")26.6 插件架构
26.6.1 微内核架构
python
from abc import ABC, abstractmethod
from typing import Dict, List, Callable, Any, Optional, Type
from dataclasses import dataclass, field
from enum import Enum, auto
import importlib
import inspect
import os
import json
class PluginState(Enum):
"""插件状态"""
UNLOADED = auto()
LOADED = auto()
ACTIVE = auto()
ERROR = auto()
@dataclass
class PluginInfo:
"""插件信息"""
name: str
version: str
description: str
author: str
dependencies: List[str] = field(default_factory=list)
class PluginInterface(ABC):
"""插件接口"""
@staticmethod
@abstractmethod
def get_info() -> PluginInfo:
"""获取插件信息"""
pass
@abstractmethod
def initialize(self, context: 'PluginContext') -> None:
"""初始化插件"""
pass
@abstractmethod
def start(self) -> None:
"""启动插件"""
pass
@abstractmethod
def stop(self) -> None:
"""停止插件"""
pass
@abstractmethod
def shutdown(self) -> None:
"""关闭插件"""
pass
class PluginContext:
"""插件上下文"""
def __init__(self):
self._services: Dict[str, Any] = {}
self._hooks: Dict[str, List[Callable]] = {}
self._commands: Dict[str, Callable] = {}
self._config: Dict[str, Any] = {}
def register_service(self, name: str, service: Any) -> None:
self._services[name] = service
def get_service(self, name: str) -> Optional[Any]:
return self._services.get(name)
def register_hook(self, hook_name: str, callback: Callable) -> None:
if hook_name not in self._hooks:
self._hooks[hook_name] = []
self._hooks[hook_name].append(callback)
def execute_hook(self, hook_name: str, *args, **kwargs) -> List[Any]:
results = []
for callback in self._hooks.get(hook_name, []):
try:
result = callback(*args, **kwargs)
results.append(result)
except Exception as e:
print(f"Hook执行错误: {e}")
return results
def register_command(self, command_name: str, handler: Callable) -> None:
self._commands[command_name] = handler
def execute_command(self, command_name: str, *args, **kwargs) -> Any:
handler = self._commands.get(command_name)
if handler:
return handler(*args, **kwargs)
raise ValueError(f"未知命令: {command_name}")
def set_config(self, key: str, value: Any) -> None:
self._config[key] = value
def get_config(self, key: str, default: Any = None) -> Any:
return self._config.get(key, default)
@dataclass
class PluginWrapper:
"""插件包装器"""
plugin: PluginInterface
info: PluginInfo
state: PluginState = PluginState.UNLOADED
error_message: str = ""
class PluginManager:
"""插件管理器"""
def __init__(self):
self._plugins: Dict[str, PluginWrapper] = {}
self._context = PluginContext()
self._load_order: List[str] = []
def register_core_service(self, name: str, service: Any) -> None:
"""注册核心服务"""
self._context.register_service(name, service)
def register_plugin(self, plugin: PluginInterface) -> bool:
"""注册插件"""
info = plugin.get_info()
if info.name in self._plugins:
print(f"插件已存在: {info.name}")
return False
for dep in info.dependencies:
if dep not in self._plugins:
print(f"缺少依赖: {dep} (required by {info.name})")
return False
wrapper = PluginWrapper(plugin=plugin, info=info)
self._plugins[info.name] = wrapper
try:
plugin.initialize(self._context)
wrapper.state = PluginState.LOADED
self._load_order.append(info.name)
print(f"插件已注册: {info.name} v{info.version}")
return True
except Exception as e:
wrapper.state = PluginState.ERROR
wrapper.error_message = str(e)
print(f"插件初始化失败: {info.name} - {e}")
return False
def unregister_plugin(self, name: str) -> bool:
"""注销插件"""
if name not in self._plugins:
return False
wrapper = self._plugins[name]
if wrapper.state == PluginState.ACTIVE:
self.stop_plugin(name)
try:
wrapper.plugin.shutdown()
del self._plugins[name]
self._load_order.remove(name)
print(f"插件已注销: {name}")
return True
except Exception as e:
print(f"插件注销失败: {name} - {e}")
return False
def start_plugin(self, name: str) -> bool:
"""启动插件"""
if name not in self._plugins:
return False
wrapper = self._plugins[name]
if wrapper.state == PluginState.ACTIVE:
return True
try:
wrapper.plugin.start()
wrapper.state = PluginState.ACTIVE
print(f"插件已启动: {name}")
return True
except Exception as e:
wrapper.state = PluginState.ERROR
wrapper.error_message = str(e)
print(f"插件启动失败: {name} - {e}")
return False
def stop_plugin(self, name: str) -> bool:
"""停止插件"""
if name not in self._plugins:
return False
wrapper = self._plugins[name]
if wrapper.state != PluginState.ACTIVE:
return True
try:
wrapper.plugin.stop()
wrapper.state = PluginState.LOADED
print(f"插件已停止: {name}")
return True
except Exception as e:
wrapper.state = PluginState.ERROR
wrapper.error_message = str(e)
print(f"插件停止失败: {name} - {e}")
return False
def start_all(self) -> None:
"""启动所有插件"""
for name in self._load_order:
self.start_plugin(name)
def stop_all(self) -> None:
"""停止所有插件"""
for name in reversed(self._load_order):
self.stop_plugin(name)
def get_plugin(self, name: str) -> Optional[PluginInterface]:
"""获取插件"""
wrapper = self._plugins.get(name)
return wrapper.plugin if wrapper else None
def list_plugins(self) -> List[Dict]:
"""列出所有插件"""
return [
{
'name': w.info.name,
'version': w.info.version,
'state': w.state.name,
'description': w.info.description
}
for w in self._plugins.values()
]
def execute_hook(self, hook_name: str, *args, **kwargs) -> List[Any]:
"""执行钩子"""
return self._context.execute_hook(hook_name, *args, **kwargs)
def execute_command(self, command_name: str, *args, **kwargs) -> Any:
"""执行命令"""
return self._context.execute_command(command_name, *args, **kwargs)
class LoggerPlugin(PluginInterface):
"""日志插件"""
@staticmethod
def get_info() -> PluginInfo:
return PluginInfo(
name="logger",
version="1.0.0",
description="日志记录插件",
author="System"
)
def initialize(self, context: PluginContext) -> None:
self._context = context
self._logs: List[str] = []
context.register_hook("on_log", self._log)
context.register_command("get_logs", self._get_logs)
def start(self) -> None:
print("日志插件已启动")
def stop(self) -> None:
print("日志插件已停止")
def shutdown(self) -> None:
self._logs.clear()
def _log(self, message: str, level: str = "INFO") -> None:
from datetime import datetime
entry = f"[{datetime.now().isoformat()}] [{level}] {message}"
self._logs.append(entry)
print(entry)
def _get_logs(self) -> List[str]:
return self._logs.copy()
class MetricsPlugin(PluginInterface):
"""指标插件"""
@staticmethod
def get_info() -> PluginInfo:
return PluginInfo(
name="metrics",
version="1.0.0",
description="性能指标插件",
author="System",
dependencies=["logger"]
)
def initialize(self, context: PluginContext) -> None:
self._context = context
self._counters: Dict[str, int] = {}
self._timers: Dict[str, List[float]] = {}
context.register_hook("on_metric", self._record_metric)
context.register_command("get_metrics", self._get_metrics)
def start(self) -> None:
print("指标插件已启动")
def stop(self) -> None:
print("指标插件已停止")
def shutdown(self) -> None:
self._counters.clear()
self._timers.clear()
def _record_metric(self, name: str, value: float, metric_type: str = "counter") -> None:
if metric_type == "counter":
self._counters[name] = self._counters.get(name, 0) + int(value)
elif metric_type == "timer":
if name not in self._timers:
self._timers[name] = []
self._timers[name].append(value)
def _get_metrics(self) -> Dict:
return {
'counters': self._counters.copy(),
'timers': {
name: {
'count': len(values),
'avg': sum(values) / len(values) if values else 0,
'min': min(values) if values else 0,
'max': max(values) if values else 0
}
for name, values in self._timers.items()
}
}
manager = PluginManager()
manager.register_plugin(LoggerPlugin())
manager.register_plugin(MetricsPlugin())
manager.start_all()
manager.execute_hook("on_log", "系统启动", "INFO")
manager.execute_hook("on_metric", "requests", 1, "counter")
manager.execute_hook("on_metric", "response_time", 0.15, "timer")
print(f"\n插件列表: {manager.list_plugins()}")
print(f"日志: {manager.execute_command('get_logs')}")
print(f"指标: {manager.execute_command('get_metrics')}")
manager.stop_all()26.7 事件驱动架构
26.7.1 事件总线与事件溯源
python
from dataclasses import dataclass, field
from typing import Dict, List, Any, Callable, Optional, Type
from datetime import datetime
from abc import ABC, abstractmethod
import json
import asyncio
@dataclass
class Event:
"""事件基类"""
event_id: str
event_type: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DomainEvent(Event):
"""领域事件"""
aggregate_id: str
aggregate_type: str
version: int = 1
@dataclass
class OrderCreatedEvent(DomainEvent):
"""订单创建事件"""
customer_id: int
items: List[dict]
total: float
@dataclass
class OrderStatusChangedEvent(DomainEvent):
"""订单状态变更事件"""
old_status: str
new_status: str
@dataclass
class PaymentProcessedEvent(DomainEvent):
"""支付处理事件"""
amount: float
payment_method: str
class EventHandler(ABC):
"""事件处理器接口"""
@abstractmethod
def handle(self, event: Event) -> None:
pass
class EventBus:
"""事件总线"""
def __init__(self):
self._handlers: Dict[str, List[Callable]] = {}
self._async_handlers: Dict[str, List[Callable]] = {}
self._middleware: List[Callable] = []
self._dead_letter_queue: List[Event] = []
def subscribe(self, event_type: str, handler: Callable) -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def subscribe_async(self, event_type: str, handler: Callable) -> None:
if event_type not in self._async_handlers:
self._async_handlers[event_type] = []
self._async_handlers[event_type].append(handler)
def add_middleware(self, middleware: Callable) -> None:
self._middleware.append(middleware)
def publish(self, event: Event) -> None:
for middleware in self._middleware:
try:
middleware(event)
except Exception as e:
print(f"中间件错误: {e}")
return
handlers = self._handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"处理器错误: {e}")
self._dead_letter_queue.append(event)
async def publish_async(self, event: Event) -> None:
for middleware in self._middleware:
try:
if asyncio.iscoroutinefunction(middleware):
await middleware(event)
else:
middleware(event)
except Exception as e:
print(f"中间件错误: {e}")
return
handlers = self._handlers.get(event.event_type, [])
async_handlers = self._async_handlers.get(event.event_type, [])
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"处理器错误: {e}")
self._dead_letter_queue.append(event)
for handler in async_handlers:
try:
await handler(event)
except Exception as e:
print(f"异步处理器错误: {e}")
self._dead_letter_queue.append(event)
def get_dead_letter_queue(self) -> List[Event]:
return self._dead_letter_queue.copy()
class EventStore:
"""事件存储"""
def __init__(self):
self._events: List[Event] = []
self._by_aggregate: Dict[str, List[Event]] = {}
self._by_type: Dict[str, List[Event]] = {}
def append(self, event: Event) -> None:
self._events.append(event)
if isinstance(event, DomainEvent):
if event.aggregate_id not in self._by_aggregate:
self._by_aggregate[event.aggregate_id] = []
self._by_aggregate[event.aggregate_id].append(event)
if event.event_type not in self._by_type:
self._by_type[event.event_type] = []
self._by_type[event.event_type].append(event)
def get_events(self, aggregate_id: str) -> List[Event]:
return self._by_aggregate.get(aggregate_id, []).copy()
def get_events_by_type(self, event_type: str) -> List[Event]:
return self._by_type.get(event_type, []).copy()
def get_all_events(self) -> List[Event]:
return self._events.copy()
class OrderAggregate:
"""订单聚合根(事件溯源)"""
def __init__(self, order_id: str):
self.order_id = order_id
self.customer_id: Optional[int] = None
self.items: List[dict] = []
self.total: float = 0
self.status: str = "pending"
self.version: int = 0
self._uncommitted_events: List[Event] = []
def create(self, customer_id: int, items: List[dict]) -> None:
if self.version > 0:
raise ValueError("订单已存在")
total = sum(item['price'] * item['quantity'] for item in items)
event = OrderCreatedEvent(
event_id=f"evt-{self.order_id}-{self.version + 1}",
event_type="order_created",
aggregate_id=self.order_id,
aggregate_type="order",
version=self.version + 1,
customer_id=customer_id,
items=items,
total=total
)
self._apply_event(event)
def change_status(self, new_status: str) -> None:
if self.status == new_status:
return
valid_transitions = {
"pending": ["confirmed", "cancelled"],
"confirmed": ["shipped", "cancelled"],
"shipped": ["delivered"],
"delivered": [],
"cancelled": []
}
if new_status not in valid_transitions.get(self.status, []):
raise ValueError(f"无效的状态转换: {self.status} -> {new_status}")
event = OrderStatusChangedEvent(
event_id=f"evt-{self.order_id}-{self.version + 1}",
event_type="order_status_changed",
aggregate_id=self.order_id,
aggregate_type="order",
version=self.version + 1,
old_status=self.status,
new_status=new_status
)
self._apply_event(event)
def _apply_event(self, event: Event) -> None:
if isinstance(event, OrderCreatedEvent):
self.customer_id = event.customer_id
self.items = event.items
self.total = event.total
self.status = "pending"
elif isinstance(event, OrderStatusChangedEvent):
self.status = event.new_status
self.version = event.version
self._uncommitted_events.append(event)
def load_from_history(self, events: List[Event]) -> None:
for event in events:
self._apply_event(event)
self._uncommitted_events.clear()
def get_uncommitted_events(self) -> List[Event]:
return self._uncommitted_events.copy()
def mark_events_as_committed(self) -> None:
self._uncommitted_events.clear()
class OrderService:
"""订单服务"""
def __init__(self, event_bus: EventBus, event_store: EventStore):
self._event_bus = event_bus
self._event_store = event_store
def create_order(self, order_id: str, customer_id: int, items: List[dict]) -> OrderAggregate:
order = OrderAggregate(order_id)
order.create(customer_id, items)
for event in order.get_uncommitted_events():
self._event_store.append(event)
self._event_bus.publish(event)
order.mark_events_as_committed()
return order
def update_status(self, order_id: str, new_status: str) -> OrderAggregate:
order = self._get_order(order_id)
order.change_status(new_status)
for event in order.get_uncommitted_events():
self._event_store.append(event)
self._event_bus.publish(event)
order.mark_events_as_committed()
return order
def _get_order(self, order_id: str) -> OrderAggregate:
order = OrderAggregate(order_id)
events = self._event_store.get_events(order_id)
if events:
order.load_from_history(events)
return order
class InventoryProjection:
"""库存投影(读模型)"""
def __init__(self, event_bus: EventBus):
self._inventory: Dict[str, int] = {}
self._reservations: Dict[str, List[str]] = {}
event_bus.subscribe("order_created", self._on_order_created)
event_bus.subscribe("order_status_changed", self._on_status_changed)
def _on_order_created(self, event: OrderCreatedEvent) -> None:
for item in event.items:
product_id = str(item['product_id'])
quantity = item['quantity']
if product_id not in self._inventory:
self._inventory[product_id] = 100
self._inventory[product_id] -= quantity
if event.aggregate_id not in self._reservations:
self._reservations[event.aggregate_id] = []
self._reservations[event.aggregate_id].append(product_id)
def _on_status_changed(self, event: OrderStatusChangedEvent) -> None:
if event.new_status == "cancelled":
order_id = event.aggregate_id
if order_id in self._reservations:
del self._reservations[order_id]
def get_stock(self, product_id: str) -> int:
return self._inventory.get(product_id, 0)
class NotificationProjection:
"""通知投影"""
def __init__(self, event_bus: EventBus):
self._notifications: List[dict] = []
event_bus.subscribe("order_created", self._on_order_created)
event_bus.subscribe("order_status_changed", self._on_status_changed)
def _on_order_created(self, event: OrderCreatedEvent) -> None:
self._notifications.append({
'type': 'order_confirmation',
'order_id': event.aggregate_id,
'customer_id': event.customer_id,
'message': f"订单 {event.aggregate_id} 创建成功"
})
def _on_status_changed(self, event: OrderStatusChangedEvent) -> None:
self._notifications.append({
'type': 'status_update',
'order_id': event.aggregate_id,
'message': f"订单状态从 {event.old_status} 变更为 {event.new_status}"
})
def get_notifications(self) -> List[dict]:
return self._notifications.copy()
event_bus = EventBus()
event_store = EventStore()
inventory_projection = InventoryProjection(event_bus)
notification_projection = NotificationProjection(event_bus)
order_service = OrderService(event_bus, event_store)
order = order_service.create_order(
order_id="ORD-001",
customer_id=1,
items=[
{'product_id': 1, 'name': '商品A', 'price': 100, 'quantity': 2},
{'product_id': 2, 'name': '商品B', 'price': 50, 'quantity': 1},
]
)
print(f"订单创建: {order.order_id}, 总额: {order.total}")
order = order_service.update_status("ORD-001", "confirmed")
print(f"订单状态: {order.status}")
print(f"\n库存: {inventory_projection.get_stock('1')}")
print(f"通知: {notification_projection.get_notifications()}")26.8 架构决策指南
26.8.1 架构选择决策树
┌─────────────────────────────────────────────────────────────────┐
│ 架构模式选择指南 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 系统复杂度如何? │
│ │ │
│ ├── 简单 ──→ 单体架构 │
│ │ │
│ └── 复杂 ──┐ │
│ │ │
│ ▼ │
│ 团队规模如何? │
│ │ │
│ ├── 小型团队 ──→ 模块化单体 │
│ │ │
│ └── 大型团队 ──┐ │
│ │ │
│ ▼ │
│ 业务领域是否独立? │
│ │ │
│ ├── 是 ──→ 微服务架构 │
│ │ │
│ └── 否 ──┐ │
│ │ │
│ ▼ │
│ 读写负载差异大吗? │
│ │ │
│ ├── 是 ──→ CQRS │
│ │ │
│ └── 否 ──→ 分层架构 │
│ │
└─────────────────────────────────────────────────────────────────┘26.8.2 架构模式对比
| 特性 | 分层架构 | 六边形架构 | 微服务 | 事件驱动 |
|---|---|---|---|---|
| 复杂度 | 低 | 中 | 高 | 高 |
| 可测试性 | 中 | 高 | 高 | 高 |
| 可扩展性 | 低 | 中 | 高 | 高 |
| 部署复杂度 | 低 | 低 | 高 | 中 |
| 学习曲线 | 低 | 中 | 高 | 高 |
| 团队协作 | 中 | 中 | 高 | 高 |
| 适用规模 | 中小型 | 中型 | 大型 | 大型 |
26.9 小结
架构设计模式是软件系统的高层结构设计,它决定了系统的组织方式、组件间的交互关系以及演进方向。
关键要点
- 分层架构:关注点分离,适合传统企业应用
- 六边形架构:依赖反转,便于测试和适配
- CQRS:读写分离,适合读写负载差异大的系统
- 插件架构:动态扩展,适合需要灵活定制的系统
- 事件驱动:异步解耦,适合复杂业务流程
实践建议
- 根据业务复杂度和团队规模选择架构
- 从简单架构开始,按需演进
- 关注架构边界和依赖方向
- 保持架构的可测试性
- 定期评估架构的适应性
下一章预告
下一章将介绍领域驱动设计,探讨如何使用战略设计和战术设计构建复杂业务系统。