第13章 责任链模式
学习目标
完成本章学习后,读者将能够:
- 理解责任链模式的核心概念与形式化定义
- 掌握链式处理请求的多种实现方式
- 设计纯责任链与非纯责任链
- 实现中间件管道与事件处理系统
- 分析责任链模式的性能特征与优化策略
- 识别责任链模式的适用场景与反模式
13.1 模式定义
13.1.1 正式定义
责任链模式(Chain of Responsibility Pattern) 使多个对象都有机会处理请求,从而避免请求的发送者和接收者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。
$$\text{CoR}: \text{Request} \xrightarrow{h_1 \to h_2 \to ... \to h_n} \text{Response}$$
其中:
- $h_1, h_2, ..., h_n$ 为处理器链
- $\to$ 表示请求传递方向
- 处理器 $h_i$ 可选择处理或传递
责任链传递规则:
$$\text{Handle}(r, i) = \begin{cases} \text{process}(h_i, r) & \text{if } \text{can_handle}(h_i, r) \ \text{Handle}(r, i+1) & \text{if } i < n \ \text{unhandled} & \text{otherwise} \end{cases}$$
链长度与复杂度:
$$T_{\text{worst}} = O(n) \times T_{\text{check}}$$
其中 $n$ 为链长度,$T_{\text{check}}$ 为单个处理器的判断时间。
13.1.2 历史背景与学术脉络
| 时期 | 发展阶段 | 关键贡献 | 代表人物/文献 |
|---|---|---|---|
| 1987 | 概念萌芽 | Smalltalk中的异常处理机制 | ParcPlace Systems |
| 1994 | GoF正式定义 | 《设计模式》收录为行为型模式 | Gamma, Helm, Johnson, Vlissides |
| 1995 | GUI事件 | 事件冒泡与捕获模型 | Netscape, Microsoft |
| 1998 | Java Servlet | Filter链式处理 | Sun Microsystems |
| 2000 | 中间件模式 | Web中间件管道 | Various Frameworks |
| 2005 | Django中间件 | Python Web中间件链 | Django Community |
| 2010 | Express.js | Node.js中间件模式 | TJ Holowaychuk |
| 2015 | 微服务 | 服务网格过滤器链 | Netflix, Istio |
| 2020 | 现代演进 | 异步责任链与响应式处理 | Async/Await Pattern |
13.1.3 模式动机
问题场景:在软件系统中,请求处理面临以下挑战:
- 处理者不确定:请求可能需要不同层级处理
- 解耦需求:发送者不应知道具体处理者
- 动态配置:处理链需要运行时调整
- 扩展性:新增处理器不应影响现有代码
解决方案:将处理器组织成链式结构:
┌─────────────────────────────────────────────────────────────────────────┐
│ 责任链模式架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Client │─────▶│ Handler1 │─────▶│ Handler2 │─────▶│ Handler3 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 处理或 │ │ 处理或 │ │ 处理或 │ │
│ │ 传递 │ │ 传递 │ │ 传递 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 处理流程: │
│ 1. 请求进入链首 │
│ 2. 每个处理器判断是否处理 │
│ 3. 处理则返回结果,否则传递给下一处理器 │
│ 4. 直到被处理或到达链尾 │
│ │
└─────────────────────────────────────────────────────────────────────────┘13.1.4 纯责任链与非纯责任链
| 类型 | 特征 | 示例 |
|---|---|---|
| 纯责任链 | 一个请求只能被一个处理器处理 | 审批流程、异常处理 |
| 非纯责任链 | 多个处理器可依次处理同一请求 | 中间件管道、日志链 |
| 混合型 | 部分处理器可终止链,部分继续传递 | HTTP过滤器链 |
13.2 理论基础
13.2.1 UML结构模型
┌─────────────────────────────────────────────────────────────────────────┐
│ 责任链模式结构图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────┐ ┌──────────────────────────┐ │
│ │ <<abstract>> │ │ Client │ │
│ │ Handler │ ├──────────────────────────┤ │
│ ├─────────────────────────────┤ │ - chain: Handler │ │
│ │ # successor: Handler │◄────────│ + send_request(req) │ │
│ ├─────────────────────────────┤ └──────────────────────────┘ │
│ │ + set_next(h): Handler │ │
│ │ + handle(request): Response │ │
│ │ # can_handle(request): bool │ │
│ │ # process(request): Response│ │
│ └─────────────────────────────┘ │
│ △ │
│ │ extends │
│ ┌────────┴────────┐ │
│ │ │ │
│ ┌──┴──────────┐ ┌───┴──────────┐ ┌──────────────┐ │
│ │ConcreteHdlr1│ │ConcreteHdlr2 │ │ConcreteHdlrN │ │
│ ├─────────────┤ ├──────────────┤ ├──────────────┤ │
│ │ - _config │ │ - _config │ │ - _config │ │
│ │ + handle() │ │ + handle() │ │ + handle() │ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ 关键设计决策: │
│ • 处理器是否自动传递(模板方法) │
│ • 链的构建方式(构造时 vs 运行时) │
│ • 处理结果是否影响后续传递 │
│ │
└─────────────────────────────────────────────────────────────────────────┘13.2.2 参与者职责
| 参与者 | 职责 | Python实现要点 |
|---|---|---|
| Handler | 定义处理接口,实现后继者链接 | ABC + 模板方法 |
| ConcreteHandler | 处理负责的请求,访问后继者 | 继承Handler,实现具体逻辑 |
| Client | 向链首提交请求 | 构建链,发起请求 |
13.2.3 链式构建模式
┌─────────────────────────────────────────────────────────────────────────┐
│ 责任链构建方式对比 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 流式构建(Fluent Builder) │
│ handler1.set_next(handler2).set_next(handler3) │
│ │
│ 2. 列表构建(List-based) │
│ chain = Chain([handler1, handler2, handler3]) │
│ │
│ 3. 装饰器构建(Decorator-based) │
│ @middleware1 │
│ @middleware2 │
│ def handler(request): ... │
│ │
│ 4. 配置驱动(Config-driven) │
│ chain = ChainFactory.from_config(config) │
│ │
│ 5. 依赖注入(DI Container) │
│ container.build_chain('approval_chain') │
│ │
└─────────────────────────────────────────────────────────────────────────┘13.3 Python实现
13.3.1 标准ABC实现
from abc import ABC, abstractmethod
from typing import Optional, Any, TypeVar, Generic
from dataclasses import dataclass
T = TypeVar('T')
R = TypeVar('R')
@dataclass
class Request:
"""请求对象"""
id: str
type: str
data: Any
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class Response:
"""响应对象"""
success: bool
result: Any = None
message: str = ""
handler: str = ""
class Handler(ABC, Generic[T, R]):
"""处理器抽象基类"""
def __init__(self, name: str = ""):
self._name = name or self.__class__.__name__
self._next: Optional['Handler'] = None
def set_next(self, handler: 'Handler') -> 'Handler':
"""设置下一个处理器(支持链式调用)"""
self._next = handler
return handler
def handle(self, request: T) -> Optional[R]:
"""处理请求(模板方法)"""
if self.can_handle(request):
result = self.process(request)
if result is not None:
return result
if self._next:
return self._next.handle(request)
return None
def can_handle(self, request: T) -> bool:
"""判断是否能处理该请求"""
return True
@abstractmethod
def process(self, request: T) -> Optional[R]:
"""具体处理逻辑"""
pass
@property
def name(self) -> str:
return self._name
class MonkeyHandler(Handler[Request, Response]):
"""猴子处理器"""
def can_handle(self, request: Request) -> bool:
return request.type == "banana"
def process(self, request: Request) -> Optional[Response]:
return Response(
success=True,
result=f"猴子吃了{request.data}",
handler=self._name
)
class SquirrelHandler(Handler[Request, Response]):
"""松鼠处理器"""
def can_handle(self, request: Request) -> bool:
return request.type == "nut"
def process(self, request: Request) -> Optional[Response]:
return Response(
success=True,
result=f"松鼠吃了{request.data}",
handler=self._name
)
class DogHandler(Handler[Request, Response]):
"""狗处理器"""
def can_handle(self, request: Request) -> bool:
return request.type == "meat"
def process(self, request: Request) -> Optional[Response]:
return Response(
success=True,
result=f"狗吃了{request.data}",
handler=self._name
)
class DefaultHandler(Handler[Request, Response]):
"""默认处理器:处理所有未被处理的请求"""
def process(self, request: Request) -> Optional[Response]:
return Response(
success=False,
result=None,
message=f"没有处理器能处理类型为 '{request.type}' 的请求",
handler=self._name
)
class ChainBuilder:
"""责任链构建器"""
@staticmethod
def build(*handlers: Handler) -> Handler:
"""构建责任链"""
if not handlers:
raise ValueError("至少需要一个处理器")
first = handlers[0]
current = first
for handler in handlers[1:]:
current.set_next(handler)
current = handler
return first
if __name__ == "__main__":
chain = ChainBuilder.build(
MonkeyHandler("Monkey"),
SquirrelHandler("Squirrel"),
DogHandler("Dog"),
DefaultHandler("Default")
)
requests = [
Request("REQ-001", "banana", "香蕉"),
Request("REQ-002", "nut", "坚果"),
Request("REQ-003", "meat", "肉"),
Request("REQ-004", "coffee", "咖啡"),
]
for req in requests:
response = chain.handle(req)
print(f"{req.type}: {response.result or response.message} (by {response.handler})")13.3.2 Protocol实现(结构化类型)
from typing import Protocol, Optional, Any, Callable, List
from dataclasses import dataclass
from functools import wraps
@dataclass
class Context:
"""请求上下文"""
request_id: str
data: Any
attributes: dict = None
processed: bool = False
result: Any = None
def __post_init__(self):
if self.attributes is None:
self.attributes = {}
class HandlerProtocol(Protocol):
"""处理器协议"""
def __call__(self, ctx: Context, next_handler: Callable) -> Optional[Context]:
...
def create_middleware(func: Callable) -> HandlerProtocol:
"""将函数转换为中间件"""
@wraps(func)
def wrapper(ctx: Context, next_handler: Callable) -> Optional[Context]:
return func(ctx, next_handler)
return wrapper
class MiddlewareChain:
"""中间件链"""
def __init__(self):
self._middlewares: List[HandlerProtocol] = []
def use(self, middleware: HandlerProtocol) -> 'MiddlewareChain':
"""添加中间件"""
self._middlewares.append(middleware)
return self
def execute(self, ctx: Context) -> Context:
"""执行中间件链"""
def create_runner(index: int) -> Callable:
if index >= len(self._middlewares):
return lambda c: c
middleware = self._middlewares[index]
def runner(context: Context) -> Context:
return middleware(context, create_runner(index + 1))
return runner
runner = create_runner(0)
return runner(ctx)
@create_middleware
def logging_middleware(ctx: Context, next_handler: Callable) -> Context:
"""日志中间件"""
print(f"[LOG] 处理请求: {ctx.request_id}")
result = next_handler(ctx)
print(f"[LOG] 请求完成: {ctx.request_id}")
return result
@create_middleware
def validation_middleware(ctx: Context, next_handler: Callable) -> Context:
"""验证中间件"""
if not ctx.data:
ctx.processed = True
ctx.result = "错误:数据为空"
return ctx
return next_handler(ctx)
@create_middleware
def timing_middleware(ctx: Context, next_handler: Callable) -> Context:
"""计时中间件"""
import time
start = time.time()
result = next_handler(ctx)
elapsed = (time.time() - start) * 1000
ctx.attributes['elapsed_ms'] = elapsed
return result
@create_middleware
def business_middleware(ctx: Context, next_handler: Callable) -> Context:
"""业务处理中间件"""
ctx.result = f"处理完成: {ctx.data}"
ctx.processed = True
return next_handler(ctx)
if __name__ == "__main__":
chain = MiddlewareChain()
chain.use(logging_middleware)
chain.use(timing_middleware)
chain.use(validation_middleware)
chain.use(business_middleware)
ctx = Context("REQ-001", "测试数据")
result = chain.execute(ctx)
print(f"\n结果: {result.result}")
print(f"耗时: {result.attributes.get('elapsed_ms', 0):.2f}ms")13.3.3 异步责任链
from abc import ABC, abstractmethod
from typing import Optional, Any, List, Awaitable, Callable
from dataclasses import dataclass
import asyncio
@dataclass
class AsyncRequest:
"""异步请求"""
id: str
payload: Any
metadata: dict = None
@dataclass
class AsyncResponse:
"""异步响应"""
success: bool
data: Any = None
error: str = ""
class AsyncHandler(ABC):
"""异步处理器基类"""
def __init__(self):
self._next: Optional['AsyncHandler'] = None
def set_next(self, handler: 'AsyncHandler') -> 'AsyncHandler':
self._next = handler
return handler
async def handle(self, request: AsyncRequest) -> Optional[AsyncResponse]:
"""异步处理请求"""
result = await self.process(request)
if result is not None:
return result
if self._next:
return await self._next.handle(request)
return None
@abstractmethod
async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
"""异步处理逻辑"""
pass
class AsyncValidationHandler(AsyncHandler):
"""异步验证处理器"""
async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
await asyncio.sleep(0.01)
if not request.payload:
return AsyncResponse(
success=False,
error="负载不能为空"
)
return None
class AsyncAuthHandler(AsyncHandler):
"""异步认证处理器"""
def __init__(self, valid_tokens: set):
super().__init__()
self._valid_tokens = valid_tokens
async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
await asyncio.sleep(0.01)
token = request.metadata.get("token") if request.metadata else None
if not token or token not in self._valid_tokens:
return AsyncResponse(
success=False,
error="认证失败"
)
return None
class AsyncBusinessHandler(AsyncHandler):
"""异步业务处理器"""
async def process(self, request: AsyncRequest) -> Optional[AsyncResponse]:
await asyncio.sleep(0.02)
return AsyncResponse(
success=True,
data=f"处理完成: {request.payload}"
)
class AsyncChain:
"""异步责任链"""
def __init__(self):
self._handlers: List[AsyncHandler] = []
def add(self, handler: AsyncHandler) -> 'AsyncChain':
self._handlers.append(handler)
return self
def build(self) -> Optional[AsyncHandler]:
if not self._handlers:
return None
for i in range(len(self._handlers) - 1):
self._handlers[i].set_next(self._handlers[i + 1])
return self._handlers[0]
async def execute(self, request: AsyncRequest) -> Optional[AsyncResponse]:
chain = self.build()
if chain:
return await chain.handle(request)
return None
async def main():
chain = AsyncChain()
chain.add(AsyncValidationHandler())
chain.add(AsyncAuthHandler({"secret_token", "admin_token"}))
chain.add(AsyncBusinessHandler())
requests = [
AsyncRequest("REQ-001", "数据1", {"token": "secret_token"}),
AsyncRequest("REQ-002", "", {"token": "secret_token"}),
AsyncRequest("REQ-003", "数据3", {"token": "invalid"}),
]
for req in requests:
response = await chain.execute(req)
status = "成功" if response and response.success else "失败"
print(f"{req.id}: {status} - {response.data if response else 'N/A'}")
if __name__ == "__main__":
asyncio.run(main())13.3.4 泛型责任链
from typing import TypeVar, Generic, Optional, Callable, List, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
TRequest = TypeVar('TRequest')
TResponse = TypeVar('TResponse')
@dataclass
class ChainResult(Generic[TResponse]):
"""链处理结果"""
handled: bool
response: Optional[TResponse] = None
handler_name: str = ""
chain_position: int = -1
class GenericHandler(ABC, Generic[TRequest, TResponse]):
"""泛型处理器"""
def __init__(self, name: str = ""):
self._name = name or self.__class__.__name__
self._next: Optional['GenericHandler'] = None
@property
def name(self) -> str:
return self._name
def set_next(self, handler: 'GenericHandler') -> 'GenericHandler':
self._next = handler
return handler
def handle(
self,
request: TRequest,
position: int = 0
) -> ChainResult[TResponse]:
"""处理请求并返回详细结果"""
if self.can_handle(request):
response = self.do_handle(request)
if response is not None:
return ChainResult(
handled=True,
response=response,
handler_name=self._name,
chain_position=position
)
if self._next:
return self._next.handle(request, position + 1)
return ChainResult(handled=False)
@abstractmethod
def can_handle(self, request: TRequest) -> bool:
"""判断是否能处理"""
pass
@abstractmethod
def do_handle(self, request: TRequest) -> Optional[TResponse]:
"""执行处理"""
pass
class ChainExecutor(Generic[TRequest, TResponse]):
"""责任链执行器"""
def __init__(self):
self._handlers: List[GenericHandler[TRequest, TResponse]] = []
self._fallback: Optional[GenericHandler[TRequest, TResponse]] = None
def add(self, handler: GenericHandler[TRequest, TResponse]) -> 'ChainExecutor':
self._handlers.append(handler)
return self
def set_fallback(self, handler: GenericHandler[TRequest, TResponse]) -> 'ChainExecutor':
self._fallback = handler
return self
def build(self) -> Optional[GenericHandler[TRequest, TResponse]]:
if not self._handlers:
return self._fallback
for i in range(len(self._handlers) - 1):
self._handlers[i].set_next(self._handlers[i + 1])
if self._fallback:
self._handlers[-1].set_next(self._fallback)
return self._handlers[0]
def execute(self, request: TRequest) -> ChainResult[TResponse]:
chain = self.build()
if chain:
return chain.handle(request)
return ChainResult(handled=False)
def get_handler_names(self) -> List[str]:
return [h.name for h in self._handlers]
@dataclass
class PaymentRequest:
"""支付请求"""
amount: float
currency: str
method: str
@dataclass
class PaymentResponse:
"""支付响应"""
transaction_id: str
status: str
message: str
class CreditCardHandler(GenericHandler[PaymentRequest, PaymentResponse]):
"""信用卡支付处理器"""
def can_handle(self, request: PaymentRequest) -> bool:
return request.method == "credit_card"
def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
return PaymentResponse(
transaction_id=f"CC-{request.amount}",
status="success",
message="信用卡支付成功"
)
class PayPalHandler(GenericHandler[PaymentRequest, PaymentResponse]):
"""PayPal支付处理器"""
def can_handle(self, request: PaymentRequest) -> bool:
return request.method == "paypal"
def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
return PaymentResponse(
transaction_id=f"PP-{request.amount}",
status="success",
message="PayPal支付成功"
)
class CryptoHandler(GenericHandler[PaymentRequest, PaymentResponse]):
"""加密货币支付处理器"""
def can_handle(self, request: PaymentRequest) -> bool:
return request.method == "crypto"
def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
return PaymentResponse(
transaction_id=f"CR-{request.amount}",
status="success",
message="加密货币支付成功"
)
class DefaultPaymentHandler(GenericHandler[PaymentRequest, PaymentResponse]):
"""默认支付处理器"""
def can_handle(self, request: PaymentRequest) -> bool:
return True
def do_handle(self, request: PaymentRequest) -> Optional[PaymentResponse]:
return PaymentResponse(
transaction_id="",
status="failed",
message=f"不支持的支付方式: {request.method}"
)
if __name__ == "__main__":
executor = ChainExecutor[PaymentRequest, PaymentResponse]()
executor.add(CreditCardHandler("CreditCard"))
executor.add(PayPalHandler("PayPal"))
executor.add(CryptoHandler("Crypto"))
executor.set_fallback(DefaultPaymentHandler("Default"))
print(f"处理器链: {' -> '.join(executor.get_handler_names())}")
requests = [
PaymentRequest(100.0, "USD", "credit_card"),
PaymentRequest(50.0, "EUR", "paypal"),
PaymentRequest(0.1, "BTC", "crypto"),
PaymentRequest(25.0, "USD", "bank_transfer"),
]
for req in requests:
result = executor.execute(req)
print(f"\n{req.method}: {result.response.status}")
print(f" 处理器: {result.handler_name}")
print(f" 消息: {result.response.message}")13.4 企业级应用示例
13.4.1 审批流程系统
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
from enum import Enum, auto
class ApprovalStatus(Enum):
"""审批状态"""
PENDING = auto()
APPROVED = auto()
REJECTED = auto()
ESCALATED = auto()
class ApprovalAction(Enum):
"""审批动作"""
APPROVE = auto()
REJECT = auto()
ESCALATE = auto()
@dataclass
class ApprovalRequest:
"""审批请求"""
request_id: str
title: str
amount: Decimal
category: str
requester: str
description: str = ""
created_at: datetime = field(default_factory=datetime.now)
history: List[Dict[str, Any]] = field(default_factory=list)
def add_history(self, approver: str, action: ApprovalAction, comment: str = ""):
self.history.append({
'approver': approver,
'action': action.name,
'comment': comment,
'timestamp': datetime.now().isoformat()
})
@dataclass
class ApprovalResult:
"""审批结果"""
request_id: str
status: ApprovalStatus
approver: str
comment: str = ""
next_level: str = ""
class Approver(ABC):
"""审批人抽象类"""
def __init__(self, name: str, title: str, approval_limit: Decimal):
self._name = name
self._title = title
self._approval_limit = approval_limit
self._next: Optional['Approver'] = None
def set_next(self, approver: 'Approver') -> 'Approver':
self._next = approver
return approver
@property
def name(self) -> str:
return f"{self._title} {self._name}"
@property
def limit(self) -> Decimal:
return self._approval_limit
def can_approve(self, request: ApprovalRequest) -> bool:
"""判断是否能审批"""
return request.amount <= self._approval_limit
def approve(self, request: ApprovalRequest) -> ApprovalResult:
"""处理审批请求"""
if self.can_approve(request):
request.add_history(self.name, ApprovalAction.APPROVE)
return ApprovalResult(
request_id=request.request_id,
status=ApprovalStatus.APPROVED,
approver=self.name,
comment=f"金额 ¥{request.amount} 在审批权限内"
)
if self._next:
request.add_history(self.name, ApprovalAction.ESCALATE,
f"金额超出权限,转交上级")
return self._next.approve(request)
request.add_history(self.name, ApprovalAction.REJECT, "无上级审批人")
return ApprovalResult(
request_id=request.request_id,
status=ApprovalStatus.REJECTED,
approver=self.name,
comment="无法找到合适的审批人"
)
class Manager(Approver):
"""经理"""
def __init__(self, name: str):
super().__init__(name, "经理", Decimal("10000"))
class Director(Approver):
"""总监"""
def __init__(self, name: str):
super().__init__(name, "总监", Decimal("50000"))
class VicePresident(Approver):
"""副总裁"""
def __init__(self, name: str):
super().__init__(name, "副总裁", Decimal("200000"))
class CEO(Approver):
"""CEO"""
def __init__(self, name: str):
super().__init__(name, "CEO", Decimal("1000000"))
class ApprovalSystem:
"""审批系统"""
def __init__(self):
self._approval_chain: Optional[Approver] = None
self._requests: Dict[str, ApprovalRequest] = {}
def setup_chain(self, *approvers: Approver) -> None:
"""设置审批链"""
if not approvers:
return
self._approval_chain = approvers[0]
current = self._approval_chain
for approver in approvers[1:]:
current.set_next(approver)
current = approver
def submit_request(self, request: ApprovalRequest) -> ApprovalResult:
"""提交审批请求"""
self._requests[request.request_id] = request
if not self._approval_chain:
return ApprovalResult(
request_id=request.request_id,
status=ApprovalStatus.REJECTED,
approver="系统",
comment="审批链未配置"
)
return self._approval_chain.approve(request)
def get_request(self, request_id: str) -> Optional[ApprovalRequest]:
return self._requests.get(request_id)
if __name__ == "__main__":
system = ApprovalSystem()
system.setup_chain(
Manager("张三"),
Director("李四"),
VicePresident("王五"),
CEO("赵六")
)
requests = [
ApprovalRequest("REQ-001", "办公用品采购", Decimal("5000"), "采购", "员工A"),
ApprovalRequest("REQ-002", "服务器设备", Decimal("30000"), "IT", "员工B"),
ApprovalRequest("REQ-003", "市场推广活动", Decimal("100000"), "市场", "员工C"),
ApprovalRequest("REQ-004", "新办公室装修", Decimal("500000"), "行政", "员工D"),
]
for req in requests:
print(f"\n{'='*50}")
print(f"请求: {req.title} (¥{req.amount})")
print(f"申请人: {req.requester}")
result = system.submit_request(req)
print(f"\n审批结果: {result.status.name}")
print(f"审批人: {result.approver}")
print(f"备注: {result.comment}")
print(f"\n审批历史:")
for h in req.history:
print(f" - {h['approver']}: {h['action']} ({h['comment']})")13.4.2 Web中间件管道
from typing import Optional, Dict, Any, Callable, List
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from datetime import datetime
import time
import json
@dataclass
class HTTPRequest:
"""HTTP请求"""
method: str
path: str
headers: Dict[str, str] = field(default_factory=dict)
query_params: Dict[str, str] = field(default_factory=dict)
body: Any = None
client_ip: str = ""
attributes: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HTTPResponse:
"""HTTP响应"""
status_code: int = 200
headers: Dict[str, str] = field(default_factory=dict)
body: Any = None
def json(self) -> dict:
return {"status": self.status_code, "data": self.body}
class Middleware(ABC):
"""中间件抽象类"""
def __init__(self):
self._next: Optional['Middleware'] = None
def set_next(self, middleware: 'Middleware') -> 'Middleware':
self._next = middleware
return middleware
@abstractmethod
def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
pass
def _call_next(self, request: HTTPRequest) -> HTTPResponse:
if self._next:
return self._next.process(request, lambda r: self._next._call_next(r))
return HTTPResponse(status_code=404, body={"error": "No handler found"})
class CORSMiddleware(Middleware):
"""CORS中间件"""
def __init__(self, allowed_origins: List[str] = None):
super().__init__()
self._allowed_origins = allowed_origins or ["*"]
def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
if request.method == "OPTIONS":
return HTTPResponse(
status_code=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization"
}
)
response = next_handler(request)
response.headers["Access-Control-Allow-Origin"] = "*"
return response
class RateLimitMiddleware(Middleware):
"""限流中间件"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
super().__init__()
self._max_requests = max_requests
self._window_seconds = window_seconds
self._request_counts: Dict[str, List[float]] = {}
def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
client_id = request.client_ip
current_time = time.time()
if client_id not in self._request_counts:
self._request_counts[client_id] = []
self._request_counts[client_id] = [
t for t in self._request_counts[client_id]
if current_time - t < self._window_seconds
]
if len(self._request_counts[client_id]) >= self._max_requests:
return HTTPResponse(
status_code=429,
body={"error": "Too Many Requests"}
)
self._request_counts[client_id].append(current_time)
return next_handler(request)
class AuthMiddleware(Middleware):
"""认证中间件"""
def __init__(self, public_paths: List[str] = None):
super().__init__()
self._public_paths = public_paths or ["/health", "/login"]
self._tokens: Dict[str, dict] = {}
def add_token(self, token: str, user_info: dict) -> None:
self._tokens[token] = user_info
def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
if request.path in self._public_paths:
return next_handler(request)
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return HTTPResponse(
status_code=401,
body={"error": "Missing or invalid Authorization header"}
)
token = auth_header[7:]
if token not in self._tokens:
return HTTPResponse(
status_code=401,
body={"error": "Invalid token"}
)
request.attributes["user"] = self._tokens[token]
return next_handler(request)
class LoggingMiddleware(Middleware):
"""日志中间件"""
def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
start_time = time.time()
print(f"[{datetime.now().isoformat()}] {request.method} {request.path}")
response = next_handler(request)
elapsed = (time.time() - start_time) * 1000
print(f" -> {response.status_code} ({elapsed:.2f}ms)")
response.headers["X-Response-Time"] = f"{elapsed:.2f}ms"
return response
class RouterMiddleware(Middleware):
"""路由中间件"""
def __init__(self):
super().__init__()
self._routes: Dict[str, Dict[str, Callable]] = {}
def add_route(self, method: str, path: str, handler: Callable) -> None:
if path not in self._routes:
self._routes[path] = {}
self._routes[path][method] = handler
def process(self, request: HTTPRequest, next_handler: Callable) -> HTTPResponse:
if request.path not in self._routes:
return HTTPResponse(
status_code=404,
body={"error": f"Path not found: {request.path}"}
)
handler = self._routes[request.path].get(request.method)
if not handler:
return HTTPResponse(
status_code=405,
body={"error": f"Method not allowed: {request.method}"}
)
return handler(request)
class Application:
"""Web应用"""
def __init__(self):
self._middleware_chain: Optional[Middleware] = None
self._router = RouterMiddleware()
self._middlewares: List[Middleware] = []
def use(self, middleware: Middleware) -> 'Application':
self._middlewares.append(middleware)
return self
def get(self, path: str, handler: Callable) -> 'Application':
self._router.add_route("GET", path, handler)
return self
def post(self, path: str, handler: Callable) -> 'Application':
self._router.add_route("POST", path, handler)
return self
def build(self) -> None:
all_middlewares = self._middlewares + [self._router]
for i in range(len(all_middlewares) - 1):
all_middlewares[i].set_next(all_middlewares[i + 1])
self._middleware_chain = all_middlewares[0] if all_middlewares else None
def handle(self, request: HTTPRequest) -> HTTPResponse:
if not self._middleware_chain:
self.build()
if self._middleware_chain:
return self._middleware_chain.process(
request,
lambda r: self._middleware_chain._call_next(r)
)
return HTTPResponse(status_code=500, body={"error": "No middleware configured"})
if __name__ == "__main__":
auth = AuthMiddleware()
auth.add_token("secret_token", {"user_id": 1, "username": "admin", "role": "admin"})
app = Application()
app.use(CORSMiddleware())
app.use(LoggingMiddleware())
app.use(RateLimitMiddleware(max_requests=10))
app.use(auth)
app.get("/health", lambda req: HTTPResponse(200, {"status": "ok"}))
app.get("/api/users", lambda req: HTTPResponse(200, {"users": ["Alice", "Bob"]}))
app.post("/api/posts", lambda req: HTTPResponse(201, {"created": req.body}))
requests = [
HTTPRequest("GET", "/health", client_ip="127.0.0.1"),
HTTPRequest("GET", "/api/users", {"Authorization": "Bearer secret_token"}, client_ip="127.0.0.1"),
HTTPRequest("POST", "/api/posts", {"Authorization": "Bearer secret_token"}, body={"title": "Hello"}, client_ip="127.0.0.1"),
HTTPRequest("GET", "/api/users", {}, client_ip="127.0.0.1"),
HTTPRequest("GET", "/unknown", client_ip="127.0.0.1"),
]
for req in requests:
response = app.handle(req)
print(f"Response: {response.json()}\n")13.4.3 事件处理系统
from typing import Optional, List, Dict, Any, Callable, Set
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum, auto
import asyncio
class EventPriority(Enum):
"""事件优先级"""
LOWEST = 0
LOW = 25
NORMAL = 50
HIGH = 75
HIGHEST = 100
MONITOR = 125
class EventResult(Enum):
"""事件处理结果"""
CONTINUE = auto()
HANDLED = auto()
CANCELLED = auto()
@dataclass
class Event:
"""事件"""
event_type: str
source: str
data: Any = None
timestamp: datetime = field(default_factory=datetime.now)
cancelled: bool = False
metadata: Dict[str, Any] = field(default_factory=dict)
def cancel(self) -> None:
self.cancelled = True
@dataclass
class EventContext:
"""事件上下文"""
event: Event
handler_name: str = ""
result: EventResult = EventResult.CONTINUE
processing_time_ms: float = 0.0
class EventHandler(ABC):
"""事件处理器"""
def __init__(self, name: str = "", priority: EventPriority = EventPriority.NORMAL):
self._name = name or self.__class__.__name__
self._priority = priority
self._event_types: Set[str] = set()
@property
def name(self) -> str:
return self._name
@property
def priority(self) -> EventPriority:
return self._priority
def listens_to(self, *event_types: str) -> 'EventHandler':
self._event_types.update(event_types)
return self
def can_handle(self, event: Event) -> bool:
return not event.cancelled and (
not self._event_types or event.event_type in self._event_types
)
@abstractmethod
def handle(self, event: Event) -> EventResult:
pass
class EventDispatcher:
"""事件分发器"""
def __init__(self):
self._handlers: List[EventHandler] = []
self._event_log: List[EventContext] = []
def register(self, handler: EventHandler) -> 'EventDispatcher':
self._handlers.append(handler)
self._handlers.sort(key=lambda h: h.priority.value, reverse=True)
return self
def unregister(self, handler_name: str) -> bool:
for i, h in enumerate(self._handlers):
if h.name == handler_name:
del self._handlers[i]
return True
return False
def dispatch(self, event: Event) -> EventContext:
"""分发事件"""
import time
start = time.time()
context = EventContext(event=event)
for handler in self._handlers:
if not handler.can_handle(event):
continue
context.handler_name = handler.name
try:
result = handler.handle(event)
context.result = result
if result == EventResult.CANCELLED or event.cancelled:
break
except Exception as e:
context.result = EventResult.CANCELLED
event.metadata['error'] = str(e)
break
context.processing_time_ms = (time.time() - start) * 1000
self._event_log.append(context)
return context
def get_handlers(self) -> List[str]:
return [h.name for h in self._handlers]
def get_event_log(self, limit: int = 100) -> List[EventContext]:
return self._event_log[-limit:]
class LoggingHandler(EventHandler):
"""日志处理器"""
def __init__(self):
super().__init__("Logger", EventPriority.MONITOR)
def handle(self, event: Event) -> EventResult:
print(f"[LOG] {event.event_type} from {event.source}: {event.data}")
return EventResult.CONTINUE
class ValidationHandler(EventHandler):
"""验证处理器"""
def __init__(self):
super().__init__("Validator", EventPriority.HIGHEST)
self.listens_to("user.create", "user.update")
def handle(self, event: Event) -> EventResult:
if not event.data:
print(f"[VALIDATOR] 数据为空,取消事件")
event.cancel()
return EventResult.CANCELLED
return EventResult.CONTINUE
class AuthHandler(EventHandler):
"""认证处理器"""
def __init__(self):
super().__init__("Auth", EventPriority.HIGH)
self.listens_to("user.delete", "admin.action")
def handle(self, event: Event) -> EventResult:
user = event.metadata.get("user")
if not user or user.get("role") != "admin":
print(f"[AUTH] 权限不足,取消事件")
event.cancel()
return EventResult.CANCELLED
return EventResult.CONTINUE
class NotificationHandler(EventHandler):
"""通知处理器"""
def __init__(self):
super().__init__("Notifier", EventPriority.LOW)
self.listens_to("user.create", "order.complete")
def handle(self, event: Event) -> EventResult:
print(f"[NOTIFY] 发送通知: {event.event_type}")
return EventResult.CONTINUE
class DatabaseHandler(EventHandler):
"""数据库处理器"""
def __init__(self):
super().__init__("Database", EventPriority.NORMAL)
def handle(self, event: Event) -> EventResult:
print(f"[DB] 保存事件: {event.event_type} -> {event.data}")
return EventResult.HANDLED
if __name__ == "__main__":
dispatcher = EventDispatcher()
dispatcher.register(LoggingHandler())
dispatcher.register(ValidationHandler())
dispatcher.register(AuthHandler())
dispatcher.register(NotificationHandler())
dispatcher.register(DatabaseHandler())
print(f"处理器链: {' -> '.join(dispatcher.get_handlers())}\n")
events = [
Event("user.create", "API", {"name": "Alice", "email": "alice@example.com"}),
Event("user.create", "API", None),
Event("user.delete", "API", {"user_id": 123}, metadata={"user": {"role": "user"}}),
Event("user.delete", "API", {"user_id": 456}, metadata={"user": {"role": "admin"}}),
Event("order.complete", "System", {"order_id": "ORD-001"}),
]
for event in events:
print(f"\n--- 处理事件: {event.event_type} ---")
context = dispatcher.dispatch(event)
print(f"结果: {context.result.name}, 处理器: {context.handler_name}")
print(f"耗时: {context.processing_time_ms:.2f}ms")13.5 模式变体与扩展
13.5.1 双向责任链
from typing import Optional, Any, List
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class BidirectionalRequest:
"""双向请求"""
data: Any
forward_processed: bool = False
backward_processed: bool = False
forward_result: Any = None
backward_result: Any = None
class BidirectionalHandler(ABC):
"""双向处理器"""
def __init__(self, name: str = ""):
self._name = name or self.__class__.__name__
self._next: Optional['BidirectionalHandler'] = None
self._prev: Optional['BidirectionalHandler'] = None
def set_next(self, handler: 'BidirectionalHandler') -> 'BidirectionalHandler':
self._next = handler
handler._prev = self
return handler
def process_forward(self, request: BidirectionalRequest) -> None:
"""前向处理"""
self.do_forward(request)
request.forward_processed = True
if self._next:
self._next.process_forward(request)
def process_backward(self, request: BidirectionalRequest) -> None:
"""后向处理"""
if self._next:
self._next.process_backward(request)
self.do_backward(request)
request.backward_processed = True
@abstractmethod
def do_forward(self, request: BidirectionalRequest) -> None:
pass
@abstractmethod
def do_backward(self, request: BidirectionalRequest) -> None:
pass
@property
def name(self) -> str:
return self._name
class EncodingHandler(BidirectionalHandler):
"""编码处理器"""
def do_forward(self, request: BidirectionalRequest) -> None:
request.data = request.data.encode('utf-8')
print(f"[{self.name}] 前向: 编码数据")
def do_backward(self, request: BidirectionalRequest) -> None:
request.backward_result = request.data.decode('utf-8')
print(f"[{self.name}] 后向: 解码数据")
class CompressionHandler(BidirectionalHandler):
"""压缩处理器"""
def do_forward(self, request: BidirectionalRequest) -> None:
import zlib
request.data = zlib.compress(request.data)
print(f"[{self.name}] 前向: 压缩数据")
def do_backward(self, request: BidirectionalRequest) -> None:
import zlib
request.data = zlib.decompress(request.data)
print(f"[{self.name}] 后向: 解压数据")
class EncryptionHandler(BidirectionalHandler):
"""加密处理器"""
def do_forward(self, request: BidirectionalRequest) -> None:
request.data = bytes([b ^ 0x55 for b in request.data])
print(f"[{self.name}] 前向: 加密数据")
def do_backward(self, request: BidirectionalRequest) -> None:
request.data = bytes([b ^ 0x55 for b in request.data])
print(f"[{self.name}] 后向: 解密数据")
if __name__ == "__main__":
encoding = EncodingHandler("Encoding")
compression = CompressionHandler("Compression")
encryption = EncryptionHandler("Encryption")
encoding.set_next(compression).set_next(encryption)
request = BidirectionalRequest(data="Hello, World!")
print("=== 前向处理(发送)===")
encoding.process_forward(request)
print(f"处理后数据: {request.data[:20]}...")
print("\n=== 后向处理(接收)===")
encoding.process_backward(request)
print(f"最终结果: {request.backward_result}")13.5.2 树形责任链
from typing import Optional, Any, List
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class TreeRequest:
"""树形请求"""
path: str
data: Any
matched: bool = False
result: Any = None
class TreeNode(ABC):
"""树节点处理器"""
def __init__(self, name: str):
self._name = name
self._children: List['TreeNode'] = []
self._parent: Optional['TreeNode'] = None
def add_child(self, child: 'TreeNode') -> 'TreeNode':
self._children.append(child)
child._parent = self
return child
@property
def name(self) -> str:
return self._name
@abstractmethod
def can_handle(self, request: TreeRequest) -> bool:
pass
@abstractmethod
def process(self, request: TreeRequest) -> Any:
pass
def handle(self, request: TreeRequest) -> bool:
"""处理请求(深度优先)"""
if self.can_handle(request):
request.result = self.process(request)
request.matched = True
return True
for child in self._children:
if child.handle(request):
return True
return False
class PathNode(TreeNode):
"""路径节点"""
def __init__(self, path_segment: str):
super().__init__(path_segment)
self._segment = path_segment
def can_handle(self, request: TreeRequest) -> bool:
parts = request.path.strip("/").split("/")
return self._segment in parts or self._segment == "*"
def process(self, request: TreeRequest) -> Any:
return f"处理路径节点: {self._segment}"
class RouterTree:
"""路由树"""
def __init__(self):
self._root = TreeNode("root")
def add_route(self, path: str, handler: TreeNode) -> None:
segments = path.strip("/").split("/")
current = self._root
for segment in segments:
found = None
for child in current._children:
if child.name == segment:
found = child
break
if found:
current = found
else:
new_node = PathNode(segment)
current.add_child(new_node)
current = new_node
current.add_child(handler)
def route(self, request: TreeRequest) -> bool:
return self._root.handle(request)
if __name__ == "__main__":
class APIHandler(TreeNode):
def __init__(self, name: str, response: str):
super().__init__(name)
self._response = response
def can_handle(self, request: TreeRequest) -> bool:
return True
def process(self, request: TreeRequest) -> Any:
return self._response
router = RouterTree()
router.add_route("/api/users", APIHandler("UsersHandler", "用户列表"))
router.add_route("/api/users/{id}", APIHandler("UserHandler", "用户详情"))
router.add_route("/api/posts", APIHandler("PostsHandler", "文章列表"))
router.add_route("/api/posts/{id}/comments", APIHandler("CommentsHandler", "评论列表"))
requests = [
TreeRequest("/api/users", None),
TreeRequest("/api/posts", None),
TreeRequest("/api/posts/123/comments", None),
TreeRequest("/api/unknown", None),
]
for req in requests:
matched = router.route(req)
print(f"{req.path}: {'匹配' if matched else '未匹配'} -> {req.result}")13.6 反模式与最佳实践
13.6.1 常见反模式
反模式1:过长的责任链
from typing import Optional, Any
from abc import ABC, abstractmethod
class Handler(ABC):
def __init__(self):
self._next: Optional['Handler'] = None
def set_next(self, handler: 'Handler') -> 'Handler':
self._next = handler
return handler
@abstractmethod
def handle(self, request: Any) -> Any:
pass
class SlowHandler(Handler):
"""错误示例:每个处理器都有延迟"""
def __init__(self, name: str):
super().__init__()
self._name = name
def handle(self, request: Any) -> Any:
import time
time.sleep(0.01)
print(f"[{self._name}] 处理中...")
if self._next:
return self._next.handle(request)
return "完成"
class OptimizedHandler(Handler):
"""正确示例:快速判断,避免不必要的传递"""
def __init__(self, name: str, condition: callable):
super().__init__()
self._name = name
self._condition = condition
def handle(self, request: Any) -> Any:
if not self._condition(request):
if self._next:
return self._next.handle(request)
return None
print(f"[{self._name}] 处理请求")
return f"由 {self._name} 处理"
if __name__ == "__main__":
import time
print("=== 过长责任链(性能问题)===")
slow_chain = SlowHandler("H1")
current = slow_chain
for i in range(2, 11):
current.set_next(SlowHandler(f"H{i}"))
current = current._next
start = time.time()
result = slow_chain.handle("request")
print(f"耗时: {(time.time() - start)*1000:.2f}ms\n")
print("=== 优化后的责任链 ===")
fast_chain = OptimizedHandler("H1", lambda r: r == "target1")
fast_chain.set_next(OptimizerHandler("H2", lambda r: r == "target2"))
start = time.time()
result = fast_chain.handle("target2")
print(f"耗时: {(time.time() - start)*1000:.2f}ms")反模式2:循环链
from typing import Optional, Any
class SafeHandler:
"""安全处理器:防止循环"""
MAX_DEPTH = 100
def __init__(self, name: str):
self._name = name
self._next: Optional['SafeHandler'] = None
def set_next(self, handler: 'SafeHandler') -> 'SafeHandler':
if self._would_create_cycle(handler):
raise ValueError("检测到循环链")
self._next = handler
return handler
def _would_create_cycle(self, handler: 'SafeHandler') -> bool:
current = handler
while current:
if current is self:
return True
current = current._next
return False
def handle(self, request: Any, depth: int = 0) -> Any:
if depth > self.MAX_DEPTH:
raise RuntimeError("责任链深度超过限制,可能存在循环")
print(f"[{self._name}] 处理请求 (深度: {depth})")
if self._next:
return self._next.handle(request, depth + 1)
return "处理完成"
if __name__ == "__main__":
h1 = SafeHandler("H1")
h2 = SafeHandler("H2")
h3 = SafeHandler("H3")
h1.set_next(h2)
h2.set_next(h3)
try:
h3.set_next(h1)
except ValueError as e:
print(f"错误: {e}")
result = h1.handle("test")
print(f"结果: {result}")13.6.2 最佳实践清单
| 实践 | 说明 | 代码示例 |
|---|---|---|
| 限制链长度 | 避免过长的责任链 | MAX_DEPTH = 20 |
| 快速判断 | 处理器快速判断是否能处理 | if not can_handle(): return next |
| 防止循环 | 检测并防止循环链 | 构建时检查 |
| 日志追踪 | 记录请求经过的处理器 | request.trace.append(handler_name) |
| 默认处理 | 链尾添加默认处理器 | DefaultHandler |
| 优先级排序 | 按优先级组织处理器 | handlers.sort(key=priority) |
| 异常处理 | 处理器异常不应中断链 | try-except 包装 |
| 性能监控 | 监控链处理时间 | elapsed = time.time() - start |
13.7 决策指南
13.7.1 是否使用责任链模式?
┌─────────────────────────────────────────────────────────────┐
│ 责任链模式决策树 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 问题:是否有多个对象可能处理同一请求? │
│ │ │
│ ├── 否 ──→ 考虑策略模式 │
│ │ │
│ └── 是 │
│ │ │
│ ▼ │
│ 问题:处理者是否需要在运行时动态确定? │
│ │ │
│ ├── 否 ──→ 考虑硬编码的条件判断 │
│ │ │
│ └── 是 │
│ │ │
│ ▼ │
│ 问题:是否需要解耦发送者和接收者? │
│ │ │
│ ├── 否 ──→ 考虑直接调用 │
│ │ │
│ └── 是 ──→ ✓ 使用责任链模式 │
│ │
└─────────────────────────────────────────────────────────────┘13.7.2 实现技术选择
| 场景 | 推荐实现 | 理由 |
|---|---|---|
| 简单链 | 类继承 + set_next | 直观、易理解 |
| 中间件管道 | 函数 + 闭包 | 灵活、可组合 |
| 异步处理 | async/await | 非阻塞、高性能 |
| 事件系统 | 观察者 + 责任链 | 支持多播 |
| 路由匹配 | 树形结构 | 高效路径查找 |
13.7.3 与其他模式的关系
| 模式 | 关系 | 协作方式 |
|---|---|---|
| 组合 | 结构相似 | 责任链是线性组合 |
| 命令 | 请求封装 | 命令对象作为请求传递 |
| 装饰器 | 链式增强 | 装饰器链类似责任链 |
| 观察者 | 通知机制 | 观察者可组成责任链 |
| 策略 | 算法选择 | 策略选择可由责任链决定 |
13.8 快速参考卡片
┌─────────────────────────────────────────────────────────────────────────┐
│ 责任链模式速查表 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 定义: 使多个对象都有机会处理请求,避免发送者与接收者耦合 │
│ │
│ 核心公式: │
│ Handle(r, i) = process(hi, r) if can_handle(hi, r) else Handle(r, i+1)│
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 参与者: │
│ • Handler → 定义处理接口和后继者 │
│ • ConcreteHandler → 处理负责的请求,传递不负责的 │
│ • Client → 向链首提交请求 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 链类型: │
│ 纯责任链: 一个请求只被一个处理器处理 │
│ 非纯责任链: 多个处理器可依次处理 │
│ 双向链: 前向处理 + 后向处理 │
│ 树形链: 分支处理 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ Python实现要点: │
│ class Handler: │
│ def __init__(self): │
│ self._next = None │
│ │
│ def set_next(self, handler): │
│ self._next = handler │
│ return handler # 支持链式调用 │
│ │
│ def handle(self, request): │
│ if self.can_handle(request): │
│ return self.process(request) │
│ if self._next: │
│ return self._next.handle(request) │
│ return None │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 适用场景: │
│ ✓ 有多个对象可以处理请求 │
│ ✓ 处理者在运行时确定 │
│ ✓ 需要解耦发送者和接收者 │
│ ✓ 需要动态配置处理流程 │
│ │
│ 不适用场景: │
│ ✗ 处理者固定且数量少 │
│ ✗ 性能敏感的热点代码 │
│ ✗ 请求必须被所有处理器处理 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 经典案例: │
│ • 审批流程 │
│ • HTTP中间件 │
│ • 事件处理系统 │
│ • 异常处理链 │
│ • 日志处理器链 │
│ │
└─────────────────────────────────────────────────────────────────────────┘13.9 思考与实践
13.9.1 思考题
概念辨析:责任链模式与装饰器模式都使用链式结构,它们的核心区别是什么?什么情况下两者可以结合使用?
性能优化:当责任链很长时,如何优化性能?考虑使用跳表、索引或缓存等技术。
循环检测:如何在不增加额外空间的情况下检测责任链中的循环?
中断与继续:设计一个支持"中断后继续"的责任链,即某个处理器可以中断处理,但允许后续处理器继续执行。
分布式责任链:如何将责任链模式扩展到分布式环境?考虑服务间通信和容错机制。
13.9.2 实践练习
练习1:实现可配置的审批流程
设计一个审批流程系统,支持:
- 从配置文件加载审批链
- 动态添加/移除审批人
- 条件分支(根据金额、类型等选择不同审批路径)
- 审批历史记录和回溯
练习2:实现Servlet风格的过滤器链
设计一个Web过滤器框架,支持:
- 过滤器注册和排序
- 请求/响应双向处理
- 异步过滤器支持
- 过滤器生命周期管理
练习3:实现事件总线
设计一个事件总线系统,支持:
- 多层级事件处理
- 事件优先级
- 事件传播控制(冒泡/捕获)
- 异步事件处理
13.10 小结
责任链模式是一种行为型设计模式,通过将处理器组织成链式结构,实现请求的灵活处理和发送者与接收者的解耦。本章深入探讨了:
- 理论基础:责任链模式的形式化定义、纯与非纯责任链的区别
- 实现技术:ABC、Protocol、异步责任链、泛型责任链等多种Python实现方式
- 企业应用:审批流程系统、Web中间件管道、事件处理系统等实际案例
- 模式变体:双向责任链、树形责任链等扩展形式
- 最佳实践:避免过长责任链、循环链等反模式,掌握性能优化技巧
责任链模式的核心价值在于:通过链式结构实现请求处理的灵活性和可扩展性,同时保持发送者和接收者的解耦。
参考资料
- Gamma, E., et al. Design Patterns: Elements of Reusable Object-Oriented Software. Addison-Wesley, 1994.
- Schmidt, D., et al. Pattern-Oriented Software Architecture. Wiley, 1996.
- Fowler, M. Patterns of Enterprise Application Architecture. Addison-Wesley, 2002.
- Python Documentation. abc — Abstract Base Classes. https://docs.python.org/3/library/abc.html
- Django Documentation. Middleware. https://docs.djangoproject.com/en/stable/topics/http/middleware/