Skip to content

第22章 模板方法模式

学习目标

完成本章学习后,读者将能够:

  • 理解模板方法模式的核心概念和形式化定义
  • 掌握算法骨架的定义方法与不变性保证
  • 实现可扩展的算法框架与钩子方法机制
  • 识别模板方法模式的适用场景与局限性
  • 应用模板方法模式解决实际软件设计问题

22.1 模式定义

22.1.1 核心定义

模板方法模式(Template Method Pattern) 是一种行为型设计模式,它定义一个操作中的算法骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重定义该算法的某些特定步骤。

22.1.2 形式化定义

从形式化角度,模板方法模式可以定义为一个四元组:

$$\mathcal{T} = \langle A, S, \sigma, \phi \rangle$$

其中:

  • $A$ 是抽象类(Abstract Class),定义算法骨架
  • $S = {s_1, s_2, \ldots, s_n}$ 是步骤集合(Step Set)
  • $\sigma: S \rightarrow {concrete, abstract, hook}$ 是步骤类型函数
  • $\phi: S^* \rightarrow Algorithm$ 是算法组合函数

步骤类型定义

$$\sigma(s) = \begin{cases} concrete & \text{如果 } s \text{ 是具体步骤(父类实现)} \ abstract & \text{如果 } s \text{ 是抽象步骤(子类必须实现)} \ hook & \text{如果 } s \text{ 是钩子步骤(子类可选实现)} \end{cases}$$

算法骨架定义

模板方法 $T$ 是一个有序步骤序列:

$$T = \langle s_1, s_2, \ldots, s_n \rangle$$

其中执行顺序由 $\phi$ 确定,且满足不变性约束:

$$\forall c \in ConcreteClasses: execute(T, c) = \phi(s_1, s_2, \ldots, s_n)$$

扩展点定义

扩展点 $E$ 是子类可以定制的步骤集合:

$$E = {s \in S \mid \sigma(s) \in {abstract, hook}}$$

不变性定理

对于任意具体类 $c_1, c_2$:

$$structure(T, c_1) = structure(T, c_2)$$

即所有具体类的算法结构相同,只有具体步骤的实现不同。

22.1.3 设计原则体现

模板方法模式体现了以下设计原则:

设计原则体现方式
好莱坞原则"不要调用我们,我们会调用你"——父类调用子类方法
开闭原则对扩展开放(新增子类),对修改关闭(算法骨架不变)
单一职责原则父类负责算法结构,子类负责具体步骤实现
里氏替换原则子类可以替换父类而不影响算法执行

22.2 历史背景与演进

22.2.1 历史发展

年代里程碑描述
1970s框架起源框架编程概念出现,模板方法作为核心机制
1985Hollywood PrincipleHollywood原则被正式提出
1994GoF经典《设计模式》将模板方法列为经典模式
1995Hollywood TemplateMartin Fowler提出Hollywood Template概念
2000s框架应用Spring、JUnit等框架大量使用模板方法
2010s函数式变体函数式编程带来新的实现方式
2020s现代实践与依赖注入、组合模式结合使用

22.2.2 理论基础

模板方法模式的理论基础来源于:

  1. 框架理论:框架定义了应用的骨架,开发者填充具体实现
  2. 控制反转(IoC):控制流从框架流向应用代码
  3. 继承与多态:利用继承实现代码复用,多态实现行为变化

22.3 UML结构图

22.3.1 标准结构

┌─────────────────────────────────────┐
│         <<abstract>>                │
│         AbstractClass               │
├─────────────────────────────────────┤
│ + template_method()                 │
│ # primitive_operation1() : abstract │
│ # primitive_operation2() : abstract │
│ # concrete_operation()              │
│ # hook()                            │
└───────────────┬─────────────────────┘

                │ extends

    ┌───────────┴───────────┐
    │                       │
┌───┴───────────┐   ┌───────┴───────┐
│ ConcreteClass1│   │ ConcreteClass2│
├───────────────┤   ├───────────────┤
│ # primitive_  │   │ # primitive_  │
│   operation1()│   │   operation1()│
│ # primitive_  │   │ # primitive_  │
│   operation2()│   │   operation2()│
│ # hook()      │   │               │
└───────────────┘   └───────────────┘

22.3.2 方法类型详解

┌────────────────────────────────────────────────────────────────┐
│                    AbstractClass                               │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  template_method():  ← 算法骨架(final,不可覆盖)             │
│      primitive_operation1()  ← 抽象方法(子类必须实现)        │
│      concrete_operation()    ← 具体方法(子类继承)            │
│      hook()                  ← 钩子方法(子类可选覆盖)        │
│      primitive_operation2()  ← 抽象方法(子类必须实现)        │
│                                                                │
├────────────────────────────────────────────────────────────────┤
│ 方法类型说明:                                                 │
│                                                                │
│ ┌──────────────┬─────────────────────────────────────────────┐ │
│ │ 抽象方法     │ @abstractmethod,子类必须实现               │ │
│ ├──────────────┼─────────────────────────────────────────────┤ │
│ │ 具体方法     │ 父类提供默认实现,子类继承使用               │ │
│ ├──────────────┼─────────────────────────────────────────────┤ │
│ │ 钩子方法     │ 空实现或默认实现,子类可选覆盖               │ │
│ ├──────────────┼─────────────────────────────────────────────┤ │
│ │ 模板方法     │ 调用其他方法组成算法,建议final              │ │
│ └──────────────┴─────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘

22.3.3 执行流程图

┌─────────────────────────────────────────────────────────────────┐
│                     template_method() 执行流程                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Client                                                         │
│    │                                                            │
│    │ template_method()                                          │
│    ▼                                                            │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │ AbstractClass                                             │   │
│  │                                                           │   │
│  │  Step 1: base_operation()      ← 具体方法(父类实现)    │   │
│  │            │                                             │   │
│  │            ▼                                             │   │
│  │  Step 2: required_operation1() ← 抽象方法(子类实现)    │   │
│  │            │                                             │   │
│  │            ▼                                             │   │
│  │  Step 3: hook1()               ← 钩子方法(可选覆盖)    │   │
│  │            │                                             │   │
│  │            ▼                                             │   │
│  │  Step 4: required_operation2() ← 抽象方法(子类实现)    │   │
│  │            │                                             │   │
│  │            ▼                                             │   │
│  │  Step 5: hook2()               ← 钩子方法(可选覆盖)    │   │
│  │                                                           │   │
│  └──────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

