Skip to content

第13章 责任链模式

学习目标

完成本章学习后,读者将能够:

  • 理解责任链模式的核心概念与形式化定义
  • 掌握链式处理请求的多种实现方式
  • 设计纯责任链与非纯责任链
  • 实现中间件管道与事件处理系统
  • 分析责任链模式的性能特征与优化策略
  • 识别责任链模式的适用场景与反模式

13.1 模式定义

13.1.1 正式定义

责任链模式(Chain of Responsibility Pattern) 使多个对象都有机会处理请求,从而避免请求的发送者和接收者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。

$$\text{CoR}: \text{Request} \xrightarrow{h_1 \to h_2 \to ... \to h_n} \text{Response}$$

其中:

  • $h_1, h_2, ..., h_n$ 为处理器链
  • $\to$ 表示请求传递方向
  • 处理器 $h_i$ 可选择处理或传递

责任链传递规则

$$\text{Handle}(r, i) = \begin{cases} \text{process}(h_i, r) & \text{if } \text{can_handle}(h_i, r) \ \text{Handle}(r, i+1) & \text{if } i < n \ \text{unhandled} & \text{otherwise} \end{cases}$$

链长度与复杂度

$$T_{\text{worst}} = O(n) \times T_{\text{check}}$$

其中 $n$ 为链长度,$T_{\text{check}}$ 为单个处理器的判断时间。

13.1.2 历史背景与学术脉络

时期发展阶段关键贡献代表人物/文献
1987概念萌芽Smalltalk中的异常处理机制ParcPlace Systems
1994GoF正式定义《设计模式》收录为行为型模式Gamma, Helm, Johnson, Vlissides
1995GUI事件事件冒泡与捕获模型Netscape, Microsoft
1998Java ServletFilter链式处理Sun Microsystems
2000中间件模式Web中间件管道Various Frameworks
2005Django中间件Python Web中间件链Django Community
2010Express.jsNode.js中间件模式TJ Holowaychuk
2015微服务服务网格过滤器链Netflix, Istio
2020现代演进异步责任链与响应式处理Async/Await Pattern

13.1.3 模式动机

问题场景:在软件系统中,请求处理面临以下挑战:

  1. 处理者不确定:请求可能需要不同层级处理
  2. 解耦需求:发送者不应知道具体处理者
  3. 动态配置:处理链需要运行时调整
  4. 扩展性:新增处理器不应影响现有代码

解决方案:将处理器组织成链式结构:

┌─────────────────────────────────────────────────────────────────────────┐
│                         责任链模式架构                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│    ┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐  │
│    │  Client  │─────▶│ Handler1 │─────▶│ Handler2 │─────▶│ Handler3 │  │
│    └──────────┘      └──────────┘      └──────────┘      └──────────┘  │
│                           │                  │                  │        │
│                           ▼                  ▼                  ▼        │
│                      ┌─────────┐        ┌─────────┐        ┌─────────┐  │
│                      │ 处理或  │        │ 处理或  │        │ 处理或  │  │
│                      │ 传递    │        │ 传递    │        │ 传递    │  │
│                      └─────────┘        └─────────┘        └─────────┘  │
│                                                                         │
│    处理流程:                                                             │
│    1. 请求进入链首                                                       │
│    2. 每个处理器判断是否处理                                              │
│    3. 处理则返回结果,否则传递给下一处理器                                │
│    4. 直到被处理或到达链尾                                                │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

13.1.4 纯责任链与非纯责任链

类型特征示例
纯责任链一个请求只能被一个处理器处理审批流程、异常处理
非纯责任链多个处理器可依次处理同一请求中间件管道、日志链
混合型部分处理器可终止链,部分继续传递HTTP过滤器链

13.2 理论基础

13.2.1 UML结构模型

┌─────────────────────────────────────────────────────────────────────────┐
│                         责任链模式结构图                                 │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────┐         ┌──────────────────────────┐  │
│  │      <<abstract>>           │         │        Client            │  │
│  │         Handler             │         ├──────────────────────────┤  │
│  ├─────────────────────────────┤         │ - chain: Handler         │  │
│  │ # successor: Handler        │◄────────│ + send_request(req)      │  │
│  ├─────────────────────────────┤         └──────────────────────────┘  │
│  │ + set_next(h): Handler      │                                        │
│  │ + handle(request): Response │                                        │
│  │ # can_handle(request): bool │                                        │
│  │ # process(request): Response│                                        │
│  └─────────────────────────────┘                                        │
│              △                                                          │
│              │ extends                                                  │
│     ┌────────┴────────┐                                                 │
│     │                 │                                                 │
│  ┌──┴──────────┐  ┌───┴──────────┐  ┌──────────────┐                   │
│  │ConcreteHdlr1│  │ConcreteHdlr2 │  │ConcreteHdlrN │                   │
│  ├─────────────┤  ├──────────────┤  ├──────────────┤                   │
│  │ - _config   │  │ - _config    │  │ - _config    │                   │
│  │ + handle()  │  │ + handle()   │  │ + handle()   │                   │
│  └─────────────┘  └──────────────┘  └──────────────┘                   │
│                                                                         │
│  关键设计决策:                                                           │
│  • 处理器是否自动传递(模板方法)                                        │
│  • 链的构建方式(构造时 vs 运行时)                                      │
│  • 处理结果是否影响后续传递                                              │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

13.2.2 参与者职责

参与者职责Python实现要点
Handler定义处理接口,实现后继者链接ABC + 模板方法
ConcreteHandler处理负责的请求,访问后继者继承Handler,实现具体逻辑
Client向链首提交请求构建链,发起请求

13.2.3 链式构建模式

┌─────────────────────────────────────────────────────────────────────────┐
│                      责任链构建方式对比                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. 流式构建(Fluent Builder)                                          │
│     handler1.set_next(handler2).set_next(handler3)                     │
│                                                                         │
│  2. 列表构建(List-based)                                              │
│     chain = Chain([handler1, handler2, handler3])                      │
│                                                                         │
│  3. 装饰器构建(Decorator-based)                                       │
│     @middleware1                                                        │
│     @middleware2                                                        │
│     def handler(request): ...                                           │
│                                                                         │
│  4. 配置驱动(Config-driven)                                           │
│     chain = ChainFactory.from_config(config)                           │
│                                                                         │
│  5. 依赖注入(DI Container)                                            │
│     container.build_chain('approval_chain')                            │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

13.3 Python实现

13.3.1 标准ABC实现

python
from abc import ABC, abstractmethod
from typing import Optional, Any, TypeVar, Generic
from dataclasses import dataclass


T = TypeVar('T')
R = TypeVar('R')


@dataclass
class Request:
    """请求对象"""
    id: str
    type: str
    data: Any
    metadata: dict = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}


@dataclass
class Response:
    """响应对象"""
    success: bool
    result: Any = None
    message: str = ""
    handler: str = ""


