Skip to content

第26章 架构设计模式

学习目标

完成本章学习后,读者将能够:

  • 理解软件架构的核心概念与形式化定义
  • 掌握分层架构、六边形架构、CQRS等经典架构模式
  • 实现插件架构、事件驱动架构等可扩展架构
  • 识别不同架构模式的适用场景与权衡
  • 设计符合业务需求的软件架构

26.1 软件架构概述

26.1.1 核心定义

软件架构(Software Architecture) 是软件系统的高层结构,它定义了系统的组成部分、它们之间的关系以及指导设计与演进的原则。

26.1.2 形式化定义

从形式化角度,软件架构可以定义为一个四元组:

$$\mathcal{A} = \langle C, R, V, P \rangle$$

其中:

  • $C = {c_1, c_2, \ldots, c_n}$ 是组件集合(Components)
  • $R \subseteq C \times C$ 是组件间关系集合(Relations)
  • $V$ 是视图集合(Views)
  • $P$ 是属性集合(Properties)

组件定义

组件 $c$ 是一个三元组:

$$c = \langle I, O, B \rangle$$

其中 $I$ 是接口集合,$O$ 是操作集合,$B$ 是行为规约。

连接件定义

连接件 $l$ 定义组件间的交互方式:

$$l: I_1 \times I_2 \rightarrow Interaction$$

架构风格

架构风格 $S$ 定义了一组约束:

$$S = \langle C_S, R_S, Constraints \rangle$$

其中 $Constraints$ 是必须满足的不变量集合。

质量属性

架构质量属性 $Q$ 可以表示为:

$$Q = {q_1, q_2, \ldots, q_m}$$

常见的质量属性包括:

  • 可维护性:$Maintainability = f(Coupling, Cohesion)$
  • 可扩展性:$Scalability = f(Throughput, Latency)$
  • 可靠性:$Reliability = f(MTBF, MTTR)$

26.1.3 架构视图

┌─────────────────────────────────────────────────────────────────┐
│                    4+1架构视图模型                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│                         ┌─────────┐                             │
│                         │ 场景    │                             │
│                         │Scenarios│                             │
│                         └────┬────┘                             │
│                              │                                  │
│              ┌───────────────┼───────────────┐                  │
│              │               │               │                  │
│              ▼               ▼               ▼                  │
│       ┌──────────┐    ┌──────────┐    ┌──────────┐             │
│       │ 逻辑视图  │    │ 进程视图  │    │ 开发视图  │             │
│       │ Logical  │    │ Process  │    │Development│             │
│       │  View    │    │  View    │    │   View   │             │
│       └──────────┘    └──────────┘    └──────────┘             │
│              │               │               │                  │
│              └───────────────┼───────────────┘                  │
│                              │                                  │
│                              ▼                                  │
│                       ┌──────────┐                              │
│                       │ 物理视图  │                              │
│                       │ Physical │                              │
│                       │  View    │                              │
│                       └──────────┘                              │
│                                                                 │
│  视图说明:                                                     │
│  - 逻辑视图:功能需求,面向最终用户                               │
│  - 进程视图:并发和分布,面向系统集成者                           │
│  - 开发视图:程序组织,面向程序员                                 │
│  - 物理视图:系统拓扑,面向系统工程师                             │
│  - 场景:用例,面向所有利益相关者                                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

26.1.4 架构模式分类

类别模式核心思想
分层架构N层架构、整洁架构关注点分离
端口适配器六边形架构、洋葱架构依赖反转
分布式架构微服务、SOA服务解耦
事件驱动事件溯源、CQRS异步解耦
扩展架构插件架构、微内核动态扩展

26.2 历史背景与演进

26.2.1 历史发展

年代里程碑描述
1968Dijkstra分层分层结构概念提出
1970sMVCSmalltalk的MVC模式
1990s三层架构客户端-服务器架构演进
19964+1视图Philippe Kruchten提出架构视图模型
2000sSOA面向服务架构兴起
2005六边形架构Alistair Cockburn提出
2012整洁架构Robert C. Martin提出
2014微服务Martin Fowler推广微服务概念
2010s云原生Kubernetes、Service Mesh
2020s模块化单体平衡复杂度的回归

26.2.2 理论基础

软件架构的理论基础来源于:

  1. 软件工程:模块化、信息隐藏、关注点分离
  2. 系统理论:系统分解、层次结构
  3. 分布式计算:CAP定理、一致性模型
  4. 领域驱动设计:限界上下文、聚合

26.3 分层架构模式

26.3.1 经典三层架构

python
from dataclasses import dataclass, field
from typing import List, Optional, Protocol, Dict, Any
from abc import ABC, abstractmethod
from datetime import datetime
import json


@dataclass
class User:
    """用户实体"""
    id: int
    name: str
    email: str
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)


class UserRepository(Protocol):
    """用户仓储协议:数据访问层接口"""
    
    def find_by_id(self, user_id: int) -> Optional[User]:
        ...
    
    def find_all(self) -> List[User]:
        ...
    
    def find_by_email(self, email: str) -> Optional[User]:
        ...
    
    def save(self, user: User) -> User:
        ...
    
    def delete(self, user_id: int) -> bool:
        ...


class InMemoryUserRepository:
    """内存用户仓储:数据访问层实现"""
    
    def __init__(self):
        self._users: Dict[int, User] = {}
        self._email_index: Dict[str, int] = {}
        self._next_id = 1
    
    def find_by_id(self, user_id: int) -> Optional[User]:
        return self._users.get(user_id)
    
    def find_all(self) -> List[User]:
        return list(self._users.values())
    
    def find_by_email(self, email: str) -> Optional[User]:
        user_id = self._email_index.get(email.lower())
        return self._users.get(user_id) if user_id else None
    
    def save(self, user: User) -> User:
        if user.id == 0:
            user = User(
                id=self._next_id,
                name=user.name,
                email=user.email,
                created_at=datetime.now(),
                updated_at=datetime.now()
            )
            self._next_id += 1
        else:
            user = User(
                id=user.id,
                name=user.name,
                email=user.email,
                created_at=user.created_at,
                updated_at=datetime.now()
            )
        
        self._users[user.id] = user
        self._email_index[user.email.lower()] = user.id
        return user
    
    def delete(self, user_id: int) -> bool:
        user = self._users.pop(user_id, None)
        if user:
            self._email_index.pop(user.email.lower(), None)
            return True
        return False


class UserService:
    """用户服务:业务逻辑层"""
    
    def __init__(self, repository: UserRepository):
        self._repository = repository
    
    def get_user(self, user_id: int) -> Optional[User]:
        return self._repository.find_by_id(user_id)
    
    def get_all_users(self) -> List[User]:
        return self._repository.find_all()
    
    def create_user(self, name: str, email: str) -> User:
        self._validate_name(name)
        self._validate_email(email)
        
        existing = self._repository.find_by_email(email)
        if existing:
            raise ValueError(f"邮箱已被使用: {email}")
        
        user = User(id=0, name=name, email=email)
        return self._repository.save(user)
    
    def update_user(self, user_id: int, name: str = None, email: str = None) -> User:
        user = self._repository.find_by_id(user_id)
        if not user:
            raise ValueError(f"用户不存在: {user_id}")
        
        new_name = name if name is not None else user.name
        new_email = email if email is not None else user.email
        
        if email is not None:
            self._validate_email(email)
            existing = self._repository.find_by_email(email)
            if existing and existing.id != user_id:
                raise ValueError(f"邮箱已被使用: {email}")
        
        if name is not None:
            self._validate_name(name)
        
        updated_user = User(
            id=user_id,
            name=new_name,
            email=new_email,
            created_at=user.created_at
        )
        return self._repository.save(updated_user)
    
    def delete_user(self, user_id: int) -> bool:
        return self._repository.delete(user_id)
    
    def _validate_email(self, email: str) -> None:
        if not email or '@' not in email:
            raise ValueError("无效的邮箱地址")
        parts = email.split('@')
        if len(parts) != 2 or '.' not in parts[1]:
            raise ValueError("无效的邮箱地址")
    
    def _validate_name(self, name: str) -> None:
        if not name or len(name.strip()) < 2:
            raise ValueError("姓名长度不足")