22.4 Python实现

22.4.1 基础实现

python
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from functools import wraps

class AbstractClass(ABC):
    """
    抽象类:定义算法骨架和基本方法
    """
    
    def template_method(self) -> None:
        """
        模板方法:定义算法骨架
        子类不应覆盖此方法(Python中无法强制final,但约定不覆盖)
        """
        self.base_operation1()
        self.required_operation1()
        self.base_operation2()
        self.hook1()
        self.required_operation2()
        self.base_operation3()
        self.hook2()
    
    def base_operation1(self) -> None:
        """具体方法:基础操作1"""
        print("AbstractClass: 基础操作1")
    
    def base_operation2(self) -> None:
        """具体方法:基础操作2"""
        print("AbstractClass: 基础操作2")
    
    def base_operation3(self) -> None:
        """具体方法:基础操作3"""
        print("AbstractClass: 基础操作3")
    
    @abstractmethod
    def required_operation1(self) -> None:
        """抽象方法:必须由子类实现"""
        pass
    
    @abstractmethod
    def required_operation2(self) -> None:
        """抽象方法:必须由子类实现"""
        pass
    
    def hook1(self) -> None:
        """钩子方法:子类可选覆盖"""
        pass
    
    def hook2(self) -> None:
        """钩子方法:子类可选覆盖"""
        pass


class ConcreteClass1(AbstractClass):
    """具体类1:实现所有抽象方法"""
    
    def required_operation1(self) -> None:
        print("ConcreteClass1: 实现操作1")
    
    def required_operation2(self) -> None:
        print("ConcreteClass1: 实现操作2")


class ConcreteClass2(AbstractClass):
    """具体类2:实现抽象方法并覆盖钩子"""
    
    def required_operation1(self) -> None:
        print("ConcreteClass2: 实现操作1")
    
    def required_operation2(self) -> None:
        print("ConcreteClass2: 实现操作2")
    
    def hook1(self) -> None:
        print("ConcreteClass2: 覆盖钩子1")
    
    def hook2(self) -> None:
        print("ConcreteClass2: 覆盖钩子2")


print("=== ConcreteClass1 执行 ===")
obj1 = ConcreteClass1()
obj1.template_method()

print("\n=== ConcreteClass2 执行 ===")
obj2 = ConcreteClass2()
obj2.template_method()

22.4.2 带返回值的模板方法

python
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, List, Any

T = TypeVar('T')
R = TypeVar('R')

class TemplateWithResult(ABC, Generic[T, R]):
    """
    支持返回值的模板方法
    """
    
    def execute(self, data: T) -> R:
        """
        模板方法:处理数据并返回结果
        """
        preprocessed = self.preprocess(data)
        validated = self.validate(preprocessed)
        processed = self.process(validated)
        result = self.postprocess(processed)
        self.cleanup()
        return result
    
    def preprocess(self, data: T) -> T:
        """预处理:默认不做任何处理"""
        return data
    
    @abstractmethod
    def validate(self, data: T) -> T:
        """验证:子类必须实现"""
        pass
    
    @abstractmethod
    def process(self, data: T) -> R:
        """处理:子类必须实现"""
        pass
    
    def postprocess(self, result: R) -> R:
        """后处理:默认不做任何处理"""
        return result
    
    def cleanup(self) -> None:
        """清理:默认不做任何处理"""
        pass


class NumberProcessor(TemplateWithResult[List[int], int]):
    """数字列表处理器:计算总和"""
    
    def validate(self, data: List[int]) -> List[int]:
        if not data:
            raise ValueError("数据列表不能为空")
        return data
    
    def process(self, data: List[int]) -> int:
        return sum(data)
    
    def postprocess(self, result: int) -> int:
        print(f"计算完成,结果: {result}")
        return result


class StringProcessor(TemplateWithResult[str, dict]):
    """字符串处理器:统计字符频率"""
    
    def preprocess(self, data: str) -> str:
        return data.lower().strip()
    
    def validate(self, data: str) -> str:
        if not data:
            raise ValueError("字符串不能为空")
        return data
    
    def process(self, data: str) -> dict:
        freq = {}
        for char in data:
            freq[char] = freq.get(char, 0) + 1
        return freq
    
    def cleanup(self) -> None:
        print("字符串处理完成,资源已释放")


num_processor = NumberProcessor()
result = num_processor.execute([1, 2, 3, 4, 5])
print(f"最终结果: {result}")

print()

str_processor = StringProcessor()
freq = str_processor.execute("  Hello World  ")
print(f"字符频率: {freq}")

22.4.3 函数式模板方法

python
from typing import Callable, TypeVar, Any, Optional
from functools import wraps
from dataclasses import dataclass

T = TypeVar('T')
R = TypeVar('R')

@dataclass
class TemplateStep:
    """模板步骤定义"""
    name: str
    required: bool = True
    default: Optional[Callable] = None


class FunctionalTemplate:
    """
    函数式模板方法:使用组合代替继承
    """
    
    def __init__(
        self,
        steps: list[TemplateStep],
        implementations: Optional[dict[str, Callable]] = None
    ):
        self.steps = steps
        self.implementations = implementations or {}
        
        self._validate_implementations()
    
    def _validate_implementations(self) -> None:
        """验证所有必需步骤都有实现"""
        for step in self.steps:
            if step.required:
                if step.name not in self.implementations:
                    if step.default is None:
                        raise ValueError(
                            f"必需步骤 '{step.name}' 缺少实现"
                        )
    
    def execute(self, *args, **kwargs) -> Any:
        """执行模板方法"""
        result = None
        context = {'args': args, 'kwargs': kwargs, 'result': None}
        
        for step in self.steps:
            impl = self.implementations.get(step.name, step.default)
            if impl is not None:
                result = impl(context)
                context['result'] = result
                context[step.name] = result
        
        return result
    
    def with_implementation(
        self, 
        step_name: str, 
        implementation: Callable
    ) -> 'FunctionalTemplate':
        """添加步骤实现(返回新实例)"""
        new_impls = self.implementations.copy()
        new_impls[step_name] = implementation
        return FunctionalTemplate(self.steps, new_impls)