class Handler(ABC, Generic[T, R]):
    """处理器抽象基类"""
    
    def __init__(self, name: str = ""):
        self._name = name or self.__class__.__name__
        self._next: Optional['Handler'] = None
    
    def set_next(self, handler: 'Handler') -> 'Handler':
        """设置下一个处理器(支持链式调用)"""
        self._next = handler
        return handler
    
    def handle(self, request: T) -> Optional[R]:
        """处理请求(模板方法)"""
        if self.can_handle(request):
            result = self.process(request)
            if result is not None:
                return result
        
        if self._next:
            return self._next.handle(request)
        
        return None
    
    def can_handle(self, request: T) -> bool:
        """判断是否能处理该请求"""
        return True
    
    @abstractmethod
    def process(self, request: T) -> Optional[R]:
        """具体处理逻辑"""
        pass
    
    @property
    def name(self) -> str:
        return self._name


class MonkeyHandler(Handler[Request, Response]):
    """猴子处理器"""
    
    def can_handle(self, request: Request) -> bool:
        return request.type == "banana"
    
    def process(self, request: Request) -> Optional[Response]:
        return Response(
            success=True,
            result=f"猴子吃了{request.data}",
            handler=self._name
        )


class SquirrelHandler(Handler[Request, Response]):
    """松鼠处理器"""
    
    def can_handle(self, request: Request) -> bool:
        return request.type == "nut"
    
    def process(self, request: Request) -> Optional[Response]:
        return Response(
            success=True,
            result=f"松鼠吃了{request.data}",
            handler=self._name
        )


class DogHandler(Handler[Request, Response]):
    """狗处理器"""
    
    def can_handle(self, request: Request) -> bool:
        return request.type == "meat"
    
    def process(self, request: Request) -> Optional[Response]:
        return Response(
            success=True,
            result=f"狗吃了{request.data}",
            handler=self._name
        )


class DefaultHandler(Handler[Request, Response]):
    """默认处理器:处理所有未被处理的请求"""
    
    def process(self, request: Request) -> Optional[Response]:
        return Response(
            success=False,
            result=None,
            message=f"没有处理器能处理类型为 '{request.type}' 的请求",
            handler=self._name
        )


class ChainBuilder:
    """责任链构建器"""
    
    @staticmethod
    def build(*handlers: Handler) -> Handler:
        """构建责任链"""
        if not handlers:
            raise ValueError("至少需要一个处理器")
        
        first = handlers[0]
        current = first
        
        for handler in handlers[1:]:
            current.set_next(handler)
            current = handler
        
        return first


if __name__ == "__main__":
    chain = ChainBuilder.build(
        MonkeyHandler("Monkey"),
        SquirrelHandler("Squirrel"),
        DogHandler("Dog"),
        DefaultHandler("Default")
    )
    
    requests = [
        Request("REQ-001", "banana", "香蕉"),
        Request("REQ-002", "nut", "坚果"),
        Request("REQ-003", "meat", "肉"),
        Request("REQ-004", "coffee", "咖啡"),
    ]
    
    for req in requests:
        response = chain.handle(req)
        print(f"{req.type}: {response.result or response.message} (by {response.handler})")

13.3.2 Protocol实现(结构化类型)

python
from typing import Protocol, Optional, Any, Callable, List
from dataclasses import dataclass
from functools import wraps


@dataclass
class Context:
    """请求上下文"""
    request_id: str
    data: Any
    attributes: dict = None
    processed: bool = False
    result: Any = None
    
    def __post_init__(self):
        if self.attributes is None:
            self.attributes = {}


class HandlerProtocol(Protocol):
    """处理器协议"""
    
    def __call__(self, ctx: Context, next_handler: Callable) -> Optional[Context]:
        ...


def create_middleware(func: Callable) -> HandlerProtocol:
    """将函数转换为中间件"""
    @wraps(func)
    def wrapper(ctx: Context, next_handler: Callable) -> Optional[Context]:
        return func(ctx, next_handler)
    return wrapper


class MiddlewareChain:
    """中间件链"""
    
    def __init__(self):
        self._middlewares: List[HandlerProtocol] = []
    
    def use(self, middleware: HandlerProtocol) -> 'MiddlewareChain':
        """添加中间件"""
        self._middlewares.append(middleware)
        return self
    
    def execute(self, ctx: Context) -> Context:
        """执行中间件链"""
        def create_runner(index: int) -> Callable:
            if index >= len(self._middlewares):
                return lambda c: c
            
            middleware = self._middlewares[index]
            
            def runner(context: Context) -> Context:
                return middleware(context, create_runner(index + 1))
            
            return runner
        
        runner = create_runner(0)
        return runner(ctx)


@create_middleware
def logging_middleware(ctx: Context, next_handler: Callable) -> Context:
    """日志中间件"""
    print(f"[LOG] 处理请求: {ctx.request_id}")
    result = next_handler(ctx)
    print(f"[LOG] 请求完成: {ctx.request_id}")
    return result


@create_middleware
def validation_middleware(ctx: Context, next_handler: Callable) -> Context:
    """验证中间件"""
    if not ctx.data:
        ctx.processed = True
        ctx.result = "错误:数据为空"
        return ctx
    return next_handler(ctx)


@create_middleware
def timing_middleware(ctx: Context, next_handler: Callable) -> Context:
    """计时中间件"""
    import time
    start = time.time()
    result = next_handler(ctx)
    elapsed = (time.time() - start) * 1000
    ctx.attributes['elapsed_ms'] = elapsed
    return result


@create_middleware
def business_middleware(ctx: Context, next_handler: Callable) -> Context:
    """业务处理中间件"""
    ctx.result = f"处理完成: {ctx.data}"
    ctx.processed = True
    return next_handler(ctx)


if __name__ == "__main__":
    chain = MiddlewareChain()
    chain.use(logging_middleware)
    chain.use(timing_middleware)
    chain.use(validation_middleware)
    chain.use(business_middleware)
    
    ctx = Context("REQ-001", "测试数据")
    result = chain.execute(ctx)
    
    print(f"\n结果: {result.result}")
    print(f"耗时: {result.attributes.get('elapsed_ms', 0):.2f}ms")

13.3.3 异步责任链

python
from abc import ABC, abstractmethod
from typing import Optional, Any, List, Awaitable, Callable
from dataclasses import dataclass
import asyncio


@dataclass
class AsyncRequest:
    """异步请求"""
    id: str
    payload: Any
    metadata: dict = None


@dataclass
class AsyncResponse:
    """异步响应"""
    success: bool
    data: Any = None
    error: str = ""


class AsyncHandler(ABC):
    """异步处理器基类"""
    
    def __init__(self):
        self._next: Optional['AsyncHandler'] = None
    
    def set_next(self, handler: 'AsyncHandler') -> 'AsyncHandler':
        self._next = handler
        return handler
    
    async def handle(self, request: AsyncRequest) -> Optional[AsyncResponse]:
        """异步处理请求"""
        result = await self.process(request)
        
        if result is not None:
            return result
        
        if self._next:
            return await self._next.handle(request)
        
        return None
    
    @abstractmethod
    async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
        """异步处理逻辑"""
        pass


