Skip to content

第13章 装饰器

学习目标

完成本章学习后,读者应能够:

  1. 理解闭包机制:掌握词法作用域、自由变量与闭包的底层实现原理,理解__closure__属性与cell对象的运作机制
  2. 精通装饰器模式:从函数装饰器到类装饰器,从无参装饰器到带参装饰器,全面掌握装饰器的实现范式
  3. 掌握元数据保留:深入理解functools.wraps__wrapped__与函数内省机制
  4. 运用标准库工具:熟练使用lru_cachesingledispatchcached_property等高级装饰器
  5. 设计装饰器架构:能够为实际项目设计可组合、可测试、可维护的装饰器体系
  6. 理解装饰器局限性:认知装饰器在调试、序列化、类型检查等方面的挑战与应对策略

13.1 闭包:装饰器的基石

13.1.1 作用域与自由变量

闭包(Closure)是装饰器的理论基础。理解闭包需要先理解Python的作用域规则与自由变量机制。

Python采用词法作用域(Lexical Scoping),函数定义时即确定了其可以访问的变量范围。LEGB规则定义了名称查找顺序:Local → Enclosing → Global → Built-in。

python
def demonstrate_scope():
    x = 10

    def inner():
        y = 20
        print(f"Local y: {y}")
        print(f"Enclosing x: {x}")

    inner()
    try:
        print(y)
    except NameError as e:
        print(f"Cannot access inner's local: {e}")

demonstrate_scope()

当一个函数引用了其外层作用域中的变量,且该变量既不是全局变量也不是局部变量时,它被称为自由变量(Free Variable)。Python编译器在编译时检测自由变量,并通过cell对象在运行时实现跨作用域的变量绑定。

python
def make_counter():
    count = 0

    def increment():
        nonlocal count
        count += 1
        return count

    return increment

counter = make_counter()
print(counter())
print(counter())
print(counter())

print(f"闭包自由变量: {counter.__code__.co_freevars}")
print(f"闭包单元格: {counter.__closure__}")
print(f"单元格值: {counter.__closure__[0].cell_contents}")

__closure__属性是一个cell对象元组,每个cell对象持有一个自由变量的引用。nonlocal声明告诉解释器该变量是自由变量,应从外层作用域查找而非创建新的局部绑定。

13.1.2 闭包的语义与实现

闭包是一个函数对象,它记住了定义时的词法环境,即使该环境在函数调用时已经不可达。闭包 = 函数 + 环境。

python
def make_multiplier(factor):
    def multiply(x):
        return x * factor
    return multiply

double = make_multiplier(2)
triple = make_multiplier(3)

print(double(5))
print(triple(5))

print(f"double的自由变量: {double.__code__.co_freevars}")
print(f"double的闭包值: {double.__closure__[0].cell_contents}")
print(f"triple的闭包值: {triple.__closure__[0].cell_contents}")

关键理解:doubletriple共享相同的函数代码,但各自持有不同的闭包环境。factor作为自由变量被绑定到不同的cell对象中。

闭包的延迟绑定陷阱:闭包中的自由变量是引用绑定而非值绑定,这意味着在闭包被调用时才解析变量的当前值。

python
def create_functions():
    functions = []
    for i in range(5):
        def func():
            return i
        functions.append(func)
    return functions

funcs = create_functions()
print([f() for f in funcs])

def create_functions_fixed():
    functions = []
    for i in range(5):
        def func(i=i):
            return i
        functions.append(func)
    return functions

funcs = create_functions_fixed()
print([f() for f in funcs])

延迟绑定发生的原因是闭包捕获的是变量的引用,而非变量在定义时的值。当循环结束后,i的值为4,所有闭包都引用同一个i。通过默认参数i=i可以在定义时将当前值绑定到参数上。

13.1.3 闭包的实际应用

可变状态封装

python
def make_averager():
    total = 0
    count = 0

    def averager(new_value):
        nonlocal total, count
        total += new_value
        count += 1
        return total / count

    return averager

avg = make_averager()
print(avg(10))
print(avg(20))
print(avg(30))
print(f"当前闭包状态: total={avg.__closure__[0].cell_contents}, count={avg.__closure__[1].cell_contents}")

记忆化缓存

