第22章 模板方法模式
学习目标
完成本章学习后,读者将能够:
- 理解模板方法模式的核心概念和形式化定义
- 掌握算法骨架的定义方法与不变性保证
- 实现可扩展的算法框架与钩子方法机制
- 识别模板方法模式的适用场景与局限性
- 应用模板方法模式解决实际软件设计问题
22.1 模式定义
22.1.1 核心定义
模板方法模式(Template Method Pattern) 是一种行为型设计模式,它定义一个操作中的算法骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重定义该算法的某些特定步骤。
22.1.2 形式化定义
从形式化角度,模板方法模式可以定义为一个四元组:
$$\mathcal{T} = \langle A, S, \sigma, \phi \rangle$$
其中:
- $A$ 是抽象类(Abstract Class),定义算法骨架
- $S = {s_1, s_2, \ldots, s_n}$ 是步骤集合(Step Set)
- $\sigma: S \rightarrow {concrete, abstract, hook}$ 是步骤类型函数
- $\phi: S^* \rightarrow Algorithm$ 是算法组合函数
步骤类型定义:
$$\sigma(s) = \begin{cases} concrete & \text{如果 } s \text{ 是具体步骤(父类实现)} \ abstract & \text{如果 } s \text{ 是抽象步骤(子类必须实现)} \ hook & \text{如果 } s \text{ 是钩子步骤(子类可选实现)} \end{cases}$$
算法骨架定义:
模板方法 $T$ 是一个有序步骤序列:
$$T = \langle s_1, s_2, \ldots, s_n \rangle$$
其中执行顺序由 $\phi$ 确定,且满足不变性约束:
$$\forall c \in ConcreteClasses: execute(T, c) = \phi(s_1, s_2, \ldots, s_n)$$
扩展点定义:
扩展点 $E$ 是子类可以定制的步骤集合:
$$E = {s \in S \mid \sigma(s) \in {abstract, hook}}$$
不变性定理:
对于任意具体类 $c_1, c_2$:
$$structure(T, c_1) = structure(T, c_2)$$
即所有具体类的算法结构相同,只有具体步骤的实现不同。
22.1.3 设计原则体现
模板方法模式体现了以下设计原则:
| 设计原则 | 体现方式 |
|---|---|
| 好莱坞原则 | "不要调用我们,我们会调用你"——父类调用子类方法 |
| 开闭原则 | 对扩展开放(新增子类),对修改关闭(算法骨架不变) |
| 单一职责原则 | 父类负责算法结构,子类负责具体步骤实现 |
| 里氏替换原则 | 子类可以替换父类而不影响算法执行 |
22.2 历史背景与演进
22.2.1 历史发展
| 年代 | 里程碑 | 描述 |
|---|---|---|
| 1970s | 框架起源 | 框架编程概念出现,模板方法作为核心机制 |
| 1985 | Hollywood Principle | Hollywood原则被正式提出 |
| 1994 | GoF经典 | 《设计模式》将模板方法列为经典模式 |
| 1995 | Hollywood Template | Martin Fowler提出Hollywood Template概念 |
| 2000s | 框架应用 | Spring、JUnit等框架大量使用模板方法 |
| 2010s | 函数式变体 | 函数式编程带来新的实现方式 |
| 2020s | 现代实践 | 与依赖注入、组合模式结合使用 |
22.2.2 理论基础
模板方法模式的理论基础来源于:
- 框架理论:框架定义了应用的骨架,开发者填充具体实现
- 控制反转(IoC):控制流从框架流向应用代码
- 继承与多态:利用继承实现代码复用,多态实现行为变化
22.3 UML结构图
22.3.1 标准结构
┌─────────────────────────────────────┐
│ <<abstract>> │
│ AbstractClass │
├─────────────────────────────────────┤
│ + template_method() │
│ # primitive_operation1() : abstract │
│ # primitive_operation2() : abstract │
│ # concrete_operation() │
│ # hook() │
└───────────────┬─────────────────────┘
│
│ extends
│
┌───────────┴───────────┐
│ │
┌───┴───────────┐ ┌───────┴───────┐
│ ConcreteClass1│ │ ConcreteClass2│
├───────────────┤ ├───────────────┤
│ # primitive_ │ │ # primitive_ │
│ operation1()│ │ operation1()│
│ # primitive_ │ │ # primitive_ │
│ operation2()│ │ operation2()│
│ # hook() │ │ │
└───────────────┘ └───────────────┘22.3.2 方法类型详解
┌────────────────────────────────────────────────────────────────┐
│ AbstractClass │
├────────────────────────────────────────────────────────────────┤
│ │
│ template_method(): ← 算法骨架(final,不可覆盖) │
│ primitive_operation1() ← 抽象方法(子类必须实现) │
│ concrete_operation() ← 具体方法(子类继承) │
│ hook() ← 钩子方法(子类可选覆盖) │
│ primitive_operation2() ← 抽象方法(子类必须实现) │
│ │
├────────────────────────────────────────────────────────────────┤
│ 方法类型说明: │
│ │
│ ┌──────────────┬─────────────────────────────────────────────┐ │
│ │ 抽象方法 │ @abstractmethod,子类必须实现 │ │
│ ├──────────────┼─────────────────────────────────────────────┤ │
│ │ 具体方法 │ 父类提供默认实现,子类继承使用 │ │
│ ├──────────────┼─────────────────────────────────────────────┤ │
│ │ 钩子方法 │ 空实现或默认实现,子类可选覆盖 │ │
│ ├──────────────┼─────────────────────────────────────────────┤ │
│ │ 模板方法 │ 调用其他方法组成算法,建议final │ │
│ └──────────────┴─────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘22.3.3 执行流程图
┌─────────────────────────────────────────────────────────────────┐
│ template_method() 执行流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client │
│ │ │
│ │ template_method() │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AbstractClass │ │
│ │ │ │
│ │ Step 1: base_operation() ← 具体方法(父类实现) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 2: required_operation1() ← 抽象方法(子类实现) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 3: hook1() ← 钩子方法(可选覆盖) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 4: required_operation2() ← 抽象方法(子类实现) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Step 5: hook2() ← 钩子方法(可选覆盖) │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘22.4 Python实现
22.4.1 基础实现
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from functools import wraps
class AbstractClass(ABC):
"""
抽象类:定义算法骨架和基本方法
"""
def template_method(self) -> None:
"""
模板方法:定义算法骨架
子类不应覆盖此方法(Python中无法强制final,但约定不覆盖)
"""
self.base_operation1()
self.required_operation1()
self.base_operation2()
self.hook1()
self.required_operation2()
self.base_operation3()
self.hook2()
def base_operation1(self) -> None:
"""具体方法:基础操作1"""
print("AbstractClass: 基础操作1")
def base_operation2(self) -> None:
"""具体方法:基础操作2"""
print("AbstractClass: 基础操作2")
def base_operation3(self) -> None:
"""具体方法:基础操作3"""
print("AbstractClass: 基础操作3")
@abstractmethod
def required_operation1(self) -> None:
"""抽象方法:必须由子类实现"""
pass
@abstractmethod
def required_operation2(self) -> None:
"""抽象方法:必须由子类实现"""
pass
def hook1(self) -> None:
"""钩子方法:子类可选覆盖"""
pass
def hook2(self) -> None:
"""钩子方法:子类可选覆盖"""
pass
class ConcreteClass1(AbstractClass):
"""具体类1:实现所有抽象方法"""
def required_operation1(self) -> None:
print("ConcreteClass1: 实现操作1")
def required_operation2(self) -> None:
print("ConcreteClass1: 实现操作2")
class ConcreteClass2(AbstractClass):
"""具体类2:实现抽象方法并覆盖钩子"""
def required_operation1(self) -> None:
print("ConcreteClass2: 实现操作1")
def required_operation2(self) -> None:
print("ConcreteClass2: 实现操作2")
def hook1(self) -> None:
print("ConcreteClass2: 覆盖钩子1")
def hook2(self) -> None:
print("ConcreteClass2: 覆盖钩子2")
print("=== ConcreteClass1 执行 ===")
obj1 = ConcreteClass1()
obj1.template_method()
print("\n=== ConcreteClass2 执行 ===")
obj2 = ConcreteClass2()
obj2.template_method()22.4.2 带返回值的模板方法
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, List, Any
T = TypeVar('T')
R = TypeVar('R')
class TemplateWithResult(ABC, Generic[T, R]):
"""
支持返回值的模板方法
"""
def execute(self, data: T) -> R:
"""
模板方法:处理数据并返回结果
"""
preprocessed = self.preprocess(data)
validated = self.validate(preprocessed)
processed = self.process(validated)
result = self.postprocess(processed)
self.cleanup()
return result
def preprocess(self, data: T) -> T:
"""预处理:默认不做任何处理"""
return data
@abstractmethod
def validate(self, data: T) -> T:
"""验证:子类必须实现"""
pass
@abstractmethod
def process(self, data: T) -> R:
"""处理:子类必须实现"""
pass
def postprocess(self, result: R) -> R:
"""后处理:默认不做任何处理"""
return result
def cleanup(self) -> None:
"""清理:默认不做任何处理"""
pass
class NumberProcessor(TemplateWithResult[List[int], int]):
"""数字列表处理器:计算总和"""
def validate(self, data: List[int]) -> List[int]:
if not data:
raise ValueError("数据列表不能为空")
return data
def process(self, data: List[int]) -> int:
return sum(data)
def postprocess(self, result: int) -> int:
print(f"计算完成,结果: {result}")
return result
class StringProcessor(TemplateWithResult[str, dict]):
"""字符串处理器:统计字符频率"""
def preprocess(self, data: str) -> str:
return data.lower().strip()
def validate(self, data: str) -> str:
if not data:
raise ValueError("字符串不能为空")
return data
def process(self, data: str) -> dict:
freq = {}
for char in data:
freq[char] = freq.get(char, 0) + 1
return freq
def cleanup(self) -> None:
print("字符串处理完成,资源已释放")
num_processor = NumberProcessor()
result = num_processor.execute([1, 2, 3, 4, 5])
print(f"最终结果: {result}")
print()
str_processor = StringProcessor()
freq = str_processor.execute(" Hello World ")
print(f"字符频率: {freq}")22.4.3 函数式模板方法
from typing import Callable, TypeVar, Any, Optional
from functools import wraps
from dataclasses import dataclass
T = TypeVar('T')
R = TypeVar('R')
@dataclass
class TemplateStep:
"""模板步骤定义"""
name: str
required: bool = True
default: Optional[Callable] = None
class FunctionalTemplate:
"""
函数式模板方法:使用组合代替继承
"""
def __init__(
self,
steps: list[TemplateStep],
implementations: Optional[dict[str, Callable]] = None
):
self.steps = steps
self.implementations = implementations or {}
self._validate_implementations()
def _validate_implementations(self) -> None:
"""验证所有必需步骤都有实现"""
for step in self.steps:
if step.required:
if step.name not in self.implementations:
if step.default is None:
raise ValueError(
f"必需步骤 '{step.name}' 缺少实现"
)
def execute(self, *args, **kwargs) -> Any:
"""执行模板方法"""
result = None
context = {'args': args, 'kwargs': kwargs, 'result': None}
for step in self.steps:
impl = self.implementations.get(step.name, step.default)
if impl is not None:
result = impl(context)
context['result'] = result
context[step.name] = result
return result
def with_implementation(
self,
step_name: str,
implementation: Callable
) -> 'FunctionalTemplate':
"""添加步骤实现(返回新实例)"""
new_impls = self.implementations.copy()
new_impls[step_name] = implementation
return FunctionalTemplate(self.steps, new_impls)
def create_data_processing_template() -> FunctionalTemplate:
"""创建数据处理模板"""
steps = [
TemplateStep('parse', required=True),
TemplateStep('validate', required=False, default=lambda ctx: ctx['result']),
TemplateStep('transform', required=True),
TemplateStep('output', required=False, default=lambda ctx: str(ctx['result'])),
]
return FunctionalTemplate(steps)
def parse_json(context: dict) -> dict:
import json
return json.loads(context['args'][0])
def validate_data(context: dict) -> dict:
data = context['result']
if not data:
raise ValueError("数据为空")
return data
def transform_upper(context: dict) -> dict:
data = context['result']
return {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}
def output_json(context: dict) -> str:
import json
return json.dumps(context['result'], ensure_ascii=False)
json_template = (
create_data_processing_template()
.with_implementation('parse', parse_json)
.with_implementation('validate', validate_data)
.with_implementation('transform', transform_upper)
.with_implementation('output', output_json)
)
result = json_template.execute('{"name": "张三", "city": "北京"}')
print(f"处理结果: {result}")22.4.4 钩子方法详解
from abc import ABC, abstractmethod
from typing import Optional, List, Any
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TemplateWithHooks(ABC):
"""
展示各种钩子方法的使用
"""
def execute(self, *args, **kwargs) -> Any:
"""
主模板方法,包含完整的生命周期钩子
"""
self._on_start()
try:
if self._should_execute(*args, **kwargs):
self._pre_execute(*args, **kwargs)
result = self._do_execute(*args, **kwargs)
result = self._post_execute(result)
self._on_success(result)
return result
else:
self._on_skip()
return None
except Exception as e:
self._on_error(e)
raise
finally:
self._on_end()
@abstractmethod
def _do_execute(self, *args, **kwargs) -> Any:
"""核心执行逻辑:子类必须实现"""
pass
def _should_execute(self, *args, **kwargs) -> bool:
"""钩子:是否执行(默认True)"""
return True
def _on_start(self) -> None:
"""钩子:开始时调用"""
logger.info(f"[{datetime.now()}] 开始执行")
def _pre_execute(self, *args, **kwargs) -> None:
"""钩子:执行前调用"""
pass
def _post_execute(self, result: Any) -> Any:
"""钩子:执行后调用"""
return result
def _on_success(self, result: Any) -> None:
"""钩子:成功时调用"""
logger.info(f"[{datetime.now()}] 执行成功")
def _on_error(self, error: Exception) -> None:
"""钩子:错误时调用"""
logger.error(f"[{datetime.now()}] 执行失败: {error}")
def _on_skip(self) -> None:
"""钩子:跳过时调用"""
logger.info(f"[{datetime.now()}] 执行被跳过")
def _on_end(self) -> None:
"""钩子:结束时调用(无论成功或失败)"""
logger.info(f"[{datetime.now()}] 执行结束")
class ConditionalProcessor(TemplateWithHooks):
"""
条件处理器:演示条件钩子
"""
def __init__(self, threshold: int = 10):
self.threshold = threshold
self._skip = False
def _should_execute(self, *args, **kwargs) -> bool:
"""当数据量小于阈值时跳过"""
data = args[0] if args else []
self._skip = len(data) < self.threshold
return not self._skip
def _do_execute(self, *args, **kwargs) -> Any:
data = args[0]
return sum(data) / len(data)
def _on_skip(self) -> None:
print(f"数据量不足 {self.threshold} 条,跳过处理")
class RetryProcessor(TemplateWithHooks):
"""
重试处理器:演示错误处理钩子
"""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_count = 0
def _do_execute(self, *args, **kwargs) -> Any:
import random
if random.random() < 0.7:
raise RuntimeError("随机失败")
return "处理成功"
def _on_error(self, error: Exception) -> None:
self.retry_count += 1
if self.retry_count <= self.max_retries:
print(f"第 {self.retry_count} 次重试...")
try:
result = self._do_execute(*self._last_args, **self._last_kwargs)
self._on_success(result)
return result
except Exception as e:
self._on_error(e)
else:
print(f"超过最大重试次数 {self.max_retries}")
def _pre_execute(self, *args, **kwargs) -> None:
self._last_args = args
self._last_kwargs = kwargs
print("=== 条件处理器测试 ===")
processor = ConditionalProcessor(threshold=5)
processor.execute([1, 2, 3])
processor.execute([1, 2, 3, 4, 5, 6, 7])22.5 企业级应用示例
22.5.1 ETL数据处理管道
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Any, Optional, Iterator
from dataclasses import dataclass, field
from datetime import datetime
import logging
import json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
T = TypeVar('T')
U = TypeVar('U')
@dataclass
class ETLContext:
"""ETL执行上下文"""
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
records_processed: int = 0
records_failed: int = 0
errors: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class ETLPipeline(ABC, Generic[T, U]):
"""
ETL管道模板:Extract-Transform-Load
"""
def __init__(self, name: str):
self.name = name
self._context = ETLContext()
def run(self, source: Any) -> list[U]:
"""
执行ETL管道
"""
logger.info(f"[{self.name}] 开始ETL管道")
self._context = ETLContext()
results = []
try:
self._pre_extract(source)
for record in self._extract(source):
try:
if self._validate(record):
transformed = self._transform(record)
self._load(transformed)
results.append(transformed)
self._context.records_processed += 1
else:
self._context.records_failed += 1
except Exception as e:
self._context.records_failed += 1
self._context.errors.append(str(e))
self._on_record_error(record, e)
self._post_load()
except Exception as e:
self._context.errors.append(f"Pipeline error: {e}")
raise
finally:
self._context.end_time = datetime.now()
self._log_summary()
return results
@abstractmethod
def _extract(self, source: Any) -> Iterator[T]:
"""提取数据:子类必须实现"""
pass
@abstractmethod
def _transform(self, record: T) -> U:
"""转换数据:子类必须实现"""
pass
@abstractmethod
def _load(self, record: U) -> None:
"""加载数据:子类必须实现"""
pass
def _pre_extract(self, source: Any) -> None:
"""钩子:提取前准备"""
pass
def _validate(self, record: T) -> bool:
"""钩子:验证记录(默认通过)"""
return True
def _post_load(self) -> None:
"""钩子:加载后处理"""
pass
def _on_record_error(self, record: T, error: Exception) -> None:
"""钩子:记录处理错误"""
logger.warning(f"记录处理失败: {error}")
def _log_summary(self) -> None:
"""记录执行摘要"""
duration = (self._context.end_time - self._context.start_time).total_seconds()
logger.info(
f"[{self.name}] 完成: "
f"处理 {self._context.records_processed} 条, "
f"失败 {self._context.records_failed} 条, "
f"耗时 {duration:.2f}秒"
)
@dataclass
class UserRecord:
"""用户记录"""
user_id: str
name: str
email: str
created_at: str
@dataclass
class UserDocument:
"""用户文档(转换后)"""
id: str
display_name: str
contact_email: str
registration_date: datetime
status: str
class UserETLPipeline(ETLPipeline[UserRecord, UserDocument]):
"""
用户数据ETL管道
"""
def __init__(self):
super().__init__("UserETL")
self._loaded_count = 0
def _extract(self, source: list[dict]) -> Iterator[UserRecord]:
"""从源数据提取用户记录"""
for item in source:
yield UserRecord(
user_id=item.get('id', ''),
name=item.get('name', ''),
email=item.get('email', ''),
created_at=item.get('created_at', '')
)
def _validate(self, record: UserRecord) -> bool:
"""验证用户记录"""
if not record.user_id or not record.email:
return False
if '@' not in record.email:
return False
return True
def _transform(self, record: UserRecord) -> UserDocument:
"""转换用户记录"""
return UserDocument(
id=record.user_id,
display_name=record.name.title(),
contact_email=record.email.lower(),
registration_date=datetime.fromisoformat(record.created_at),
status='active'
)
def _load(self, document: UserDocument) -> None:
"""加载用户文档"""
self._loaded_count += 1
logger.debug(f"加载用户: {document.id}")
def _post_load(self) -> None:
"""加载后处理"""
logger.info(f"共加载 {self._loaded_count} 个用户文档")
source_data = [
{'id': '001', 'name': 'john doe', 'email': 'JOHN@EXAMPLE.COM', 'created_at': '2024-01-15'},
{'id': '002', 'name': 'jane smith', 'email': 'jane@example.com', 'created_at': '2024-02-20'},
{'id': '', 'name': 'invalid', 'email': 'no-email', 'created_at': '2024-03-01'},
{'id': '003', 'name': 'bob wilson', 'email': 'bob@test.com', 'created_at': '2024-03-10'},
]
pipeline = UserETLPipeline()
results = pipeline.run(source_data)
print(f"\n成功处理 {len(results)} 条记录")
for doc in results:
print(f" - {doc.display_name} ({doc.contact_email})")22.5.2 Web请求处理器
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import time
T = TypeVar('T')
R = TypeVar('R')
class HttpMethod(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
@dataclass
class HttpRequest:
"""HTTP请求"""
method: HttpMethod
path: str
headers: dict[str, str]
body: Optional[str] = None
query_params: dict[str, str] = None
def __post_init__(self):
if self.query_params is None:
self.query_params = {}
@dataclass
class HttpResponse:
"""HTTP响应"""
status_code: int
body: str
headers: dict[str, str] = None
def __post_init__(self):
if self.headers is None:
self.headers = {}
class RequestHandler(ABC, Generic[T, R]):
"""
请求处理器模板
"""
def handle(self, request: HttpRequest) -> HttpResponse:
"""
处理请求的模板方法
"""
start_time = time.time()
try:
if not self._authenticate(request):
return self._unauthorized_response()
if not self._authorize(request):
return self._forbidden_response()
parsed = self._parse_request(request)
validated = self._validate(parsed)
if not validated:
return self._bad_request_response("验证失败")
result = self._process(parsed)
response = self._build_response(result)
self._log_request(request, response, time.time() - start_time)
return response
except ValueError as e:
return self._bad_request_response(str(e))
except Exception as e:
return self._internal_error_response(str(e))
@abstractmethod
def _parse_request(self, request: HttpRequest) -> T:
"""解析请求:子类实现"""
pass
@abstractmethod
def _process(self, data: T) -> R:
"""处理业务:子类实现"""
pass
def _authenticate(self, request: HttpRequest) -> bool:
"""钩子:认证(默认通过)"""
auth_header = request.headers.get('Authorization', '')
return auth_header.startswith('Bearer ')
def _authorize(self, request: HttpRequest) -> bool:
"""钩子:授权(默认通过)"""
return True
def _validate(self, data: T) -> bool:
"""钩子:验证(默认通过)"""
return True
def _build_response(self, result: R) -> HttpResponse:
"""构建响应"""
return HttpResponse(
status_code=200,
body=json.dumps(result, ensure_ascii=False, default=str),
headers={'Content-Type': 'application/json'}
)
def _log_request(
self,
request: HttpRequest,
response: HttpResponse,
duration: float
) -> None:
"""钩子:记录请求日志"""
print(
f"[{request.method.value}] {request.path} "
f"-> {response.status_code} ({duration*1000:.2f}ms)"
)
def _unauthorized_response(self) -> HttpResponse:
return HttpResponse(401, json.dumps({'error': '未授权'}))
def _forbidden_response(self) -> HttpResponse:
return HttpResponse(403, json.dumps({'error': '禁止访问'}))
def _bad_request_response(self, message: str) -> HttpResponse:
return HttpResponse(400, json.dumps({'error': message}))
def _internal_error_response(self, message: str) -> HttpResponse:
return HttpResponse(500, json.dumps({'error': '服务器错误'}))
@dataclass
class UserData:
name: str
email: str
age: int
class UserCreateHandler(RequestHandler[UserData, dict]):
"""用户创建处理器"""
def _parse_request(self, request: HttpRequest) -> UserData:
if not request.body:
raise ValueError("请求体不能为空")
data = json.loads(request.body)
return UserData(
name=data['name'],
email=data['email'],
age=data['age']
)
def _validate(self, data: UserData) -> bool:
if not data.name or len(data.name) < 2:
return False
if '@' not in data.email:
return False
if data.age < 0 or data.age > 150:
return False
return True
def _process(self, data: UserData) -> dict:
return {
'id': 'user_001',
'name': data.name,
'email': data.email,
'age': data.age,
'created': True
}
handler = UserCreateHandler()
valid_request = HttpRequest(
method=HttpMethod.POST,
path='/api/users',
headers={
'Authorization': 'Bearer token123',
'Content-Type': 'application/json'
},
body='{"name": "张三", "email": "zhangsan@example.com", "age": 25}'
)
response = handler.handle(valid_request)
print(f"状态码: {response.status_code}")
print(f"响应: {response.body}")22.5.3 文档生成器
from abc import ABC, abstractmethod
from typing import Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class DocumentFormat(Enum):
PDF = "pdf"
HTML = "html"
MARKDOWN = "markdown"
DOCX = "docx"
@dataclass
class DocumentMetadata:
"""文档元数据"""
title: str
author: str
created_at: datetime = field(default_factory=datetime.now)
version: str = "1.0"
keywords: list[str] = field(default_factory=list)
@dataclass
class Document:
"""生成的文档"""
metadata: DocumentMetadata
content: str
format: DocumentFormat
size_bytes: int = 0
class DocumentGenerator(ABC):
"""
文档生成器模板
"""
def generate(
self,
data: Any,
metadata: DocumentMetadata,
format: DocumentFormat = DocumentFormat.PDF
) -> Document:
"""
生成文档的模板方法
"""
self._validate_data(data)
content_parts = []
content_parts.append(self._generate_header(metadata))
content_parts.append(self._generate_toc(data))
content_parts.append(self._generate_body(data))
content_parts.append(self._generate_footer(metadata))
full_content = self._combine_content(content_parts)
formatted_content = self._format_output(full_content, format)
document = Document(
metadata=metadata,
content=formatted_content,
format=format,
size_bytes=len(formatted_content.encode('utf-8'))
)
self._post_generate(document)
return document
@abstractmethod
def _generate_body(self, data: Any) -> str:
"""生成正文:子类必须实现"""
pass
def _validate_data(self, data: Any) -> None:
"""钩子:验证数据"""
if data is None:
raise ValueError("数据不能为空")
def _generate_header(self, metadata: DocumentMetadata) -> str:
"""钩子:生成页眉"""
return f"# {metadata.title}\n\n"
def _generate_toc(self, data: Any) -> str:
"""钩子:生成目录(默认空)"""
return ""
def _generate_footer(self, metadata: DocumentMetadata) -> str:
"""钩子:生成页脚"""
return f"\n\n---\n生成时间: {metadata.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n作者: {metadata.author}"
def _combine_content(self, parts: list[str]) -> str:
"""组合内容"""
return "\n".join(parts)
def _format_output(self, content: str, format: DocumentFormat) -> str:
"""钩子:格式化输出"""
return content
def _post_generate(self, document: Document) -> None:
"""钩子:生成后处理"""
print(f"文档生成完成: {document.metadata.title} ({document.size_bytes} bytes)")
@dataclass
class ReportData:
"""报表数据"""
sections: list[dict]
summary: dict
class ReportGenerator(DocumentGenerator):
"""报表生成器"""
def _generate_toc(self, data: ReportData) -> str:
toc = "## 目录\n\n"
for i, section in enumerate(data.sections, 1):
toc += f"{i}. {section['title']}\n"
return toc + "\n"
def _generate_body(self, data: ReportData) -> str:
body = ""
for section in data.sections:
body += f"## {section['title']}\n\n"
body += f"{section['content']}\n\n"
if 'items' in section:
for item in section['items']:
body += f"- {item}\n"
body += "\n"
body += "## 总结\n\n"
for key, value in data.summary.items():
body += f"- **{key}**: {value}\n"
return body
def _format_output(self, content: str, format: DocumentFormat) -> str:
if format == DocumentFormat.HTML:
lines = content.split('\n')
html_lines = []
for line in lines:
if line.startswith('# '):
html_lines.append(f'<h1>{line[2:]}</h1>')
elif line.startswith('## '):
html_lines.append(f'<h2>{line[3:]}</h2>')
elif line.startswith('- '):
html_lines.append(f'<li>{line[2:]}</li>')
else:
html_lines.append(f'<p>{line}</p>')
return '\n'.join(html_lines)
return content
report_data = ReportData(
sections=[
{
'title': '项目概述',
'content': '本项目旨在提升系统性能。',
'items': ['性能优化', '代码重构', '测试覆盖']
},
{
'title': '实施计划',
'content': '分三个阶段实施。',
'items': ['第一阶段:分析', '第二阶段:开发', '第三阶段:部署']
}
],
summary={
'预计工期': '3个月',
'预算': '50万元',
'风险等级': '中'
}
)
metadata = DocumentMetadata(
title="项目实施报告",
author="项目经理",
keywords=["项目", "报告", "实施"]
)
generator = ReportGenerator()
doc = generator.generate(report_data, metadata, DocumentFormat.MARKDOWN)
print("\n" + "="*50)
print(doc.content)22.6 模式变体与扩展
22.6.1 策略化模板方法
from abc import ABC, abstractmethod
from typing import Callable, Any, Optional
from dataclasses import dataclass
@dataclass
class StepConfig:
"""步骤配置"""
name: str
strategy: Callable
is_required: bool = True
class StrategyTemplate:
"""
策略化模板方法:将步骤实现作为策略注入
"""
def __init__(
self,
steps: list[StepConfig],
hooks: Optional[dict[str, Callable]] = None
):
self.steps = steps
self.hooks = hooks or {}
def execute(self, *args, **kwargs) -> Any:
"""执行模板"""
self._call_hook('on_start')
context = {'args': args, 'kwargs': kwargs, 'results': {}}
result = None
try:
for step in self.steps:
step_result = step.strategy(context)
context['results'][step.name] = step_result
context['last_result'] = step_result
result = step_result
self._call_hook('on_success', result)
return result
except Exception as e:
self._call_hook('on_error', e)
raise
finally:
self._call_hook('on_end')
def _call_hook(self, hook_name: str, *args) -> None:
if hook_name in self.hooks:
self.hooks[hook_name](*args)
def create_api_template() -> StrategyTemplate:
"""创建API处理模板"""
def parse_request(ctx: dict) -> dict:
return {'data': ctx['kwargs'].get('data', {})}
def validate_data(ctx: dict) -> dict:
data = ctx['last_result']['data']
if not data:
raise ValueError("数据为空")
return data
def process_data(ctx: dict) -> dict:
data = ctx['last_result']
return {'processed': True, 'data': data}
def format_response(ctx: dict) -> dict:
return {
'status': 'success',
'result': ctx['last_result']
}
steps = [
StepConfig('parse', parse_request),
StepConfig('validate', validate_data),
StepConfig('process', process_data),
StepConfig('format', format_response),
]
hooks = {
'on_start': lambda: print("API处理开始"),
'on_success': lambda r: print(f"API处理成功: {r}"),
'on_error': lambda e: print(f"API处理失败: {e}"),
'on_end': lambda: print("API处理结束"),
}
return StrategyTemplate(steps, hooks)
api_template = create_api_template()
result = api_template.execute(data={'name': '测试'})22.6.2 组合模板方法
from typing import Protocol, Any, Callable
from dataclasses import dataclass, field
class TemplateStep(Protocol):
"""模板步骤协议"""
def execute(self, context: dict) -> Any:
...
@dataclass
class SimpleStep:
"""简单步骤"""
name: str
action: Callable
required: bool = True
def execute(self, context: dict) -> Any:
return self.action(context)
@dataclass
class ConditionalStep:
"""条件步骤"""
name: str
condition: Callable[[dict], bool]
true_step: TemplateStep
false_step: TemplateStep | None = None
def execute(self, context: dict) -> Any:
if self.condition(context):
return self.true_step.execute(context)
elif self.false_step:
return self.false_step.execute(context)
return None
@dataclass
class LoopStep:
"""循环步骤"""
name: str
get_items: Callable[[dict], list]
step: TemplateStep
def execute(self, context: dict) -> list:
items = self.get_items(context)
results = []
for item in items:
context['current_item'] = item
result = self.step.execute(context)
results.append(result)
return results
@dataclass
class CompositeTemplate:
"""组合模板"""
name: str
steps: list[TemplateStep] = field(default_factory=list)
def add_step(self, step: TemplateStep) -> 'CompositeTemplate':
self.steps.append(step)
return self
def execute(self, initial_context: dict | None = None) -> dict:
context = initial_context or {}
context['results'] = {}
for step in self.steps:
result = step.execute(context)
context['results'][step.name] = result
context['last_result'] = result
return context
def create_order_processing_template() -> CompositeTemplate:
"""创建订单处理组合模板"""
def validate_order(ctx: dict) -> bool:
order = ctx.get('order', {})
return order.get('items') and order.get('customer')
def check_inventory(ctx: dict) -> dict:
order = ctx['order']
available = []
for item in order['items']:
available.append({'sku': item['sku'], 'available': True})
return {'items': available}
def reserve_inventory(ctx: dict) -> dict:
return {'reserved': True}
def calculate_total(ctx: dict) -> float:
order = ctx['order']
return sum(item.get('price', 0) * item.get('quantity', 1) for item in order['items'])
def process_payment(ctx: dict) -> dict:
return {'paid': True, 'transaction_id': 'TXN001'}
def confirm_order(ctx: dict) -> dict:
return {'confirmed': True, 'order_id': 'ORD001'}
def notify_customer(ctx: dict) -> dict:
return {'notified': True}
template = CompositeTemplate('OrderProcessing')
template.add_step(SimpleStep('validate', lambda ctx: validate_order(ctx)))
template.add_step(SimpleStep('check_inventory', check_inventory))
template.add_step(SimpleStep('reserve_inventory', reserve_inventory))
template.add_step(SimpleStep('calculate_total', calculate_total))
template.add_step(SimpleStep('process_payment', process_payment))
template.add_step(SimpleStep('confirm_order', confirm_order))
template.add_step(SimpleStep('notify_customer', notify_customer))
return template
order_template = create_order_processing_template()
order_data = {
'order': {
'customer': 'CUST001',
'items': [
{'sku': 'ITEM001', 'price': 100, 'quantity': 2},
{'sku': 'ITEM002', 'price': 50, 'quantity': 1},
]
}
}
result = order_template.execute(order_data)
print(f"订单处理结果: {result['results']}")22.6.3 异步模板方法
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Optional
from datetime import datetime
class AsyncTemplate(ABC):
"""
异步模板方法基类
"""
async def execute(self, *args, **kwargs) -> Any:
"""异步模板方法"""
await self._pre_execute(*args, **kwargs)
try:
result = await self._do_execute(*args, **kwargs)
result = await self._post_execute(result)
return result
except Exception as e:
await self._on_error(e)
raise
@abstractmethod
async def _do_execute(self, *args, **kwargs) -> Any:
"""核心执行逻辑"""
pass
async def _pre_execute(self, *args, **kwargs) -> None:
"""钩子:执行前"""
pass
async def _post_execute(self, result: Any) -> Any:
"""钩子:执行后"""
return result
async def _on_error(self, error: Exception) -> None:
"""钩子:错误处理"""
print(f"执行错误: {error}")
class AsyncDataProcessor(AsyncTemplate):
"""异步数据处理器"""
def __init__(self, name: str):
self.name = name
self._start_time: Optional[datetime] = None
async def _pre_execute(self, *args, **kwargs) -> None:
self._start_time = datetime.now()
print(f"[{self.name}] 开始处理...")
async def _do_execute(self, *args, **kwargs) -> Any:
data = args[0] if args else []
results = []
for item in data:
result = await self._process_item(item)
results.append(result)
return results
async def _process_item(self, item: Any) -> Any:
"""处理单个项目"""
await asyncio.sleep(0.1)
return {'processed': item, 'timestamp': datetime.now().isoformat()}
async def _post_execute(self, result: Any) -> Any:
duration = (datetime.now() - self._start_time).total_seconds()
print(f"[{self.name}] 处理完成,耗时 {duration:.2f}秒")
return result
async def main():
processor = AsyncDataProcessor("AsyncProcessor")
data = [1, 2, 3, 4, 5]
results = await processor.execute(data)
print(f"处理结果: {results}")
asyncio.run(main())22.7 反模式与最佳实践
22.7.1 常见反模式
反模式1:模板方法过于复杂
# ❌ 错误示例:模板方法包含过多逻辑
class BadTemplate(ABC):
def template_method(self, data):
# 模板方法本身包含太多业务逻辑
if data['type'] == 'A':
self.step1()
if data['flag']:
self.step2()
else:
self.step3()
elif data['type'] == 'B':
self.step4()
for item in data['items']:
self.step5(item)
# ... 更多条件分支
pass
# ✅ 正确示例:模板方法简洁,逻辑委托给子类
class GoodTemplate(ABC):
def template_method(self, data):
preprocessed = self.preprocess(data)
result = self.process(preprocessed)
return self.postprocess(result)
@abstractmethod
def process(self, data):
pass反模式2:滥用钩子方法
# ❌ 错误示例:过多可选钩子导致难以理解
class OverHookedTemplate(ABC):
def template_method(self):
self.hook1()
self.hook2()
self.hook3()
self.hook4()
self.hook5()
# 哪些钩子应该覆盖?哪些是可选的?
# ✅ 正确示例:明确区分必需方法和钩子
class CleanTemplate(ABC):
def template_method(self):
self.required_step1() # 必需
self.optional_hook() # 可选(有明确命名)
self.required_step2() # 必需
@abstractmethod
def required_step1(self):
pass
def optional_hook(self): # 钩子有默认空实现
pass
@abstractmethod
def required_step2(self):
pass反模式3:继承层次过深
# ❌ 错误示例:多层继承导致维护困难
class Level1(ABC):
def template_method(self):
self.step1()
class Level2(Level1):
def step1(self):
self.step1a()
self.step1b()
class Level3(Level2):
def step1a(self):
# 哪些方法可以覆盖?哪些会被调用?
pass
# ✅ 正确示例:使用组合代替深层继承
class ComposedTemplate:
def __init__(self, step1_impl, step2_impl):
self.step1 = step1_impl
self.step2 = step2_impl
def execute(self):
self.step1()
self.step2()22.7.2 最佳实践
from abc import ABC, abstractmethod
from typing import Any, Optional, final
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
class BestPracticeTemplate(ABC):
"""
模板方法最佳实践示例
"""
@final
def execute(self, data: Any) -> Any:
"""
模板方法:使用@final防止子类覆盖(Python 3.8+)
如果Python版本不支持@final,应在文档中说明不应覆盖
"""
self._log_start(data)
try:
validated = self._validate(data)
if not validated:
raise ValueError("数据验证失败")
preprocessed = self._preprocess(data)
result = self._process(preprocessed)
postprocessed = self._postprocess(result)
self._log_success(postprocessed)
return postprocessed
except Exception as e:
self._log_error(e)
raise
finally:
self._cleanup()
@abstractmethod
def _process(self, data: Any) -> Any:
"""核心处理逻辑:子类必须实现"""
pass
def _validate(self, data: Any) -> bool:
"""钩子:验证数据(默认通过)"""
return data is not None
def _preprocess(self, data: Any) -> Any:
"""钩子:预处理(默认不处理)"""
return data
def _postprocess(self, result: Any) -> Any:
"""钩子:后处理(默认不处理)"""
return result
def _cleanup(self) -> None:
"""钩子:清理资源(默认不处理)"""
pass
def _log_start(self, data: Any) -> None:
logger.info(f"开始处理: {type(self).__name__}")
def _log_success(self, result: Any) -> None:
logger.info(f"处理成功: {type(self).__name__}")
def _log_error(self, error: Exception) -> None:
logger.error(f"处理失败: {type(self).__name__} - {error}")
class ConcreteProcessor(BestPracticeTemplate):
"""具体处理器示例"""
def _process(self, data: list) -> dict:
return {
'count': len(data),
'sum': sum(data) if data else 0
}
def _validate(self, data: Any) -> bool:
return isinstance(data, list) and all(isinstance(x, (int, float)) for x in data)
def _preprocess(self, data: list) -> list:
return [x for x in data if x > 0]
def _postprocess(self, result: dict) -> dict:
result['status'] = 'completed'
return result
processor = ConcreteProcessor()
result = processor.execute([1, 2, 3, 4, 5, -1, -2])
print(f"结果: {result}")22.8 决策指南
22.8.1 是否使用模板方法模式
┌─────────────────────────────────────────────────────────────────┐
│ 是否使用模板方法模式? │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 是否有多个类具有相似的算法结构? │
│ │ │
│ ├── 否 ──→ 考虑其他模式或直接实现 │
│ │ │
│ └── 是 ──┐ │
│ │ │
│ ▼ │
│ 算法结构是否固定不变? │
│ │ │
│ ├── 否 ──→ 考虑策略模式 │
│ │ │
│ └── 是 ──┐ │
│ │ │
│ ▼ │
│ 是否需要控制扩展点? │
│ │ │
│ ├── 否 ──→ 考虑简单的继承 │
│ │ │
│ └── 是 ──→ ✓ 使用模板方法模式 │
│ │
└─────────────────────────────────────────────────────────────────┘22.8.2 模式选择决策树
┌─────────────────────────────────────────────────────────────────┐
│ 行为型模式选择指南 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 需要封装什么? │
│ │ │
│ ├── 算法整体 ──→ 策略模式 │
│ │ │
│ ├── 算法骨架 ──→ 模板方法模式 ✓ │
│ │ │
│ ├── 对象行为 ──→ 状态模式 │
│ │ │
│ ├── 请求处理 ──→ 责任链模式 │
│ │ │
│ └── 操作结构 ──→ 访问者模式 │
│ │
└─────────────────────────────────────────────────────────────────┘22.8.3 与相关模式的对比
| 特性 | 模板方法 | 策略模式 | 工厂方法 |
|---|---|---|---|
| 意图 | 定义算法骨架 | 封装可互换算法 | 创建对象 |
| 变化点 | 算法步骤 | 整个算法 | 创建逻辑 |
| 实现方式 | 继承 | 组合 | 继承 |
| 运行时改变 | 不支持 | 支持 | 不适用 |
| 控制方向 | 父类控制子类 | 客户端选择 | 父类控制子类 |
| 扩展方式 | 新增子类 | 新增策略类 | 新增产品类 |
22.9 快速参考卡
22.9.1 核心概念速查
┌─────────────────────────────────────────────────────────────────┐
│ 模板方法模式速查卡 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 定义:定义算法骨架,将步骤延迟到子类实现 │
│ │
│ 核心角色: │
│ ├── AbstractClass:定义模板方法和抽象步骤 │
│ └── ConcreteClass:实现抽象步骤 │
│ │
│ 方法类型: │
│ ├── 模板方法:定义算法骨架(不应被覆盖) │
│ ├── 抽象方法:子类必须实现 │
│ ├── 具体方法:父类提供默认实现 │
│ └── 钩子方法:子类可选覆盖 │
│ │
│ 设计原则:好莱坞原则 - "不要调用我们,我们会调用你" │
│ │
│ 适用场景: │
│ ├── 多个类有相似算法结构 │
│ ├── 需要控制子类扩展点 │
│ └── 需要复用公共代码 │
│ │
│ 优点:代码复用、控制扩展、符合开闭原则 │
│ 缺点:限制继承、可能难以维护 │
│ │
└─────────────────────────────────────────────────────────────────┘22.9.2 Python实现要点
from abc import ABC, abstractmethod
from typing import final
class Template(ABC):
@final
def template_method(self):
self.step1()
self.step2()
self.hook()
@abstractmethod
def step1(self): pass
@abstractmethod
def step2(self): pass
def hook(self): pass22.9.3 常见应用场景
| 场景 | 示例 |
|---|---|
| 数据处理 | ETL管道、数据转换 |
| 文档生成 | 报表、合同、证书 |
| 请求处理 | Web框架、API处理 |
| 游戏开发 | AI行为、角色动作 |
| 测试框架 | 单元测试、集成测试 |
22.10 小结
模板方法模式通过定义算法骨架,将可变部分延迟到子类实现,是一种典型的控制反转模式。在Python中,可以利用抽象基类、钩子方法和函数式组合实现灵活的模板方法模式。
关键要点
- 算法骨架固定:模板方法定义了算法的整体结构
- 扩展点明确:抽象方法和钩子方法定义了子类的扩展点
- 好莱坞原则:父类调用子类方法,而非相反
- 代码复用:公共代码在父类中实现,避免重复
实践建议
- 保持模板方法简洁,避免过多条件分支
- 明确区分必需方法和钩子方法
- 考虑使用组合代替深层继承
- 为模板方法添加文档说明不应被覆盖
下一章预告
下一章将介绍访问者模式,它用于在不变的数据结构上定义新的操作,是处理复杂对象结构的利器。