class AsyncValidationHandler(AsyncHandler):
    """异步验证处理器"""
    
    async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
        await asyncio.sleep(0.01)
        
        if not request.payload:
            return AsyncResponse(
                success=False,
                error="负载不能为空"
            )
        
        return None


class AsyncAuthHandler(AsyncHandler):
    """异步认证处理器"""
    
    def __init__(self, valid_tokens: set):
        super().__init__()
        self._valid_tokens = valid_tokens
    
    async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
        await asyncio.sleep(0.01)
        
        token = request.metadata.get("token") if request.metadata else None
        
        if not token or token not in self._valid_tokens:
            return AsyncResponse(
                success=False,
                error="认证失败"
            )
        
        return None


class AsyncBusinessHandler(AsyncHandler):
    """异步业务处理器"""
    
    async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
        await asyncio.sleep(0.02)
        
        return AsyncResponse(
            success=True,
            data=f"处理完成: {request.payload}"
        )


class AsyncChain:
    """异步责任链"""
    
    def __init__(self):
        self._handlers: List[AsyncHandler] = []
    
    def add(self, handler: AsyncHandler) -> 'AsyncChain':
        self._handlers.append(handler)
        return self
    
    def build(self) -> Optional[AsyncHandler]:
        if not self._handlers:
            return None
        
        for i in range(len(self._handlers) - 1):
            self._handlers[i].set_next(self._handlers[i + 1])
        
        return self._handlers[0]
    
    async def execute(self, request: AsyncRequest) -> Optional[AsyncResponse]:
        chain = self.build()
        if chain:
            return await chain.handle(request)
        return None


async def main():
    chain = AsyncChain()
    chain.add(AsyncValidationHandler())
    chain.add(AsyncAuthHandler({"secret_token", "admin_token"}))
    chain.add(AsyncBusinessHandler())
    
    requests = [
        AsyncRequest("REQ-001", "数据1", {"token": "secret_token"}),
        AsyncRequest("REQ-002", "", {"token": "secret_token"}),
        AsyncRequest("REQ-003", "数据3", {"token": "invalid"}),
    ]
    
    for req in requests:
        response = await chain.execute(req)
        status = "成功" if response and response.success else "失败"
        print(f"{req.id}: {status} - {response.data if response else 'N/A'}")


if __name__ == "__main__":
    asyncio.run(main())

13.3.4 泛型责任链

python
from typing import TypeVar, Generic, Optional, Callable, List, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod


TRequest = TypeVar('TRequest')
TResponse = TypeVar('TResponse')


@dataclass
class ChainResult(Generic[TResponse]):
    """链处理结果"""
    handled: bool
    response: Optional[TResponse] = None
    handler_name: str = ""
    chain_position: int = -1


class GenericHandler(ABC, Generic[TRequest, TResponse]):
    """泛型处理器"""
    
    def __init__(self, name: str = ""):
        self._name = name or self.__class__.__name__
        self._next: Optional['GenericHandler'] = None
    
    @property
    def name(self) -> str:
        return self._name
    
    def set_next(self, handler: 'GenericHandler') -> 'GenericHandler':
        self._next = handler
        return handler
    
    def handle(
        self,
        request: TRequest,
        position: int = 0
    ) -> ChainResult[TResponse]:
        """处理请求并返回详细结果"""
        if self.can_handle(request):
            response = self.do_handle(request)
            if response is not None:
                return ChainResult(
                    handled=True,
                    response=response,
                    handler_name=self._name,
                    chain_position=position
                )
        
        if self._next:
            return self._next.handle(request, position + 1)
        
        return ChainResult(handled=False)
    
    @abstractmethod
    def can_handle(self, request: TRequest) -> bool:
        """判断是否能处理"""
        pass
    
    @abstractmethod
    def do_handle(self, request: TRequest) -> Optional[TResponse]:
        """执行处理"""
        pass


class ChainExecutor(Generic[TRequest, TResponse]):
    """责任链执行器"""
    
    def __init__(self):
        self._handlers: List[GenericHandler[TRequest, TResponse]] = []
        self._fallback: Optional[GenericHandler[TRequest, TResponse]] = None
    
    def add(self, handler: GenericHandler[TRequest, TResponse]) -> 'ChainExecutor':
        self._handlers.append(handler)
        return self
    
    def set_fallback(self, handler: GenericHandler[TRequest, TResponse]) -> 'ChainExecutor':
        self._fallback = handler
        return self
    
    def build(self) -> Optional[GenericHandler[TRequest, TResponse]]:
        if not self._handlers:
            return self._fallback
        
        for i in range(len(self._handlers) - 1):
            self._handlers[i].set_next(self._handlers[i + 1])
        
        if self._fallback:
            self._handlers[-1].set_next(self._fallback)
        
        return self._handlers[0]
    
    def execute(self, request: TRequest) -> ChainResult[TResponse]:
        chain = self.build()
        if chain:
            return chain.handle(request)
        return ChainResult(handled=False)
    
    def get_handler_names(self) -> List[str]:
        return [h.name for h in self._handlers]


@dataclass
class PaymentRequest:
    """支付请求"""
    amount: float
    currency: str
    method: str


@dataclass
class PaymentResponse:
    """支付响应"""
    transaction_id: str
    status: str
    message: str


class CreditCardHandler(GenericHandler[PaymentRequest, PaymentResponse]):
    """信用卡支付处理器"""
    
    def can_handle(self, request: PaymentRequest) -> bool:
        return request.method == "credit_card"
    
    def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
        return PaymentResponse(
            transaction_id=f"CC-{request.amount}",
            status="success",
            message="信用卡支付成功"
        )


class PayPalHandler(GenericHandler[PaymentRequest, PaymentResponse]):
    """PayPal支付处理器"""
    
    def can_handle(self, request: PaymentRequest) -> bool:
        return request.method == "paypal"
    
    def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
        return PaymentResponse(
            transaction_id=f"PP-{request.amount}",
            status="success",
            message="PayPal支付成功"
        )


class CryptoHandler(GenericHandler[PaymentRequest, PaymentResponse]):
    """加密货币支付处理器"""
    
    def can_handle(self, request: PaymentRequest) -> bool:
        return request.method == "crypto"
    
    def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
        return PaymentResponse(
            transaction_id=f"CR-{request.amount}",
            status="success",
            message="加密货币支付成功"
        )


class DefaultPaymentHandler(GenericHandler[PaymentRequest, PaymentResponse]):
    """默认支付处理器"""
    
    def can_handle(self, request: PaymentRequest) -> bool:
        return True
    
    def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
        return PaymentResponse(
            transaction_id="",
            status="failed",
            message=f"不支持的支付方式: {request.method}"
        )


if __name__ == "__main__":
    executor = ChainExecutor[PaymentRequest, PaymentResponse]()
    executor.add(CreditCardHandler("CreditCard"))
    executor.add(PayPalHandler("PayPal"))
    executor.add(CryptoHandler("Crypto"))
    executor.set_fallback(DefaultPaymentHandler("Default"))
    
    print(f"处理器链: {' -> '.join(executor.get_handler_names())}")
    
    requests = [
        PaymentRequest(100.0, "USD", "credit_card"),
        PaymentRequest(50.0, "EUR", "paypal"),
        PaymentRequest(0.1, "BTC", "crypto"),
        PaymentRequest(25.0, "USD", "bank_transfer"),
    ]
    
    for req in requests:
        result = executor.execute(req)
        print(f"\n{req.method}: {result.response.status}")
        print(f"  处理器: {result.handler_name}")
        print(f"  消息: {result.response.message}")