@dataclass
class ApiResponse:
    """API响应"""
    success: bool
    data: Any = None
    error: str = None
    status_code: int = 200


class UserController:
    """用户控制器:表示层"""
    
    def __init__(self, service: UserService):
        self._service = service
    
    def get_user(self, user_id: int) -> ApiResponse:
        user = self._service.get_user(user_id)
        if not user:
            return ApiResponse(False, error="用户不存在", status_code=404)
        return ApiResponse(True, data=self._user_to_dict(user))
    
    def list_users(self) -> ApiResponse:
        users = self._service.get_all_users()
        return ApiResponse(True, data=[self._user_to_dict(u) for u in users])
    
    def create_user(self, request: dict) -> ApiResponse:
        try:
            user = self._service.create_user(
                name=request.get('name'),
                email=request.get('email')
            )
            return ApiResponse(True, data=self._user_to_dict(user), status_code=201)
        except ValueError as e:
            return ApiResponse(False, error=str(e), status_code=400)
    
    def update_user(self, user_id: int, request: dict) -> ApiResponse:
        try:
            user = self._service.update_user(
                user_id=user_id,
                name=request.get('name'),
                email=request.get('email')
            )
            return ApiResponse(True, data=self._user_to_dict(user))
        except ValueError as e:
            return ApiResponse(False, error=str(e), status_code=400)
    
    def delete_user(self, user_id: int) -> ApiResponse:
        if self._service.delete_user(user_id):
            return ApiResponse(True, status_code=204)
        return ApiResponse(False, error="用户不存在", status_code=404)
    
    def _user_to_dict(self, user: User) -> dict:
        return {
            'id': user.id,
            'name': user.name,
            'email': user.email,
            'created_at': user.created_at.isoformat(),
            'updated_at': user.updated_at.isoformat()
        }


repository = InMemoryUserRepository()
service = UserService(repository)
controller = UserController(service)

response = controller.create_user({'name': '张三', 'email': 'zhangsan@example.com'})
print(f"创建用户: {response}")

response = controller.create_user({'name': '李四', 'email': 'lisi@example.com'})
print(f"创建用户: {response}")

response = controller.list_users()
print(f"用户列表: {response}")

26.3.2 依赖注入容器

python
from typing import Dict, Type, Callable, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum, auto
import inspect


class ServiceLifetime(Enum):
    """服务生命周期"""
    TRANSIENT = auto()
    SINGLETON = auto()
    SCOPED = auto()


@dataclass
class ServiceDescriptor:
    """服务描述符"""
    service_type: Type
    implementation: Any
    lifetime: ServiceLifetime
    factory: Optional[Callable] = None


class Container:
    """
    依赖注入容器
    
    支持:
    - 瞬态:每次请求创建新实例
    - 单例:整个应用生命周期共享一个实例
    - 作用域:同一作用域内共享实例
    """
    
    def __init__(self):
        self._descriptors: Dict[Type, ServiceDescriptor] = {}
        self._singletons: Dict[Type, Any] = {}
        self._scopes: Dict[str, Dict[Type, Any]] = {}
        self._current_scope: Optional[str] = None
    
    def register_transient(
        self, 
        service_type: Type, 
        implementation: Type = None
    ) -> 'Container':
        """注册瞬态服务"""
        impl = implementation or service_type
        self._descriptors[service_type] = ServiceDescriptor(
            service_type=service_type,
            implementation=impl,
            lifetime=ServiceLifetime.TRANSIENT
        )
        return self
    
    def register_singleton(
        self, 
        service_type: Type, 
        implementation: Type = None
    ) -> 'Container':
        """注册单例服务"""
        impl = implementation or service_type
        self._descriptors[service_type] = ServiceDescriptor(
            service_type=service_type,
            implementation=impl,
            lifetime=ServiceLifetime.SINGLETON
        )
        return self
    
    def register_factory(
        self, 
        service_type: Type, 
        factory: Callable[['Container'], Any],
        lifetime: ServiceLifetime = ServiceLifetime.TRANSIENT
    ) -> 'Container':
        """注册工厂方法"""
        self._descriptors[service_type] = ServiceDescriptor(
            service_type=service_type,
            implementation=None,
            lifetime=lifetime,
            factory=factory
        )
        return self
    
    def register_instance(self, service_type: Type, instance: Any) -> 'Container':
        """注册现有实例"""
        self._singletons[service_type] = instance
        return self
    
    def get(self, service_type: Type) -> Any:
        """获取服务实例"""
        if service_type in self._singletons:
            return self._singletons[service_type]
        
        descriptor = self._descriptors.get(service_type)
        if not descriptor:
            raise ValueError(f"未注册的服务: {service_type.__name__}")
        
        if descriptor.lifetime == ServiceLifetime.SINGLETON:
            if service_type not in self._singletons:
                self._singletons[service_type] = self._create_instance(descriptor)
            return self._singletons[service_type]
        
        if descriptor.lifetime == ServiceLifetime.SCOPED and self._current_scope:
            scope = self._scopes[self._current_scope]
            if service_type not in scope:
                scope[service_type] = self._create_instance(descriptor)
            return scope[service_type]
        
        return self._create_instance(descriptor)
    
    def _create_instance(self, descriptor: ServiceDescriptor) -> Any:
        """创建服务实例"""
        if descriptor.factory:
            return descriptor.factory(self)
        
        impl = descriptor.implementation
        sig = inspect.signature(impl.__init__)
        kwargs = {}
        
        for param_name, param in sig.parameters.items():
            if param_name == 'self':
                continue
            if param.annotation != inspect.Parameter.empty:
                try:
                    kwargs[param_name] = self.get(param.annotation)
                except ValueError:
                    if param.default != inspect.Parameter.empty:
                        continue
                    raise
        
        return impl(**kwargs)
    
    def create_scope(self, scope_id: str = None) -> 'Scope':
        """创建作用域"""
        import uuid
        scope_id = scope_id or str(uuid.uuid4())
        self._scopes[scope_id] = {}
        return Scope(self, scope_id)
    
    def build(self) -> 'ServiceProvider':
        """构建服务提供者"""
        return ServiceProvider(self)


class Scope:
    """作用域"""
    
    def __init__(self, container: Container, scope_id: str):
        self._container = container
        self._scope_id = scope_id
        container._current_scope = scope_id
    
    def get(self, service_type: Type) -> Any:
        return self._container.get(service_type)
    
    def dispose(self) -> None:
        if self._scope_id in self._container._scopes:
            del self._container._scopes[self._scope_id]
        self._container._current_scope = None


class ServiceProvider:
    """服务提供者"""
    
    def __init__(self, container: Container):
        self._container = container
    
    def get(self, service_type: Type) -> Any:
        return self._container.get(service_type)
    
    def create_scope(self) -> Scope:
        return self._container.create_scope()