python
def make_cache():
    cache = {}

    def get(key, compute):
        if key not in cache:
            cache[key] = compute(key)
        return cache[key]

    def clear():
        cache.clear()

    get.clear = clear
    get.cache = cache
    return get

cached = make_cache()

def expensive_sqrt(n):
    print(f"  Computing sqrt({n})...")
    return n ** 0.5

print(cached(16, expensive_sqrt))
print(cached(16, expensive_sqrt))
print(cached(25, expensive_sqrt))
print(f"缓存内容: {cached.cache}")
cached.clear()
print(f"清空后: {cached.cache}")

函数工厂与策略模式

python
def make_validator(field_name, min_val, max_val):
    def validate(value):
        if not isinstance(value, (int, float)):
            raise TypeError(f"{field_name} must be numeric")
        if value < min_val:
            raise ValueError(f"{field_name} must be >= {min_val}")
        if value > max_val:
            raise ValueError(f"{field_name} must be <= {max_val}")
        return True

    validate.field_name = field_name
    validate.min_val = min_val
    validate.max_val = max_val
    return validate

validate_age = make_validator("age", 0, 150)
validate_score = make_validator("score", 0, 100)
validate_temp = make_validator("temperature", -273.15, float("inf"))

print(validate_age(25))
print(validate_score(85))
print(validate_temp(36.5))

try:
    validate_age(-5)
except ValueError as e:
    print(e)

13.1.4 闭包与类的对比

闭包和类都可以封装状态与行为,但各有适用场景:

特性闭包
状态封装__closure__ + nonlocal实例属性
接口暴露仅暴露返回的函数通过属性和方法
继承不支持支持
内省困难(需访问__closure__方便(属性访问)
内存较轻量较重(__dict__等)
可读性简单场景更简洁复杂场景更清晰
python
def counter_closure(start=0):
    count = start

    def increment(step=1):
        nonlocal count
        count += step
        return count

    def reset():
        nonlocal count
        count = start

    increment.reset = reset
    return increment

class CounterClass:
    def __init__(self, start=0):
        self._count = start
        self._start = start

    def increment(self, step=1):
        self._count += step
        return self._count

    def reset(self):
        self._count = self._start

c1 = counter_closure(10)
print(c1())
print(c1())
c1.reset()
print(c1())

c2 = CounterClass(10)
print(c2.increment())
print(c2.increment())
c2.reset()
print(c2.increment())

13.2 函数装饰器

13.2.1 装饰器的本质

装饰器(Decorator)是一种语法糖,本质是一个高阶函数,接受一个函数作为参数并返回一个新函数。@decorator语法等价于func = decorator(func)

python
def trace(func):
    def wrapper(*args, **kwargs):
        print(f"→ 调用 {func.__name__}({args}, {kwargs})")
        result = func(*args, **kwargs)
        print(f"← 返回 {func.__name__} => {result}")
        return result
    return wrapper

@trace
def add(a, b):
    return a + b

print(add(3, 5))

add_equivalent = trace(add)
print(add is add_equivalent)

理解装饰器的关键在于:装饰器在模块加载时执行,而非在函数调用时。这意味着装饰器是声明式的——它在函数定义时就已经生效。

python
registry = []

def register(func):
    print(f"注册函数: {func.__name__}")
    registry.append(func)
    return func

@register
def func_a():
    return "A"

@register
def func_b():
    return "B"

print(f"注册表: {[f.__name__ for f in registry]}")

13.2.2 通用装饰器模板

一个健壮的函数装饰器应遵循以下模板:

python
from functools import wraps

def robust_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        return result
    return wrapper

关键要素:

  • *args, **kwargs:接受任意参数,确保装饰器适用于所有函数签名
  • @wraps(func):保留原函数的元数据(__name____doc____module__等)
  • 返回result:确保原函数的返回值不被丢弃

13.2.3 常用装饰器模式

计时装饰器

python
import time
from functools import wraps

def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} 执行耗时: {elapsed:.6f}秒")
        return result
    return wrapper

@timer
def compute_sum(n):
    return sum(range(n))

result = compute_sum(1_000_000)
print(f"结果: {result}")

日志装饰器