13.4 企业级应用示例

13.4.1 审批流程系统

python
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum, auto


class ApprovalStatus(Enum):
    """审批状态"""
    PENDING = auto()
    APPROVED = auto()
    REJECTED = auto()
    ESCALATED = auto()


class ApprovalAction(Enum):
    """审批动作"""
    APPROVE = auto()
    REJECT = auto()
    ESCALATE = auto()


@dataclass
class ApprovalRequest:
    """审批请求"""
    request_id: str
    title: str
    amount: Decimal
    category: str
    requester: str
    description: str = ""
    created_at: datetime = field(default_factory=datetime.now)
    history: List[Dict[str, Any]] = field(default_factory=list)
    
    def add_history(self, approver: str, action: ApprovalAction, comment: str = ""):
        self.history.append({
            'approver': approver,
            'action': action.name,
            'comment': comment,
            'timestamp': datetime.now().isoformat()
        })


@dataclass
class ApprovalResult:
    """审批结果"""
    request_id: str
    status: ApprovalStatus
    approver: str
    comment: str = ""
    next_level: str = ""


class Approver(ABC):
    """审批人抽象类"""
    
    def __init__(self, name: str, title: str, approval_limit: Decimal):
        self._name = name
        self._title = title
        self._approval_limit = approval_limit
        self._next: Optional['Approver'] = None
    
    def set_next(self, approver: 'Approver') -> 'Approver':
        self._next = approver
        return approver
    
    @property
    def name(self) -> str:
        return f"{self._title} {self._name}"
    
    @property
    def limit(self) -> Decimal:
        return self._approval_limit
    
    def can_approve(self, request: ApprovalRequest) -> bool:
        """判断是否能审批"""
        return request.amount <= self._approval_limit
    
    def approve(self, request: ApprovalRequest) -> ApprovalResult:
        """处理审批请求"""
        if self.can_approve(request):
            request.add_history(self.name, ApprovalAction.APPROVE)
            return ApprovalResult(
                request_id=request.request_id,
                status=ApprovalStatus.APPROVED,
                approver=self.name,
                comment=f"金额 ¥{request.amount} 在审批权限内"
            )
        
        if self._next:
            request.add_history(self.name, ApprovalAction.ESCALATE, 
                              f"金额超出权限,转交上级")
            return self._next.approve(request)
        
        request.add_history(self.name, ApprovalAction.REJECT, "无上级审批人")
        return ApprovalResult(
            request_id=request.request_id,
            status=ApprovalStatus.REJECTED,
            approver=self.name,
            comment="无法找到合适的审批人"
        )


class Manager(Approver):
    """经理"""
    
    def __init__(self, name: str):
        super().__init__(name, "经理", Decimal("10000"))


class Director(Approver):
    """总监"""
    
    def __init__(self, name: str):
        super().__init__(name, "总监", Decimal("50000"))


class VicePresident(Approver):
    """副总裁"""
    
    def __init__(self, name: str):
        super().__init__(name, "副总裁", Decimal("200000"))


class CEO(Approver):
    """CEO"""
    
    def __init__(self, name: str):
        super().__init__(name, "CEO", Decimal("1000000"))


class ApprovalSystem:
    """审批系统"""
    
    def __init__(self):
        self._approval_chain: Optional[Approver] = None
        self._requests: Dict[str, ApprovalRequest] = {}
    
    def setup_chain(self, *approvers: Approver) -> None:
        """设置审批链"""
        if not approvers:
            return
        
        self._approval_chain = approvers[0]
        current = self._approval_chain
        
        for approver in approvers[1:]:
            current.set_next(approver)
            current = approver
    
    def submit_request(self, request: ApprovalRequest) -> ApprovalResult:
        """提交审批请求"""
        self._requests[request.request_id] = request
        
        if not self._approval_chain:
            return ApprovalResult(
                request_id=request.request_id,
                status=ApprovalStatus.REJECTED,
                approver="系统",
                comment="审批链未配置"
            )
        
        return self._approval_chain.approve(request)
    
    def get_request(self, request_id: str) -> Optional[ApprovalRequest]:
        return self._requests.get(request_id)


if __name__ == "__main__":
    system = ApprovalSystem()
    
    system.setup_chain(
        Manager("张三"),
        Director("李四"),
        VicePresident("王五"),
        CEO("赵六")
    )
    
    requests = [
        ApprovalRequest("REQ-001", "办公用品采购", Decimal("5000"), "采购", "员工A"),
        ApprovalRequest("REQ-002", "服务器设备", Decimal("30000"), "IT", "员工B"),
        ApprovalRequest("REQ-003", "市场推广活动", Decimal("100000"), "市场", "员工C"),
        ApprovalRequest("REQ-004", "新办公室装修", Decimal("500000"), "行政", "员工D"),
    ]
    
    for req in requests:
        print(f"\n{'='*50}")
        print(f"请求: {req.title}{req.amount})")
        print(f"申请人: {req.requester}")
        
        result = system.submit_request(req)
        
        print(f"\n审批结果: {result.status.name}")
        print(f"审批人: {result.approver}")
        print(f"备注: {result.comment}")
        
        print(f"\n审批历史:")
        for h in req.history:
            print(f"  - {h['approver']}: {h['action']} ({h['comment']})")

13.4.2 Web中间件管道

python
from typing import Optional, Dict, Any, Callable, List
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from datetime import datetime
import time
import json


@dataclass
class HTTPRequest:
    """HTTP请求"""
    method: str
    path: str
    headers: Dict[str, str] = field(default_factory=dict)
    query_params: Dict[str, str] = field(default_factory=dict)
    body: Any = None
    client_ip: str = ""
    attributes: Dict[str, Any] = field(default_factory=dict)


@dataclass
class HTTPResponse:
    """HTTP响应"""
    status_code: int = 200
    headers: Dict[str, str] = field(default_factory=dict)
    body: Any = None
    
    def json(self) -> dict:
        return {"status": self.status_code, "data": self.body}


class Middleware(ABC):
    """中间件抽象类"""
    
    def __init__(self):
        self._next: Optional['Middleware'] = None
    
    def set_next(self, middleware: 'Middleware') -> 'Middleware':
        self._next = middleware
        return middleware
    
    @abstractmethod
    def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
        pass
    
    def _call_next(self, request: HTTPRequest) -> HTTPResponse:
        if self._next:
            return self._next.process(request, lambda r: self._next._call_next(r))
        return HTTPResponse(status_code=404, body={"error": "No handler found"})