container = Container()

container.register_singleton(InMemoryUserRepository)
container.register_transient(UserService)
container.register_transient(UserController)

provider = container.build()

controller1 = provider.get(UserController)
controller2 = provider.get(UserController)

print(f"控制器是不同实例: {controller1 is not controller2}")
print(f"仓储是相同实例: {controller1._service._repository is controller2._service._repository}")

26.3.3 整洁架构

python
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto


class OrderStatus(Enum):
    PENDING = auto()
    CONFIRMED = auto()
    SHIPPED = auto()
    DELIVERED = auto()
    CANCELLED = auto()


@dataclass
class OrderItem:
    """订单项实体"""
    product_id: int
    product_name: str
    quantity: int
    unit_price: float
    
    @property
    def total(self) -> float:
        return self.quantity * self.unit_price


@dataclass
class Order:
    """订单实体(核心业务逻辑)"""
    id: int
    customer_id: int
    items: List[OrderItem]
    status: OrderStatus = OrderStatus.PENDING
    created_at: datetime = field(default_factory=datetime.now)
    
    @property
    def total(self) -> float:
        return sum(item.total for item in self.items)
    
    def add_item(self, item: OrderItem) -> None:
        self.items.append(item)
    
    def remove_item(self, product_id: int) -> bool:
        for i, item in enumerate(self.items):
            if item.product_id == product_id:
                self.items.pop(i)
                return True
        return False
    
    def confirm(self) -> None:
        if self.status != OrderStatus.PENDING:
            raise ValueError("只有待确认订单可以确认")
        if not self.items:
            raise ValueError("订单不能为空")
        self.status = OrderStatus.CONFIRMED
    
    def ship(self) -> None:
        if self.status != OrderStatus.CONFIRMED:
            raise ValueError("只有已确认订单可以发货")
        self.status = OrderStatus.SHIPPED
    
    def deliver(self) -> None:
        if self.status != OrderStatus.SHIPPED:
            raise ValueError("只有已发货订单可以确认收货")
        self.status = OrderStatus.DELIVERED
    
    def cancel(self) -> None:
        if self.status in (OrderStatus.SHIPPED, OrderStatus.DELIVERED):
            raise ValueError("已发货或已收货的订单不能取消")
        self.status = OrderStatus.CANCELLED


class OrderRepository(ABC):
    """订单仓储接口(核心层)"""
    
    @abstractmethod
    def find_by_id(self, order_id: int) -> Optional[Order]:
        pass
    
    @abstractmethod
    def find_by_customer(self, customer_id: int) -> List[Order]:
        pass
    
    @abstractmethod
    def save(self, order: Order) -> Order:
        pass
    
    @abstractmethod
    def delete(self, order_id: int) -> bool:
        pass


class OrderUseCases:
    """订单用例(核心层)"""
    
    def __init__(self, repository: OrderRepository):
        self._repository = repository
    
    def create_order(self, customer_id: int, items: List[Dict]) -> Order:
        order_items = [
            OrderItem(
                product_id=item['product_id'],
                product_name=item['product_name'],
                quantity=item['quantity'],
                unit_price=item['unit_price']
            )
            for item in items
        ]
        order = Order(id=0, customer_id=customer_id, items=order_items)
        return self._repository.save(order)
    
    def get_order(self, order_id: int) -> Optional[Order]:
        return self._repository.find_by_id(order_id)
    
    def get_customer_orders(self, customer_id: int) -> List[Order]:
        return self._repository.find_by_customer(customer_id)
    
    def confirm_order(self, order_id: int) -> Order:
        order = self._repository.find_by_id(order_id)
        if not order:
            raise ValueError(f"订单不存在: {order_id}")
        order.confirm()
        return self._repository.save(order)
    
    def ship_order(self, order_id: int) -> Order:
        order = self._repository.find_by_id(order_id)
        if not order:
            raise ValueError(f"订单不存在: {order_id}")
        order.ship()
        return self._repository.save(order)
    
    def cancel_order(self, order_id: int) -> Order:
        order = self._repository.find_by_id(order_id)
        if not order:
            raise ValueError(f"订单不存在: {order_id}")
        order.cancel()
        return self._repository.save(order)


class InMemoryOrderRepository(OrderRepository):
    """内存订单仓储(基础设施层)"""
    
    def __init__(self):
        self._orders: Dict[int, Order] = {}
        self._customer_index: Dict[int, List[int]] = {}
        self._next_id = 1
    
    def find_by_id(self, order_id: int) -> Optional[Order]:
        return self._orders.get(order_id)
    
    def find_by_customer(self, customer_id: int) -> List[Order]:
        order_ids = self._customer_index.get(customer_id, [])
        return [self._orders[oid] for oid in order_ids if oid in self._orders]
    
    def save(self, order: Order) -> Order:
        if order.id == 0:
            order = Order(
                id=self._next_id,
                customer_id=order.customer_id,
                items=order.items,
                status=order.status,
                created_at=order.created_at
            )
            self._next_id += 1
        
        self._orders[order.id] = order
        
        if order.customer_id not in self._customer_index:
            self._customer_index[order.customer_id] = []
        if order.id not in self._customer_index[order.customer_id]:
            self._customer_index[order.customer_id].append(order.id)
        
        return order
    
    def delete(self, order_id: int) -> bool:
        order = self._orders.pop(order_id, None)
        if order:
            if order.customer_id in self._customer_index:
                self._customer_index[order.customer_id] = [
                    oid for oid in self._customer_index[order.customer_id]
                    if oid != order_id
                ]
            return True
        return False


class OrderController:
    """订单控制器(接口适配层)"""
    
    def __init__(self, use_cases: OrderUseCases):
        self._use_cases = use_cases
    
    def create_order(self, request: dict) -> dict:
        try:
            order = self._use_cases.create_order(
                customer_id=request['customer_id'],
                items=request['items']
            )
            return {'success': True, 'data': self._order_to_dict(order)}
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def get_order(self, order_id: int) -> dict:
        order = self._use_cases.get_order(order_id)
        if not order:
            return {'success': False, 'error': '订单不存在'}
        return {'success': True, 'data': self._order_to_dict(order)}
    
    def confirm_order(self, order_id: int) -> dict:
        try:
            order = self._use_cases.confirm_order(order_id)
            return {'success': True, 'data': self._order_to_dict(order)}
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
    def _order_to_dict(self, order: Order) -> dict:
        return {
            'id': order.id,
            'customer_id': order.customer_id,
            'items': [
                {
                    'product_id': item.product_id,
                    'product_name': item.product_name,
                    'quantity': item.quantity,
                    'unit_price': item.unit_price,
                    'total': item.total
                }
                for item in order.items
            ],
            'total': order.total,
            'status': order.status.name,
            'created_at': order.created_at.isoformat()
        }


repository = InMemoryOrderRepository()
use_cases = OrderUseCases(repository)
controller = OrderController(use_cases)

result = controller.create_order({
    'customer_id': 1,
    'items': [
        {'product_id': 1, 'product_name': '商品A', 'quantity': 2, 'unit_price': 100},
        {'product_id': 2, 'product_name': '商品B', 'quantity': 1, 'unit_price': 200},
    ]
})
print(f"创建订单: {result}")

result = controller.confirm_order(1)
print(f"确认订单: {result}")

26.4 六边形架构

26.4.1 核心概念

