第12章 代理模式
学习目标
完成本章学习后,读者将能够:
- 理解代理模式的核心概念与形式化定义
- 掌握多种代理类型的实现技术与适用场景
- 设计虚拟代理、保护代理、缓存代理等变体
- 实现动态代理与描述符代理
- 分析代理模式与装饰器模式的本质区别
- 识别代理模式的适用场景与反模式
12.1 模式定义
12.1.1 正式定义
代理模式(Proxy Pattern) 为其他对象提供一种代理以控制对这个对象的访问。代理对象充当客户端与真实对象之间的中介,可以在不改变真实对象接口的情况下添加额外的控制逻辑。
$$\text{Proxy}: \mathcal{P} \xrightarrow{\text{control}} \mathcal{R} \text{ where } \mathcal{P} \sim \mathcal{R}$$
其中:
- $\mathcal{P}$ 表示代理对象(Proxy)
- $\mathcal{R}$ 表示真实对象(Real Subject)
- $\mathcal{P} \sim \mathcal{R}$ 表示代理与真实对象具有相同接口
代理控制函数:
$$\text{Control}(r, o, c) = \begin{cases} r(o) & \text{if } \text{authorize}(c) \land \text{available}(o) \ \text{error} & \text{otherwise} \end{cases}$$
其中:
- $r$ 为请求操作
- $o$ 为目标对象
- $c$ 为客户端上下文
代理开销公式:
$$\text{Overhead} = T_{\text{pre-processing}} + T_{\text{post-processing}} + T_{\text{indirection}}$$
12.1.2 历史背景与学术脉络
| 时期 | 发展阶段 | 关键贡献 | 代表人物/文献 |
|---|---|---|---|
| 1987 | 概念萌芽 | Smalltalk中的代理概念 | ParcPlace Systems |
| 1994 | GoF正式定义 | 《设计模式》收录为结构型模式 | Gamma, Helm, Johnson, Vlissides |
| 1995 | 分布式计算 | CORBA/DCOM远程代理 | OMG, Microsoft |
| 1998 | Java RMI | 远程方法调用的代理实现 | Sun Microsystems |
| 2000 | AOP兴起 | 动态代理与面向切面编程 | AspectJ, Spring AOP |
| 2005 | Web服务 | SOAP/REST API代理模式 | W3C |
| 2010 | 云计算 | 微服务API网关代理 | Netflix Zuul |
| 2015 | 容器化 | Kubernetes Service代理 | |
| 2020 | 现代演进 | Python __getattr__动态代理 | Python Community |
12.1.3 模式动机
问题场景:在软件系统中,直接访问对象可能存在以下问题:
- 创建开销大:对象初始化需要大量资源
- 访问控制:需要验证客户端权限
- 远程访问:对象位于不同地址空间
- 额外操作:需要在访问前后执行额外逻辑
解决方案:引入代理对象作为中介:
┌─────────────────────────────────────────────────────────────────────────┐
│ 代理模式架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Client │────────▶│ Proxy │────────▶│ RealSubject │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 控制逻辑层 │ │
│ ├─────────────────┤ │
│ │ • 访问控制 │ │
│ │ • 延迟加载 │ │
│ │ • 缓存管理 │ │
│ │ • 日志记录 │ │
│ │ • 性能监控 │ │
│ │ • 远程转发 │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘12.1.4 代理类型分类
| 代理类型 | 用途 | 典型场景 |
|---|---|---|
| 虚拟代理(Virtual Proxy) | 延迟创建开销大的对象 | 图片懒加载、数据库连接 |
| 保护代理(Protection Proxy) | 控制访问权限 | 权限验证、访问控制 |
| 远程代理(Remote Proxy) | 为远程对象提供本地代理 | RPC、Web服务调用 |
| 缓存代理(Cache Proxy) | 缓存结果减少重复计算 | API缓存、数据库查询缓存 |
| 智能引用(Smart Reference) | 访问时执行额外操作 | 引用计数、日志记录 |
| 防火墙代理(Firewall Proxy) | 保护网络资源 | 网络安全过滤 |
| 同步代理(Synchronization Proxy) | 提供线程安全访问 | 多线程环境 |
12.2 理论基础
12.2.1 UML结构模型
┌─────────────────────────────────────────────────────────────────────────┐
│ 代理模式结构图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ ┌─────────────────────────────┐ │
│ │ <<interface>> │ │ Proxy │ │
│ │ Subject │ ├─────────────────────────────┤ │
│ ├─────────────────────┤ │ - real_subject: RealSubject │ │
│ │ + request() │◄────────│ - _cache: Dict │ │
│ │ + operation() │ implements│ - _lock: Lock │ │
│ └─────────────────────┘ │ + request() │ │
│ △ │ + operation() │ │
│ │ │ - check_access(): bool │ │
│ │ │ - log_access() │ │
│ ┌─────────┴──────────┐ └─────────────────────────────┘ │
│ │ │ │ delegates │
│ │ RealSubject │ ▼ │
│ ├────────────────────┤ ┌─────────────────────────────┐ │
│ │ - _state: Any │ │ Client Code │ │
│ │ + request() │◄─────────│ │ │
│ │ + operation() │ uses │ subject.request() │ │
│ └────────────────────┘ └─────────────────────────────┘ │
│ │
│ 代理模式关键关系: │
│ • Proxy和RealSubject实现相同接口Subject │
│ • Proxy持有RealSubject的引用 │
│ • Client通过Subject接口与Proxy交互 │
│ │
└─────────────────────────────────────────────────────────────────────────┘12.2.2 参与者职责
| 参与者 | 职责 | Python实现要点 |
|---|---|---|
| Subject | 声明真实主题和代理的共同接口 | ABC或Protocol |
| RealSubject | 定义代理所代表的真实对象 | 业务逻辑实现 |
| Proxy | 控制对RealSubject的访问 | 持有引用、添加控制逻辑 |
| Client | 通过Subject接口与对象交互 | 依赖注入 |
12.2.3 代理与装饰器的本质区别
┌─────────────────────────────────────────────────────────────────────────┐
│ 代理模式 vs 装饰器模式 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 代理模式(Proxy) 装饰器模式(Decorator) │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Proxy │ │ Decorator │ │
│ ├──────────────────┤ ├──────────────────┤ │
│ │ 目的: 控制访问 │ │ 目的: 增强功能 │ │
│ │ 关系: 1:1 │ │ 关系: N:1 │ │
│ │ 生命周期: 管理 │ │ 生命周期: 不管理 │ │
│ │ 对象创建: 可以 │ │ 对象创建: 不可以 │ │
│ │ 透明性: 完全透明 │ │ 透明性: 可叠加 │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
│ 代理模式示例: 装饰器模式示例: │
│ • 虚拟代理(延迟加载) • 日志装饰器 │
│ • 保护代理(权限控制) • 缓存装饰器 │
│ • 远程代理(网络代理) • 重试装饰器 │
│ │
│ 关键区别: │
│ • 代理控制访问,装饰器增强功能 │
│ • 代理可以决定是否创建真实对象,装饰器不创建对象 │
│ • 代理通常在对象创建前工作,装饰器在对象创建后工作 │
│ │
└─────────────────────────────────────────────────────────────────────────┘12.3 Python实现
12.3.1 标准ABC实现
from abc import ABC, abstractmethod
from typing import Optional
import time
class Subject(ABC):
"""主题接口:定义真实对象和代理的共同接口"""
@abstractmethod
def request(self) -> str:
"""执行请求"""
pass
@abstractmethod
def operation(self, data: str) -> str:
"""执行操作"""
pass
class RealSubject(Subject):
"""真实主题:定义代理所代表的真实对象"""
def __init__(self, name: str):
self._name = name
self._state = "initialized"
print(f"RealSubject '{self._name}' 创建完成")
def request(self) -> str:
return f"RealSubject[{self._name}]: 处理请求"
def operation(self, data: str) -> str:
self._state = f"processed_{data}"
return f"RealSubject[{self._name}]: 操作完成 - {self._state}"
class Proxy(Subject):
"""代理:控制对RealSubject的访问"""
def __init__(self, name: str):
self._name = name
self._real_subject: Optional[RealSubject] = None
self._access_count = 0
self._last_access_time: Optional[float] = None
def _ensure_subject(self) -> RealSubject:
"""确保真实对象已创建(延迟加载)"""
if self._real_subject is None:
print(f"Proxy: 延迟创建 RealSubject '{self._name}'")
self._real_subject = RealSubject(self._name)
return self._real_subject
def _check_access(self) -> bool:
"""检查访问权限"""
print(f"Proxy: 检查访问权限...")
return True
def _log_access(self) -> None:
"""记录访问日志"""
self._access_count += 1
self._last_access_time = time.time()
print(f"Proxy: 访问次数={self._access_count}")
def request(self) -> str:
if self._check_access():
result = self._ensure_subject().request()
self._log_access()
return result
return "Proxy: 访问被拒绝"
def operation(self, data: str) -> str:
if self._check_access():
result = self._ensure_subject().operation(data)
self._log_access()
return result
return "Proxy: 操作被拒绝"
def get_stats(self) -> dict:
"""获取代理统计信息"""
return {
'name': self._name,
'access_count': self._access_count,
'last_access': self._last_access_time,
'subject_created': self._real_subject is not None
}
def client_code(subject: Subject) -> None:
"""客户端代码:通过Subject接口与对象交互"""
print(subject.request())
print(subject.operation("test_data"))
if __name__ == "__main__":
print("=== 直接访问 RealSubject ===")
real = RealSubject("direct")
client_code(real)
print("\n=== 通过 Proxy 访问 ===")
proxy = Proxy("proxied")
client_code(proxy)
print(f"\n代理统计: {proxy.get_stats()}")12.3.2 Protocol实现(结构化类型)
from typing import Protocol, Any, Optional, runtime_checkable
from dataclasses import dataclass
from functools import wraps
import time
@runtime_checkable
class SubjectProtocol(Protocol):
"""主题协议:定义接口契约"""
def request(self) -> str: ...
def operation(self, data: str) -> str: ...
@dataclass
class ImageMetadata:
"""图片元数据"""
filename: str
width: int
height: int
size_bytes: int
class ImageService:
"""图片服务:真实主题"""
def __init__(self, filename: str):
self._filename = filename
self._metadata: Optional[ImageMetadata] = None
self._loaded = False
self._load_image()
def _load_image(self) -> None:
"""模拟加载图片"""
print(f"ImageService: 加载图片 '{self._filename}'...")
time.sleep(0.1)
self._metadata = ImageMetadata(
filename=self._filename,
width=1920,
height=1080,
size_bytes=1024 * 1024 * 2
)
self._loaded = True
print(f"ImageService: 图片加载完成")
def request(self) -> str:
return f"ImageService: 图片 '{self._filename}' 已就绪"
def operation(self, data: str) -> str:
return f"ImageService: 对图片执行操作 '{data}'"
def get_metadata(self) -> Optional[ImageMetadata]:
return self._metadata
class ImageProxy:
"""图片代理:虚拟代理实现延迟加载"""
def __init__(self, filename: str):
self._filename = filename
self._image_service: Optional[ImageService] = None
self._metadata_cache: Optional[ImageMetadata] = None
def _ensure_loaded(self) -> ImageService:
"""确保图片已加载"""
if self._image_service is None:
self._image_service = ImageService(self._filename)
return self._image_service
def request(self) -> str:
return self._ensure_loaded().request()
def operation(self, data: str) -> str:
return self._ensure_loaded().operation(data)
def get_metadata(self) -> Optional[ImageMetadata]:
"""获取元数据(可能需要加载)"""
if self._metadata_cache:
return self._metadata_cache
service = self._ensure_loaded()
self._metadata_cache = service.get_metadata()
return self._metadata_cache
def get_filename(self) -> str:
"""获取文件名(无需加载)"""
return self._filename
def is_loaded(self) -> bool:
"""检查是否已加载"""
return self._image_service is not None
class ImageGallery:
"""图片画廊:管理多个图片代理"""
def __init__(self):
self._images: dict[str, ImageProxy] = {}
def add_image(self, filename: str) -> ImageProxy:
proxy = ImageProxy(filename)
self._images[filename] = proxy
return proxy
def display_all(self) -> None:
for filename, proxy in self._images.items():
status = "已加载" if proxy.is_loaded() else "未加载"
print(f" {filename}: {status}")
def load_image(self, filename: str) -> None:
if filename in self._images:
self._images[filename].request()
if __name__ == "__main__":
gallery = ImageGallery()
gallery.add_image("photo1.jpg")
gallery.add_image("photo2.jpg")
gallery.add_image("photo3.jpg")
print("=== 初始状态 ===")
gallery.display_all()
print("\n=== 加载第一张图片 ===")
gallery.load_image("photo1.jpg")
gallery.display_all()
print("\n=== 再次访问第一张图片(已缓存)===")
gallery.load_image("photo1.jpg")12.3.3 动态代理(__getattr__实现)
from typing import Any, Callable, Optional, Dict
from functools import wraps
import time
import threading
class DynamicProxy:
"""动态代理:通过__getattr__自动转发方法调用"""
def __init__(self, subject: Any):
self._subject = subject
self._method_cache: Dict[str, Callable] = {}
self._call_stats: Dict[str, int] = {}
self._lock = threading.Lock()
def __getattr__(self, name: str) -> Any:
"""动态获取属性和方法"""
attr = getattr(self._subject, name)
if callable(attr):
if name not in self._method_cache:
@wraps(attr)
def wrapper(*args, **kwargs):
return self._proxy_call(name, attr, *args, **kwargs)
self._method_cache[name] = wrapper
return self._method_cache[name]
return attr
def _proxy_call(
self,
method_name: str,
method: Callable,
*args,
**kwargs
) -> Any:
"""代理方法调用"""
start = time.time()
try:
result = method(*args, **kwargs)
elapsed = time.time() - start
with self._lock:
self._call_stats[method_name] = \
self._call_stats.get(method_name, 0) + 1
print(f"[Proxy] {method_name}() 执行时间: {elapsed:.4f}s")
return result
except Exception as e:
print(f"[Proxy] {method_name}() 异常: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""获取调用统计"""
return {
'call_counts': dict(self._call_stats),
'total_calls': sum(self._call_stats.values())
}
class APIClient:
"""示例:API客户端"""
def get_users(self) -> list:
time.sleep(0.05)
return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
def get_user(self, user_id: int) -> dict:
time.sleep(0.02)
return {"id": user_id, "name": f"User{user_id}"}
def create_user(self, name: str) -> dict:
time.sleep(0.03)
return {"id": 3, "name": name}
@property
def base_url(self) -> str:
return "https://api.example.com"
if __name__ == "__main__":
client = APIClient()
proxy = DynamicProxy(client)
print("=== 通过动态代理调用方法 ===")
users = proxy.get_users()
print(f"用户列表: {users}")
user = proxy.get_user(1)
print(f"用户: {user}")
new_user = proxy.create_user("Charlie")
print(f"新用户: {new_user}")
print(f"\n代理统计: {proxy.get_stats()}")
print(f"访问属性: base_url = {proxy.base_url}")12.3.4 描述符代理
from typing import Any, Optional, Callable, TypeVar, Generic
from dataclasses import dataclass
from weakref import WeakKeyDictionary
import threading
T = TypeVar('T')
class LazyProxy(Generic[T]):
"""延迟加载描述符代理"""
def __init__(self, factory: Callable[[], T]):
self._factory = factory
self._instances: WeakKeyDictionary = WeakKeyDictionary()
self._lock = threading.Lock()
def __get__(self, obj: Any, objtype: Any = None) -> T:
if obj is None:
return self
if obj not in self._instances:
with self._lock:
if obj not in self._instances:
print(f"LazyProxy: 创建实例...")
self._instances[obj] = self._factory()
return self._instances[obj]
class CachedProperty:
"""缓存属性代理"""
def __init__(self, func: Callable):
self._func = func
self._cache: WeakKeyDictionary = WeakKeyDictionary()
self.__doc__ = func.__doc__
def __get__(self, obj: Any, objtype: Any = None) -> Any:
if obj is None:
return self
if obj not in self._cache:
print(f"CachedProperty: 计算 '{self._func.__name__}'...")
self._cache[obj] = self._func(obj)
return self._cache[obj]
class ValidatedProxy:
"""验证代理描述符"""
def __init__(
self,
name: str,
validator: Callable[[Any], bool],
error_message: str = "验证失败"
):
self._name = name
self._validator = validator
self._error_message = error_message
self._data: WeakKeyDictionary = WeakKeyDictionary()
def __get__(self, obj: Any, objtype: Any = None) -> Any:
if obj is None:
return self
return self._data.get(obj, None)
def __set__(self, obj: Any, value: Any) -> None:
if not self._validator(value):
raise ValueError(f"{self._name}: {self._error_message}")
self._data[obj] = value
@dataclass
class DatabaseConnection:
"""数据库连接(模拟)"""
host: str
port: int
def connect(self) -> str:
return f"Connected to {self.host}:{self.port}"
def query(self, sql: str) -> list:
return [{"result": sql}]
class DataService:
"""数据服务:使用描述符代理"""
_db = LazyProxy[DatabaseConnection](
lambda: DatabaseConnection("localhost", 5432)
)
def __init__(self, name: str):
self.name = name
@CachedProperty
def expensive_computation(self) -> int:
"""耗时计算"""
import time
time.sleep(0.1)
return 42
age = ValidatedProxy(
"age",
lambda x: isinstance(x, int) and 0 <= x <= 150,
"年龄必须是0-150之间的整数"
)
def get_data(self) -> str:
return self._db.connect()
def query(self, sql: str) -> list:
return self._db.query(sql)
if __name__ == "__main__":
service = DataService("test")
print("=== 延迟加载数据库连接 ===")
print(service.get_data())
print(service.query("SELECT * FROM users"))
print("\n=== 缓存属性 ===")
print(f"第一次计算: {service.expensive_computation}")
print(f"第二次访问(缓存): {service.expensive_computation}")
print("\n=== 验证代理 ===")
service.age = 25
print(f"年龄: {service.age}")
try:
service.age = -5
except ValueError as e:
print(f"验证错误: {e}")12.3.5 泛型保护代理
from typing import TypeVar, Generic, Dict, Set, Callable, Any, Optional
from dataclasses import dataclass
from enum import Enum, auto
from functools import wraps
from abc import ABC, abstractmethod
class Permission(Enum):
"""权限枚举"""
READ = auto()
WRITE = auto()
DELETE = auto()
ADMIN = auto()
@dataclass(frozen=True)
class User:
"""用户"""
username: str
permissions: Set[Permission]
def has_permission(self, permission: Permission) -> bool:
return permission in self.permissions or Permission.ADMIN in self.permissions
T = TypeVar('T')
class ProtectionProxy(Generic[T]):
"""保护代理:控制访问权限"""
def __init__(
self,
subject: T,
user: User,
permission_map: Optional[Dict[str, Set[Permission]]] = None
):
self._subject = subject
self._user = user
self._permission_map = permission_map or {}
self._access_log: list = []
def _check_permission(self, method_name: str) -> bool:
"""检查方法调用权限"""
required = self._permission_map.get(method_name, set())
if not required:
return True
has_access = any(
self._user.has_permission(p) for p in required
)
self._access_log.append({
'method': method_name,
'user': self._user.username,
'allowed': has_access,
'required': required
})
return has_access
def __getattr__(self, name: str) -> Any:
attr = getattr(self._subject, name)
if callable(attr):
@wraps(attr)
def wrapper(*args, **kwargs):
if not self._check_permission(name):
required = self._permission_map.get(name, set())
raise PermissionError(
f"用户 '{self._user.username}' 无权限执行 '{name}' "
f"(需要: {required})"
)
return attr(*args, **kwargs)
return wrapper
return attr
def get_access_log(self) -> list:
return list(self._access_log)
class SecureDocument:
"""安全文档"""
def __init__(self, title: str, content: str = ""):
self._title = title
self._content = content
def read(self) -> str:
return f"[{self._title}]\n{self._content}"
def write(self, content: str) -> None:
self._content = content
def delete(self) -> None:
self._content = ""
self._title = "[已删除]"
def get_title(self) -> str:
return self._title
DOCUMENT_PERMISSIONS = {
'read': {Permission.READ},
'write': {Permission.WRITE},
'delete': {Permission.DELETE},
'get_title': set(),
}
class DocumentService:
"""文档服务工厂"""
@staticmethod
def create_secure_document(
title: str,
content: str,
user: User
) -> ProtectionProxy[SecureDocument]:
doc = SecureDocument(title, content)
return ProtectionProxy(doc, user, DOCUMENT_PERMISSIONS)
if __name__ == "__main__":
admin = User("admin", {Permission.ADMIN})
reader = User("reader", {Permission.READ})
writer = User("writer", {Permission.READ, Permission.WRITE})
doc = SecureDocument("机密文档", "这是机密内容")
print("=== 管理员访问 ===")
admin_proxy = ProtectionProxy(doc, admin, DOCUMENT_PERMISSIONS)
print(admin_proxy.read())
admin_proxy.write("管理员修改的内容")
print("\n=== 读者访问 ===")
reader_proxy = ProtectionProxy(doc, reader, DOCUMENT_PERMISSIONS)
print(reader_proxy.read())
try:
reader_proxy.write("读者尝试修改")
except PermissionError as e:
print(f"权限错误: {e}")
print("\n=== 写者访问 ===")
writer_proxy = ProtectionProxy(doc, writer, DOCUMENT_PERMISSIONS)
print(writer_proxy.read())
writer_proxy.write("写者修改的内容")
try:
writer_proxy.delete()
except PermissionError as e:
print(f"权限错误: {e}")
print(f"\n访问日志: {writer_proxy.get_access_log()}")12.4 企业级应用示例
12.4.1 数据库连接池代理
from typing import Optional, List, Dict, Any, Callable
from dataclasses import dataclass, field
from contextlib import contextmanager
from datetime import datetime
import threading
import time
import queue
@dataclass
class ConnectionStats:
"""连接统计"""
total_created: int = 0
total_reused: int = 0
total_errors: int = 0
active_connections: int = 0
peak_connections: int = 0
@dataclass
class ConnectionConfig:
"""连接配置"""
host: str
port: int
database: str
user: str
password: str
max_connections: int = 10
min_connections: int = 2
connection_timeout: float = 30.0
idle_timeout: float = 300.0
class Connection:
"""数据库连接(模拟)"""
def __init__(self, conn_id: int, config: ConnectionConfig):
self._id = conn_id
self._config = config
self._created_at = datetime.now()
self._last_used = datetime.now()
self._is_closed = False
self._query_count = 0
@property
def id(self) -> int:
return self._id
@property
def is_alive(self) -> bool:
return not self._is_closed
@property
def is_idle_expired(self) -> bool:
idle_time = (datetime.now() - self._last_used).total_seconds()
return idle_time > self._config.idle_timeout
def execute(self, query: str) -> List[Dict[str, Any]]:
if self._is_closed:
raise RuntimeError("连接已关闭")
self._last_used = datetime.now()
self._query_count += 1
time.sleep(0.01)
return [{"id": 1, "result": f"Query: {query[:30]}..."}]
def close(self) -> None:
self._is_closed = True
def __repr__(self) -> str:
status = "closed" if self._is_closed else "active"
return f"Connection(id={self._id}, status={status}, queries={self._query_count})"
class ConnectionPoolProxy:
"""连接池代理:管理数据库连接"""
def __init__(self, config: ConnectionConfig):
self._config = config
self._pool: queue.Queue = queue.Queue(maxsize=config.max_connections)
self._all_connections: Dict[int, Connection] = {}
self._stats = ConnectionStats()
self._lock = threading.Lock()
self._next_id = 0
self._initialize_pool()
def _initialize_pool(self) -> None:
"""初始化最小连接数"""
for _ in range(self._config.min_connections):
conn = self._create_connection()
self._pool.put(conn)
def _create_connection(self) -> Connection:
"""创建新连接"""
with self._lock:
self._next_id += 1
conn = Connection(self._next_id, self._config)
self._all_connections[conn.id] = conn
self._stats.total_created += 1
self._stats.active_connections += 1
self._stats.peak_connections = max(
self._stats.peak_connections,
self._stats.active_connections
)
return conn
def acquire(self, timeout: Optional[float] = None) -> Connection:
"""获取连接"""
timeout = timeout or self._config.connection_timeout
try:
conn = self._pool.get(block=True, timeout=timeout)
if not conn.is_alive or conn.is_idle_expired:
self._release_connection(conn)
return self.acquire(timeout)
self._stats.total_reused += 1
return conn
except queue.Empty:
if self._stats.active_connections < self._config.max_connections:
return self._create_connection()
raise RuntimeError("连接池已耗尽")
def release(self, conn: Connection) -> None:
"""释放连接"""
if not conn.is_alive:
self._release_connection(conn)
return
try:
self._pool.put(conn, block=False)
except queue.Full:
self._release_connection(conn)
def _release_connection(self, conn: Connection) -> None:
"""真正释放连接"""
conn.close()
with self._lock:
if conn.id in self._all_connections:
del self._all_connections[conn.id]
self._stats.active_connections -= 1
@contextmanager
def connection(self):
"""上下文管理器"""
conn = self.acquire()
try:
yield conn
finally:
self.release(conn)
def execute(self, query: str) -> List[Dict[str, Any]]:
"""便捷执行方法"""
with self.connection() as conn:
return conn.execute(query)
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
'total_created': self._stats.total_created,
'total_reused': self._stats.total_reused,
'total_errors': self._stats.total_errors,
'active_connections': self._stats.active_connections,
'peak_connections': self._stats.peak_connections,
'pool_size': self._pool.qsize(),
'reuse_rate': (
self._stats.total_reused /
max(1, self._stats.total_created + self._stats.total_reused)
)
}
def close_all(self) -> None:
"""关闭所有连接"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
conn.close()
except queue.Empty:
break
for conn in list(self._all_connections.values()):
conn.close()
self._all_connections.clear()
self._stats.active_connections = 0
class DatabaseService:
"""数据库服务"""
_pools: Dict[str, ConnectionPoolProxy] = {}
_lock = threading.Lock()
@classmethod
def get_pool(cls, config: ConnectionConfig) -> ConnectionPoolProxy:
key = f"{config.host}:{config.port}:{config.database}"
if key not in cls._pools:
with cls._lock:
if key not in cls._pools:
cls._pools[key] = ConnectionPoolProxy(config)
return cls._pools[key]
@classmethod
def close_all(cls) -> None:
for pool in cls._pools.values():
pool.close_all()
cls._pools.clear()
if __name__ == "__main__":
config = ConnectionConfig(
host="localhost",
port=5432,
database="mydb",
user="admin",
password="secret",
max_connections=5,
min_connections=2
)
pool = DatabaseService.get_pool(config)
print("=== 执行查询 ===")
for i in range(5):
result = pool.execute(f"SELECT * FROM users WHERE id = {i}")
print(f"查询 {i+1}: {result[0]['result']}")
print(f"\n连接池统计: {pool.get_stats()}")
print("\n=== 并发测试 ===")
def worker(worker_id: int):
with pool.connection() as conn:
result = conn.execute(f"SELECT * FROM orders WHERE worker = {worker_id}")
print(f"Worker {worker_id}: {result[0]['result']}")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"\n最终统计: {pool.get_stats()}")
DatabaseService.close_all()12.4.2 API网关代理
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from abc import ABC, abstractmethod
import time
import hashlib
import threading
from functools import wraps
class HttpMethod(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
@dataclass
class Request:
"""HTTP请求"""
method: HttpMethod
path: str
headers: Dict[str, str] = field(default_factory=dict)
params: Dict[str, Any] = field(default_factory=dict)
body: Any = None
client_id: str = "anonymous"
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class Response:
"""HTTP响应"""
status_code: int
headers: Dict[str, str] = field(default_factory=dict)
body: Any = None
message: str = ""
latency_ms: float = 0.0
class Middleware(ABC):
"""中间件接口"""
@abstractmethod
def process(self, request: Request, next_handler: Callable) -> Response:
pass
class RateLimitMiddleware(Middleware):
"""限流中间件"""
def __init__(
self,
max_requests: int = 100,
window_seconds: int = 60
):
self._max_requests = max_requests
self._window = timedelta(seconds=window_seconds)
self._requests: Dict[str, List[datetime]] = {}
self._lock = threading.Lock()
def process(self, request: Request, next_handler: Callable) -> Response:
client_id = request.client_id
now = datetime.now()
with self._lock:
if client_id not in self._requests:
self._requests[client_id] = []
self._requests[client_id] = [
t for t in self._requests[client_id]
if now - t < self._window
]
if len(self._requests[client_id]) >= self._max_requests:
return Response(
status_code=429,
message="请求过于频繁,请稍后再试"
)
self._requests[client_id].append(now)
return next_handler(request)
class CacheMiddleware(Middleware):
"""缓存中间件"""
def __init__(self, ttl_seconds: int = 60):
self._ttl = timedelta(seconds=ttl_seconds)
self._cache: Dict[str, tuple] = {}
self._lock = threading.Lock()
def _make_key(self, request: Request) -> str:
key_data = f"{request.method.value}:{request.path}:{str(request.params)}"
return hashlib.md5(key_data.encode()).hexdigest()
def process(self, request: Request, next_handler: Callable) -> Response:
if request.method != HttpMethod.GET:
return next_handler(request)
key = self._make_key(request)
now = datetime.now()
with self._lock:
if key in self._cache:
response, timestamp = self._cache[key]
if now - timestamp < self._ttl:
response.headers["X-Cache"] = "HIT"
return response
response = next_handler(request)
if response.status_code == 200:
with self._lock:
self._cache[key] = (response, now)
response.headers["X-Cache"] = "MISS"
return response
def clear(self) -> None:
with self._lock:
self._cache.clear()
class AuthMiddleware(Middleware):
"""认证中间件"""
def __init__(self, api_keys: Dict[str, List[str]]):
self._api_keys = api_keys
def process(self, request: Request, next_handler: Callable) -> Response:
api_key = request.headers.get("X-API-Key", "")
if not api_key:
return Response(
status_code=401,
message="缺少API密钥"
)
client_id = self._api_keys.get(api_key)
if not client_id:
return Response(
status_code=403,
message="无效的API密钥"
)
request.client_id = client_id[0]
return next_handler(request)
class LoggingMiddleware(Middleware):
"""日志中间件"""
def process(self, request: Request, next_handler: Callable) -> Response:
start = time.time()
response = next_handler(request)
elapsed = (time.time() - start) * 1000
print(
f"[{datetime.now().isoformat()}] "
f"{request.method.value} {request.path} "
f"-> {response.status_code} ({elapsed:.2f}ms)"
)
response.latency_ms = elapsed
return response
class BackendService:
"""后端服务(模拟)"""
def __init__(self, name: str, base_url: str):
self._name = name
self._base_url = base_url
def handle(self, request: Request) -> Response:
time.sleep(0.02)
return Response(
status_code=200,
body={
"service": self._name,
"path": request.path,
"params": request.params
},
message="成功"
)
class APIGateway:
"""API网关代理"""
def __init__(self):
self._services: Dict[str, BackendService] = {}
self._middlewares: List[Middleware] = []
self._routes: Dict[str, str] = {}
def register_service(
self,
prefix: str,
service: BackendService
) -> None:
self._services[prefix] = service
def add_middleware(self, middleware: Middleware) -> None:
self._middlewares.append(middleware)
def _build_handler_chain(self) -> Callable:
def final_handler(request: Request) -> Response:
path_parts = request.path.strip("/").split("/")
prefix = f"/{path_parts[0]}" if path_parts else "/"
if prefix not in self._services:
return Response(
status_code=404,
message=f"服务未找到: {prefix}"
)
return self._services[prefix].handle(request)
handler = final_handler
for middleware in reversed(self._middlewares):
current_handler = handler
current_middleware = middleware
handler = lambda req, h=current_handler, m=current_middleware: m.process(req, h)
return handler
def handle(self, request: Request) -> Response:
handler = self._build_handler_chain()
return handler(request)
def get(
self,
path: str,
params: Dict[str, Any] = None,
headers: Dict[str, str] = None
) -> Response:
request = Request(
method=HttpMethod.GET,
path=path,
params=params or {},
headers=headers or {}
)
return self.handle(request)
def post(
self,
path: str,
body: Any = None,
headers: Dict[str, str] = None
) -> Response:
request = Request(
method=HttpMethod.POST,
path=path,
body=body,
headers=headers or {}
)
return self.handle(request)
if __name__ == "__main__":
gateway = APIGateway()
gateway.register_service("/users", BackendService("user-service", "http://users:8001"))
gateway.register_service("/orders", BackendService("order-service", "http://orders:8002"))
gateway.register_service("/products", BackendService("product-service", "http://products:8003"))
gateway.add_middleware(LoggingMiddleware())
gateway.add_middleware(RateLimitMiddleware(max_requests=10, window_seconds=60))
gateway.add_middleware(CacheMiddleware(ttl_seconds=30))
print("=== GET请求 ===")
response = gateway.get("/users", params={"id": 1})
print(f"响应: {response.status_code}, 缓存: {response.headers.get('X-Cache')}")
print("\n=== 重复请求(缓存命中)===")
response = gateway.get("/users", params={"id": 1})
print(f"响应: {response.status_code}, 缓存: {response.headers.get('X-Cache')}")
print("\n=== POST请求 ===")
response = gateway.post("/orders", body={"product_id": 123, "quantity": 2})
print(f"响应: {response.status_code}")
print("\n=== 不存在的服务 ===")
response = gateway.get("/unknown")
print(f"响应: {response.status_code}, 消息: {response.message}")12.4.3 分布式缓存代理
from typing import Dict, Any, Optional, List, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
from enum import Enum
import threading
import time
import hashlib
import json
T = TypeVar('T')
class CacheLevel(Enum):
L1 = "L1"
L2 = "L2"
L3 = "L3"
@dataclass
class CacheEntry(Generic[T]):
"""缓存条目"""
key: str
value: T
created_at: datetime = field(default_factory=datetime.now)
expires_at: Optional[datetime] = None
hits: int = 0
size_bytes: int = 0
level: CacheLevel = CacheLevel.L1
def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.now() > self.expires_at
def touch(self) -> None:
self.hits += 1
class CacheBackend(ABC, Generic[T]):
"""缓存后端接口"""
@abstractmethod
def get(self, key: str) -> Optional[CacheEntry[T]]:
pass
@abstractmethod
def set(self, entry: CacheEntry[T]) -> bool:
pass
@abstractmethod
def delete(self, key: str) -> bool:
pass
@abstractmethod
def clear(self) -> None:
pass
@abstractmethod
def stats(self) -> Dict[str, Any]:
pass
class MemoryCacheBackend(CacheBackend[T]):
"""内存缓存后端"""
def __init__(self, max_size: int = 1000, max_memory_mb: float = 100):
self._cache: Dict[str, CacheEntry[T]] = {}
self._max_size = max_size
self._max_memory = max_memory_mb * 1024 * 1024
self._current_memory = 0
self._lock = threading.Lock()
self._stats = {'hits': 0, 'misses': 0, 'evictions': 0}
def get(self, key: str) -> Optional[CacheEntry[T]]:
with self._lock:
if key in self._cache:
entry = self._cache[key]
if not entry.is_expired():
entry.touch()
self._stats['hits'] += 1
return entry
else:
self._evict(key)
self._stats['misses'] += 1
return None
def set(self, entry: CacheEntry[T]) -> bool:
with self._lock:
if entry.key in self._cache:
old = self._cache[entry.key]
self._current_memory -= old.size_bytes
while (
len(self._cache) >= self._max_size or
self._current_memory + entry.size_bytes > self._max_memory
):
if not self._evict_lru():
return False
self._cache[entry.key] = entry
self._current_memory += entry.size_bytes
return True
def delete(self, key: str) -> bool:
with self._lock:
return self._evict(key)
def _evict(self, key: str) -> bool:
if key in self._cache:
entry = self._cache.pop(key)
self._current_memory -= entry.size_bytes
self._stats['evictions'] += 1
return True
return False
def _evict_lru(self) -> bool:
if not self._cache:
return False
lru_key = min(self._cache.keys(), key=lambda k: self._cache[k].hits)
return self._evict(lru_key)
def clear(self) -> None:
with self._lock:
self._cache.clear()
self._current_memory = 0
def stats(self) -> Dict[str, Any]:
total = self._stats['hits'] + self._stats['misses']
return {
**self._stats,
'size': len(self._cache),
'memory_bytes': self._current_memory,
'hit_rate': self._stats['hits'] / max(1, total)
}
class CacheProxy(Generic[T]):
"""缓存代理:多级缓存"""
def __init__(
self,
backends: Dict[CacheLevel, CacheBackend[T]],
default_ttl: timedelta = timedelta(minutes=5)
):
self._backends = backends
self._default_ttl = default_ttl
self._lock = threading.Lock()
def _estimate_size(self, value: T) -> int:
try:
return len(json.dumps(value))
except:
return 64
def get(self, key: str) -> Optional[T]:
for level in [CacheLevel.L1, CacheLevel.L2, CacheLevel.L3]:
if level in self._backends:
entry = self._backends[level].get(key)
if entry:
if level != CacheLevel.L1 and CacheLevel.L1 in self._backends:
self._backends[CacheLevel.L1].set(entry)
return entry.value
return None
def set(
self,
key: str,
value: T,
ttl: Optional[timedelta] = None
) -> bool:
ttl = ttl or self._default_ttl
expires_at = datetime.now() + ttl
entry = CacheEntry(
key=key,
value=value,
expires_at=expires_at,
size_bytes=self._estimate_size(value)
)
success = True
for level in [CacheLevel.L1, CacheLevel.L2, CacheLevel.L3]:
if level in self._backends:
entry.level = level
if not self._backends[level].set(entry):
success = False
return success
def delete(self, key: str) -> bool:
results = []
for backend in self._backends.values():
results.append(backend.delete(key))
return any(results)
def clear(self) -> None:
for backend in self._backends.values():
backend.clear()
def stats(self) -> Dict[str, Any]:
return {
level.value: backend.stats()
for level, backend in self._backends.items()
}
class DataService:
"""数据服务:使用缓存代理"""
def __init__(self, cache: CacheProxy):
self._cache = cache
self._query_count = 0
def _fetch_from_db(self, key: str) -> Dict[str, Any]:
self._query_count += 1
time.sleep(0.05)
return {"key": key, "data": f"value_for_{key}", "query_num": self._query_count}
def get(self, key: str) -> Dict[str, Any]:
cached = self._cache.get(key)
if cached:
print(f"缓存命中: {key}")
return cached
print(f"缓存未命中,查询数据库: {key}")
data = self._fetch_from_db(key)
self._cache.set(key, data)
return data
def invalidate(self, key: str) -> None:
self._cache.delete(key)
print(f"缓存失效: {key}")
if __name__ == "__main__":
l1_cache = MemoryCacheBackend[Dict](max_size=100, max_memory_mb=10)
l2_cache = MemoryCacheBackend[Dict](max_size=1000, max_memory_mb=100)
cache = CacheProxy[Dict]({
CacheLevel.L1: l1_cache,
CacheLevel.L2: l2_cache
})
service = DataService(cache)
print("=== 第一次查询 ===")
result1 = service.get("user:1")
print(f"结果: {result1}")
print("\n=== 第二次查询(缓存命中)===")
result2 = service.get("user:1")
print(f"结果: {result2}")
print("\n=== 缓存统计 ===")
for level, stats in cache.stats().items():
print(f"{level}: {stats}")
print("\n=== 缓存失效后重新查询 ===")
service.invalidate("user:1")
result3 = service.get("user:1")
print(f"结果: {result3}")12.5 模式变体与扩展
12.5.1 智能引用代理
from typing import Any, Optional, Callable, Dict
from dataclasses import dataclass, field
from weakref import WeakKeyDictionary, ref
import threading
import time
@dataclass
class ReferenceStats:
"""引用统计"""
access_count: int = 0
last_access: Optional[float] = None
created_at: float = field(default_factory=time.time)
modifications: int = 0
class SmartReferenceProxy:
"""智能引用代理:自动引用计数和生命周期管理"""
_instances: Dict[int, 'SmartReferenceProxy'] = {}
_lock = threading.Lock()
def __init__(self, subject: Any, on_finalize: Callable = None):
self._subject = subject
self._on_finalize = on_finalize
self._stats = ReferenceStats()
self._observers: list = []
self._id = id(self)
with self._lock:
SmartReferenceProxy._instances[self._id] = self
def __getattr__(self, name: str) -> Any:
attr = getattr(self._subject, name)
if callable(attr):
def wrapper(*args, **kwargs):
self._stats.access_count += 1
self._stats.last_access = time.time()
result = attr(*args, **kwargs)
self._notify_observers('call', name, result)
return result
return wrapper
self._stats.access_count += 1
self._stats.last_access = time.time()
return attr
def __setattr__(self, name: str, value: Any) -> None:
if name.startswith('_'):
object.__setattr__(self, name, value)
return
setattr(self._subject, name, value)
self._stats.modifications += 1
self._notify_observers('set', name, value)
def __del__(self):
with self._lock:
SmartReferenceProxy._instances.pop(self._id, None)
if self._on_finalize:
self._on_finalize(self._subject)
def add_observer(self, callback: Callable) -> None:
self._observers.append(callback)
def _notify_observers(self, event_type: str, name: str, value: Any) -> None:
for observer in self._observers:
observer(event_type, name, value)
def get_stats(self) -> ReferenceStats:
return self._stats
@classmethod
def get_active_count(cls) -> int:
return len(cls._instances)
class Resource:
"""资源类"""
def __init__(self, name: str):
self.name = name
self.data = {}
def process(self, data: str) -> str:
return f"Processed: {data}"
def cleanup(self) -> None:
print(f"Cleaning up resource: {self.name}")
if __name__ == "__main__":
def on_finalize(resource):
resource.cleanup()
def observer(event_type, name, value):
print(f"[Observer] {event_type}: {name} = {value}")
resource = Resource("test_resource")
proxy = SmartReferenceProxy(resource, on_finalize)
proxy.add_observer(observer)
print("=== 访问属性 ===")
print(f"名称: {proxy.name}")
print("\n=== 调用方法 ===")
result = proxy.process("test_data")
print(f"结果: {result}")
print("\n=== 修改属性 ===")
proxy.data["key"] = "value"
print(f"\n统计: {proxy.get_stats()}")
print(f"活跃代理数: {SmartReferenceProxy.get_active_count()}")12.5.2 同步代理
from typing import Any, Callable, Dict
from functools import wraps
import threading
from dataclasses import dataclass
class SynchronizationProxy:
"""同步代理:提供线程安全访问"""
def __init__(self, subject: Any, lock: threading.Lock = None):
self._subject = subject
self._lock = lock or threading.RLock()
self._method_locks: Dict[str, threading.Lock] = {}
def __getattr__(self, name: str) -> Any:
attr = getattr(self._subject, name)
if callable(attr):
if name not in self._method_locks:
self._method_locks[name] = threading.Lock()
method_lock = self._method_locks[name]
@wraps(attr)
def synchronized_method(*args, **kwargs):
with self._lock:
with method_lock:
return attr(*args, **kwargs)
return synchronized_method
with self._lock:
return attr
def __setattr__(self, name: str, value: Any) -> None:
if name.startswith('_'):
object.__setattr__(self, name, value)
return
with self._lock:
setattr(self._subject, name, value)
def acquire(self) -> bool:
return self._lock.acquire()
def release(self) -> None:
self._lock.release()
@property
def locked(self) -> bool:
return self._lock.locked()
class SharedCounter:
"""共享计数器"""
def __init__(self, initial: int = 0):
self._value = initial
self._history: list = []
def increment(self, amount: int = 1) -> int:
self._value += amount
self._history.append(('increment', amount, self._value))
return self._value
def decrement(self, amount: int = 1) -> int:
self._value -= amount
self._history.append(('decrement', amount, self._value))
return self._value
def get_value(self) -> int:
return self._value
def get_history(self) -> list:
return list(self._history)
if __name__ == "__main__":
counter = SharedCounter(0)
safe_counter = SynchronizationProxy(counter)
results = []
errors = []
def worker(worker_id: int, operations: int):
try:
for i in range(operations):
if i % 2 == 0:
val = safe_counter.increment()
else:
val = safe_counter.decrement()
results.append((worker_id, val))
except Exception as e:
errors.append((worker_id, str(e)))
threads = [
threading.Thread(target=worker, args=(i, 100))
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终值: {safe_counter.get_value()}")
print(f"操作历史数: {len(safe_counter.get_history())}")
print(f"错误数: {len(errors)}")12.5.3 远程代理模拟
from typing import Any, Dict, Optional, Callable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import json
import time
@dataclass
class RemoteRequest:
"""远程请求"""
service: str
method: str
args: tuple = ()
kwargs: Dict[str, Any] = field(default_factory=dict)
request_id: str = ""
@dataclass
class RemoteResponse:
"""远程响应"""
request_id: str
success: bool
result: Any = None
error: Optional[str] = None
latency_ms: float = 0.0
class RemoteService(ABC):
"""远程服务接口"""
@abstractmethod
def call(self, request: RemoteRequest) -> RemoteResponse:
pass
class MockRemoteService(RemoteService):
"""模拟远程服务"""
def __init__(self, host: str, port: int):
self._host = host
self._port = port
self._services: Dict[str, Any] = {}
def register_service(self, name: str, service: Any) -> None:
self._services[name] = service
def call(self, request: RemoteRequest) -> RemoteResponse:
start = time.time()
try:
if request.service not in self._services:
return RemoteResponse(
request_id=request.request_id,
success=False,
error=f"服务未找到: {request.service}"
)
service = self._services[request.service]
method = getattr(service, request.method, None)
if not method:
return RemoteResponse(
request_id=request.request_id,
success=False,
error=f"方法未找到: {request.method}"
)
time.sleep(0.02)
result = method(*request.args, **request.kwargs)
return RemoteResponse(
request_id=request.request_id,
success=True,
result=result,
latency_ms=(time.time() - start) * 1000
)
except Exception as e:
return RemoteResponse(
request_id=request.request_id,
success=False,
error=str(e),
latency_ms=(time.time() - start) * 1000
)
class RemoteProxy:
"""远程代理"""
def __init__(
self,
service_name: str,
remote_service: RemoteService,
timeout: float = 30.0
):
self._service_name = service_name
self._remote = remote_service
self._timeout = timeout
self._request_counter = 0
def _make_request_id(self) -> str:
self._request_counter += 1
return f"{self._service_name}_{self._request_counter}"
def __getattr__(self, name: str) -> Any:
def remote_method(*args, **kwargs):
request = RemoteRequest(
service=self._service_name,
method=name,
args=args,
kwargs=kwargs,
request_id=self._make_request_id()
)
response = self._remote.call(request)
if not response.success:
raise RuntimeError(f"远程调用失败: {response.error}")
return response.result
return remote_method
class UserService:
"""用户服务"""
def get_user(self, user_id: int) -> Dict[str, Any]:
return {"id": user_id, "name": f"User{user_id}", "email": f"user{user_id}@example.com"}
def list_users(self, page: int = 1, size: int = 10) -> list:
return [
{"id": i, "name": f"User{i}"}
for i in range((page - 1) * size + 1, page * size + 1)
]
def create_user(self, name: str, email: str) -> Dict[str, Any]:
return {"id": 999, "name": name, "email": email}
if __name__ == "__main__":
remote = MockRemoteService("api.example.com", 8080)
remote.register_service("user", UserService())
user_proxy = RemoteProxy("user", remote)
print("=== 获取用户 ===")
user = user_proxy.get_user(1)
print(f"用户: {user}")
print("\n=== 列出用户 ===")
users = user_proxy.list_users(page=1, size=5)
print(f"用户列表: {users}")
print("\n=== 创建用户 ===")
new_user = user_proxy.create_user("Alice", "alice@example.com")
print(f"新用户: {new_user}")12.6 反模式与最佳实践
12.6.1 常见反模式
反模式1:代理泄漏
from typing import Any
class LeakyProxy:
"""错误示例:代理泄漏真实对象"""
def __init__(self, subject: Any):
self._subject = subject
def get_subject(self) -> Any:
return self._subject
def __getattr__(self, name: str) -> Any:
return getattr(self._subject, name)
class SecureProxy:
"""正确示例:保护真实对象"""
def __init__(self, subject: Any):
self._subject = subject
def __getattr__(self, name: str) -> Any:
attr = getattr(self._subject, name)
if callable(attr):
def safe_wrapper(*args, **kwargs):
print(f"[Proxy] 调用方法: {name}")
return attr(*args, **kwargs)
return safe_wrapper
return attr
class SensitiveData:
def __init__(self):
self._secret = "password123"
def get_info(self) -> str:
return "Public info"
def _get_secret(self) -> str:
return self._secret
if __name__ == "__main__":
data = SensitiveData()
leaky = LeakyProxy(data)
real = leaky.get_subject()
print(f"泄漏真实对象: {real._get_secret()}")
secure = SecureProxy(data)
print(f"安全代理: {secure.get_info()}")反模式2:过度代理
from typing import Any, Callable
from functools import wraps
class OverProxied:
"""过度代理:每层都添加日志"""
def __init__(self, subject: Any, name: str):
self._subject = subject
self._name = name
def __getattr__(self, name: str) -> Any:
attr = getattr(self._subject, name)
if callable(attr):
@wraps(attr)
def wrapper(*args, **kwargs):
print(f"[{self._name}] 调用 {name}")
return attr(*args, **kwargs)
return wrapper
return attr
class SimpleProxy:
"""简洁代理:只在必要处添加逻辑"""
def __init__(self, subject: Any):
self._subject = subject
def __getattr__(self, name: str) -> Any:
return getattr(self._subject, name)
def sensitive_operation(self, *args, **kwargs) -> Any:
print("[Proxy] 敏感操作审计")
return self._subject.sensitive_operation(*args, **kwargs)
class Service:
def operation_a(self) -> str:
return "A"
def operation_b(self) -> str:
return "B"
def sensitive_operation(self) -> str:
return "Sensitive"
if __name__ == "__main__":
service = Service()
p1 = OverProxied(service, "Layer1")
p2 = OverProxied(p1, "Layer2")
p3 = OverProxied(p2, "Layer3")
print("=== 过度代理 ===")
result = p3.operation_a()
print("\n=== 简洁代理 ===")
simple = SimpleProxy(service)
print(simple.operation_a())
print(simple.sensitive_operation())12.6.2 最佳实践清单
| 实践 | 说明 | 代码示例 |
|---|---|---|
| 接口一致性 | 代理与真实对象实现相同接口 | class Proxy(Subject): |
| 延迟初始化 | 虚拟代理延迟创建真实对象 | if self._real is None: self._real = Real() |
| 透明性 | 客户端不应感知代理存在 | 通过Subject接口交互 |
| 单一职责 | 每个代理只负责一种控制逻辑 | 保护代理只做权限控制 |
| 线程安全 | 共享代理需要同步机制 | with self._lock: |
| 资源清理 | 代理负责清理真实对象资源 | __del__ 或上下文管理器 |
| 日志审计 | 记录代理操作便于调试 | self._log_access() |
| 错误处理 | 代理应优雅处理真实对象异常 | try-except包装 |
12.7 决策指南
12.7.1 代理类型选择
┌─────────────────────────────────────────────────────────────┐
│ 代理类型选择决策树 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 问题:是否需要控制对象创建时机? │
│ │ │
│ ├── 是 ──→ 虚拟代理(延迟加载) │
│ │ │
│ └── 否 │
│ │ │
│ ▼ │
│ 问题:是否需要控制访问权限? │
│ │ │
│ ├── 是 ──→ 保护代理(权限控制) │
│ │ │
│ └── 否 │
│ │ │
│ ▼ │
│ 问题:对象是否在远程地址空间? │
│ │ │
│ ├── 是 ──→ 远程代理(网络通信) │
│ │ │
│ └── 否 │
│ │ │
│ ▼ │
│ 问题:是否需要缓存结果? │
│ │ │
│ ├── 是 ──→ 缓存代理 │
│ │ │
│ └── 否 │
│ │ │
│ ▼ │
│ 问题:是否需要额外操作(日志、计数等)? │
│ │ │
│ ├── 是 ──→ 智能引用代理 │
│ │ │
│ └── 否 ──→ 考虑是否需要代理 │
│ │
└─────────────────────────────────────────────────────────────┘12.7.2 实现技术选择
| 场景 | 推荐实现 | 理由 |
|---|---|---|
| 静态代理 | ABC + 显式实现 | 类型安全、IDE支持好 |
| 动态代理 | __getattr__ | 灵活、减少样板代码 |
| 属性代理 | 描述符 | 精细控制属性访问 |
| 方法代理 | 装饰器 | 可组合、可重用 |
| 远程代理 | Protocol + 序列化 | 类型安全、跨进程 |
12.7.3 与其他模式的关系
| 模式 | 关系 | 协作方式 |
|---|---|---|
| 装饰器 | 相似但目的不同 | 代理控制访问,装饰器增强功能 |
| 适配器 | 接口转换 | 适配器改变接口,代理保持接口 |
| 外观 | 简化接口 | 外观简化复杂系统,代理控制访问 |
| 享元 | 共享对象 | 享元共享状态,代理控制访问 |
| 抽象工厂 | 创建对象 | 工厂可创建代理对象 |
12.8 快速参考卡片
┌─────────────────────────────────────────────────────────────────────────┐
│ 代理模式速查表 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 定义: 为其他对象提供代理以控制对这个对象的访问 │
│ │
│ 核心公式: │
│ Control(r, o, c) = r(o) if authorize(c) ∧ available(o) else error │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 代理类型: │
│ • 虚拟代理 → 延迟加载开销大的对象 │
│ • 保护代理 → 控制访问权限 │
│ • 远程代理 → 为远程对象提供本地代理 │
│ • 缓存代理 → 缓存结果减少重复计算 │
│ • 智能引用 → 访问时执行额外操作 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 参与者: │
│ • Subject → 声明共同接口 │
│ • RealSubject → 定义真实对象 │
│ • Proxy → 控制对RealSubject的访问 │
│ • Client → 通过Subject接口交互 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ Python实现要点: │
│ class Proxy(Subject): │
│ def __init__(self, subject: RealSubject): │
│ self._subject = subject │
│ │
│ def request(self): │
│ if self._check_access(): │
│ result = self._subject.request() │
│ self._log_access() │
│ return result │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 代理 vs 装饰器: │
│ 代理: 控制访问、管理生命周期、1:1关系 │
│ 装饰器: 增强功能、不管理生命周期、N:1关系 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 适用场景: │
│ ✓ 需要延迟创建开销大的对象 │
│ ✓ 需要控制对对象的访问权限 │
│ ✓ 需要为远程对象提供本地代理 │
│ ✓ 需要在访问对象时执行额外操作 │
│ ✓ 需要缓存计算结果 │
│ │
│ 不适用场景: │
│ ✗ 简单的属性访问 │
│ ✗ 不需要任何控制逻辑 │
│ ✗ 性能敏感的热点代码 │
│ │
│ ───────────────────────────────────────────────────────────────────── │
│ │
│ 经典案例: │
│ • 数据库连接池 │
│ • API网关 │
│ • 图片懒加载 │
│ • RPC远程调用 │
│ • 缓存系统 │
│ │
└─────────────────────────────────────────────────────────────────────────┘12.9 思考与实践
12.9.1 思考题
概念辨析:代理模式与装饰器模式在实现上非常相似,如何从设计意图上区分它们?什么情况下两者可以结合使用?
性能权衡:代理模式引入了额外的间接层,如何量化评估代理带来的性能开销?在什么情况下这种开销是可以接受的?
线程安全:在设计保护代理时,如何确保权限检查和实际操作之间的原子性?如果权限状态在检查后发生变化怎么办?
动态代理:Python的
__getattr__动态代理有什么局限性?如何处理特殊方法(如__str__、__eq__)的代理?远程代理:在设计远程代理时,如何处理网络延迟、超时和异常?如何保证调用的幂等性?
12.9.2 实践练习
练习1:实现图片懒加载代理
设计一个图片加载系统,支持:
- 延迟加载大图片
- 预览缩略图(立即加载)
- 加载进度回调
- 内存管理和缓存
练习2:实现多级缓存代理
设计一个支持L1/L2/L3三级缓存的代理系统:
- L1:进程内内存缓存(最快)
- L2:Redis缓存(中等)
- L3:数据库(最慢)
- 实现缓存穿透、击穿、雪崩的防护
练习3:实现RPC远程代理
设计一个简化的RPC框架:
- 客户端通过代理调用远程方法
- 支持序列化/反序列化
- 实现超时和重试机制
- 支持负载均衡
12.10 小结
代理模式是一种控制对象访问的结构型模式,通过引入代理对象作为中介,可以在不修改真实对象的情况下添加访问控制、延迟加载、缓存等额外功能。本章深入探讨了:
- 理论基础:代理模式的形式化定义、代理类型分类、与装饰器模式的本质区别
- 实现技术:ABC、Protocol、动态代理、描述符代理、保护代理等多种Python实现方式
- 企业应用:数据库连接池代理、API网关代理、分布式缓存代理等实际案例
- 模式变体:智能引用代理、同步代理、远程代理等扩展形式
- 最佳实践:避免代理泄漏、过度代理等反模式,掌握线程安全设计
代理模式的核心价值在于:通过引入间接层,实现对对象访问的精细控制,同时保持接口的透明性和一致性。
参考资料
- Gamma, E., et al. Design Patterns: Elements of Reusable Object-Oriented Software. Addison-Wesley, 1994.
- Schmidt, D., et al. Pattern-Oriented Software Architecture. Wiley, 1996.
- Python Documentation. Data Model: Customizing attribute access. https://docs.python.org/3/reference/datamodel.html
- PEP 544. Protocols: Structural subtyping. https://www.python.org/dev/peps/pep-0544/
- Fowler, M. Patterns of Enterprise Application Architecture. Addison-Wesley, 2002.