def create_data_processing_template() -> FunctionalTemplate:
    """创建数据处理模板"""
    steps = [
        TemplateStep('parse', required=True),
        TemplateStep('validate', required=False, default=lambda ctx: ctx['result']),
        TemplateStep('transform', required=True),
        TemplateStep('output', required=False, default=lambda ctx: str(ctx['result'])),
    ]
    return FunctionalTemplate(steps)


def parse_json(context: dict) -> dict:
    import json
    return json.loads(context['args'][0])

def validate_data(context: dict) -> dict:
    data = context['result']
    if not data:
        raise ValueError("数据为空")
    return data

def transform_upper(context: dict) -> dict:
    data = context['result']
    return {k: v.upper() if isinstance(v, str) else v for k, v in data.items()}

def output_json(context: dict) -> str:
    import json
    return json.dumps(context['result'], ensure_ascii=False)


json_template = (
    create_data_processing_template()
    .with_implementation('parse', parse_json)
    .with_implementation('validate', validate_data)
    .with_implementation('transform', transform_upper)
    .with_implementation('output', output_json)
)

result = json_template.execute('{"name": "张三", "city": "北京"}')
print(f"处理结果: {result}")

22.4.4 钩子方法详解

python
from abc import ABC, abstractmethod
from typing import Optional, List, Any
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class TemplateWithHooks(ABC):
    """
    展示各种钩子方法的使用
    """
    
    def execute(self, *args, **kwargs) -> Any:
        """
        主模板方法,包含完整的生命周期钩子
        """
        self._on_start()
        
        try:
            if self._should_execute(*args, **kwargs):
                self._pre_execute(*args, **kwargs)
                result = self._do_execute(*args, **kwargs)
                result = self._post_execute(result)
                self._on_success(result)
                return result
            else:
                self._on_skip()
                return None
                
        except Exception as e:
            self._on_error(e)
            raise
        finally:
            self._on_end()
    
    @abstractmethod
    def _do_execute(self, *args, **kwargs) -> Any:
        """核心执行逻辑:子类必须实现"""
        pass
    
    def _should_execute(self, *args, **kwargs) -> bool:
        """钩子:是否执行(默认True)"""
        return True
    
    def _on_start(self) -> None:
        """钩子:开始时调用"""
        logger.info(f"[{datetime.now()}] 开始执行")
    
    def _pre_execute(self, *args, **kwargs) -> None:
        """钩子:执行前调用"""
        pass
    
    def _post_execute(self, result: Any) -> Any:
        """钩子:执行后调用"""
        return result
    
    def _on_success(self, result: Any) -> None:
        """钩子:成功时调用"""
        logger.info(f"[{datetime.now()}] 执行成功")
    
    def _on_error(self, error: Exception) -> None:
        """钩子:错误时调用"""
        logger.error(f"[{datetime.now()}] 执行失败: {error}")
    
    def _on_skip(self) -> None:
        """钩子:跳过时调用"""
        logger.info(f"[{datetime.now()}] 执行被跳过")
    
    def _on_end(self) -> None:
        """钩子:结束时调用(无论成功或失败)"""
        logger.info(f"[{datetime.now()}] 执行结束")


class ConditionalProcessor(TemplateWithHooks):
    """
    条件处理器:演示条件钩子
    """
    
    def __init__(self, threshold: int = 10):
        self.threshold = threshold
        self._skip = False
    
    def _should_execute(self, *args, **kwargs) -> bool:
        """当数据量小于阈值时跳过"""
        data = args[0] if args else []
        self._skip = len(data) < self.threshold
        return not self._skip
    
    def _do_execute(self, *args, **kwargs) -> Any:
        data = args[0]
        return sum(data) / len(data)
    
    def _on_skip(self) -> None:
        print(f"数据量不足 {self.threshold} 条,跳过处理")


class RetryProcessor(TemplateWithHooks):
    """
    重试处理器:演示错误处理钩子
    """
    
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
        self.retry_count = 0
    
    def _do_execute(self, *args, **kwargs) -> Any:
        import random
        if random.random() < 0.7:
            raise RuntimeError("随机失败")
        return "处理成功"
    
    def _on_error(self, error: Exception) -> None:
        self.retry_count += 1
        if self.retry_count <= self.max_retries:
            print(f"第 {self.retry_count} 次重试...")
            try:
                result = self._do_execute(*self._last_args, **self._last_kwargs)
                self._on_success(result)
                return result
            except Exception as e:
                self._on_error(e)
        else:
            print(f"超过最大重试次数 {self.max_retries}")
    
    def _pre_execute(self, *args, **kwargs) -> None:
        self._last_args = args
        self._last_kwargs = kwargs


print("=== 条件处理器测试 ===")
processor = ConditionalProcessor(threshold=5)
processor.execute([1, 2, 3])
processor.execute([1, 2, 3, 4, 5, 6, 7])

22.5 企业级应用示例

22.5.1 ETL数据处理管道

python
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Any, Optional, Iterator
from dataclasses import dataclass, field
from datetime import datetime
import logging
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

T = TypeVar('T')
U = TypeVar('U')

@dataclass
class ETLContext:
    """ETL执行上下文"""
    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None
    records_processed: int = 0
    records_failed: int = 0
    errors: list[str] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)