python
from abc import ABC, abstractmethod
from typing import Protocol, List, Optional, Dict, Any, Type
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto


@dataclass
class Product:
    """产品实体"""
    id: int
    name: str
    description: str
    price: float
    stock: int
    category_id: int
    created_at: datetime = field(default_factory=datetime.now)
    
    def update_stock(self, quantity: int) -> None:
        if self.stock + quantity < 0:
            raise ValueError("库存不足")
        self.stock += quantity
    
    def update_price(self, new_price: float) -> None:
        if new_price < 0:
            raise ValueError("价格不能为负")
        self.price = new_price


class ProductRepository(Protocol):
    """产品仓储端口(入站端口)"""
    
    def find_by_id(self, product_id: int) -> Optional[Product]:
        ...
    
    def find_all(self) -> List[Product]:
        ...
    
    def find_by_category(self, category_id: int) -> List[Product]:
        ...
    
    def save(self, product: Product) -> Product:
        ...
    
    def delete(self, product_id: int) -> bool:
        ...


class ProductUseCases:
    """产品用例(应用核心)"""
    
    def __init__(self, repository: ProductRepository):
        self._repository = repository
    
    def get_product(self, product_id: int) -> Optional[Product]:
        return self._repository.find_by_id(product_id)
    
    def list_products(self) -> List[Product]:
        return self._repository.find_all()
    
    def list_by_category(self, category_id: int) -> List[Product]:
        return self._repository.find_by_category(category_id)
    
    def create_product(
        self, 
        name: str, 
        description: str, 
        price: float, 
        stock: int, 
        category_id: int
    ) -> Product:
        self._validate_price(price)
        self._validate_stock(stock)
        
        product = Product(
            id=0,
            name=name,
            description=description,
            price=price,
            stock=stock,
            category_id=category_id
        )
        return self._repository.save(product)
    
    def update_product(
        self, 
        product_id: int, 
        name: str = None, 
        description: str = None, 
        price: float = None, 
        stock: int = None
    ) -> Product:
        product = self._repository.find_by_id(product_id)
        if not product:
            raise ValueError(f"产品不存在: {product_id}")
        
        if price is not None:
            self._validate_price(price)
            product.price = price
        if stock is not None:
            product.stock = stock
        if name is not None:
            product.name = name
        if description is not None:
            product.description = description
        
        return self._repository.save(product)
    
    def update_stock(self, product_id: int, quantity: int) -> Product:
        product = self._repository.find_by_id(product_id)
        if not product:
            raise ValueError(f"产品不存在: {product_id}")
        product.update_stock(quantity)
        return self._repository.save(product)
    
    def delete_product(self, product_id: int) -> bool:
        return self._repository.delete(product_id)
    
    def _validate_price(self, price: float) -> None:
        if price < 0:
            raise ValueError("价格不能为负")
    
    def _validate_stock(self, stock: int) -> None:
        if stock < 0:
            raise ValueError("库存不能为负")


class InMemoryProductRepository:
    """内存产品仓储适配器(出站适配器)"""
    
    def __init__(self):
        self._products: Dict[int, Product] = {}
        self._category_index: Dict[int, List[int]] = {}
        self._next_id = 1
    
    def find_by_id(self, product_id: int) -> Optional[Product]:
        return self._products.get(product_id)
    
    def find_all(self) -> List[Product]:
        return list(self._products.values())
    
    def find_by_category(self, category_id: int) -> List[Product]:
        product_ids = self._category_index.get(category_id, [])
        return [self._products[pid] for pid in product_ids if pid in self._products]
    
    def save(self, product: Product) -> Product:
        if product.id == 0:
            product = Product(
                id=self._next_id,
                name=product.name,
                description=product.description,
                price=product.price,
                stock=product.stock,
                category_id=product.category_id,
                created_at=product.created_at
            )
            self._next_id += 1
        
        self._products[product.id] = product
        
        if product.category_id not in self._category_index:
            self._category_index[product.category_id] = []
        if product.id not in self._category_index[product.category_id]:
            self._category_index[product.category_id].append(product.id)
        
        return product
    
    def delete(self, product_id: int) -> bool:
        product = self._products.pop(product_id, None)
        if product:
            if product.category_id in self._category_index:
                self._category_index[product.category_id] = [
                    pid for pid in self._category_index[product.category_id]
                    if pid != product_id
                ]
            return True
        return False


class RestAdapter:
    """REST API适配器(入站适配器)"""
    
    def __init__(self, use_cases: ProductUseCases):
        self._use_cases = use_cases
    
    def get_product(self, product_id: int) -> dict:
        product = self._use_cases.get_product(product_id)
        if not product:
            return {'success': False, 'error': '产品不存在', 'status': 404}
        return {'success': True, 'data': self._to_dict(product)}
    
    def list_products(self, category_id: int = None) -> dict:
        if category_id:
            products = self._use_cases.list_by_category(category_id)
        else:
            products = self._use_cases.list_products()
        return {'success': True, 'data': [self._to_dict(p) for p in products]}
    
    def create_product(self, request: dict) -> dict:
        try:
            product = self._use_cases.create_product(
                name=request['name'],
                description=request.get('description', ''),
                price=request['price'],
                stock=request.get('stock', 0),
                category_id=request['category_id']
            )
            return {'success': True, 'data': self._to_dict(product), 'status': 201}
        except ValueError as e:
            return {'success': False, 'error': str(e), 'status': 400}
    
    def update_product(self, product_id: int, request: dict) -> dict:
        try:
            product = self._use_cases.update_product(
                product_id=product_id,
                name=request.get('name'),
                description=request.get('description'),
                price=request.get('price'),
                stock=request.get('stock')
            )
            return {'success': True, 'data': self._to_dict(product)}
        except ValueError as e:
            return {'success': False, 'error': str(e), 'status': 400}
    
    def _to_dict(self, product: Product) -> dict:
        return {
            'id': product.id,
            'name': product.name,
            'description': product.description,
            'price': product.price,
            'stock': product.stock,
            'category_id': product.category_id,
            'created_at': product.created_at.isoformat()
        }


class CliAdapter:
    """命令行适配器(入站适配器)"""
    
    def __init__(self, use_cases: ProductUseCases):
        self._use_cases = use_cases
    
    def list_products(self) -> str:
        products = self._use_cases.list_products()
        if not products:
            return "没有产品"
        
        lines = ["ID\t名称\t价格\t库存"]
        lines.extend(f"{p.id}\t{p.name}\t{p.price}\t{p.stock}" for p in products)
        return "\n".join(lines)
    
    def show_product(self, product_id: int) -> str:
        product = self._use_cases.get_product(product_id)
        if not product:
            return f"产品不存在: {product_id}"
        
        return f"""
产品详情
--------
ID: {product.id}
名称: {product.name}
描述: {product.description}
价格: ¥{product.price}
库存: {product.stock}
分类: {product.category_id}
创建时间: {product.created_at}
"""


repository = InMemoryProductRepository()
use_cases = ProductUseCases(repository)

rest_adapter = RestAdapter(use_cases)
cli_adapter = CliAdapter(use_cases)

result = rest_adapter.create_product({
    'name': 'Python编程',
    'description': 'Python编程入门书籍',
    'price': 89.0,
    'stock': 100,
    'category_id': 1
})
print(f"REST创建产品: {result}")

print(f"\nCLI产品列表:\n{cli_adapter.list_products()}")

26.5 CQRS模式