class CORSMiddleware(Middleware):
    """CORS中间件"""
    
    def __init__(self, allowed_origins: List[str] = None):
        super().__init__()
        self._allowed_origins = allowed_origins or ["*"]
    
    def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
        if request.method == "OPTIONS":
            return HTTPResponse(
                status_code=200,
                headers={
                    "Access-Control-Allow-Origin": "*",
                    "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
                    "Access-Control-Allow-Headers": "Content-Type, Authorization"
                }
            )
        
        response = next_handler(request)
        response.headers["Access-Control-Allow-Origin"] = "*"
        return response


class RateLimitMiddleware(Middleware):
    """限流中间件"""
    
    def __init__(self, max_requests: int = 100, window_seconds: int = 60):
        super().__init__()
        self._max_requests = max_requests
        self._window_seconds = window_seconds
        self._request_counts: Dict[str, List[float]] = {}
    
    def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
        client_id = request.client_ip
        current_time = time.time()
        
        if client_id not in self._request_counts:
            self._request_counts[client_id] = []
        
        self._request_counts[client_id] = [
            t for t in self._request_counts[client_id]
            if current_time - t < self._window_seconds
        ]
        
        if len(self._request_counts[client_id]) >= self._max_requests:
            return HTTPResponse(
                status_code=429,
                body={"error": "Too Many Requests"}
            )
        
        self._request_counts[client_id].append(current_time)
        return next_handler(request)


class AuthMiddleware(Middleware):
    """认证中间件"""
    
    def __init__(self, public_paths: List[str] = None):
        super().__init__()
        self._public_paths = public_paths or ["/health", "/login"]
        self._tokens: Dict[str, dict] = {}
    
    def add_token(self, token: str, user_info: dict) -> None:
        self._tokens[token] = user_info
    
    def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
        if request.path in self._public_paths:
            return next_handler(request)
        
        auth_header = request.headers.get("Authorization", "")
        
        if not auth_header.startswith("Bearer "):
            return HTTPResponse(
                status_code=401,
                body={"error": "Missing or invalid Authorization header"}
            )
        
        token = auth_header[7:]
        
        if token not in self._tokens:
            return HTTPResponse(
                status_code=401,
                body={"error": "Invalid token"}
            )
        
        request.attributes["user"] = self._tokens[token]
        return next_handler(request)


class LoggingMiddleware(Middleware):
    """日志中间件"""
    
    def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
        start_time = time.time()
        
        print(f"[{datetime.now().isoformat()}] {request.method} {request.path}")
        
        response = next_handler(request)
        
        elapsed = (time.time() - start_time) * 1000
        print(f"  -> {response.status_code} ({elapsed:.2f}ms)")
        
        response.headers["X-Response-Time"] = f"{elapsed:.2f}ms"
        return response


class RouterMiddleware(Middleware):
    """路由中间件"""
    
    def __init__(self):
        super().__init__()
        self._routes: Dict[str, Dict[str, Callable]] = {}
    
    def add_route(self, method: str, path: str, handler: Callable) -> None:
        if path not in self._routes:
            self._routes[path] = {}
        self._routes[path][method] = handler
    
    def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
        if request.path not in self._routes:
            return HTTPResponse(
                status_code=404,
                body={"error": f"Path not found: {request.path}"}
            )
        
        handler = self._routes[request.path].get(request.method)
        
        if not handler:
            return HTTPResponse(
                status_code=405,
                body={"error": f"Method not allowed: {request.method}"}
            )
        
        return handler(request)


class Application:
    """Web应用"""
    
    def __init__(self):
        self._middleware_chain: Optional[Middleware] = None
        self._router = RouterMiddleware()
        self._middlewares: List[Middleware] = []
    
    def use(self, middleware: Middleware) -> 'Application':
        self._middlewares.append(middleware)
        return self
    
    def get(self, path: str, handler: Callable) -> 'Application':
        self._router.add_route("GET", path, handler)
        return self
    
    def post(self, path: str, handler: Callable) -> 'Application':
        self._router.add_route("POST", path, handler)
        return self
    
    def build(self) -> None:
        all_middlewares = self._middlewares + [self._router]
        
        for i in range(len(all_middlewares) - 1):
            all_middlewares[i].set_next(all_middlewares[i + 1])
        
        self._middleware_chain = all_middlewares[0] if all_middlewares else None
    
    def handle(self, request: HTTPRequest) -> HTTPResponse:
        if not self._middleware_chain:
            self.build()
        
        if self._middleware_chain:
            return self._middleware_chain.process(
                request,
                lambda r: self._middleware_chain._call_next(r)
            )
        
        return HTTPResponse(status_code=500, body={"error": "No middleware configured"})


if __name__ == "__main__":
    auth = AuthMiddleware()
    auth.add_token("secret_token", {"user_id": 1, "username": "admin", "role": "admin"})
    
    app = Application()
    app.use(CORSMiddleware())
    app.use(LoggingMiddleware())
    app.use(RateLimitMiddleware(max_requests=10))
    app.use(auth)
    
    app.get("/health", lambda req: HTTPResponse(200, {"status": "ok"}))
    app.get("/api/users", lambda req: HTTPResponse(200, {"users": ["Alice", "Bob"]}))
    app.post("/api/posts", lambda req: HTTPResponse(201, {"created": req.body}))
    
    requests = [
        HTTPRequest("GET", "/health", client_ip="127.0.0.1"),
        HTTPRequest("GET", "/api/users", {"Authorization": "Bearer secret_token"}, client_ip="127.0.0.1"),
        HTTPRequest("POST", "/api/posts", {"Authorization": "Bearer secret_token"}, body={"title": "Hello"}, client_ip="127.0.0.1"),
        HTTPRequest("GET", "/api/users", {}, client_ip="127.0.0.1"),
        HTTPRequest("GET", "/unknown", client_ip="127.0.0.1"),
    ]
    
    for req in requests:
        response = app.handle(req)
        print(f"Response: {response.json()}\n")

13.4.3 事件处理系统

python
from typing import Optional, List, Dict, Any, Callable, Set
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum, auto
import asyncio


class EventPriority(Enum):
    """事件优先级"""
    LOWEST = 0
    LOW = 25
    NORMAL = 50
    HIGH = 75
    HIGHEST = 100
    MONITOR = 125


class EventResult(Enum):
    """事件处理结果"""
    CONTINUE = auto()
    HANDLED = auto()
    CANCELLED = auto()


@dataclass
class Event:
    """事件"""
    event_type: str
    source: str
    data: Any = None
    timestamp: datetime = field(default_factory=datetime.now)
    cancelled: bool = False
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def cancel(self) -> None:
        self.cancelled = True


@dataclass
class EventContext:
    """事件上下文"""
    event: Event
    handler_name: str = ""
    result: EventResult = EventResult.CONTINUE
    processing_time_ms: float = 0.0


class EventHandler(ABC):
    """事件处理器"""
    
    def __init__(self, name: str = "", priority: EventPriority = EventPriority.NORMAL):
        self._name = name or self.__class__.__name__
        self._priority = priority
        self._event_types: Set[str] = set()
    
    @property
    def name(self) -> str:
        return self._name
    
    @property
    def priority(self) -> EventPriority:
        return self._priority
    
    def listens_to(self, *event_types: str) -> 'EventHandler':
        self._event_types.update(event_types)
        return self
    
    def can_handle(self, event: Event) -> bool:
        return not event.cancelled and (
            not self._event_types or event.event_type in self._event_types
        )
    
    @abstractmethod
    def handle(self, event: Event) -> EventResult:
        pass