class ETLPipeline(ABC, Generic[T, U]):
    """
    ETL管道模板:Extract-Transform-Load
    """
    
    def __init__(self, name: str):
        self.name = name
        self._context = ETLContext()
    
    def run(self, source: Any) -> list[U]:
        """
        执行ETL管道
        """
        logger.info(f"[{self.name}] 开始ETL管道")
        self._context = ETLContext()
        
        results = []
        
        try:
            self._pre_extract(source)
            
            for record in self._extract(source):
                try:
                    if self._validate(record):
                        transformed = self._transform(record)
                        self._load(transformed)
                        results.append(transformed)
                        self._context.records_processed += 1
                    else:
                        self._context.records_failed += 1
                except Exception as e:
                    self._context.records_failed += 1
                    self._context.errors.append(str(e))
                    self._on_record_error(record, e)
            
            self._post_load()
            
        except Exception as e:
            self._context.errors.append(f"Pipeline error: {e}")
            raise
        finally:
            self._context.end_time = datetime.now()
            self._log_summary()
        
        return results
    
    @abstractmethod
    def _extract(self, source: Any) -> Iterator[T]:
        """提取数据:子类必须实现"""
        pass
    
    @abstractmethod
    def _transform(self, record: T) -> U:
        """转换数据:子类必须实现"""
        pass
    
    @abstractmethod
    def _load(self, record: U) -> None:
        """加载数据:子类必须实现"""
        pass
    
    def _pre_extract(self, source: Any) -> None:
        """钩子:提取前准备"""
        pass
    
    def _validate(self, record: T) -> bool:
        """钩子:验证记录(默认通过)"""
        return True
    
    def _post_load(self) -> None:
        """钩子:加载后处理"""
        pass
    
    def _on_record_error(self, record: T, error: Exception) -> None:
        """钩子:记录处理错误"""
        logger.warning(f"记录处理失败: {error}")
    
    def _log_summary(self) -> None:
        """记录执行摘要"""
        duration = (self._context.end_time - self._context.start_time).total_seconds()
        logger.info(
            f"[{self.name}] 完成: "
            f"处理 {self._context.records_processed} 条, "
            f"失败 {self._context.records_failed} 条, "
            f"耗时 {duration:.2f}秒"
        )


@dataclass
class UserRecord:
    """用户记录"""
    user_id: str
    name: str
    email: str
    created_at: str


@dataclass
class UserDocument:
    """用户文档(转换后)"""
    id: str
    display_name: str
    contact_email: str
    registration_date: datetime
    status: str


class UserETLPipeline(ETLPipeline[UserRecord, UserDocument]):
    """
    用户数据ETL管道
    """
    
    def __init__(self):
        super().__init__("UserETL")
        self._loaded_count = 0
    
    def _extract(self, source: list[dict]) -> Iterator[UserRecord]:
        """从源数据提取用户记录"""
        for item in source:
            yield UserRecord(
                user_id=item.get('id', ''),
                name=item.get('name', ''),
                email=item.get('email', ''),
                created_at=item.get('created_at', '')
            )
    
    def _validate(self, record: UserRecord) -> bool:
        """验证用户记录"""
        if not record.user_id or not record.email:
            return False
        if '@' not in record.email:
            return False
        return True
    
    def _transform(self, record: UserRecord) -> UserDocument:
        """转换用户记录"""
        return UserDocument(
            id=record.user_id,
            display_name=record.name.title(),
            contact_email=record.email.lower(),
            registration_date=datetime.fromisoformat(record.created_at),
            status='active'
        )
    
    def _load(self, document: UserDocument) -> None:
        """加载用户文档"""
        self._loaded_count += 1
        logger.debug(f"加载用户: {document.id}")
    
    def _post_load(self) -> None:
        """加载后处理"""
        logger.info(f"共加载 {self._loaded_count} 个用户文档")


source_data = [
    {'id': '001', 'name': 'john doe', 'email': 'JOHN@EXAMPLE.COM', 'created_at': '2024-01-15'},
    {'id': '002', 'name': 'jane smith', 'email': 'jane@example.com', 'created_at': '2024-02-20'},
    {'id': '', 'name': 'invalid', 'email': 'no-email', 'created_at': '2024-03-01'},
    {'id': '003', 'name': 'bob wilson', 'email': 'bob@test.com', 'created_at': '2024-03-10'},
]

pipeline = UserETLPipeline()
results = pipeline.run(source_data)
print(f"\n成功处理 {len(results)} 条记录")
for doc in results:
    print(f"  - {doc.display_name} ({doc.contact_email})")

22.5.2 Web请求处理器

python
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import time

T = TypeVar('T')
R = TypeVar('R')

class HttpMethod(Enum):
    GET = "GET"
    POST = "POST"
    PUT = "PUT"
    DELETE = "DELETE"


@dataclass
class HttpRequest:
    """HTTP请求"""
    method: HttpMethod
    path: str
    headers: dict[str, str]
    body: Optional[str] = None
    query_params: dict[str, str] = None
    
    def __post_init__(self):
        if self.query_params is None:
            self.query_params = {}


@dataclass
class HttpResponse:
    """HTTP响应"""
    status_code: int
    body: str
    headers: dict[str, str] = None
    
    def __post_init__(self):
        if self.headers is None:
            self.headers = {}


class RequestHandler(ABC, Generic[T, R]):
    """
    请求处理器模板
    """
    
    def handle(self, request: HttpRequest) -> HttpResponse:
        """
        处理请求的模板方法
        """
        start_time = time.time()
        
        try:
            if not self._authenticate(request):
                return self._unauthorized_response()
            
            if not self._authorize(request):
                return self._forbidden_response()
            
            parsed = self._parse_request(request)
            
            validated = self._validate(parsed)
            if not validated:
                return self._bad_request_response("验证失败")
            
            result = self._process(parsed)
            
            response = self._build_response(result)
            
            self._log_request(request, response, time.time() - start_time)
            
            return response
            
        except ValueError as e:
            return self._bad_request_response(str(e))
        except Exception as e:
            return self._internal_error_response(str(e))
    
    @abstractmethod
    def _parse_request(self, request: HttpRequest) -> T:
        """解析请求:子类实现"""
        pass
    
    @abstractmethod
    def _process(self, data: T) -> R:
        """处理业务:子类实现"""
        pass
    
    def _authenticate(self, request: HttpRequest) -> bool:
        """钩子:认证(默认通过)"""
        auth_header = request.headers.get('Authorization', '')
        return auth_header.startswith('Bearer ')
    
    def _authorize(self, request: HttpRequest) -> bool:
        """钩子:授权(默认通过)"""
        return True
    
    def _validate(self, data: T) -> bool:
        """钩子:验证(默认通过)"""
        return True
    
    def _build_response(self, result: R) -> HttpResponse:
        """构建响应"""
        return HttpResponse(
            status_code=200,
            body=json.dumps(result, ensure_ascii=False, default=str),
            headers={'Content-Type': 'application/json'}
        )
    
    def _log_request(
        self, 
        request: HttpRequest, 
        response: HttpResponse, 
        duration: float
    ) -> None:
        """钩子:记录请求日志"""
        print(
            f"[{request.method.value}] {request.path} "
            f"-> {response.status_code} ({duration*1000:.2f}ms)"
        )
    
    def _unauthorized_response(self) -> HttpResponse:
        return HttpResponse(401, json.dumps({'error': '未授权'}))
    
    def _forbidden_response(self) -> HttpResponse:
        return HttpResponse(403, json.dumps({'error': '禁止访问'}))
    
    def _bad_request_response(self, message: str) -> HttpResponse:
        return HttpResponse(400, json.dumps({'error': message}))
    
    def _internal_error_response(self, message: str) -> HttpResponse:
        return HttpResponse(500, json.dumps({'error': '服务器错误'}))