26.5.1 命令查询职责分离

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable, Type
from datetime import datetime
from abc import ABC, abstractmethod
import json


@dataclass
class Command:
    """命令基类"""
    timestamp: datetime = field(default_factory=datetime.now)


@dataclass
class Query:
    """查询基类"""
    pass


@dataclass
class CreateOrderCommand(Command):
    """创建订单命令"""
    customer_id: int
    items: List[dict]


@dataclass
class UpdateOrderStatusCommand(Command):
    """更新订单状态命令"""
    order_id: int
    status: str


@dataclass
class CancelOrderCommand(Command):
    """取消订单命令"""
    order_id: int
    reason: str


@dataclass
class GetOrderQuery(Query):
    """获取订单查询"""
    order_id: int


@dataclass
class ListOrdersQuery(Query):
    """列出订单查询"""
    customer_id: Optional[int] = None
    status: Optional[str] = None
    page: int = 1
    page_size: int = 10


@dataclass
class OrderSummary:
    """订单摘要(读模型)"""
    id: int
    customer_id: int
    customer_name: str
    total: float
    status: str
    item_count: int
    created_at: datetime


@dataclass
class OrderDetail:
    """订单详情(读模型)"""
    id: int
    customer_id: int
    customer_name: str
    items: List[dict]
    total: float
    status: str
    created_at: datetime
    updated_at: datetime


class OrderWriteModel:
    """订单写模型"""
    
    def __init__(self):
        self._orders: Dict[int, dict] = {}
        self._next_id = 1
    
    def create(self, customer_id: int, items: List[dict]) -> int:
        order_id = self._next_id
        self._next_id += 1
        
        total = sum(item['price'] * item['quantity'] for item in items)
        
        self._orders[order_id] = {
            'id': order_id,
            'customer_id': customer_id,
            'items': items,
            'total': total,
            'status': 'pending',
            'created_at': datetime.now(),
            'updated_at': datetime.now()
        }
        
        return order_id
    
    def update_status(self, order_id: int, status: str) -> bool:
        if order_id not in self._orders:
            return False
        
        self._orders[order_id]['status'] = status
        self._orders[order_id]['updated_at'] = datetime.now()
        return True
    
    def cancel(self, order_id: int) -> bool:
        if order_id not in self._orders:
            return False
        
        order = self._orders[order_id]
        if order['status'] in ('shipped', 'delivered'):
            return False
        
        order['status'] = 'cancelled'
        order['updated_at'] = datetime.now()
        return True
    
    def get(self, order_id: int) -> Optional[dict]:
        return self._orders.get(order_id)


class OrderReadModel:
    """订单读模型"""
    
    def __init__(self):
        self._summaries: Dict[int, OrderSummary] = {}
        self._details: Dict[int, OrderDetail] = {}
        self._by_customer: Dict[int, List[int]] = {}
        self._by_status: Dict[str, List[int]] = {}
    
    def index(self, order_data: dict, customer_name: str = None) -> None:
        order_id = order_data['id']
        
        summary = OrderSummary(
            id=order_id,
            customer_id=order_data['customer_id'],
            customer_name=customer_name or f"客户{order_data['customer_id']}",
            total=order_data['total'],
            status=order_data['status'],
            item_count=len(order_data['items']),
            created_at=order_data['created_at']
        )
        self._summaries[order_id] = summary
        
        detail = OrderDetail(
            id=order_id,
            customer_id=order_data['customer_id'],
            customer_name=customer_name or f"客户{order_data['customer_id']}",
            items=order_data['items'],
            total=order_data['total'],
            status=order_data['status'],
            created_at=order_data['created_at'],
            updated_at=order_data['updated_at']
        )
        self._details[order_id] = detail
        
        customer_id = order_data['customer_id']
        if customer_id not in self._by_customer:
            self._by_customer[customer_id] = []
        if order_id not in self._by_customer[customer_id]:
            self._by_customer[customer_id].append(order_id)
        
        status = order_data['status']
        if status not in self._by_status:
            self._by_status[status] = []
        if order_id not in self._by_status[status]:
            self._by_status[status].append(order_id)
    
    def get_summary(self, order_id: int) -> Optional[OrderSummary]:
        return self._summaries.get(order_id)
    
    def get_detail(self, order_id: int) -> Optional[OrderDetail]:
        return self._details.get(order_id)
    
    def list_by_customer(self, customer_id: int) -> List[OrderSummary]:
        order_ids = self._by_customer.get(customer_id, [])
        return [self._summaries[oid] for oid in order_ids if oid in self._summaries]
    
    def list_by_status(self, status: str) -> List[OrderSummary]:
        order_ids = self._by_status.get(status, [])
        return [self._summaries[oid] for oid in order_ids if oid in self._summaries]
    
    def list_all(self, page: int = 1, page_size: int = 10) -> List[OrderSummary]:
        all_summaries = sorted(
            self._summaries.values(),
            key=lambda s: s.created_at,
            reverse=True
        )
        start = (page - 1) * page_size
        return all_summaries[start:start + page_size]


class CommandHandler:
    """命令处理器"""
    
    def __init__(self, write_model: OrderWriteModel, read_model: OrderReadModel):
        self._write_model = write_model
        self._read_model = read_model
        self._handlers: Dict[Type, Callable] = {
            CreateOrderCommand: self._handle_create,
            UpdateOrderStatusCommand: self._handle_update_status,
            CancelOrderCommand: self._handle_cancel,
        }
    
    def handle(self, command: Command) -> Any:
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"未知命令类型: {type(command).__name__}")
        return handler(command)
    
    def _handle_create(self, command: CreateOrderCommand) -> int:
        order_id = self._write_model.create(
            customer_id=command.customer_id,
            items=command.items
        )
        
        order_data = self._write_model.get(order_id)
        self._read_model.index(order_data)
        
        return order_id
    
    def _handle_update_status(self, command: UpdateOrderStatusCommand) -> bool:
        success = self._write_model.update_status(
            order_id=command.order_id,
            status=command.status
        )
        
        if success:
            order_data = self._write_model.get(command.order_id)
            self._read_model.index(order_data)
        
        return success
    
    def _handle_cancel(self, command: CancelOrderCommand) -> bool:
        success = self._write_model.cancel(command.order_id)
        
        if success:
            order_data = self._write_model.get(command.order_id)
            self._read_model.index(order_data)
        
        return success


class QueryHandler:
    """查询处理器"""
    
    def __init__(self, read_model: OrderReadModel):
        self._read_model = read_model
        self._handlers: Dict[Type, Callable] = {
            GetOrderQuery: self._handle_get,
            ListOrdersQuery: self._handle_list,
        }
    
    def handle(self, query: Query) -> Any:
        handler = self._handlers.get(type(query))
        if not handler:
            raise ValueError(f"未知查询类型: {type(query).__name__}")
        return handler(query)
    
    def _handle_get(self, query: GetOrderQuery) -> Optional[OrderDetail]:
        return self._read_model.get_detail(query.order_id)
    
    def _handle_list(self, query: ListOrdersQuery) -> List[OrderSummary]:
        if query.customer_id:
            return self._read_model.list_by_customer(query.customer_id)
        if query.status:
            return self._read_model.list_by_status(query.status)
        return self._read_model.list_all(query.page, query.page_size)


write_model = OrderWriteModel()
read_model = OrderReadModel()

command_handler = CommandHandler(write_model, read_model)
query_handler = QueryHandler(read_model)