python
import logging
from functools import wraps

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def log_calls(level=logging.DEBUG):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            args_repr = [repr(a) for a in args]
            kwargs_repr = [f"{k}={v!r}" for k, v in kwargs.items()]
            signature = ", ".join(args_repr + kwargs_repr)
            logger.log(level, f"调用 {func.__name__}({signature})")
            try:
                result = func(*args, **kwargs)
                logger.log(level, f"{func.__name__} 返回 {result!r}")
                return result
            except Exception as e:
                logger.error(f"{func.__name__} 抛出 {type(e).__name__}: {e}")
                raise
        return wrapper
    return decorator

@log_calls(level=logging.INFO)
def divide(a, b):
    return a / b

print(divide(10, 3))
try:
    divide(10, 0)
except ZeroDivisionError:
    pass

类型检查装饰器

python
from functools import wraps
import inspect

def typecheck(func):
    sig = inspect.signature(func)
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        bound = sig.bind(*args, **kwargs)
        bound.apply_defaults()
        
        for name, value in bound.arguments.items():
            param = sig.parameters[name]
            if param.annotation is not param.empty:
                if not isinstance(value, param.annotation):
                    raise TypeError(
                        f"参数 '{name}' 期望类型 {param.annotation.__name__}, "
                        f"实际类型 {type(value).__name__}"
                    )
        
        result = func(*args, **kwargs)
        
        if sig.return_annotation is not sig.empty:
            if not isinstance(result, sig.return_annotation):
                raise TypeError(
                    f"返回值期望类型 {sig.return_annotation.__name__}, "
                    f"实际类型 {type(result).__name__}"
                )
        
        return result
    return wrapper

@typecheck
def greet(name: str, times: int) -> str:
    return (f"Hello, {name}! " * times).strip()

print(greet("Alice", 2))

try:
    greet(123, 2)
except TypeError as e:
    print(e)

重试装饰器

python
import time
import random
from functools import wraps

def retry(
    max_attempts=3,
    delay=1.0,
    backoff=2.0,
    exceptions=(Exception,),
    on_retry=None
):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            current_delay = delay
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        raise
                    if on_retry:
                        on_retry(attempt, e)
                    time.sleep(current_delay)
                    current_delay *= backoff
        return wrapper
    return decorator

@retry(
    max_attempts=5,
    delay=0.1,
    backoff=1.5,
    exceptions=(ConnectionError, TimeoutError),
    on_retry=lambda attempt, err: print(f"第{attempt}次重试: {err}")
)
def fetch_url(url):
    if random.random() < 0.7:
        raise ConnectionError(f"连接失败: {url}")
    return f"内容来自 {url}"

try:
    result = fetch_url("https://example.com")
    print(result)
except ConnectionError as e:
    print(f"最终失败: {e}")

速率限制装饰器

python
import time
from functools import wraps
from collections import deque

def rate_limit(calls_per_second=1):
    min_interval = 1.0 / calls_per_second
    
    def decorator(func):
        timestamps = deque()
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            now = time.monotonic()
            while timestamps and now - timestamps[0] >= 1.0:
                timestamps.popleft()
            
            if len(timestamps) >= calls_per_second:
                sleep_time = min_interval - (now - timestamps[-1])
                if sleep_time > 0:
                    time.sleep(sleep_time)
            
            timestamps.append(time.monotonic())
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(calls_per_second=3)
def api_call(endpoint):
    print(f"调用 {endpoint} @ {time.strftime('%H:%M:%S')}")
    return f"响应: {endpoint}"

for i in range(6):
    api_call(f"/api/data/{i}")

13.2.4 保留函数元数据

functools.wraps不仅仅是复制__name____doc__,它还处理了多个关键属性:

python
from functools import wraps