@dataclass
class UserData:
    name: str
    email: str
    age: int


class UserCreateHandler(RequestHandler[UserData, dict]):
    """用户创建处理器"""
    
    def _parse_request(self, request: HttpRequest) -> UserData:
        if not request.body:
            raise ValueError("请求体不能为空")
        data = json.loads(request.body)
        return UserData(
            name=data['name'],
            email=data['email'],
            age=data['age']
        )
    
    def _validate(self, data: UserData) -> bool:
        if not data.name or len(data.name) < 2:
            return False
        if '@' not in data.email:
            return False
        if data.age < 0 or data.age > 150:
            return False
        return True
    
    def _process(self, data: UserData) -> dict:
        return {
            'id': 'user_001',
            'name': data.name,
            'email': data.email,
            'age': data.age,
            'created': True
        }


handler = UserCreateHandler()

valid_request = HttpRequest(
    method=HttpMethod.POST,
    path='/api/users',
    headers={
        'Authorization': 'Bearer token123',
        'Content-Type': 'application/json'
    },
    body='{"name": "张三", "email": "zhangsan@example.com", "age": 25}'
)

response = handler.handle(valid_request)
print(f"状态码: {response.status_code}")
print(f"响应: {response.body}")

22.5.3 文档生成器

python
from abc import ABC, abstractmethod
from typing import Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class DocumentFormat(Enum):
    PDF = "pdf"
    HTML = "html"
    MARKDOWN = "markdown"
    DOCX = "docx"


@dataclass
class DocumentMetadata:
    """文档元数据"""
    title: str
    author: str
    created_at: datetime = field(default_factory=datetime.now)
    version: str = "1.0"
    keywords: list[str] = field(default_factory=list)


@dataclass
class Document:
    """生成的文档"""
    metadata: DocumentMetadata
    content: str
    format: DocumentFormat
    size_bytes: int = 0


class DocumentGenerator(ABC):
    """
    文档生成器模板
    """
    
    def generate(
        self, 
        data: Any, 
        metadata: DocumentMetadata,
        format: DocumentFormat = DocumentFormat.PDF
    ) -> Document:
        """
        生成文档的模板方法
        """
        self._validate_data(data)
        
        content_parts = []
        
        content_parts.append(self._generate_header(metadata))
        content_parts.append(self._generate_toc(data))
        content_parts.append(self._generate_body(data))
        content_parts.append(self._generate_footer(metadata))
        
        full_content = self._combine_content(content_parts)
        
        formatted_content = self._format_output(full_content, format)
        
        document = Document(
            metadata=metadata,
            content=formatted_content,
            format=format,
            size_bytes=len(formatted_content.encode('utf-8'))
        )
        
        self._post_generate(document)
        
        return document
    
    @abstractmethod
    def _generate_body(self, data: Any) -> str:
        """生成正文:子类必须实现"""
        pass
    
    def _validate_data(self, data: Any) -> None:
        """钩子:验证数据"""
        if data is None:
            raise ValueError("数据不能为空")
    
    def _generate_header(self, metadata: DocumentMetadata) -> str:
        """钩子:生成页眉"""
        return f"# {metadata.title}\n\n"
    
    def _generate_toc(self, data: Any) -> str:
        """钩子:生成目录(默认空)"""
        return ""
    
    def _generate_footer(self, metadata: DocumentMetadata) -> str:
        """钩子:生成页脚"""
        return f"\n\n---\n生成时间: {metadata.created_at.strftime('%Y-%m-%d %H:%M:%S')}\n作者: {metadata.author}"
    
    def _combine_content(self, parts: list[str]) -> str:
        """组合内容"""
        return "\n".join(parts)
    
    def _format_output(self, content: str, format: DocumentFormat) -> str:
        """钩子:格式化输出"""
        return content
    
    def _post_generate(self, document: Document) -> None:
        """钩子:生成后处理"""
        print(f"文档生成完成: {document.metadata.title} ({document.size_bytes} bytes)")


@dataclass
class ReportData:
    """报表数据"""
    sections: list[dict]
    summary: dict


class ReportGenerator(DocumentGenerator):
    """报表生成器"""
    
    def _generate_toc(self, data: ReportData) -> str:
        toc = "## 目录\n\n"
        for i, section in enumerate(data.sections, 1):
            toc += f"{i}. {section['title']}\n"
        return toc + "\n"
    
    def _generate_body(self, data: ReportData) -> str:
        body = ""
        for section in data.sections:
            body += f"## {section['title']}\n\n"
            body += f"{section['content']}\n\n"
            
            if 'items' in section:
                for item in section['items']:
                    body += f"- {item}\n"
                body += "\n"
        
        body += "## 总结\n\n"
        for key, value in data.summary.items():
            body += f"- **{key}**: {value}\n"
        
        return body
    
    def _format_output(self, content: str, format: DocumentFormat) -> str:
        if format == DocumentFormat.HTML:
            lines = content.split('\n')
            html_lines = []
            for line in lines:
                if line.startswith('# '):
                    html_lines.append(f'<h1>{line[2:]}</h1>')
                elif line.startswith('## '):
                    html_lines.append(f'<h2>{line[3:]}</h2>')
                elif line.startswith('- '):
                    html_lines.append(f'<li>{line[2:]}</li>')
                else:
                    html_lines.append(f'<p>{line}</p>')
            return '\n'.join(html_lines)
        return content