order_id = command_handler.handle(CreateOrderCommand(
    customer_id=1,
    items=[
        {'product_id': 1, 'name': '商品A', 'price': 100, 'quantity': 2},
        {'product_id': 2, 'name': '商品B', 'price': 50, 'quantity': 1},
    ]
))
print(f"创建订单ID: {order_id}")

command_handler.handle(CreateOrderCommand(
    customer_id=1,
    items=[{'product_id': 3, 'name': '商品C', 'price': 200, 'quantity': 1}]
))

command_handler.handle(UpdateOrderStatusCommand(order_id=order_id, status='confirmed'))

detail = query_handler.handle(GetOrderQuery(order_id=order_id))
print(f"订单详情: ID={detail.id}, 状态={detail.status}, 总额={detail.total}")

summaries = query_handler.handle(ListOrdersQuery(customer_id=1))
print(f"客户订单数: {len(summaries)}")

26.6 插件架构

26.6.1 微内核架构

python
from abc import ABC, abstractmethod
from typing import Dict, List, Callable, Any, Optional, Type
from dataclasses import dataclass, field
from enum import Enum, auto
import importlib
import inspect
import os
import json


class PluginState(Enum):
    """插件状态"""
    UNLOADED = auto()
    LOADED = auto()
    ACTIVE = auto()
    ERROR = auto()


@dataclass
class PluginInfo:
    """插件信息"""
    name: str
    version: str
    description: str
    author: str
    dependencies: List[str] = field(default_factory=list)


class PluginInterface(ABC):
    """插件接口"""
    
    @staticmethod
    @abstractmethod
    def get_info() -> PluginInfo:
        """获取插件信息"""
        pass
    
    @abstractmethod
    def initialize(self, context: 'PluginContext') -> None:
        """初始化插件"""
        pass
    
    @abstractmethod
    def start(self) -> None:
        """启动插件"""
        pass
    
    @abstractmethod
    def stop(self) -> None:
        """停止插件"""
        pass
    
    @abstractmethod
    def shutdown(self) -> None:
        """关闭插件"""
        pass


class PluginContext:
    """插件上下文"""
    
    def __init__(self):
        self._services: Dict[str, Any] = {}
        self._hooks: Dict[str, List[Callable]] = {}
        self._commands: Dict[str, Callable] = {}
        self._config: Dict[str, Any] = {}
    
    def register_service(self, name: str, service: Any) -> None:
        self._services[name] = service
    
    def get_service(self, name: str) -> Optional[Any]:
        return self._services.get(name)
    
    def register_hook(self, hook_name: str, callback: Callable) -> None:
        if hook_name not in self._hooks:
            self._hooks[hook_name] = []
        self._hooks[hook_name].append(callback)
    
    def execute_hook(self, hook_name: str, *args, **kwargs) -> List[Any]:
        results = []
        for callback in self._hooks.get(hook_name, []):
            try:
                result = callback(*args, **kwargs)
                results.append(result)
            except Exception as e:
                print(f"Hook执行错误: {e}")
        return results
    
    def register_command(self, command_name: str, handler: Callable) -> None:
        self._commands[command_name] = handler
    
    def execute_command(self, command_name: str, *args, **kwargs) -> Any:
        handler = self._commands.get(command_name)
        if handler:
            return handler(*args, **kwargs)
        raise ValueError(f"未知命令: {command_name}")
    
    def set_config(self, key: str, value: Any) -> None:
        self._config[key] = value
    
    def get_config(self, key: str, default: Any = None) -> Any:
        return self._config.get(key, default)


@dataclass
class PluginWrapper:
    """插件包装器"""
    plugin: PluginInterface
    info: PluginInfo
    state: PluginState = PluginState.UNLOADED
    error_message: str = ""


class PluginManager:
    """插件管理器"""
    
    def __init__(self):
        self._plugins: Dict[str, PluginWrapper] = {}
        self._context = PluginContext()
        self._load_order: List[str] = []
    
    def register_core_service(self, name: str, service: Any) -> None:
        """注册核心服务"""
        self._context.register_service(name, service)
    
    def register_plugin(self, plugin: PluginInterface) -> bool:
        """注册插件"""
        info = plugin.get_info()
        
        if info.name in self._plugins:
            print(f"插件已存在: {info.name}")
            return False
        
        for dep in info.dependencies:
            if dep not in self._plugins:
                print(f"缺少依赖: {dep} (required by {info.name})")
                return False
        
        wrapper = PluginWrapper(plugin=plugin, info=info)
        self._plugins[info.name] = wrapper
        
        try:
            plugin.initialize(self._context)
            wrapper.state = PluginState.LOADED
            self._load_order.append(info.name)
            print(f"插件已注册: {info.name} v{info.version}")
            return True
        except Exception as e:
            wrapper.state = PluginState.ERROR
            wrapper.error_message = str(e)
            print(f"插件初始化失败: {info.name} - {e}")
            return False
    
    def unregister_plugin(self, name: str) -> bool:
        """注销插件"""
        if name not in self._plugins:
            return False
        
        wrapper = self._plugins[name]
        
        if wrapper.state == PluginState.ACTIVE:
            self.stop_plugin(name)
        
        try:
            wrapper.plugin.shutdown()
            del self._plugins[name]
            self._load_order.remove(name)
            print(f"插件已注销: {name}")
            return True
        except Exception as e:
            print(f"插件注销失败: {name} - {e}")
            return False
    
    def start_plugin(self, name: str) -> bool:
        """启动插件"""
        if name not in self._plugins:
            return False
        
        wrapper = self._plugins[name]
        
        if wrapper.state == PluginState.ACTIVE:
            return True
        
        try:
            wrapper.plugin.start()
            wrapper.state = PluginState.ACTIVE
            print(f"插件已启动: {name}")
            return True
        except Exception as e:
            wrapper.state = PluginState.ERROR
            wrapper.error_message = str(e)
            print(f"插件启动失败: {name} - {e}")
            return False
    
    def stop_plugin(self, name: str) -> bool:
        """停止插件"""
        if name not in self._plugins:
            return False
        
        wrapper = self._plugins[name]
        
        if wrapper.state != PluginState.ACTIVE:
            return True
        
        try:
            wrapper.plugin.stop()
            wrapper.state = PluginState.LOADED
            print(f"插件已停止: {name}")
            return True
        except Exception as e:
            wrapper.state = PluginState.ERROR
            wrapper.error_message = str(e)
            print(f"插件停止失败: {name} - {e}")
            return False
    
    def start_all(self) -> None:
        """启动所有插件"""
        for name in self._load_order:
            self.start_plugin(name)
    
    def stop_all(self) -> None:
        """停止所有插件"""
        for name in reversed(self._load_order):
            self.stop_plugin(name)
    
    def get_plugin(self, name: str) -> Optional[PluginInterface]:
        """获取插件"""
        wrapper = self._plugins.get(name)
        return wrapper.plugin if wrapper else None
    
    def list_plugins(self) -> List[Dict]:
        """列出所有插件"""
        return [
            {
                'name': w.info.name,
                'version': w.info.version,
                'state': w.state.name,
                'description': w.info.description
            }
            for w in self._plugins.values()
        ]
    
    def execute_hook(self, hook_name: str, *args, **kwargs) -> List[Any]:
        """执行钩子"""
        return self._context.execute_hook(hook_name, *args, **kwargs)
    
    def execute_command(self, command_name: str, *args, **kwargs) -> Any:
        """执行命令"""
        return self._context.execute_command(command_name, *args, **kwargs)