class EventDispatcher:
    """事件分发器"""
    
    def __init__(self):
        self._handlers: List[EventHandler] = []
        self._event_log: List[EventContext] = []
    
    def register(self, handler: EventHandler) -> 'EventDispatcher':
        self._handlers.append(handler)
        self._handlers.sort(key=lambda h: h.priority.value, reverse=True)
        return self
    
    def unregister(self, handler_name: str) -> bool:
        for i, h in enumerate(self._handlers):
            if h.name == handler_name:
                del self._handlers[i]
                return True
        return False
    
    def dispatch(self, event: Event) -> EventContext:
        """分发事件"""
        import time
        start = time.time()
        
        context = EventContext(event=event)
        
        for handler in self._handlers:
            if not handler.can_handle(event):
                continue
            
            context.handler_name = handler.name
            
            try:
                result = handler.handle(event)
                context.result = result
                
                if result == EventResult.CANCELLED or event.cancelled:
                    break
                
            except Exception as e:
                context.result = EventResult.CANCELLED
                event.metadata['error'] = str(e)
                break
        
        context.processing_time_ms = (time.time() - start) * 1000
        self._event_log.append(context)
        
        return context
    
    def get_handlers(self) -> List[str]:
        return [h.name for h in self._handlers]
    
    def get_event_log(self, limit: int = 100) -> List[EventContext]:
        return self._event_log[-limit:]


class LoggingHandler(EventHandler):
    """日志处理器"""
    
    def __init__(self):
        super().__init__("Logger", EventPriority.MONITOR)
    
    def handle(self, event: Event) -> EventResult:
        print(f"[LOG] {event.event_type} from {event.source}: {event.data}")
        return EventResult.CONTINUE


class ValidationHandler(EventHandler):
    """验证处理器"""
    
    def __init__(self):
        super().__init__("Validator", EventPriority.HIGHEST)
        self.listens_to("user.create", "user.update")
    
    def handle(self, event: Event) -> EventResult:
        if not event.data:
            print(f"[VALIDATOR] 数据为空,取消事件")
            event.cancel()
            return EventResult.CANCELLED
        return EventResult.CONTINUE


class AuthHandler(EventHandler):
    """认证处理器"""
    
    def __init__(self):
        super().__init__("Auth", EventPriority.HIGH)
        self.listens_to("user.delete", "admin.action")
    
    def handle(self, event: Event) -> EventResult:
        user = event.metadata.get("user")
        if not user or user.get("role") != "admin":
            print(f"[AUTH] 权限不足,取消事件")
            event.cancel()
            return EventResult.CANCELLED
        return EventResult.CONTINUE


class NotificationHandler(EventHandler):
    """通知处理器"""
    
    def __init__(self):
        super().__init__("Notifier", EventPriority.LOW)
        self.listens_to("user.create", "order.complete")
    
    def handle(self, event: Event) -> EventResult:
        print(f"[NOTIFY] 发送通知: {event.event_type}")
        return EventResult.CONTINUE


class DatabaseHandler(EventHandler):
    """数据库处理器"""
    
    def __init__(self):
        super().__init__("Database", EventPriority.NORMAL)
    
    def handle(self, event: Event) -> EventResult:
        print(f"[DB] 保存事件: {event.event_type} -> {event.data}")
        return EventResult.HANDLED


if __name__ == "__main__":
    dispatcher = EventDispatcher()
    
    dispatcher.register(LoggingHandler())
    dispatcher.register(ValidationHandler())
    dispatcher.register(AuthHandler())
    dispatcher.register(NotificationHandler())
    dispatcher.register(DatabaseHandler())
    
    print(f"处理器链: {' -> '.join(dispatcher.get_handlers())}\n")
    
    events = [
        Event("user.create", "API", {"name": "Alice", "email": "alice@example.com"}),
        Event("user.create", "API", None),
        Event("user.delete", "API", {"user_id": 123}, metadata={"user": {"role": "user"}}),
        Event("user.delete", "API", {"user_id": 456}, metadata={"user": {"role": "admin"}}),
        Event("order.complete", "System", {"order_id": "ORD-001"}),
    ]
    
    for event in events:
        print(f"\n--- 处理事件: {event.event_type} ---")
        context = dispatcher.dispatch(event)
        print(f"结果: {context.result.name}, 处理器: {context.handler_name}")
        print(f"耗时: {context.processing_time_ms:.2f}ms")

13.5 模式变体与扩展

13.5.1 双向责任链

python
from typing import Optional, Any, List
from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class BidirectionalRequest:
    """双向请求"""
    data: Any
    forward_processed: bool = False
    backward_processed: bool = False
    forward_result: Any = None
    backward_result: Any = None


class BidirectionalHandler(ABC):
    """双向处理器"""
    
    def __init__(self, name: str = ""):
        self._name = name or self.__class__.__name__
        self._next: Optional['BidirectionalHandler'] = None
        self._prev: Optional['BidirectionalHandler'] = None
    
    def set_next(self, handler: 'BidirectionalHandler') -> 'BidirectionalHandler':
        self._next = handler
        handler._prev = self
        return handler
    
    def process_forward(self, request: BidirectionalRequest) -> None:
        """前向处理"""
        self.do_forward(request)
        request.forward_processed = True
        
        if self._next:
            self._next.process_forward(request)
    
    def process_backward(self, request: BidirectionalRequest) -> None:
        """后向处理"""
        if self._next:
            self._next.process_backward(request)
        
        self.do_backward(request)
        request.backward_processed = True
    
    @abstractmethod
    def do_forward(self, request: BidirectionalRequest) -> None:
        pass
    
    @abstractmethod
    def do_backward(self, request: BidirectionalRequest) -> None:
        pass
    
    @property
    def name(self) -> str:
        return self._name


class EncodingHandler(BidirectionalHandler):
    """编码处理器"""
    
    def do_forward(self, request: BidirectionalRequest) -> None:
        request.data = request.data.encode('utf-8')
        print(f"[{self.name}] 前向: 编码数据")
    
    def do_backward(self, request: BidirectionalRequest) -> None:
        request.backward_result = request.data.decode('utf-8')
        print(f"[{self.name}] 后向: 解码数据")


class CompressionHandler(BidirectionalHandler):
    """压缩处理器"""
    
    def do_forward(self, request: BidirectionalRequest) -> None:
        import zlib
        request.data = zlib.compress(request.data)
        print(f"[{self.name}] 前向: 压缩数据")
    
    def do_backward(self, request: BidirectionalRequest) -> None:
        import zlib
        request.data = zlib.decompress(request.data)
        print(f"[{self.name}] 后向: 解压数据")