report_data = ReportData(
    sections=[
        {
            'title': '项目概述',
            'content': '本项目旨在提升系统性能。',
            'items': ['性能优化', '代码重构', '测试覆盖']
        },
        {
            'title': '实施计划',
            'content': '分三个阶段实施。',
            'items': ['第一阶段:分析', '第二阶段:开发', '第三阶段:部署']
        }
    ],
    summary={
        '预计工期': '3个月',
        '预算': '50万元',
        '风险等级': '中'
    }
)

metadata = DocumentMetadata(
    title="项目实施报告",
    author="项目经理",
    keywords=["项目", "报告", "实施"]
)

generator = ReportGenerator()
doc = generator.generate(report_data, metadata, DocumentFormat.MARKDOWN)
print("\n" + "="*50)
print(doc.content)

22.6 模式变体与扩展

22.6.1 策略化模板方法

python
from abc import ABC, abstractmethod
from typing import Callable, Any, Optional
from dataclasses import dataclass

@dataclass
class StepConfig:
    """步骤配置"""
    name: str
    strategy: Callable
    is_required: bool = True


class StrategyTemplate:
    """
    策略化模板方法:将步骤实现作为策略注入
    """
    
    def __init__(
        self,
        steps: list[StepConfig],
        hooks: Optional[dict[str, Callable]] = None
    ):
        self.steps = steps
        self.hooks = hooks or {}
    
    def execute(self, *args, **kwargs) -> Any:
        """执行模板"""
        self._call_hook('on_start')
        
        context = {'args': args, 'kwargs': kwargs, 'results': {}}
        result = None
        
        try:
            for step in self.steps:
                step_result = step.strategy(context)
                context['results'][step.name] = step_result
                context['last_result'] = step_result
                result = step_result
            
            self._call_hook('on_success', result)
            return result
            
        except Exception as e:
            self._call_hook('on_error', e)
            raise
        finally:
            self._call_hook('on_end')
    
    def _call_hook(self, hook_name: str, *args) -> None:
        if hook_name in self.hooks:
            self.hooks[hook_name](*args)


def create_api_template() -> StrategyTemplate:
    """创建API处理模板"""
    
    def parse_request(ctx: dict) -> dict:
        return {'data': ctx['kwargs'].get('data', {})}
    
    def validate_data(ctx: dict) -> dict:
        data = ctx['last_result']['data']
        if not data:
            raise ValueError("数据为空")
        return data
    
    def process_data(ctx: dict) -> dict:
        data = ctx['last_result']
        return {'processed': True, 'data': data}
    
    def format_response(ctx: dict) -> dict:
        return {
            'status': 'success',
            'result': ctx['last_result']
        }
    
    steps = [
        StepConfig('parse', parse_request),
        StepConfig('validate', validate_data),
        StepConfig('process', process_data),
        StepConfig('format', format_response),
    ]
    
    hooks = {
        'on_start': lambda: print("API处理开始"),
        'on_success': lambda r: print(f"API处理成功: {r}"),
        'on_error': lambda e: print(f"API处理失败: {e}"),
        'on_end': lambda: print("API处理结束"),
    }
    
    return StrategyTemplate(steps, hooks)


api_template = create_api_template()
result = api_template.execute(data={'name': '测试'})

22.6.2 组合模板方法

python
from typing import Protocol, Any, Callable
from dataclasses import dataclass, field


class TemplateStep(Protocol):
    """模板步骤协议"""
    
    def execute(self, context: dict) -> Any:
        ...


@dataclass
class SimpleStep:
    """简单步骤"""
    name: str
    action: Callable
    required: bool = True
    
    def execute(self, context: dict) -> Any:
        return self.action(context)


@dataclass
class ConditionalStep:
    """条件步骤"""
    name: str
    condition: Callable[[dict], bool]
    true_step: TemplateStep
    false_step: TemplateStep | None = None
    
    def execute(self, context: dict) -> Any:
        if self.condition(context):
            return self.true_step.execute(context)
        elif self.false_step:
            return self.false_step.execute(context)
        return None


@dataclass
class LoopStep:
    """循环步骤"""
    name: str
    get_items: Callable[[dict], list]
    step: TemplateStep
    
    def execute(self, context: dict) -> list:
        items = self.get_items(context)
        results = []
        for item in items:
            context['current_item'] = item
            result = self.step.execute(context)
            results.append(result)
        return results


@dataclass
class CompositeTemplate:
    """组合模板"""
    name: str
    steps: list[TemplateStep] = field(default_factory=list)
    
    def add_step(self, step: TemplateStep) -> 'CompositeTemplate':
        self.steps.append(step)
        return self
    
    def execute(self, initial_context: dict | None = None) -> dict:
        context = initial_context or {}
        context['results'] = {}
        
        for step in self.steps:
            result = step.execute(context)
            context['results'][step.name] = result
            context['last_result'] = result
        
        return context


def create_order_processing_template() -> CompositeTemplate:
    """创建订单处理组合模板"""
    
    def validate_order(ctx: dict) -> bool:
        order = ctx.get('order', {})
        return order.get('items') and order.get('customer')
    
    def check_inventory(ctx: dict) -> dict:
        order = ctx['order']
        available = []
        for item in order['items']:
            available.append({'sku': item['sku'], 'available': True})
        return {'items': available}
    
    def reserve_inventory(ctx: dict) -> dict:
        return {'reserved': True}
    
    def calculate_total(ctx: dict) -> float:
        order = ctx['order']
        return sum(item.get('price', 0) * item.get('quantity', 1) for item in order['items'])
    
    def process_payment(ctx: dict) -> dict:
        return {'paid': True, 'transaction_id': 'TXN001'}
    
    def confirm_order(ctx: dict) -> dict:
        return {'confirmed': True, 'order_id': 'ORD001'}
    
    def notify_customer(ctx: dict) -> dict:
        return {'notified': True}
    
    template = CompositeTemplate('OrderProcessing')
    
    template.add_step(SimpleStep('validate', lambda ctx: validate_order(ctx)))
    template.add_step(SimpleStep('check_inventory', check_inventory))
    template.add_step(SimpleStep('reserve_inventory', reserve_inventory))
    template.add_step(SimpleStep('calculate_total', calculate_total))
    template.add_step(SimpleStep('process_payment', process_payment))
    template.add_step(SimpleStep('confirm_order', confirm_order))
    template.add_step(SimpleStep('notify_customer', notify_customer))
    
    return template