def show_wraps_behavior(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

@show_wraps_behavior
def example(x: int, y: str = "default") -> str:
    """这是一个示例函数。"""
    return f"{x}: {y}"

print(f"__name__: {example.__name__}")
print(f"__doc__: {example.__doc__}")
print(f"__module__: {example.__module__}")
print(f"__wrapped__: {example.__wrapped__}")
print(f"__annotations__: {example.__annotations__}")
print(f"__qualname__: {example.__qualname__}")

__wrapped__属性特别重要——它保存了对原始函数的引用,使得装饰后的函数仍可访问原始函数。这在调试、内省和装饰器叠加时至关重要。

python
import inspect

original = example.__wrapped__
print(f"原始函数签名: {inspect.signature(original)}")
print(f"原始函数源码:\n{inspect.getsource(original)}")

13.3 带参数的装饰器

13.3.1 三层嵌套模式

带参数的装饰器需要三层嵌套:最外层接收装饰器参数,中间层接收被装饰函数,最内层是实际包装函数。

python
from functools import wraps

def repeat(times=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            result = None
            for _ in range(times):
                result = func(*args, **kwargs)
            return result
        return wrapper
    return decorator

@repeat(times=3)
def greet(name):
    print(f"Hello, {name}!")
    return f"Greeted {name}"

greet("Alice")

执行流程:repeat(times=3) → 返回decorator@decorator应用于greet → 返回wrapper

13.3.2 可选参数装饰器

当装饰器既需要支持@decorator(无参数)又需要支持@decorator(option=value)(有参数)时,需要特殊处理:

python
from functools import wraps

def flexible_decorator(func=None, *, option="default"):
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            print(f"[{option}] 调用 {f.__name__}")
            return f(*args, **kwargs)
        return wrapper
    
    if func is not None:
        return decorator(func)
    return decorator

@flexible_decorator
def func_a():
    return "A"

@flexible_decorator(option="custom")
def func_b():
    return "B"

print(func_a())
print(func_b())

@flexible_decorator不带括号时,func接收被装饰函数;当带括号时,funcNone,返回真正的装饰器。

13.3.3 基于类的参数化装饰器

使用类实现带参数的装饰器通常更清晰:

python
from functools import wraps

class Retry:
    def __init__(self, max_attempts=3, delay=1.0, backoff=2.0, exceptions=(Exception,)):
        self.max_attempts = max_attempts
        self.delay = delay
        self.backoff = backoff
        self.exceptions = exceptions

    def __call__(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            current_delay = self.delay
            for attempt in range(1, self.max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except self.exceptions as e:
                    if attempt == self.max_attempts:
                        raise
                    import time
                    time.sleep(current_delay)
                    current_delay *= self.backoff
        return wrapper

@Retry(max_attempts=3, delay=0.5, backoff=2.0, exceptions=(ValueError,))
def parse_int(value):
    return int(value)

try:
    print(parse_int("42"))
    parse_int("abc")
except ValueError:
    print("解析失败")

13.4 类装饰器

13.4.1 用类实现函数装饰器

类装饰器通过__call__方法使实例可调用,适合需要维护状态的场景:

python
from functools import wraps

class CallStats:
    def __init__(self, func):
        wraps(func)(self)
        self.func = func
        self.call_count = 0
        self.total_time = 0.0
        self.last_result = None

    def __call__(self, *args, **kwargs):
        import time
        start = time.perf_counter()
        self.last_result = self.func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        self.call_count += 1
        self.total_time += elapsed
        return self.last_result

    def stats(self):
        avg = self.total_time / self.call_count if self.call_count else 0
        return {
            "function": self.func.__name__,
            "calls": self.call_count,
            "total_time": self.total_time,
            "avg_time": avg,
        }

    def reset(self):
        self.call_count = 0
        self.total_time = 0.0

@CallStats
def compute(n):
    return sum(i ** 2 for i in range(n))

compute(100_000)
compute(200_000)
compute(50_000)

import json
print(json.dumps(compute.stats(), indent=2))
compute.reset()
print(f"重置后调用次数: {compute.call_count}")

13.4.2 装饰类

类装饰器可以修改类的行为、添加方法或改变实例创建过程:

添加方法

python
def add_repr(cls):
    def __repr__(self):
        fields = ", ".join(
            f"{k}={v!r}" for k, v in self.__dict__.items()
        )
        return f"{cls.__name__}({fields})"
    cls.__repr__ = __repr__
    return cls

@add_repr
class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

p = Point(3, 5)
print(p)

单例模式

python
def singleton(cls):
    instances = {}

    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]

    get_instance.__wrapped__ = cls
    get_instance._instances = instances
    return get_instance

@singleton
class DatabaseConnection:
    def __init__(self, host="localhost"):
        self.host = host
        print(f"连接数据库: {host}")

db1 = DatabaseConnection("localhost")
db2 = DatabaseConnection("remote-host")
print(f"db1 is db2: {db1 is db2}")
print(f"原始类: {DatabaseConnection.__wrapped__}")

数据验证装饰器

python
from functools import wraps

def validated(cls):
    original_init = cls.__init__

    @wraps(original_init)
    def new_init(self, *args, **kwargs):
        annotations = cls.__annotations__
        bound = original_init.__code__

        varnames = bound.co_varnames[1:bound.co_argcount]
        for i, name in enumerate(varnames):
            if i < len(args):
                value = args[i]
            elif name in kwargs:
                value = kwargs[name]
            else:
                continue

            if name in annotations:
                expected = annotations[name]
                if not isinstance(value, expected):
                    raise TypeError(
                        f"{cls.__name__}.{name} 期望 {expected.__name__}, "
                        f"得到 {type(value).__name__}"
                    )

        original_init(self, *args, **kwargs)

    cls.__init__ = new_init
    return cls

@validated
class User:
    name: str
    age: int

    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

u1 = User("Alice", 25)
print(f"{u1.name}, {u1.age}")

try:
    User(123, "twenty")
except TypeError as e:
    print(e)

13.5 装饰器叠加

13.5.1 执行顺序

多个装饰器按从下到上的顺序应用,但执行时按从外到内的顺序:

python
from functools import wraps

def decorator_a(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("A: 函数调用前")
        result = func(*args, **kwargs)
        print("A: 函数调用后")
        return result
    return wrapper

def decorator_b(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("B: 函数调用前")
        result = func(*args, **kwargs)
        print("B: 函数调用后")
        return result
    return wrapper

@decorator_a
@decorator_b
def greet(name):
    print(f"Hello, {name}!")

greet("Alice")

输出顺序:

A: 函数调用前
B: 函数调用前
Hello, Alice!
B: 函数调用后
A: 函数调用后

等价于:greet = decorator_a(decorator_b(greet)),形成洋葱式调用栈。

13.5.2 装饰器组合器

为了管理复杂的装饰器组合,可以设计组合器:

python
from functools import wraps

def compose(*decorators):
    def decorator(func):
        for dec in reversed(decorators):
            func = dec(func)
        return func
    return decorator

def trace(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"[TRACE] {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        import time
        start = time.perf_counter()
        result = func(*args, **kwargs)
        print(f"[TIMER] {func.__name__}: {time.perf_counter() - start:.6f}s")
        return result
    return wrapper

def validate_positive(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if any(arg < 0 for arg in args if isinstance(arg, (int, float))):
            raise ValueError("参数不能为负数")
        return func(*args, **kwargs)
    return wrapper

@compose(trace, timer, validate_positive)
def compute(a, b):
    return a ** b

print(compute(2, 10))

13.6 functools模块高级工具

13.6.1 lru_cache:最近最少使用缓存

lru_cache是Python标准库中最实用的装饰器之一,实现了LRU(Least Recently Used)缓存淘汰策略:

python
from functools import lru_cache
import time

@lru_cache(maxsize=128)
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

start = time.perf_counter()
print(f"fib(100) = {fibonacci(100)}")
print(f"耗时: {time.perf_counter() - start:.6f}秒")

print(fibonacci.cache_info())

fibonacci.cache_clear()
print(f"清空后: {fibonacci.cache_info()}")

缓存参数选择

  • maxsize=None:无限制缓存,适用于结果集有限且可全部缓存的情况
  • maxsize=128:默认值,适合大多数场景
  • maxsize=0:禁用缓存(实际上只是禁用了缓存效果,但仍保留装饰器接口)

缓存键的计算lru_cache使用参数的值作为缓存键。参数必须是可哈希的(hashable),这意味着列表、字典等不可哈希类型不能直接作为参数。

python
@lru_cache(maxsize=32)
def process_data(data_hash: int, threshold: float) -> list:
    print(f"  处理数据: hash={data_hash}, threshold={threshold}")
    return [data_hash * threshold]

print(process_data(100, 0.5))
print(process_data(100, 0.5))
print(process_data.cache_info())

typed参数typed=True时,不同类型的参数会产生不同的缓存条目:

python
@lru_cache(maxsize=128, typed=True)
def typed_cache(x):
    print(f"  计算: {x} (type={type(x).__name__})")
    return x * 2

typed_cache(1)
typed_cache(1.0)
print(typed_cache.cache_info())

13.6.2 singledispatch:单分派泛型函数

singledispatch实现了基于第一个参数类型的运行时多态(单分派),是Python实现访问者模式的优雅方式:

python
from functools import singledispatch
from collections.abc import Sequence
from typing import Union

@singledispatch
def to_json(obj) -> str:
    raise TypeError(f"不支持的类型: {type(obj)}")

@to_json.register(str)
def _(obj):
    return f'"{obj}"'

@to_json.register(int)
def _(obj):
    return str(obj)

@to_json.register(float)
def _(obj):
    return str(obj)

@to_json.register(bool)
def _(obj):
    return "true" if obj else "false"

@to_json.register(type(None))
def _(obj):
    return "null"

@to_json.register(dict)
def _(obj):
    pairs = ", ".join(
        f"{to_json(k)}: {to_json(v)}" for k, v in obj.items()
    )
    return "{" + pairs + "}"

@to_json.register(list)
def _(obj):
    items = ", ".join(to_json(item) for item in obj)
    return "[" + items + "]"

print(to_json("hello"))
print(to_json(42))
print(to_json(3.14))
print(to_json(True))
print(to_json(None))
print(to_json({"name": "Alice", "age": 25}))
print(to_json([1, "two", None, True]))

继承与分派singledispatch支持继承关系,子类会使用最近父类的实现:

python
@singledispatch
def process(obj):
    return f"默认处理: {type(obj).__name__}"

@process.register(int)
def _(obj):
    return f"整数处理: {obj}"

@process.register(float)
def _(obj):
    return f"浮点处理: {obj:.2f}"

class SpecialInt(int):
    pass

print(process(42))
print(process(3.14))
print(process("hello"))
print(process(SpecialInt(7)))

13.6.3 cached_property与cached_method

Python 3.8+引入了cached_property,3.9+引入了cached_method

python
from functools import cached_property
import math

class Circle:
    def __init__(self, radius):
        self.radius = radius

    @cached_property
    def area(self):
        print("  计算面积...")
        return math.pi * self.radius ** 2

    @cached_property
    def circumference(self):
        print("  计算周长...")
        return 2 * math.pi * self.radius

c = Circle(5)
print(f"面积: {c.area}")
print(f"面积(缓存): {c.area}")
print(f"周长: {c.circumference}")
print(f"周长(缓存): {c.circumference}")

c.radius = 10
print(f"面积(未更新): {c.area}")

del c.area
print(f"面积(重新计算): {c.area}")

注意:cached_property在实例的__dict__中存储缓存值。修改radius不会自动使缓存失效,需要手动删除缓存属性。

13.6.4 partial与partialmethod

python
from functools import partial

def power(base, exp):
    return base ** exp

square = partial(power, exp=2)
cube = partial(power, exp=3)

print(square(5))
print(cube(5))
print(f"partial属性: func={square.func}, keywords={square.keywords}")

from functools import partialmethod

class Cell:
    def __init__(self):
        self._alive = False

    def set_state(self, state):
        self._alive = state

    set_alive = partialmethod(set_state, True)
    set_dead = partialmethod(set_state, False)

cell = Cell()
cell.set_alive()
print(f"存活: {cell._alive}")
cell.set_dead()
print(f"存活: {cell._alive}")

13.6.5 total_ordering

total_ordering装饰器根据已定义的部分比较方法自动补全其余比较方法:

python
from functools import total_ordering

@total_ordering
class Student:
    def __init__(self, name, grade):
        self.name = name
        self.grade = grade

    def __eq__(self, other):
        if not isinstance(other, Student):
            return NotImplemented
        return self.grade == other.grade

    def __lt__(self, other):
        if not isinstance(other, Student):
            return NotImplemented
        return self.grade < other.grade

students = [
    Student("Alice", 90),
    Student("Bob", 85),
    Student("Charlie", 92),
]

for s in sorted(students):
    print(f"{s.name}: {s.grade}")

print(Student("A", 90) <= Student("B", 90))
print(Student("A", 85) > Student("B", 90))

13.7 装饰器设计模式与最佳实践

13.7.1 装饰器与设计模式

装饰器模式(Decorator Pattern)是GoF设计模式之一,Python的装饰器语法是该模式的一种实现。但两者有区别:

  • GoF装饰器模式:运行时动态添加职责,通过对象组合实现
  • Python装饰器:定义时静态添加行为,通过函数包装实现
python
from functools import wraps

class TextProcessor:
    def process(self, text: str) -> str:
        return text

class TrimDecorator:
    def __init__(self, processor: TextProcessor):
        self._processor = processor

    def process(self, text: str) -> str:
        return self._processor.process(text.strip())

class UpperDecorator:
    def __init__(self, processor: TextProcessor):
        self._processor = processor

    def process(self, text: str) -> str:
        return self._processor.process(text).upper()

class ReverseDecorator:
    def __init__(self, processor: TextProcessor):
        self._processor = processor

    def process(self, text: str) -> str:
        return self._processor.process(text)[::-1]

processor = ReverseDecorator(UpperDecorator(TrimDecorator(TextProcessor())))
print(processor.process("  hello world  "))

13.7.2 装饰器可测试性

装饰器应该是可独立测试的:

python
from functools import wraps
import time

def timer(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        wrapper.last_elapsed = time.perf_counter() - start
        return result
    wrapper.last_elapsed = 0.0
    return wrapper

@timer
def slow_add(a, b):
    time.sleep(0.01)
    return a + b

result = slow_add(1, 2)
print(f"结果: {result}, 耗时: {slow_add.last_elapsed:.6f}秒")

13.7.3 装饰器与类型提示

Python 3.10+提供了ParamSpecConcatenate用于精确描述装饰器的类型签名:

python
from functools import wraps
from typing import TypeVar, ParamSpec, Callable

P = ParamSpec("P")
R = TypeVar("R")

def trace(func: Callable[P, R]) -> Callable[P, R]:
    @wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        print(f"调用 {func.__name__}")
        return func(*args, **kwargs)
    return wrapper

@trace
def add(a: int, b: int) -> int:
    return a + b

result: int = add(1, 2)

13.7.4 装饰器的局限性

调试困难:装饰器改变了函数的调用栈,可能导致traceback不直观。使用@wraps__wrapped__可以缓解。

序列化问题:被装饰的函数可能无法被pickle:

python
import pickle
from functools import wraps

def my_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

@my_decorator
def greet(name):
    return f"Hello, {name}"

try:
    pickled = pickle.dumps(greet)
    print("序列化成功")
except (pickle.PicklingError, AttributeError) as e:
    print(f"序列化失败: {e}")

import copyreg
import types

def pickle_wrapper(func):
    return unpickle_wrapper, (func.__wrapped__,)

def unpickle_wrapper(func):
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

copyreg.pickle(types.FunctionType, pickle_wrapper)

装饰器顺序敏感:不同装饰器的叠加顺序可能产生不同效果,需要仔细设计。

性能开销:每层装饰器增加一次函数调用开销,在高频调用场景下需要考虑。

13.8 前沿技术动态

13.8.1 Python 3.12+的改进

Python 3.12对装饰器语法进行了增强,允许更灵活的装饰器表达式:

python
# Python 3.12+ 支持更灵活的装饰器语法
# 允许在装饰器位置使用任意表达式

decorators = [trace, timer]

# 传统写法
@trace
@timer
def func_v1():
    pass

# Python 3.12+ 可以动态选择装饰器
# @decorators[0]  # 新语法(示例,需3.12+环境)

13.8.2 基于装饰器的依赖注入

现代Python框架广泛使用装饰器实现依赖注入:

python
from functools import wraps
from typing import get_type_hints

class Container:
    _instances = {}
    _factories = {}

    @classmethod
    def register(cls, interface):
        def decorator(implementation):
            cls._factories[interface] = implementation
            return implementation
        return decorator

    @classmethod
    def get(cls, interface):
        if interface not in cls._instances:
            if interface in cls._factories:
                cls._instances[interface] = cls._factories[interface]()
            else:
                raise KeyError(f"未注册的类型: {interface}")
        return cls._instances[interface]

    @classmethod
    def inject(cls, func):
        hints = get_type_hints(func)

        @wraps(func)
        def wrapper(*args, **kwargs):
            for name, hint in hints.items():
                if name not in kwargs and hint in cls._factories:
                    kwargs[name] = cls.get(hint)
            return func(*args, **kwargs)
        return wrapper

class Database:
    pass

@Container.register(Database)
class PostgresDB(Database):
    def query(self, sql):
        return f"PostgreSQL: {sql}"

@Container.inject
def get_data(db: Database):
    return db.query("SELECT * FROM users")

print(get_data())

13.8.3 装饰器与AST转换

高级用法是利用装饰器触发AST(抽象语法树)转换,实现编译时优化:

python
import ast
import inspect
import textwrap

def optimize(func):
    source = inspect.getsource(func)
    source = textwrap.dedent(source)
    tree = ast.parse(source)
    
    class ConstantFolder(ast.NodeTransformer):
        def visit_BinOp(self, node):
            self.generic_visit(node)
            if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
                if isinstance(node.op, ast.Add):
                    return ast.Constant(value=node.left.value + node.right.value)
                if isinstance(node.op, ast.Mult):
                    return ast.Constant(value=node.left.value * node.right.value)
            return node
    
    optimized = ConstantFolder().visit(tree)
    ast.fix_missing_locations(optimized)
    
    code = compile(optimized, func.__code__.co_filename, "exec")
    namespace = {}
    exec(code, func.__globals__, namespace)
    return namespace[func.__name__]

@optimize
def compute():
    return 2 + 3

print(compute())

13.9 本章小结

本章深入探讨了Python装饰器机制:

  1. 闭包基础:词法作用域、自由变量、__closure__cell对象、延迟绑定陷阱
  2. 函数装饰器:装饰器本质、通用模板、常用模式(计时、日志、类型检查、重试、速率限制)
  3. 带参装饰器:三层嵌套、可选参数、基于类的参数化
  4. 类装饰器__call__协议、装饰类、单例模式、数据验证
  5. 装饰器叠加:执行顺序、组合器模式
  6. functools工具lru_cachesingledispatchcached_propertypartialtotal_ordering
  7. 设计实践:装饰器模式对比、可测试性、类型提示、局限性与应对

13.10 习题与项目练习

基础习题

  1. 闭包分析:分析以下代码的输出,并解释原因:

    python
    funcs = [lambda: i for i in range(5)]
    print([f() for f in funcs])

    如何修改使其输出[0, 1, 2, 3, 4]

  2. 计时装饰器:实现一个@timeit装饰器,记录函数执行时间并支持配置输出格式(秒/毫秒/微秒)。

  3. 缓存装饰器:不使用functools.lru_cache,手动实现一个带TTL(生存时间)的缓存装饰器。

进阶习题

  1. 权限检查装饰器:实现一个@require_permission(permission)装饰器,用于Web框架中的权限控制。要求支持权限组合(AND/OR逻辑)。

  2. 异步装饰器:实现一个适用于async函数的重试装饰器@async_retry,支持指数退避策略。

  3. 装饰器注册系统:设计一个基于装饰器的插件注册系统,支持:

    • 自动发现和注册插件
    • 按名称/标签查询插件
    • 插件依赖声明

项目练习

  1. AOP框架:使用装饰器实现一个轻量级面向切面编程(AOP)框架,支持:

    • Before通知:方法调用前执行
    • After通知:方法调用后执行
    • Around通知:包裹方法调用
    • AfterThrowing通知:异常时执行
    • 切点(Pointcut)表达式匹配
  2. 性能监控中间件:实现一个基于装饰器的性能监控系统,包括:

    • 函数级耗时统计
    • 调用频率统计
    • 内存使用监控
    • 统计数据导出(JSON/CSV)
    • 可配置的告警阈值

13.11 延伸阅读

13.11.1 装饰器理论

  • PEP 318 — Decorators for Functions and Methods (https://peps.python.org/pep-0318/) — 装饰器语法的设计决策
  • 《Fluent Python》第7章 — 函数装饰器和闭包深度解析
  • 《Python Cookbook》第9章 — 元编程实用技巧

13.11.2 functools模块

13.11.3 类型提示与装饰器

13.11.4 实践案例


下一章:第14章 生成器与迭代器

青少年创意编程 - 高中Python组 - 江苏省宿城中等专业学校