class EncryptionHandler(BidirectionalHandler):
    """加密处理器"""
    
    def do_forward(self, request: BidirectionalRequest) -> None:
        request.data = bytes([b ^ 0x55 for b in request.data])
        print(f"[{self.name}] 前向: 加密数据")
    
    def do_backward(self, request: BidirectionalRequest) -> None:
        request.data = bytes([b ^ 0x55 for b in request.data])
        print(f"[{self.name}] 后向: 解密数据")


if __name__ == "__main__":
    encoding = EncodingHandler("Encoding")
    compression = CompressionHandler("Compression")
    encryption = EncryptionHandler("Encryption")
    
    encoding.set_next(compression).set_next(encryption)
    
    request = BidirectionalRequest(data="Hello, World!")
    
    print("=== 前向处理(发送)===")
    encoding.process_forward(request)
    print(f"处理后数据: {request.data[:20]}...")
    
    print("\n=== 后向处理(接收)===")
    encoding.process_backward(request)
    print(f"最终结果: {request.backward_result}")

13.5.2 树形责任链

python
from typing import Optional, Any, List
from abc import ABC, abstractmethod
from dataclasses import dataclass


@dataclass
class TreeRequest:
    """树形请求"""
    path: str
    data: Any
    matched: bool = False
    result: Any = None


class TreeNode(ABC):
    """树节点处理器"""
    
    def __init__(self, name: str):
        self._name = name
        self._children: List['TreeNode'] = []
        self._parent: Optional['TreeNode'] = None
    
    def add_child(self, child: 'TreeNode') -> 'TreeNode':
        self._children.append(child)
        child._parent = self
        return child
    
    @property
    def name(self) -> str:
        return self._name
    
    @abstractmethod
    def can_handle(self, request: TreeRequest) -> bool:
        pass
    
    @abstractmethod
    def process(self, request: TreeRequest) -> Any:
        pass
    
    def handle(self, request: TreeRequest) -> bool:
        """处理请求(深度优先)"""
        if self.can_handle(request):
            request.result = self.process(request)
            request.matched = True
            return True
        
        for child in self._children:
            if child.handle(request):
                return True
        
        return False


class PathNode(TreeNode):
    """路径节点"""
    
    def __init__(self, path_segment: str):
        super().__init__(path_segment)
        self._segment = path_segment
    
    def can_handle(self, request: TreeRequest) -> bool:
        parts = request.path.strip("/").split("/")
        return self._segment in parts or self._segment == "*"
    
    def process(self, request: TreeRequest) -> Any:
        return f"处理路径节点: {self._segment}"


class RouterTree:
    """路由树"""
    
    def __init__(self):
        self._root = TreeNode("root")
    
    def add_route(self, path: str, handler: TreeNode) -> None:
        segments = path.strip("/").split("/")
        current = self._root
        
        for segment in segments:
            found = None
            for child in current._children:
                if child.name == segment:
                    found = child
                    break
            
            if found:
                current = found
            else:
                new_node = PathNode(segment)
                current.add_child(new_node)
                current = new_node
        
        current.add_child(handler)
    
    def route(self, request: TreeRequest) -> bool:
        return self._root.handle(request)


if __name__ == "__main__":
    class APIHandler(TreeNode):
        def __init__(self, name: str, response: str):
            super().__init__(name)
            self._response = response
        
        def can_handle(self, request: TreeRequest) -> bool:
            return True
        
        def process(self, request: TreeRequest) -> Any:
            return self._response
    
    router = RouterTree()
    
    router.add_route("/api/users", APIHandler("UsersHandler", "用户列表"))
    router.add_route("/api/users/{id}", APIHandler("UserHandler", "用户详情"))
    router.add_route("/api/posts", APIHandler("PostsHandler", "文章列表"))
    router.add_route("/api/posts/{id}/comments", APIHandler("CommentsHandler", "评论列表"))
    
    requests = [
        TreeRequest("/api/users", None),
        TreeRequest("/api/posts", None),
        TreeRequest("/api/posts/123/comments", None),
        TreeRequest("/api/unknown", None),
    ]
    
    for req in requests:
        matched = router.route(req)
        print(f"{req.path}: {'匹配' if matched else '未匹配'} -> {req.result}")

13.6 反模式与最佳实践

13.6.1 常见反模式

反模式1:过长的责任链

python
from typing import Optional, Any
from abc import ABC, abstractmethod


class Handler(ABC):
    def __init__(self):
        self._next: Optional['Handler'] = None
    
    def set_next(self, handler: 'Handler') -> 'Handler':
        self._next = handler
        return handler
    
    @abstractmethod
    def handle(self, request: Any) -> Any:
        pass


class SlowHandler(Handler):
    """错误示例:每个处理器都有延迟"""
    
    def __init__(self, name: str):
        super().__init__()
        self._name = name
    
    def handle(self, request: Any) -> Any:
        import time
        time.sleep(0.01)
        print(f"[{self._name}] 处理中...")
        
        if self._next:
            return self._next.handle(request)
        return "完成"


class OptimizedHandler(Handler):
    """正确示例:快速判断,避免不必要的传递"""
    
    def __init__(self, name: str, condition: callable):
        super().__init__()
        self._name = name
        self._condition = condition
    
    def handle(self, request: Any) -> Any:
        if not self._condition(request):
            if self._next:
                return self._next.handle(request)
            return None
        
        print(f"[{self._name}] 处理请求")
        return f"由 {self._name} 处理"


if __name__ == "__main__":
    import time
    
    print("=== 过长责任链(性能问题)===")
    slow_chain = SlowHandler("H1")
    current = slow_chain
    for i in range(2, 11):
        current.set_next(SlowHandler(f"H{i}"))
        current = current._next
    
    start = time.time()
    result = slow_chain.handle("request")
    print(f"耗时: {(time.time() - start)*1000:.2f}ms\n")
    
    print("=== 优化后的责任链 ===")
    fast_chain = OptimizedHandler("H1", lambda r: r == "target1")
    fast_chain.set_next(OptimizerHandler("H2", lambda r: r == "target2"))
    
    start = time.time()
    result = fast_chain.handle("target2")
    print(f"耗时: {(time.time() - start)*1000:.2f}ms")

反模式2:循环链

python
from typing import Optional, Any


class SafeHandler:
    """安全处理器:防止循环"""
    
    MAX_DEPTH = 100
    
    def __init__(self, name: str):
        self._name = name
        self._next: Optional['SafeHandler'] = None
    
    def set_next(self, handler: 'SafeHandler') -> 'SafeHandler':
        if self._would_create_cycle(handler):
            raise ValueError("检测到循环链")
        self._next = handler
        return handler
    
    def _would_create_cycle(self, handler: 'SafeHandler') -> bool:
        current = handler
        while current:
            if current is self:
                return True
            current = current._next
        return False
    
    def handle(self, request: Any, depth: int = 0) -> Any:
        if depth > self.MAX_DEPTH:
            raise RuntimeError("责任链深度超过限制,可能存在循环")
        
        print(f"[{self._name}] 处理请求 (深度: {depth})")
        
        if self._next:
            return self._next.handle(request, depth + 1)
        
        return "处理完成"