order_template = create_order_processing_template()
order_data = {
    'order': {
        'customer': 'CUST001',
        'items': [
            {'sku': 'ITEM001', 'price': 100, 'quantity': 2},
            {'sku': 'ITEM002', 'price': 50, 'quantity': 1},
        ]
    }
}

result = order_template.execute(order_data)
print(f"订单处理结果: {result['results']}")

22.6.3 异步模板方法

python
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Optional
from datetime import datetime


class AsyncTemplate(ABC):
    """
    异步模板方法基类
    """
    
    async def execute(self, *args, **kwargs) -> Any:
        """异步模板方法"""
        await self._pre_execute(*args, **kwargs)
        
        try:
            result = await self._do_execute(*args, **kwargs)
            result = await self._post_execute(result)
            return result
        except Exception as e:
            await self._on_error(e)
            raise
    
    @abstractmethod
    async def _do_execute(self, *args, **kwargs) -> Any:
        """核心执行逻辑"""
        pass
    
    async def _pre_execute(self, *args, **kwargs) -> None:
        """钩子:执行前"""
        pass
    
    async def _post_execute(self, result: Any) -> Any:
        """钩子:执行后"""
        return result
    
    async def _on_error(self, error: Exception) -> None:
        """钩子:错误处理"""
        print(f"执行错误: {error}")


class AsyncDataProcessor(AsyncTemplate):
    """异步数据处理器"""
    
    def __init__(self, name: str):
        self.name = name
        self._start_time: Optional[datetime] = None
    
    async def _pre_execute(self, *args, **kwargs) -> None:
        self._start_time = datetime.now()
        print(f"[{self.name}] 开始处理...")
    
    async def _do_execute(self, *args, **kwargs) -> Any:
        data = args[0] if args else []
        
        results = []
        for item in data:
            result = await self._process_item(item)
            results.append(result)
        
        return results
    
    async def _process_item(self, item: Any) -> Any:
        """处理单个项目"""
        await asyncio.sleep(0.1)
        return {'processed': item, 'timestamp': datetime.now().isoformat()}
    
    async def _post_execute(self, result: Any) -> Any:
        duration = (datetime.now() - self._start_time).total_seconds()
        print(f"[{self.name}] 处理完成,耗时 {duration:.2f}秒")
        return result


async def main():
    processor = AsyncDataProcessor("AsyncProcessor")
    data = [1, 2, 3, 4, 5]
    results = await processor.execute(data)
    print(f"处理结果: {results}")


asyncio.run(main())

22.7 反模式与最佳实践

22.7.1 常见反模式

反模式1:模板方法过于复杂

python
# ❌ 错误示例:模板方法包含过多逻辑
class BadTemplate(ABC):
    def template_method(self, data):
        # 模板方法本身包含太多业务逻辑
        if data['type'] == 'A':
            self.step1()
            if data['flag']:
                self.step2()
            else:
                self.step3()
        elif data['type'] == 'B':
            self.step4()
            for item in data['items']:
                self.step5(item)
        # ... 更多条件分支
        pass


# ✅ 正确示例:模板方法简洁,逻辑委托给子类
class GoodTemplate(ABC):
    def template_method(self, data):
        preprocessed = self.preprocess(data)
        result = self.process(preprocessed)
        return self.postprocess(result)
    
    @abstractmethod
    def process(self, data):
        pass

反模式2:滥用钩子方法

python
# ❌ 错误示例:过多可选钩子导致难以理解
class OverHookedTemplate(ABC):
    def template_method(self):
        self.hook1()
        self.hook2()
        self.hook3()
        self.hook4()
        self.hook5()
        # 哪些钩子应该覆盖?哪些是可选的?


# ✅ 正确示例:明确区分必需方法和钩子
class CleanTemplate(ABC):
    def template_method(self):
        self.required_step1()    # 必需
        self.optional_hook()      # 可选(有明确命名)
        self.required_step2()    # 必需
    
    @abstractmethod
    def required_step1(self):
        pass
    
    def optional_hook(self):    # 钩子有默认空实现
        pass
    
    @abstractmethod
    def required_step2(self):
        pass

反模式3:继承层次过深

python
# ❌ 错误示例:多层继承导致维护困难
class Level1(ABC):
    def template_method(self):
        self.step1()

class Level2(Level1):
    def step1(self):
        self.step1a()
        self.step1b()

class Level3(Level2):
    def step1a(self):
        # 哪些方法可以覆盖?哪些会被调用?
        pass


# ✅ 正确示例:使用组合代替深层继承
class ComposedTemplate:
    def __init__(self, step1_impl, step2_impl):
        self.step1 = step1_impl
        self.step2 = step2_impl
    
    def execute(self):
        self.step1()
        self.step2()

22.7.2 最佳实践

python
from abc import ABC, abstractmethod
from typing import Any, Optional, final
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