class LoggerPlugin(PluginInterface):
    """日志插件"""
    
    @staticmethod
    def get_info() -> PluginInfo:
        return PluginInfo(
            name="logger",
            version="1.0.0",
            description="日志记录插件",
            author="System"
        )
    
    def initialize(self, context: PluginContext) -> None:
        self._context = context
        self._logs: List[str] = []
        context.register_hook("on_log", self._log)
        context.register_command("get_logs", self._get_logs)
    
    def start(self) -> None:
        print("日志插件已启动")
    
    def stop(self) -> None:
        print("日志插件已停止")
    
    def shutdown(self) -> None:
        self._logs.clear()
    
    def _log(self, message: str, level: str = "INFO") -> None:
        from datetime import datetime
        entry = f"[{datetime.now().isoformat()}] [{level}] {message}"
        self._logs.append(entry)
        print(entry)
    
    def _get_logs(self) -> List[str]:
        return self._logs.copy()


class MetricsPlugin(PluginInterface):
    """指标插件"""
    
    @staticmethod
    def get_info() -> PluginInfo:
        return PluginInfo(
            name="metrics",
            version="1.0.0",
            description="性能指标插件",
            author="System",
            dependencies=["logger"]
        )
    
    def initialize(self, context: PluginContext) -> None:
        self._context = context
        self._counters: Dict[str, int] = {}
        self._timers: Dict[str, List[float]] = {}
        context.register_hook("on_metric", self._record_metric)
        context.register_command("get_metrics", self._get_metrics)
    
    def start(self) -> None:
        print("指标插件已启动")
    
    def stop(self) -> None:
        print("指标插件已停止")
    
    def shutdown(self) -> None:
        self._counters.clear()
        self._timers.clear()
    
    def _record_metric(self, name: str, value: float, metric_type: str = "counter") -> None:
        if metric_type == "counter":
            self._counters[name] = self._counters.get(name, 0) + int(value)
        elif metric_type == "timer":
            if name not in self._timers:
                self._timers[name] = []
            self._timers[name].append(value)
    
    def _get_metrics(self) -> Dict:
        return {
            'counters': self._counters.copy(),
            'timers': {
                name: {
                    'count': len(values),
                    'avg': sum(values) / len(values) if values else 0,
                    'min': min(values) if values else 0,
                    'max': max(values) if values else 0
                }
                for name, values in self._timers.items()
            }
        }


manager = PluginManager()

manager.register_plugin(LoggerPlugin())
manager.register_plugin(MetricsPlugin())

manager.start_all()

manager.execute_hook("on_log", "系统启动", "INFO")
manager.execute_hook("on_metric", "requests", 1, "counter")
manager.execute_hook("on_metric", "response_time", 0.15, "timer")

print(f"\n插件列表: {manager.list_plugins()}")
print(f"日志: {manager.execute_command('get_logs')}")
print(f"指标: {manager.execute_command('get_metrics')}")

manager.stop_all()

26.7 事件驱动架构

26.7.1 事件总线与事件溯源

python
from dataclasses import dataclass, field
from typing import Dict, List, Any, Callable, Optional, Type
from datetime import datetime
from abc import ABC, abstractmethod
import json
import asyncio


@dataclass
class Event:
    """事件基类"""
    event_id: str
    event_type: str
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass
class DomainEvent(Event):
    """领域事件"""
    aggregate_id: str
    aggregate_type: str
    version: int = 1


@dataclass
class OrderCreatedEvent(DomainEvent):
    """订单创建事件"""
    customer_id: int
    items: List[dict]
    total: float


@dataclass
class OrderStatusChangedEvent(DomainEvent):
    """订单状态变更事件"""
    old_status: str
    new_status: str


@dataclass
class PaymentProcessedEvent(DomainEvent):
    """支付处理事件"""
    amount: float
    payment_method: str


class EventHandler(ABC):
    """事件处理器接口"""
    
    @abstractmethod
    def handle(self, event: Event) -> None:
        pass


class EventBus:
    """事件总线"""
    
    def __init__(self):
        self._handlers: Dict[str, List[Callable]] = {}
        self._async_handlers: Dict[str, List[Callable]] = {}
        self._middleware: List[Callable] = []
        self._dead_letter_queue: List[Event] = []
    
    def subscribe(self, event_type: str, handler: Callable) -> None:
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    def subscribe_async(self, event_type: str, handler: Callable) -> None:
        if event_type not in self._async_handlers:
            self._async_handlers[event_type] = []
        self._async_handlers[event_type].append(handler)
    
    def add_middleware(self, middleware: Callable) -> None:
        self._middleware.append(middleware)
    
    def publish(self, event: Event) -> None:
        for middleware in self._middleware:
            try:
                middleware(event)
            except Exception as e:
                print(f"中间件错误: {e}")
                return
        
        handlers = self._handlers.get(event.event_type, [])
        for handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"处理器错误: {e}")
                self._dead_letter_queue.append(event)
    
    async def publish_async(self, event: Event) -> None:
        for middleware in self._middleware:
            try:
                if asyncio.iscoroutinefunction(middleware):
                    await middleware(event)
                else:
                    middleware(event)
            except Exception as e:
                print(f"中间件错误: {e}")
                return
        
        handlers = self._handlers.get(event.event_type, [])
        async_handlers = self._async_handlers.get(event.event_type, [])
        
        for handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"处理器错误: {e}")
                self._dead_letter_queue.append(event)
        
        for handler in async_handlers:
            try:
                await handler(event)
            except Exception as e:
                print(f"异步处理器错误: {e}")
                self._dead_letter_queue.append(event)
    
    def get_dead_letter_queue(self) -> List[Event]:
        return self._dead_letter_queue.copy()


class EventStore:
    """事件存储"""
    
    def __init__(self):
        self._events: List[Event] = []
        self._by_aggregate: Dict[str, List[Event]] = {}
        self._by_type: Dict[str, List[Event]] = {}
    
    def append(self, event: Event) -> None:
        self._events.append(event)
        
        if isinstance(event, DomainEvent):
            if event.aggregate_id not in self._by_aggregate:
                self._by_aggregate[event.aggregate_id] = []
            self._by_aggregate[event.aggregate_id].append(event)
        
        if event.event_type not in self._by_type:
            self._by_type[event.event_type] = []
        self._by_type[event.event_type].append(event)
    
    def get_events(self, aggregate_id: str) -> List[Event]:
        return self._by_aggregate.get(aggregate_id, []).copy()
    
    def get_events_by_type(self, event_type: str) -> List[Event]:
        return self._by_type.get(event_type, []).copy()
    
    def get_all_events(self) -> List[Event]:
        return self._events.copy()