if __name__ == "__main__":
    h1 = SafeHandler("H1")
    h2 = SafeHandler("H2")
    h3 = SafeHandler("H3")
    
    h1.set_next(h2)
    h2.set_next(h3)
    
    try:
        h3.set_next(h1)
    except ValueError as e:
        print(f"错误: {e}")
    
    result = h1.handle("test")
    print(f"结果: {result}")

13.6.2 最佳实践清单

实践说明代码示例
限制链长度避免过长的责任链MAX_DEPTH = 20
快速判断处理器快速判断是否能处理if not can_handle(): return next
防止循环检测并防止循环链构建时检查
日志追踪记录请求经过的处理器request.trace.append(handler_name)
默认处理链尾添加默认处理器DefaultHandler
优先级排序按优先级组织处理器handlers.sort(key=priority)
异常处理处理器异常不应中断链try-except 包装
性能监控监控链处理时间elapsed = time.time() - start

13.7 决策指南

13.7.1 是否使用责任链模式?

┌─────────────────────────────────────────────────────────────┐
│                    责任链模式决策树                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  问题:是否有多个对象可能处理同一请求?                      │
│         │                                                   │
│         ├── 否 ──→ 考虑策略模式                             │
│         │                                                   │
│         └── 是                                              │
│              │                                              │
│              ▼                                              │
│  问题:处理者是否需要在运行时动态确定?                      │
│         │                                                   │
│         ├── 否 ──→ 考虑硬编码的条件判断                     │
│         │                                                   │
│         └── 是                                              │
│              │                                              │
│              ▼                                              │
│  问题:是否需要解耦发送者和接收者?                          │
│         │                                                   │
│         ├── 否 ──→ 考虑直接调用                             │
│         │                                                   │
│         └── 是 ──→ ✓ 使用责任链模式                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

13.7.2 实现技术选择

场景推荐实现理由
简单链类继承 + set_next直观、易理解
中间件管道函数 + 闭包灵活、可组合
异步处理async/await非阻塞、高性能
事件系统观察者 + 责任链支持多播
路由匹配树形结构高效路径查找

13.7.3 与其他模式的关系

模式关系协作方式
组合结构相似责任链是线性组合
命令请求封装命令对象作为请求传递
装饰器链式增强装饰器链类似责任链
观察者通知机制观察者可组成责任链
策略算法选择策略选择可由责任链决定

13.8 快速参考卡片

┌─────────────────────────────────────────────────────────────────────────┐
│                        责任链模式速查表                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  定义: 使多个对象都有机会处理请求,避免发送者与接收者耦合                │
│                                                                         │
│  核心公式:                                                               │
│    Handle(r, i) = process(hi, r) if can_handle(hi, r) else Handle(r, i+1)│
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  参与者:                                                                 │
│    • Handler         → 定义处理接口和后继者                             │
│    • ConcreteHandler → 处理负责的请求,传递不负责的                     │
│    • Client          → 向链首提交请求                                   │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  链类型:                                                                 │
│    纯责任链:   一个请求只被一个处理器处理                                │
│    非纯责任链: 多个处理器可依次处理                                      │
│    双向链:     前向处理 + 后向处理                                       │
│    树形链:     分支处理                                                  │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  Python实现要点:                                                         │
│    class Handler:                                                       │
│        def __init__(self):                                              │
│            self._next = None                                            │
│                                                                         │
│        def set_next(self, handler):                                     │
│            self._next = handler                                         │
│            return handler  # 支持链式调用                               │
│                                                                         │
│        def handle(self, request):                                       │
│            if self.can_handle(request):                                 │
│                return self.process(request)                             │
│            if self._next:                                               │
│                return self._next.handle(request)                        │
│            return None                                                  │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  适用场景:                                                               │
│    ✓ 有多个对象可以处理请求                                              │
│    ✓ 处理者在运行时确定                                                  │
│    ✓ 需要解耦发送者和接收者                                              │
│    ✓ 需要动态配置处理流程                                                │
│                                                                         │
│  不适用场景:                                                             │
│    ✗ 处理者固定且数量少                                                  │
│    ✗ 性能敏感的热点代码                                                  │
│    ✗ 请求必须被所有处理器处理                                            │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  经典案例:                                                               │
│    • 审批流程                                                            │
│    • HTTP中间件                                                          │
│    • 事件处理系统                                                        │
│    • 异常处理链                                                          │
│    • 日志处理器链                                                        │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

13.9 思考与实践

13.9.1 思考题

  1. 概念辨析:责任链模式与装饰器模式都使用链式结构,它们的核心区别是什么?什么情况下两者可以结合使用?

  2. 性能优化:当责任链很长时,如何优化性能?考虑使用跳表、索引或缓存等技术。

  3. 循环检测:如何在不增加额外空间的情况下检测责任链中的循环?

  4. 中断与继续:设计一个支持"中断后继续"的责任链,即某个处理器可以中断处理,但允许后续处理器继续执行。

  5. 分布式责任链:如何将责任链模式扩展到分布式环境?考虑服务间通信和容错机制。

13.9.2 实践练习

练习1:实现可配置的审批流程

设计一个审批流程系统,支持:

  • 从配置文件加载审批链
  • 动态添加/移除审批人
  • 条件分支(根据金额、类型等选择不同审批路径)
  • 审批历史记录和回溯

练习2:实现Servlet风格的过滤器链

设计一个Web过滤器框架,支持:

  • 过滤器注册和排序
  • 请求/响应双向处理
  • 异步过滤器支持
  • 过滤器生命周期管理

练习3:实现事件总线

设计一个事件总线系统,支持:

  • 多层级事件处理
  • 事件优先级
  • 事件传播控制(冒泡/捕获)
  • 异步事件处理

13.10 小结

责任链模式是一种行为型设计模式,通过将处理器组织成链式结构,实现请求的灵活处理和发送者与接收者的解耦。本章深入探讨了:

  1. 理论基础:责任链模式的形式化定义、纯与非纯责任链的区别
  2. 实现技术:ABC、Protocol、异步责任链、泛型责任链等多种Python实现方式
  3. 企业应用:审批流程系统、Web中间件管道、事件处理系统等实际案例
  4. 模式变体:双向责任链、树形责任链等扩展形式
  5. 最佳实践:避免过长责任链、循环链等反模式,掌握性能优化技巧

责任链模式的核心价值在于:通过链式结构实现请求处理的灵活性和可扩展性,同时保持发送者和接收者的解耦。


参考资料

  1. Gamma, E., et al. Design Patterns: Elements of Reusable Object-Oriented Software. Addison-Wesley, 1994.
  2. Schmidt, D., et al. Pattern-Oriented Software Architecture. Wiley, 1996.
  3. Fowler, M. Patterns of Enterprise Application Architecture. Addison-Wesley, 2002.
  4. Python Documentation. abc — Abstract Base Classes. https://docs.python.org/3/library/abc.html
  5. Django Documentation. Middleware. https://docs.djangoproject.com/en/stable/topics/http/middleware/

Python技术丛书 - 江苏省宿城中等专业学校