class BestPracticeTemplate(ABC):
    """
    模板方法最佳实践示例
    """
    
    @final
    def execute(self, data: Any) -> Any:
        """
        模板方法:使用@final防止子类覆盖(Python 3.8+)
        如果Python版本不支持@final,应在文档中说明不应覆盖
        """
        self._log_start(data)
        
        try:
            validated = self._validate(data)
            if not validated:
                raise ValueError("数据验证失败")
            
            preprocessed = self._preprocess(data)
            result = self._process(preprocessed)
            postprocessed = self._postprocess(result)
            
            self._log_success(postprocessed)
            return postprocessed
            
        except Exception as e:
            self._log_error(e)
            raise
        finally:
            self._cleanup()
    
    @abstractmethod
    def _process(self, data: Any) -> Any:
        """核心处理逻辑:子类必须实现"""
        pass
    
    def _validate(self, data: Any) -> bool:
        """钩子:验证数据(默认通过)"""
        return data is not None
    
    def _preprocess(self, data: Any) -> Any:
        """钩子:预处理(默认不处理)"""
        return data
    
    def _postprocess(self, result: Any) -> Any:
        """钩子:后处理(默认不处理)"""
        return result
    
    def _cleanup(self) -> None:
        """钩子:清理资源(默认不处理)"""
        pass
    
    def _log_start(self, data: Any) -> None:
        logger.info(f"开始处理: {type(self).__name__}")
    
    def _log_success(self, result: Any) -> None:
        logger.info(f"处理成功: {type(self).__name__}")
    
    def _log_error(self, error: Exception) -> None:
        logger.error(f"处理失败: {type(self).__name__} - {error}")


class ConcreteProcessor(BestPracticeTemplate):
    """具体处理器示例"""
    
    def _process(self, data: list) -> dict:
        return {
            'count': len(data),
            'sum': sum(data) if data else 0
        }
    
    def _validate(self, data: Any) -> bool:
        return isinstance(data, list) and all(isinstance(x, (int, float)) for x in data)
    
    def _preprocess(self, data: list) -> list:
        return [x for x in data if x > 0]
    
    def _postprocess(self, result: dict) -> dict:
        result['status'] = 'completed'
        return result


processor = ConcreteProcessor()
result = processor.execute([1, 2, 3, 4, 5, -1, -2])
print(f"结果: {result}")

22.8 决策指南

22.8.1 是否使用模板方法模式

┌─────────────────────────────────────────────────────────────────┐
│                    是否使用模板方法模式?                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  是否有多个类具有相似的算法结构?                               │
│         │                                                       │
│         ├── 否 ──→ 考虑其他模式或直接实现                       │
│         │                                                       │
│         └── 是 ──┐                                              │
│                    │                                            │
│                    ▼                                            │
│         算法结构是否固定不变?                                  │
│         │                                                       │
│         ├── 否 ──→ 考虑策略模式                                 │
│         │                                                       │
│         └── 是 ──┐                                              │
│                    │                                            │
│                    ▼                                            │
│         是否需要控制扩展点?                                    │
│         │                                                       │
│         ├── 否 ──→ 考虑简单的继承                               │
│         │                                                       │
│         └── 是 ──→ ✓ 使用模板方法模式                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

22.8.2 模式选择决策树

┌─────────────────────────────────────────────────────────────────┐
│                    行为型模式选择指南                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  需要封装什么?                                                 │
│         │                                                       │
│         ├── 算法整体 ──→ 策略模式                               │
│         │                                                       │
│         ├── 算法骨架 ──→ 模板方法模式 ✓                         │
│         │                                                       │
│         ├── 对象行为 ──→ 状态模式                               │
│         │                                                       │
│         ├── 请求处理 ──→ 责任链模式                             │
│         │                                                       │
│         └── 操作结构 ──→ 访问者模式                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

22.8.3 与相关模式的对比

特性模板方法策略模式工厂方法
意图定义算法骨架封装可互换算法创建对象
变化点算法步骤整个算法创建逻辑
实现方式继承组合继承
运行时改变不支持支持不适用
控制方向父类控制子类客户端选择父类控制子类
扩展方式新增子类新增策略类新增产品类

22.9 快速参考卡

22.9.1 核心概念速查

┌─────────────────────────────────────────────────────────────────┐
│                    模板方法模式速查卡                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  定义:定义算法骨架,将步骤延迟到子类实现                       │
│                                                                 │
│  核心角色:                                                     │
│  ├── AbstractClass:定义模板方法和抽象步骤                      │
│  └── ConcreteClass:实现抽象步骤                                │
│                                                                 │
│  方法类型:                                                     │
│  ├── 模板方法:定义算法骨架(不应被覆盖)                       │
│  ├── 抽象方法:子类必须实现                                     │
│  ├── 具体方法:父类提供默认实现                                 │
│  └── 钩子方法:子类可选覆盖                                     │
│                                                                 │
│  设计原则:好莱坞原则 - "不要调用我们,我们会调用你"            │
│                                                                 │
│  适用场景:                                                     │
│  ├── 多个类有相似算法结构                                       │
│  ├── 需要控制子类扩展点                                         │
│  └── 需要复用公共代码                                           │
│                                                                 │
│  优点:代码复用、控制扩展、符合开闭原则                         │
│  缺点:限制继承、可能难以维护                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

22.9.2 Python实现要点

python
from abc import ABC, abstractmethod
from typing import final

class Template(ABC):
    @final
    def template_method(self):
        self.step1()
        self.step2()
        self.hook()
    
    @abstractmethod
    def step1(self): pass
    
    @abstractmethod
    def step2(self): pass
    
    def hook(self): pass

22.9.3 常见应用场景

场景示例
数据处理ETL管道、数据转换
文档生成报表、合同、证书
请求处理Web框架、API处理
游戏开发AI行为、角色动作
测试框架单元测试、集成测试

22.10 小结

模板方法模式通过定义算法骨架,将可变部分延迟到子类实现,是一种典型的控制反转模式。在Python中,可以利用抽象基类、钩子方法和函数式组合实现灵活的模板方法模式。

关键要点

  1. 算法骨架固定:模板方法定义了算法的整体结构
  2. 扩展点明确:抽象方法和钩子方法定义了子类的扩展点
  3. 好莱坞原则:父类调用子类方法,而非相反
  4. 代码复用:公共代码在父类中实现,避免重复

实践建议

  1. 保持模板方法简洁,避免过多条件分支
  2. 明确区分必需方法和钩子方法
  3. 考虑使用组合代替深层继承
  4. 为模板方法添加文档说明不应被覆盖

下一章预告

下一章将介绍访问者模式,它用于在不变的数据结构上定义新的操作,是处理复杂对象结构的利器。

Python技术丛书 - 江苏省宿城中等专业学校