class OrderAggregate:
    """订单聚合根(事件溯源)"""
    
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.customer_id: Optional[int] = None
        self.items: List[dict] = []
        self.total: float = 0
        self.status: str = "pending"
        self.version: int = 0
        self._uncommitted_events: List[Event] = []
    
    def create(self, customer_id: int, items: List[dict]) -> None:
        if self.version > 0:
            raise ValueError("订单已存在")
        
        total = sum(item['price'] * item['quantity'] for item in items)
        
        event = OrderCreatedEvent(
            event_id=f"evt-{self.order_id}-{self.version + 1}",
            event_type="order_created",
            aggregate_id=self.order_id,
            aggregate_type="order",
            version=self.version + 1,
            customer_id=customer_id,
            items=items,
            total=total
        )
        self._apply_event(event)
    
    def change_status(self, new_status: str) -> None:
        if self.status == new_status:
            return
        
        valid_transitions = {
            "pending": ["confirmed", "cancelled"],
            "confirmed": ["shipped", "cancelled"],
            "shipped": ["delivered"],
            "delivered": [],
            "cancelled": []
        }
        
        if new_status not in valid_transitions.get(self.status, []):
            raise ValueError(f"无效的状态转换: {self.status} -> {new_status}")
        
        event = OrderStatusChangedEvent(
            event_id=f"evt-{self.order_id}-{self.version + 1}",
            event_type="order_status_changed",
            aggregate_id=self.order_id,
            aggregate_type="order",
            version=self.version + 1,
            old_status=self.status,
            new_status=new_status
        )
        self._apply_event(event)
    
    def _apply_event(self, event: Event) -> None:
        if isinstance(event, OrderCreatedEvent):
            self.customer_id = event.customer_id
            self.items = event.items
            self.total = event.total
            self.status = "pending"
        elif isinstance(event, OrderStatusChangedEvent):
            self.status = event.new_status
        
        self.version = event.version
        self._uncommitted_events.append(event)
    
    def load_from_history(self, events: List[Event]) -> None:
        for event in events:
            self._apply_event(event)
        self._uncommitted_events.clear()
    
    def get_uncommitted_events(self) -> List[Event]:
        return self._uncommitted_events.copy()
    
    def mark_events_as_committed(self) -> None:
        self._uncommitted_events.clear()


class OrderService:
    """订单服务"""
    
    def __init__(self, event_bus: EventBus, event_store: EventStore):
        self._event_bus = event_bus
        self._event_store = event_store
    
    def create_order(self, order_id: str, customer_id: int, items: List[dict]) -> OrderAggregate:
        order = OrderAggregate(order_id)
        order.create(customer_id, items)
        
        for event in order.get_uncommitted_events():
            self._event_store.append(event)
            self._event_bus.publish(event)
        
        order.mark_events_as_committed()
        return order
    
    def update_status(self, order_id: str, new_status: str) -> OrderAggregate:
        order = self._get_order(order_id)
        order.change_status(new_status)
        
        for event in order.get_uncommitted_events():
            self._event_store.append(event)
            self._event_bus.publish(event)
        
        order.mark_events_as_committed()
        return order
    
    def _get_order(self, order_id: str) -> OrderAggregate:
        order = OrderAggregate(order_id)
        events = self._event_store.get_events(order_id)
        if events:
            order.load_from_history(events)
        return order


class InventoryProjection:
    """库存投影(读模型)"""
    
    def __init__(self, event_bus: EventBus):
        self._inventory: Dict[str, int] = {}
        self._reservations: Dict[str, List[str]] = {}
        event_bus.subscribe("order_created", self._on_order_created)
        event_bus.subscribe("order_status_changed", self._on_status_changed)
    
    def _on_order_created(self, event: OrderCreatedEvent) -> None:
        for item in event.items:
            product_id = str(item['product_id'])
            quantity = item['quantity']
            
            if product_id not in self._inventory:
                self._inventory[product_id] = 100
            
            self._inventory[product_id] -= quantity
            
            if event.aggregate_id not in self._reservations:
                self._reservations[event.aggregate_id] = []
            self._reservations[event.aggregate_id].append(product_id)
    
    def _on_status_changed(self, event: OrderStatusChangedEvent) -> None:
        if event.new_status == "cancelled":
            order_id = event.aggregate_id
            if order_id in self._reservations:
                del self._reservations[order_id]
    
    def get_stock(self, product_id: str) -> int:
        return self._inventory.get(product_id, 0)


class NotificationProjection:
    """通知投影"""
    
    def __init__(self, event_bus: EventBus):
        self._notifications: List[dict] = []
        event_bus.subscribe("order_created", self._on_order_created)
        event_bus.subscribe("order_status_changed", self._on_status_changed)
    
    def _on_order_created(self, event: OrderCreatedEvent) -> None:
        self._notifications.append({
            'type': 'order_confirmation',
            'order_id': event.aggregate_id,
            'customer_id': event.customer_id,
            'message': f"订单 {event.aggregate_id} 创建成功"
        })
    
    def _on_status_changed(self, event: OrderStatusChangedEvent) -> None:
        self._notifications.append({
            'type': 'status_update',
            'order_id': event.aggregate_id,
            'message': f"订单状态从 {event.old_status} 变更为 {event.new_status}"
        })
    
    def get_notifications(self) -> List[dict]:
        return self._notifications.copy()


event_bus = EventBus()
event_store = EventStore()

inventory_projection = InventoryProjection(event_bus)
notification_projection = NotificationProjection(event_bus)

order_service = OrderService(event_bus, event_store)

order = order_service.create_order(
    order_id="ORD-001",
    customer_id=1,
    items=[
        {'product_id': 1, 'name': '商品A', 'price': 100, 'quantity': 2},
        {'product_id': 2, 'name': '商品B', 'price': 50, 'quantity': 1},
    ]
)
print(f"订单创建: {order.order_id}, 总额: {order.total}")

order = order_service.update_status("ORD-001", "confirmed")
print(f"订单状态: {order.status}")

print(f"\n库存: {inventory_projection.get_stock('1')}")
print(f"通知: {notification_projection.get_notifications()}")

26.8 架构决策指南

26.8.1 架构选择决策树

┌─────────────────────────────────────────────────────────────────┐
│                    架构模式选择指南                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  系统复杂度如何?                                               │
│         │                                                       │
│         ├── 简单 ──→ 单体架构                                   │
│         │                                                       │
│         └── 复杂 ──┐                                            │
│                    │                                            │
│                    ▼                                            │
│         团队规模如何?                                          │
│         │                                                       │
│         ├── 小型团队 ──→ 模块化单体                             │
│         │                                                       │
│         └── 大型团队 ──┐                                        │
│                    │                                            │
│                    ▼                                            │
│         业务领域是否独立?                                      │
│         │                                                       │
│         ├── 是 ──→ 微服务架构                                   │
│         │                                                       │
│         └── 否 ──┐                                              │
│                    │                                            │
│                    ▼                                            │
│         读写负载差异大吗?                                      │
│         │                                                       │
│         ├── 是 ──→ CQRS                                        │
│         │                                                       │
│         └── 否 ──→ 分层架构                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

26.8.2 架构模式对比

特性分层架构六边形架构微服务事件驱动
复杂度
可测试性
可扩展性
部署复杂度
学习曲线
团队协作
适用规模中小型中型大型大型

26.9 小结

架构设计模式是软件系统的高层结构设计,它决定了系统的组织方式、组件间的交互关系以及演进方向。

关键要点

  1. 分层架构:关注点分离,适合传统企业应用
  2. 六边形架构:依赖反转,便于测试和适配
  3. CQRS:读写分离,适合读写负载差异大的系统
  4. 插件架构:动态扩展,适合需要灵活定制的系统
  5. 事件驱动:异步解耦,适合复杂业务流程

实践建议

  1. 根据业务复杂度和团队规模选择架构
  2. 从简单架构开始,按需演进
  3. 关注架构边界和依赖方向
  4. 保持架构的可测试性
  5. 定期评估架构的适应性

下一章预告

下一章将介绍领域驱动设计,探讨如何使用战略设计和战术设计构建复杂业务系统。

Python技术丛书 - 江苏省宿城中等专业学校