第6章 适配器模式
学习目标
完成本章学习后,读者将能够:
- 理解适配器模式的核心概念、形式化定义与理论基础
- 掌握对象适配器与类适配器的语义差异与实现机制
- 设计双向适配器、可插拔适配器等高级变体
- 运用Protocol实现类型安全的适配器
- 识别适配器模式与外观模式、装饰器模式的边界
- 评估适配器模式在系统集成中的权衡取舍
6.1 模式定义
6.1.1 形式化定义
适配器模式(Adapter Pattern) 将一个类的接口转换成客户希望的另一个接口。适配器模式使得原本由于接口不兼容而不能一起工作的那些类可以一起工作。
从形式化角度,适配器模式可定义为接口映射函数:
$$ \text{Adapter}: \mathcal{I}{\text{Adaptee}} \xrightarrow{\text{transform}} \mathcal{I}{\text{Target}} $$
其中:
- $\mathcal{I}_{\text{Adaptee}}$ 为被适配者接口(Adaptee Interface)
- $\mathcal{I}_{\text{Target}}$ 为目标接口(Target Interface)
- $\text{transform}$ 为接口转换函数
接口兼容性约束:
$$ \forall m \in \mathcal{I}{\text{Target}}.\text{methods}, \exists m' \in \mathcal{I}{\text{Adaptee}}.\text{methods}: \text{Adapter}.m \Rightarrow \text{Adaptee}.m' $$
即目标接口的每个方法必须映射到被适配者接口的某个方法(或方法组合)。
适配器的行为等价性:
$$ \text{Adapter}.\text{request}() \equiv f(\text{Adaptee}.\text{specific_request}()) $$
其中 $f$ 为语义保持的转换函数。
6.1.2 历史背景与学术渊源
适配器模式的概念源于多个工程领域:
1. 电气工程隐喻
术语"Adapter"最直观的来源是电源适配器——将一种电压/插头标准转换为另一种。这种"接口转换"的思想被软件工程借鉴:
电气适配器:220V AC → 5V DC
软件适配器:LegacyAPI → ModernAPI2. GoF分类(1994)
Gamma等人在《Design Patterns》中将适配器模式归类为结构型模式,并明确区分了两种实现形式:
- 类适配器(Class Adapter):通过多重继承实现
- 对象适配器(Object Adapter):通过组合实现
3. 接口隔离原则的影响
适配器模式体现了接口隔离原则(ISP)的思想:客户端不应被迫依赖它不使用的接口。适配器提供精确的接口转换,隔离了客户端与被适配者的耦合。
4. 现代演进
现代编程语言对适配器模式的支持呈现新特点:
- 鸭子类型:Python等语言通过鸭子类型简化适配器实现
- Protocol类型:Python 3.8+ 的Protocol提供结构化类型支持
- 隐式适配:某些语言特性(如Python的
__iter__)提供隐式适配机制
6.1.3 模式动机
适配器模式解决以下核心问题:
问题1:遗留系统集成
class LegacySystem:
def old_operation(self, data: str) -> dict:
return {"result": data.upper()}
class NewSystem:
def process(self, data: bytes) -> str:
pass
class LegacyAdapter(NewSystem):
def __init__(self, legacy: LegacySystem):
self._legacy = legacy
def process(self, data: bytes) -> str:
result = self._legacy.old_operation(data.decode())
return result["result"]问题2:第三方库接口不兼容
class ThirdPartyAPI:
def fetch_data(self, endpoint: str, params: dict) -> list:
return [{"id": 1}, {"id": 2}]
class StandardRepository:
def find_all(self, filters: dict) -> Iterator[dict]:
pass
class ThirdPartyAdapter(StandardRepository):
def __init__(self, api: ThirdPartyAPI):
self._api = api
def find_all(self, filters: dict) -> Iterator[dict]:
data = self._api.fetch_data("/items", filters)
return iter(data)问题3:统一多个相似但不兼容的接口
class PaymentAdapter(Protocol):
def charge(self, amount: Decimal) -> str: ...
class StripeAdapter:
def charge(self, amount: Decimal) -> str:
return f"Stripe: {amount}"
class PayPalAdapter:
def charge(self, amount: Decimal) -> str:
return f"PayPal: {amount}"6.1.4 UML结构模型
┌─────────────────────────────────────────────────────────────┐
│ 适配器模式结构 │
│ 对象适配器 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────┐
│ <<interface>> │
│ Target │
├─────────────────────────┤
│ + request(): Result │◄─────────── 客户端期望的接口
│ + operation(): void │
└────────────△────────────┘
│
│ implements
│
┌────────────┴────────────┐ ┌─────────────────────────┐
│ Adapter │ │ Adaptee │
├─────────────────────────┤ ├─────────────────────────┤
│ - adaptee: Adaptee │───────│ + specific_request() │
├─────────────────────────┤ 组合 │ + another_operation() │
│ + request(): Result │ └─────────────────────────┘
│ + operation(): void │ ▲
└─────────────────────────┘ │
│
现有不兼容接口
┌─────────────────────────────────────────────────────────────┐
│ 类适配器 │
│ (多重继承) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────┐ ┌─────────────────────────┐
│ Target │ │ Adaptee │
├─────────────────────────┤ ├─────────────────────────┤
│ + request(): Result │ │ + specific_request() │
└────────────△────────────┘ └────────────△────────────┘
│ │
│ 多重继承 │
└──────────────┬───────────────────┘
│
┌─────────────┴─────────────┐
│ Adapter │
├───────────────────────────┤
│ + request(): Result │
│ return specific_request()│
└───────────────────────────┘参与者职责:
| 参与者 | 职责 |
|---|---|
| Target | 定义客户端使用的特定领域接口 |
| Adaptee | 定义已存在的接口,需要被适配 |
| Adapter | 将Adaptee接口转换为Target接口 |
| Client | 与符合Target接口的对象协作 |
6.2 Python实现机制
6.2.1 对象适配器(推荐方式)
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Optional
T = TypeVar('T')
class Target(ABC):
"""目标接口:客户端期望的接口"""
@abstractmethod
def request(self) -> str:
"""执行请求操作"""
pass
@abstractmethod
def get_info(self) -> dict:
"""获取信息"""
pass
class Adaptee:
"""
被适配者:具有不兼容接口的现有类
可能是第三方库、遗留系统或任何接口不匹配的类
"""
def specific_request(self) -> str:
"""特定的请求方法,接口与Target不兼容"""
return "Adaptee: 特定请求的响应"
def another_specific_method(self) -> dict:
"""另一个特定方法"""
return {"source": "Adaptee", "status": "active"}
def legacy_operation(self, data: str) -> str:
"""遗留操作方法"""
return f"处理: {data}"
class Adapter(Target):
"""
对象适配器:通过组合实现接口转换
优势:
- 灵活性高,可在运行时切换被适配对象
- 符合组合优于继承原则
- 可以适配多个被适配者
"""
def __init__(self, adaptee: Adaptee):
self._adaptee = adaptee
def request(self) -> str:
"""
将Target.request转换为Adaptee.specific_request
可能包含:
- 参数转换
- 返回值转换
- 异常转换
"""
result = self._adaptee.specific_request()
return f"Adapter: {result}"
def get_info(self) -> dict:
"""将Target.get_info映射到Adaptee的方法组合"""
info = self._adaptee.another_specific_method()
info["adapter_type"] = "ObjectAdapter"
return info
def client_code(target: Target) -> None:
"""客户端代码:仅依赖Target接口"""
print(f"请求结果: {target.request()}")
print(f"信息: {target.get_info()}")
def demo_object_adapter():
adaptee = Adaptee()
adapter = Adapter(adaptee)
client_code(adapter)
demo_object_adapter()6.2.2 类适配器(多重继承)
from abc import ABC, abstractmethod
from typing import override
class Target(ABC):
"""目标接口"""
@abstractmethod
def request(self) -> str:
pass
class Adaptee:
"""被适配者"""
def specific_request(self) -> str:
return ".eetpadA eht fo roivaheb laicepS"
def helper_method(self) -> str:
return "辅助方法"
class ClassAdapter(Target, Adaptee):
"""
类适配器:通过多重继承实现
特点:
- 同时继承Target和Adaptee
- 可以直接访问Adaptee的protected成员
- 不需要持有Adaptee引用
限制:
- Python不支持真正的多继承接口实现
- 灵活性较低,编译时确定
- 如果Adaptee是final类则无法使用
"""
@override
def request(self) -> str:
"""
直接调用继承的方法
可以访问Adaptee的所有方法,无需通过引用
"""
reversed_text = self.specific_request()[::-1]
helper = self.helper_method()
return f"ClassAdapter: {reversed_text} ({helper})"
def demo_class_adapter():
adapter = ClassAdapter()
print(adapter.request())
print(f"\n可以直接访问Adaptee方法:")
print(f" specific_request: {adapter.specific_request()}")
print(f" helper_method: {adapter.helper_method()}")
demo_class_adapter()6.2.3 基于Protocol的现代实现
from typing import Protocol, runtime_checkable, TypeVar, Any
from dataclasses import dataclass
from decimal import Decimal
@runtime_checkable
class PaymentProcessor(Protocol):
"""
支付处理器协议
使用Protocol定义结构化类型:
- 任何实现这些方法的类都自动满足协议
- 运行时可检查协议一致性
- 支持静态类型检查
"""
def charge(self, amount: Decimal, currency: str) -> str:
"""扣款操作"""
...
def refund(self, transaction_id: str) -> bool:
"""退款操作"""
...
@dataclass
class StripeAPI:
"""Stripe支付API(第三方接口)"""
api_key: str
def create_charge(self, amount_cents: int, currency: str) -> dict:
return {
"id": "ch_stripe_123",
"status": "succeeded",
"amount": amount_cents,
"currency": currency
}
def create_refund(self, charge_id: str) -> dict:
return {"id": "re_stripe_456", "status": "succeeded"}
class StripeAdapter:
"""
Stripe适配器:隐式实现PaymentProcessor协议
无需显式继承Protocol,只需实现协议方法
"""
def __init__(self, api_key: str):
self._api = StripeAPI(api_key)
def charge(self, amount: Decimal, currency: str) -> str:
amount_cents = int(amount * 100)
result = self._api.create_charge(amount_cents, currency)
return result["id"] if result["status"] == "succeeded" else ""
def refund(self, transaction_id: str) -> bool:
result = self._api.create_refund(transaction_id)
return result["status"] == "succeeded"
@dataclass
class PayPalAPI:
"""PayPal支付API(第三方接口)"""
client_id: str
secret: str
def execute_payment(self, total: str, currency_code: str) -> dict:
return {
"payment_id": "PAY_123",
"state": "approved",
"amount": total
}
def execute_refund(self, payment_id: str) -> dict:
return {"refund_id": "REFUND_456", "state": "completed"}
class PayPalAdapter:
"""PayPal适配器:隐式实现PaymentProcessor协议"""
def __init__(self, client_id: str, secret: str):
self._api = PayPalAPI(client_id, secret)
def charge(self, amount: Decimal, currency: str) -> str:
result = self._api.execute_payment(str(amount), currency)
return result["payment_id"] if result["state"] == "approved" else ""
def refund(self, transaction_id: str) -> bool:
result = self._api.execute_refund(transaction_id)
return result["state"] == "completed"
def process_payment(processor: PaymentProcessor, amount: Decimal) -> str:
"""
泛型支付处理函数
接受任何实现PaymentProcessor协议的对象
"""
return processor.charge(amount, "USD")
def demo_protocol_adapter():
stripe = StripeAdapter("sk_test_123")
paypal = PayPalAdapter("client_id", "secret")
print(f"Stripe是否实现协议: {isinstance(stripe, PaymentProcessor)}")
print(f"PayPal是否实现协议: {isinstance(paypal, PaymentProcessor)}")
amount = Decimal("99.99")
print(f"\nStripe扣款: {process_payment(stripe, amount)}")
print(f"PayPal扣款: {process_payment(paypal, amount)}")
demo_protocol_adapter()6.2.4 双向适配器
from abc import ABC, abstractmethod
from typing import Optional
class SystemA(ABC):
"""系统A的接口"""
@abstractmethod
def operation_a(self, data: str) -> str:
pass
@abstractmethod
def get_status_a(self) -> dict:
pass
class SystemB(ABC):
"""系统B的接口"""
@abstractmethod
def operation_b(self, value: int) -> dict:
pass
@abstractmethod
def get_status_b(self) -> str:
pass
class ConcreteSystemA(SystemA):
"""系统A的具体实现"""
def operation_a(self, data: str) -> str:
return f"SystemA处理: {data}"
def get_status_a(self) -> dict:
return {"system": "A", "status": "active"}
class ConcreteSystemB(SystemB):
"""系统B的具体实现"""
def operation_b(self, value: int) -> dict:
return {"system": "B", "result": value * 2}
def get_status_b(self) -> str:
return "B:active"
class BidirectionalAdapter(SystemA, SystemB):
"""
双向适配器
功能:
- 将SystemA接口适配为SystemB接口
- 将SystemB接口适配为SystemA接口
适用场景:
- 两个系统需要互相调用
- 系统迁移过程中的双向兼容
"""
def __init__(
self,
system_a: Optional[SystemA] = None,
system_b: Optional[SystemB] = None
):
self._system_a = system_a
self._system_b = system_b
def operation_a(self, data: str) -> str:
"""实现SystemA接口"""
if self._system_a:
return self._system_a.operation_a(data)
elif self._system_b:
result = self._system_b.operation_b(len(data))
return f"Adapted from B: {result}"
return "No system available"
def get_status_a(self) -> dict:
if self._system_a:
return self._system_a.get_status_a()
elif self._system_b:
status = self._system_b.get_status_b()
return {"adapted_from": "B", "status": status}
return {}
def operation_b(self, value: int) -> dict:
"""实现SystemB接口"""
if self._system_b:
return self._system_b.operation_b(value)
elif self._system_a:
result = self._system_a.operation_a(str(value))
return {"adapted_from": "A", "result": result}
return {}
def get_status_b(self) -> str:
if self._system_b:
return self._system_b.get_status_b()
elif self._system_a:
status = self._system_a.get_status_a()
return f"Adapted:{status['system']}:{status['status']}"
return "No system"
def demo_bidirectional_adapter():
system_a = ConcreteSystemA()
system_b = ConcreteSystemB()
adapter_a_to_b = BidirectionalAdapter(system_a=system_a)
adapter_b_to_a = BidirectionalAdapter(system_b=system_b)
print("A适配为B接口:")
print(f" operation_b: {adapter_a_to_b.operation_b(10)}")
print(f" get_status_b: {adapter_a_to_b.get_status_b()}")
print("\nB适配为A接口:")
print(f" operation_a: {adapter_b_to_a.operation_a('test')}")
print(f" get_status_a: {adapter_b_to_a.get_status_a()}")
demo_bidirectional_adapter()6.2.5 可插拔适配器
from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, TypeVar, Generic
from dataclasses import dataclass
T = TypeVar('T')
@dataclass
class AdapterConfig:
"""适配器配置"""
name: str
parameter_mapping: Dict[str, str]
result_mapping: Dict[str, str]
preprocessor: Callable[[Dict], Dict]
postprocessor: Callable[[Dict], Dict]
class PluggableAdapter(ABC, Generic[T]):
"""
可插拔适配器
特点:
- 通过配置定义适配规则
- 支持参数映射和结果映射
- 支持预处理和后处理钩子
"""
def __init__(
self,
adaptee: T,
config: AdapterConfig
):
self._adaptee = adaptee
self._config = config
def _map_parameters(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""参数映射"""
mapped = {}
for target_key, source_key in self._config.parameter_mapping.items():
if source_key in params:
mapped[target_key] = params[source_key]
return mapped
def _map_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""结果映射"""
mapped = {}
for target_key, source_key in self._config.result_mapping.items():
if source_key in result:
mapped[target_key] = result[source_key]
return mapped
def execute(self, method_name: str, **params) -> Dict[str, Any]:
"""
执行适配后的方法调用
流程:
1. 预处理参数
2. 映射参数
3. 调用被适配者方法
4. 映射结果
5. 后处理结果
"""
preprocessed = self._config.preprocessor(params)
mapped_params = self._map_parameters(preprocessed)
method = getattr(self._adaptee, method_name)
result = method(**mapped_params)
if isinstance(result, dict):
mapped_result = self._map_result(result)
return self._config.postprocessor(mapped_result)
return {"result": result}
class LegacyAPI:
"""遗留API示例"""
def fetch_user(self, user_id: int, include_details: bool = True) -> dict:
return {
"user_id": user_id,
"user_name": "张三",
"user_email": "zhangsan@example.com",
"details": {"age": 25} if include_details else None
}
def update_user(self, uid: int, uname: str, uemail: str) -> dict:
return {
"success": True,
"user_id": uid,
"user_name": uname,
"user_email": uemail
}
def create_legacy_adapter() -> PluggableAdapter[LegacyAPI]:
"""创建遗留API适配器"""
def preprocess(params: Dict) -> Dict:
if 'include_details' not in params:
params['include_details'] = False
return params
def postprocess(result: Dict) -> Dict:
result['adapter_version'] = '1.0'
return result
config = AdapterConfig(
name="LegacyUserAPI",
parameter_mapping={
"user_id": "id",
"include_details": "include_details",
"uname": "name",
"uemail": "email"
},
result_mapping={
"id": "user_id",
"name": "user_name",
"email": "user_email"
},
preprocessor=preprocess,
postprocessor=postprocess
)
return PluggableAdapter(LegacyAPI(), config)
def demo_pluggable_adapter():
adapter = create_legacy_adapter()
result = adapter.execute("fetch_user", id=1)
print(f"获取用户: {result}")
result = adapter.execute("update_user", id=1, name="李四", email="lisi@example.com")
print(f"更新用户: {result}")
demo_pluggable_adapter()6.3 高级适配技术
6.3.1 缺省适配器
from abc import ABC, abstractmethod
from typing import Optional, Any
from dataclasses import dataclass
class WindowListener(ABC):
"""
窗口监听器接口
定义了窗口事件的所有回调方法
客户端可能只关心部分事件
"""
@abstractmethod
def on_open(self) -> None:
"""窗口打开"""
pass
@abstractmethod
def on_close(self) -> None:
"""窗口关闭"""
pass
@abstractmethod
def on_resize(self, width: int, height: int) -> None:
"""窗口大小改变"""
pass
@abstractmethod
def on_focus(self) -> None:
"""窗口获得焦点"""
pass
@abstractmethod
def on_blur(self) -> None:
"""窗口失去焦点"""
pass
@abstractmethod
def on_key_press(self, key: str) -> None:
"""按键按下"""
pass
class WindowAdapter(WindowListener):
"""
缺省适配器:提供所有方法的空实现
客户端可以继承此类,只重写需要的方法
避免实现所有接口方法的繁琐
"""
def on_open(self) -> None:
pass
def on_close(self) -> None:
pass
def on_resize(self, width: int, height: int) -> None:
pass
def on_focus(self) -> None:
pass
def on_blur(self) -> None:
pass
def on_key_press(self, key: str) -> None:
pass
class CloseHandler(WindowAdapter):
"""只关心关闭事件的处理器"""
def __init__(self, on_close_callback: callable):
self._callback = on_close_callback
def on_close(self) -> None:
self._callback()
class ResizeHandler(WindowAdapter):
"""只关心大小改变的处理器"""
def __init__(self):
self._last_size = (0, 0)
def on_resize(self, width: int, height: int) -> None:
self._last_size = (width, height)
print(f"窗口大小: {width}x{height}")
@dataclass
class Window:
"""窗口类"""
title: str
width: int = 800
height: int = 600
_listeners: list = None
def __post_init__(self):
self._listeners = []
def add_listener(self, listener: WindowListener) -> None:
self._listeners.append(listener)
def open(self) -> None:
for listener in self._listeners:
listener.on_open()
def close(self) -> None:
for listener in self._listeners:
listener.on_close()
def resize(self, width: int, height: int) -> None:
self.width = width
self.height = height
for listener in self._listeners:
listener.on_resize(width, height)
def demo_default_adapter():
window = Window("主窗口")
window.add_listener(CloseHandler(lambda: print("窗口已关闭")))
window.add_listener(ResizeHandler())
window.open()
window.resize(1024, 768)
window.close()
demo_default_adapter()6.3.2 异步适配器
import asyncio
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Awaitable, Callable
from dataclasses import dataclass
from functools import wraps
T = TypeVar('T')
class AsyncRepository(ABC):
"""异步仓库接口"""
@abstractmethod
async def find_by_id(self, id: int) -> dict:
pass
@abstractmethod
async def find_all(self) -> list:
pass
@abstractmethod
async def save(self, entity: dict) -> int:
pass
class SyncLegacyRepository:
"""同步遗留仓库"""
def __init__(self, delay: float = 0.1):
self._delay = delay
self._data = {
1: {"id": 1, "name": "张三"},
2: {"id": 2, "name": "李四"}
}
def get_by_id(self, id: int) -> dict:
import time
time.sleep(self._delay)
return self._data.get(id)
def get_all(self) -> list:
import time
time.sleep(self._delay)
return list(self._data.values())
def insert(self, entity: dict) -> int:
import time
time.sleep(self._delay)
new_id = max(self._data.keys()) + 1
entity["id"] = new_id
self._data[new_id] = entity
return new_id
class AsyncAdapter(AsyncRepository):
"""
异步适配器:将同步API适配为异步API
使用asyncio.to_thread在线程池中执行同步操作
避免阻塞事件循环
"""
def __init__(self, sync_repo: SyncLegacyRepository):
self._sync_repo = sync_repo
async def find_by_id(self, id: int) -> dict:
"""将同步get_by_id适配为异步"""
return await asyncio.to_thread(self._sync_repo.get_by_id, id)
async def find_all(self) -> list:
"""将同步get_all适配为异步"""
return await asyncio.to_thread(self._sync_repo.get_all)
async def save(self, entity: dict) -> int:
"""将同步insert适配为异步"""
return await asyncio.to_thread(self._sync_repo.insert, entity)
class AsyncService:
"""异步服务"""
def __init__(self, repo: AsyncRepository):
self._repo = repo
async def get_user(self, user_id: int) -> dict:
return await self._repo.find_by_id(user_id)
async def get_all_users(self) -> list:
return await self._repo.find_all()
async def create_user(self, name: str) -> dict:
entity = {"name": name}
user_id = await self._repo.save(entity)
return {"id": user_id, "name": name}
async def demo_async_adapter():
sync_repo = SyncLegacyRepository(delay=0.05)
async_repo = AsyncAdapter(sync_repo)
service = AsyncService(async_repo)
print("并发获取多个用户:")
tasks = [
service.get_user(1),
service.get_user(2),
service.get_all_users()
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f" 结果{i+1}: {result}")
new_user = await service.create_user("王五")
print(f"\n创建用户: {new_user}")
asyncio.run(demo_async_adapter())6.3.3 缓存适配器
from abc import ABC, abstractmethod
from typing import Any, Optional, Dict, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import hashlib
import json
class CacheBackend(ABC):
"""缓存后端接口"""
@abstractmethod
def get(self, key: str) -> Optional[Any]:
pass
@abstractmethod
def set(self, key: str, value: Any, ttl: int = None) -> None:
pass
@abstractmethod
def delete(self, key: str) -> None:
pass
@abstractmethod
def exists(self, key: str) -> bool:
pass
@dataclass
class MemoryCache:
"""内存缓存实现"""
_cache: Dict[str, Any] = field(default_factory=dict)
_expiry: Dict[str, datetime] = field(default_factory=dict)
def get(self, key: str) -> Optional[Any]:
if key in self._expiry and datetime.now() > self._expiry[key]:
del self._cache[key]
del self._expiry[key]
return None
return self._cache.get(key)
def set(self, key: str, value: Any, ttl: int = None) -> None:
self._cache[key] = value
if ttl:
self._expiry[key] = datetime.now() + timedelta(seconds=ttl)
def delete(self, key: str) -> None:
self._cache.pop(key, None)
self._expiry.pop(key, None)
def exists(self, key: str) -> bool:
return self.get(key) is not None
class CachedRepositoryAdapter:
"""
缓存适配器:为任何仓库添加缓存层
装饰器模式的变体:透明地添加缓存功能
"""
def __init__(
self,
repository: Any,
cache: CacheBackend,
ttl: int = 300,
key_prefix: str = ""
):
self._repository = repository
self._cache = cache
self._ttl = ttl
self._key_prefix = key_prefix
def _make_key(self, method: str, *args, **kwargs) -> str:
"""生成缓存键"""
key_data = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
key_hash = hashlib.md5(key_data.encode()).hexdigest()[:8]
return f"{self._key_prefix}:{method}:{key_hash}"
def __getattr__(self, name: str) -> Any:
"""
动态代理:拦截所有方法调用
对于查询方法(以find/get开头),添加缓存
对于修改方法,清除相关缓存
"""
attr = getattr(self._repository, name)
if not callable(attr):
return attr
@wraps(attr)
def wrapper(*args, **kwargs):
if name.startswith(('find', 'get', 'list', 'fetch')):
return self._cached_call(name, attr, *args, **kwargs)
elif name.startswith(('save', 'update', 'delete', 'create')):
return self._invalidate_and_call(name, attr, *args, **kwargs)
else:
return attr(*args, **kwargs)
return wrapper
def _cached_call(self, method: str, func: Callable, *args, **kwargs) -> Any:
"""带缓存的调用"""
cache_key = self._make_key(method, *args, **kwargs)
cached = self._cache.get(cache_key)
if cached is not None:
print(f" [缓存命中] {method}")
return cached
print(f" [缓存未命中] {method}")
result = func(*args, **kwargs)
self._cache.set(cache_key, result, self._ttl)
return result
def _invalidate_and_call(self, method: str, func: Callable, *args, **kwargs) -> Any:
"""清除缓存并调用"""
pattern = f"{self._key_prefix}:"
self._cache.delete(pattern)
print(f" [缓存清除] {method}")
return func(*args, **kwargs)
class UserRepository:
"""用户仓库"""
def __init__(self):
self._users = {
1: {"id": 1, "name": "张三", "email": "zhang@example.com"},
2: {"id": 2, "name": "李四", "email": "li@example.com"}
}
def find_by_id(self, user_id: int) -> dict:
print(f" [数据库查询] find_by_id({user_id})")
return self._users.get(user_id)
def find_all(self) -> list:
print(" [数据库查询] find_all()")
return list(self._users.values())
def save(self, user: dict) -> int:
print(f" [数据库写入] save({user})")
user_id = user.get("id", max(self._users.keys()) + 1)
self._users[user_id] = user
return user_id
def demo_cache_adapter():
repo = UserRepository()
cache = MemoryCache()
cached_repo = CachedRepositoryAdapter(repo, cache, ttl=60, key_prefix="user")
print("第一次查询:")
user = cached_repo.find_by_id(1)
print(f" 结果: {user}")
print("\n第二次查询(应命中缓存):")
user = cached_repo.find_by_id(1)
print(f" 结果: {user}")
print("\n查询所有用户:")
users = cached_repo.find_all()
print(f" 结果: {users}")
print("\n保存用户(应清除缓存):")
cached_repo.save({"id": 1, "name": "张三", "email": "zhang_new@example.com"})
print("\n再次查询(缓存已清除):")
user = cached_repo.find_by_id(1)
print(f" 结果: {user}")
demo_cache_adapter()6.4 企业级应用示例
6.4.1 多数据库适配器
from abc import ABC, abstractmethod
from typing import Any, List, Dict, Optional, Iterator
from dataclasses import dataclass, field
from enum import Enum
from contextlib import contextmanager
class DatabaseType(Enum):
MYSQL = "mysql"
POSTGRESQL = "postgresql"
SQLITE = "sqlite"
MONGODB = "mongodb"
@dataclass
class QueryResult:
"""查询结果"""
rows: List[Dict[str, Any]]
affected_rows: int = 0
last_insert_id: Optional[int] = None
execution_time: float = 0.0
class DatabaseAdapter(ABC):
"""数据库适配器接口"""
@abstractmethod
def connect(self) -> bool:
"""建立连接"""
pass
@abstractmethod
def disconnect(self) -> None:
"""断开连接"""
pass
@abstractmethod
def execute(self, query: str, params: tuple = ()) -> QueryResult:
"""执行SQL"""
pass
@abstractmethod
def fetch_all(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""查询所有"""
pass
@abstractmethod
def fetch_one(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
"""查询单条"""
pass
@abstractmethod
def begin_transaction(self) -> None:
"""开始事务"""
pass
@abstractmethod
def commit(self) -> None:
"""提交事务"""
pass
@abstractmethod
def rollback(self) -> None:
"""回滚事务"""
pass
@contextmanager
def transaction(self):
"""事务上下文管理器"""
self.begin_transaction()
try:
yield self
self.commit()
except Exception:
self.rollback()
raise
class MySQLDriver:
"""MySQL驱动模拟"""
def __init__(self, host: str, port: int, database: str, user: str, password: str):
self.config = {"host": host, "port": port, "database": database}
self._connected = False
self._in_transaction = False
def mysql_connect(self) -> bool:
self._connected = True
return True
def mysql_close(self) -> None:
self._connected = False
def mysql_query(self, sql: str, params: tuple) -> List[Dict]:
return [{"id": 1, "name": "MySQL用户"}]
def mysql_execute(self, sql: str, params: tuple) -> int:
return 1
def mysql_begin(self) -> None:
self._in_transaction = True
def mysql_commit(self) -> None:
self._in_transaction = False
def mysql_rollback(self) -> None:
self._in_transaction = False
class MySQLAdapter(DatabaseAdapter):
"""MySQL适配器"""
def __init__(self, host: str, port: int, database: str, user: str, password: str):
self._driver = MySQLDriver(host, port, database, user, password)
def connect(self) -> bool:
return self._driver.mysql_connect()
def disconnect(self) -> None:
self._driver.mysql_close()
def execute(self, query: str, params: tuple = ()) -> QueryResult:
affected = self._driver.mysql_execute(query, params)
return QueryResult(rows=[], affected_rows=affected)
def fetch_all(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
return self._driver.mysql_query(query, params)
def fetch_one(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
results = self._driver.mysql_query(query, params)
return results[0] if results else None
def begin_transaction(self) -> None:
self._driver.mysql_begin()
def commit(self) -> None:
self._driver.mysql_commit()
def rollback(self) -> None:
self._driver.mysql_rollback()
class PostgreSQLDriver:
"""PostgreSQL驱动模拟"""
def __init__(self, host: str, port: int, database: str, user: str, password: str):
self.config = {"host": host, "port": port, "database": database}
self._connected = False
def pg_connect(self) -> bool:
self._connected = True
return True
def pg_close(self) -> None:
self._connected = False
def pg_query(self, sql: str, params: tuple) -> List[Dict]:
return [{"id": 1, "name": "PostgreSQL用户"}]
def pg_execute(self, sql: str, params: tuple) -> str:
return "INSERT 1"
def pg_begin(self) -> None:
pass
def pg_commit(self) -> None:
pass
def pg_rollback(self) -> None:
pass
class PostgreSQLAdapter(DatabaseAdapter):
"""PostgreSQL适配器"""
def __init__(self, host: str, port: int, database: str, user: str, password: str):
self._driver = PostgreSQLDriver(host, port, database, user, password)
def connect(self) -> bool:
return self._driver.pg_connect()
def disconnect(self) -> None:
self._driver.pg_close()
def execute(self, query: str, params: tuple = ()) -> QueryResult:
self._driver.pg_execute(query, params)
return QueryResult(rows=[], affected_rows=1)
def fetch_all(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
return self._driver.pg_query(query, params)
def fetch_one(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
results = self._driver.pg_query(query, params)
return results[0] if results else None
def begin_transaction(self) -> None:
self._driver.pg_begin()
def commit(self) -> None:
self._driver.pg_commit()
def rollback(self) -> None:
self._driver.pg_rollback()
class DatabaseAdapterFactory:
"""数据库适配器工厂"""
@staticmethod
def create(db_type: DatabaseType, **config) -> DatabaseAdapter:
adapters = {
DatabaseType.MYSQL: MySQLAdapter,
DatabaseType.POSTGRESQL: PostgreSQLAdapter,
}
adapter_class = adapters.get(db_type)
if not adapter_class:
raise ValueError(f"不支持的数据库类型: {db_type}")
return adapter_class(**config)
class Repository:
"""通用仓库"""
def __init__(self, db: DatabaseAdapter, table: str):
self._db = db
self._table = table
def find_all(self) -> List[Dict[str, Any]]:
return self._db.fetch_all(f"SELECT * FROM {self._table}")
def find_by_id(self, id: int) -> Optional[Dict[str, Any]]:
return self._db.fetch_one(
f"SELECT * FROM {self._table} WHERE id = %s",
(id,)
)
def save(self, entity: Dict[str, Any]) -> int:
with self._db.transaction():
result = self._db.execute(
f"INSERT INTO {self._table} (name) VALUES (%s)",
(entity.get("name"),)
)
return result.affected_rows
def demo_database_adapter():
mysql = DatabaseAdapterFactory.create(
DatabaseType.MYSQL,
host="localhost",
port=3306,
database="app",
user="root",
password="secret"
)
mysql.connect()
pg = DatabaseAdapterFactory.create(
DatabaseType.POSTGRESQL,
host="localhost",
port=5432,
database="app",
user="postgres",
password="secret"
)
pg.connect()
user_repo_mysql = Repository(mysql, "users")
user_repo_pg = Repository(pg, "users")
print("MySQL查询:")
print(f" {user_repo_mysql.find_all()}")
print("\nPostgreSQL查询:")
print(f" {user_repo_pg.find_all()}")
mysql.disconnect()
pg.disconnect()
demo_database_adapter()6.4.2 消息队列适配器
from abc import ABC, abstractmethod
from typing import Any, Callable, Optional, Dict
from dataclasses import dataclass, field
from enum import Enum
import json
import queue
class MessageQueueType(Enum):
RABBITMQ = "rabbitmq"
KAFKA = "kafka"
REDIS = "redis"
SQS = "sqs"
@dataclass
class Message:
"""消息"""
id: str
body: Any
headers: Dict[str, str] = field(default_factory=dict)
timestamp: float = 0.0
@dataclass
class PublishResult:
"""发布结果"""
success: bool
message_id: Optional[str] = None
error: Optional[str] = None
class MessageQueueAdapter(ABC):
"""消息队列适配器接口"""
@abstractmethod
def connect(self) -> bool:
"""建立连接"""
pass
@abstractmethod
def disconnect(self) -> None:
"""断开连接"""
pass
@abstractmethod
def publish(self, queue_name: str, message: Any, **options) -> PublishResult:
"""发布消息"""
pass
@abstractmethod
def subscribe(self, queue_name: str, callback: Callable[[Message], None]) -> None:
"""订阅消息"""
pass
@abstractmethod
def acknowledge(self, message: Message) -> None:
"""确认消息"""
pass
@abstractmethod
def reject(self, message: Message, requeue: bool = False) -> None:
"""拒绝消息"""
pass
class RabbitMQDriver:
"""RabbitMQ驱动模拟"""
def __init__(self, host: str, port: int, username: str, password: str):
self.config = {"host": host, "port": port}
self._queues: Dict[str, queue.Queue] = {}
self._connected = False
def rabbit_connect(self) -> bool:
self._connected = True
return True
def rabbit_close(self) -> None:
self._connected = False
def rabbit_publish(self, queue: str, body: bytes, properties: dict) -> str:
if queue not in self._queues:
self._queues[queue] = queue.Queue()
message_id = f"msg_{len(self._queues[queue].queue)}"
self._queues[queue].put({"id": message_id, "body": body})
return message_id
def rabbit_consume(self, queue: str) -> Optional[dict]:
if queue in self._queues and not self._queues[queue].empty():
return self._queues[queue].get()
return None
def rabbit_ack(self, delivery_tag: str) -> None:
pass
def rabbit_nack(self, delivery_tag: str, requeue: bool) -> None:
pass
class RabbitMQAdapter(MessageQueueAdapter):
"""RabbitMQ适配器"""
def __init__(self, host: str, port: int, username: str, password: str):
self._driver = RabbitMQDriver(host, port, username, password)
def connect(self) -> bool:
return self._driver.rabbit_connect()
def disconnect(self) -> None:
self._driver.rabbit_close()
def publish(self, queue_name: str, message: Any, **options) -> PublishResult:
body = json.dumps(message).encode() if not isinstance(message, bytes) else message
message_id = self._driver.rabbit_publish(queue_name, body, options)
return PublishResult(success=True, message_id=message_id)
def subscribe(self, queue_name: str, callback: Callable[[Message], None]) -> None:
while True:
msg = self._driver.rabbit_consume(queue_name)
if msg:
body = json.loads(msg["body"])
message = Message(id=msg["id"], body=body)
callback(message)
def acknowledge(self, message: Message) -> None:
self._driver.rabbit_ack(message.id)
def reject(self, message: Message, requeue: bool = False) -> None:
self._driver.rabbit_nack(message.id, requeue)
class KafkaDriver:
"""Kafka驱动模拟"""
def __init__(self, brokers: list, group_id: str):
self.brokers = brokers
self.group_id = group_id
self._topics: Dict[str, queue.Queue] = {}
def kafka_connect(self) -> bool:
return True
def kafka_close(self) -> None:
pass
def kafka_send(self, topic: str, key: bytes, value: bytes) -> int:
if topic not in self._topics:
self._topics[topic] = queue.Queue()
offset = len(self._topics[topic].queue)
self._topics[topic].put({"key": key, "value": value, "offset": offset})
return offset
def kafka_poll(self, topic: str) -> Optional[dict]:
if topic in self._topics and not self._topics[topic].empty():
return self._topics[topic].get()
return None
def kafka_commit(self) -> None:
pass
class KafkaAdapter(MessageQueueAdapter):
"""Kafka适配器"""
def __init__(self, brokers: list, group_id: str):
self._driver = KafkaDriver(brokers, group_id)
def connect(self) -> bool:
return self._driver.kafka_connect()
def disconnect(self) -> None:
self._driver.kafka_close()
def publish(self, queue_name: str, message: Any, **options) -> PublishResult:
key = options.get("key", b"").encode() if isinstance(options.get("key"), str) else options.get("key", b"")
value = json.dumps(message).encode() if not isinstance(message, bytes) else message
offset = self._driver.kafka_send(queue_name, key, value)
return PublishResult(success=True, message_id=f"{queue_name}:{offset}")
def subscribe(self, queue_name: str, callback: Callable[[Message], None]) -> None:
while True:
msg = self._driver.kafka_poll(queue_name)
if msg:
body = json.loads(msg["value"])
message = Message(id=str(msg["offset"]), body=body)
callback(message)
def acknowledge(self, message: Message) -> None:
self._driver.kafka_commit()
def reject(self, message: Message, requeue: bool = False) -> None:
pass
class MessageQueueFactory:
"""消息队列工厂"""
@staticmethod
def create(mq_type: MessageQueueType, **config) -> MessageQueueAdapter:
adapters = {
MessageQueueType.RABBITMQ: RabbitMQAdapter,
MessageQueueType.KAFKA: KafkaAdapter,
}
adapter_class = adapters.get(mq_type)
if not adapter_class:
raise ValueError(f"不支持的消息队列类型: {mq_type}")
return adapter_class(**config)
def demo_message_queue_adapter():
rabbitmq = MessageQueueFactory.create(
MessageQueueType.RABBITMQ,
host="localhost",
port=5672,
username="guest",
password="guest"
)
rabbitmq.connect()
result = rabbitmq.publish("orders", {"order_id": 1, "amount": 99.99})
print(f"RabbitMQ发布: {result}")
kafka = MessageQueueFactory.create(
MessageQueueType.KAFKA,
brokers=["localhost:9092"],
group_id="consumer-1"
)
kafka.connect()
result = kafka.publish("events", {"event": "user_created", "user_id": 1})
print(f"Kafka发布: {result}")
rabbitmq.disconnect()
kafka.disconnect()
demo_message_queue_adapter()6.5 与其他结构型模式比较
6.5.1 模式对比分析
┌─────────────────────────────────────────────────────────────┐
│ 结构型模式对比矩阵 │
└─────────────────────────────────────────────────────────────┘
模式 │ 核心目的 │ 接口变化 │ 对象数量
──────────────┼───────────────┼───────────────┼─────────────────
适配器模式 │ 接口转换 │ 改变接口 │ 1:1
装饰器模式 │ 功能增强 │ 保持接口 │ 1:1+
外观模式 │ 简化接口 │ 简化接口 │ N:1
代理模式 │ 访问控制 │ 保持接口 │ 1:1
组合模式 │ 树形结构 │ 统一接口 │ N:N
桥接模式 │ 分离维度 │ 分离抽象 │ N:M6.5.2 适配器 vs 装饰器 vs 外观
from abc import ABC, abstractmethod
from typing import Optional
class DataSource(ABC):
"""数据源接口"""
@abstractmethod
def read(self) -> str:
pass
@abstractmethod
def write(self, data: str) -> None:
pass
class FileDataSource(DataSource):
"""文件数据源"""
def __init__(self, filename: str):
self._filename = filename
self._data = ""
def read(self) -> str:
return self._data
def write(self, data: str) -> None:
self._data = data
class LegacyDataSource:
"""遗留数据源(不兼容接口)"""
def load_data(self) -> bytes:
return b"legacy data"
def save_data(self, data: bytes) -> None:
pass
class AdapterExample(DataSource):
"""
适配器示例:将LegacyDataSource适配为DataSource
目的:使不兼容接口能够一起工作
特点:改变接口
"""
def __init__(self, legacy: LegacyDataSource):
self._legacy = legacy
def read(self) -> str:
return self._legacy.load_data().decode()
def write(self, data: str) -> None:
self._legacy.save_data(data.encode())
class DecoratorExample(DataSource):
"""
装饰器示例:为DataSource添加加密功能
目的:在不改变接口的情况下增强功能
特点:保持接口,添加行为
"""
def __init__(self, source: DataSource):
self._source = source
def read(self) -> str:
data = self._source.read()
return self._decrypt(data)
def write(self, data: str) -> None:
encrypted = self._encrypt(data)
self._source.write(encrypted)
def _encrypt(self, data: str) -> str:
return f"encrypted({data})"
def _decrypt(self, data: str) -> str:
return data.replace("encrypted(", "").rstrip(")")
class SubsystemA:
def operation_a(self) -> str:
return "A"
class SubsystemB:
def operation_b(self) -> str:
return "B"
class SubsystemC:
def operation_c(self) -> str:
return "C"
class FacadeExample:
"""
外观示例:简化多个子系统的使用
目的:为复杂子系统提供简单接口
特点:简化接口,隐藏复杂性
"""
def __init__(self):
self._a = SubsystemA()
self._b = SubsystemB()
self._c = SubsystemC()
def simple_operation(self) -> str:
return f"{self._a.operation_a()}-{self._b.operation_b()}-{self._c.operation_c()}"
def demo_pattern_comparison():
print("=== 适配器模式 ===")
legacy = LegacyDataSource()
adapter = AdapterExample(legacy)
print(f" 读取: {adapter.read()}")
print("\n=== 装饰器模式 ===")
source = FileDataSource("test.txt")
decorated = DecoratorExample(source)
decorated.write("hello")
print(f" 写入后读取: {decorated.read()}")
print("\n=== 外观模式 ===")
facade = FacadeExample()
print(f" 简化操作: {facade.simple_operation()}")
demo_pattern_comparison()6.6 反模式与最佳实践
6.6.1 常见反模式
from typing import Any
class AdapterAntiPatterns:
"""适配器模式反模式示例"""
@staticmethod
def anti_pattern_1_god_adapter():
"""
反模式:上帝适配器
问题:一个适配器适配太多不相关的方法
"""
class GodAdapter:
def __init__(self, adaptee: Any):
self._adaptee = adaptee
def method1(self): pass
def method2(self): pass
def method3(self): pass
def method4(self): pass
def method5(self): pass
print("反模式1:上帝适配器")
print(" 问题:适配器承担过多职责,违反单一职责原则")
@staticmethod
def anti_pattern_2_over_adapting():
"""
反模式:过度适配
问题:为简单接口创建不必要的适配器
"""
class SimpleClass:
def do_something(self) -> str:
return "done"
class UnnecessaryAdapter:
def __init__(self, simple: SimpleClass):
self._simple = simple
def execute(self) -> str:
return self._simple.do_something()
print("\n反模式2:过度适配")
print(" 问题:简单场景不需要适配器,增加不必要的复杂度")
@staticmethod
def anti_pattern_3_interface_pollution():
"""
反模式:接口污染
问题:适配器暴露被适配者的实现细节
"""
class BadAdapter:
def __init__(self, adaptee: Any):
self.adaptee = adaptee
def get_adaptee(self) -> Any:
return self.adaptee
print("\n反模式3:接口污染")
print(" 问题:暴露内部实现,破坏封装性")
def demo_anti_patterns():
AdapterAntiPatterns.anti_pattern_1_god_adapter()
AdapterAntiPatterns.anti_pattern_2_over_adapting()
AdapterAntiPatterns.anti_pattern_3_interface_pollution()
demo_anti_patterns()6.6.2 最佳实践清单
BEST_PRACTICE_CHECKLIST = """
适配器模式最佳实践清单
======================
□ 接口设计
├─ 定义清晰的Target接口
├─ 适配器仅实现Target接口方法
└─ 不暴露Adaptee的实现细节
□ 适配器选择
├─ 优先使用对象适配器(组合)
├─ 仅在特殊情况下使用类适配器
└─ 考虑使用Protocol实现类型安全
□ 职责边界
├─ 适配器只负责接口转换
├─ 不在适配器中添加业务逻辑
└─ 保持适配器的单一职责
□ 错误处理
├─ 转换Adaptee的异常为Target的异常
├─ 处理接口不匹配的边界情况
└─ 提供有意义的错误信息
□ 命名规范
├─ 适配器类名以Adapter结尾
├─ 方法名遵循Target接口约定
└─ 文档化适配行为
□ 测试策略
├─ 针对Target接口编写测试
├─ 验证接口转换的正确性
└─ 测试异常处理逻辑
"""6.7 决策指南
6.7.1 使用决策树
┌─────────────────────────────────────────────────────────────┐
│ 适配器模式使用决策树 │
└─────────────────────────────────────────────────────────────┘
需要集成现有代码?
│
▼
┌────────────────────────┐
│ 接口是否兼容? │
└────────────┬───────────┘
│ │
是 否
│ │
▼ ▼
┌─────────────┐ ┌─────────────────┐
│ 直接使用 │ │ 是否需要修改 │
└─────────────┘ │ 现有代码? │
└────────┬────────┘
│
┌────────┴────────┐
是 否
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 考虑重构 │ │ 使用适配器 │
└─────────────┘ └─────────────┘
│
▼
┌────────────────────────┐
│ 选择适配器类型 │
└────────────┬───────────┘
│ │
需要适配多个 只需适配一个
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 对象适配器 │ │ 类适配器或 │
│ (推荐) │ │ 对象适配器 │
└─────────────┘ └─────────────┘6.7.2 快速参考卡
ADAPTER_QUICK_REFERENCE = """
适配器模式快速参考
==================
┌─────────────────────────────────────────────────────────────┐
│ 核心概念 │
├─────────────────────────────────────────────────────────────┤
│ 将一个类的接口转换成客户端期望的另一个接口 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 实现方式 │
├─────────────────────────────────────────────────────────────┤
│ 对象适配器(推荐): │
│ class Adapter(Target): │
│ def __init__(self, adaptee): │
│ self._adaptee = adaptee │
│ def request(self): │
│ return self._adaptee.specific_request() │
│ │
│ 类适配器: │
│ class Adapter(Target, Adaptee): │
│ def request(self): │
│ return self.specific_request() │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 适用场景 │
├─────────────────────────────────────────────────────────────┤
│ ✓ 集成第三方库或遗留代码 │
│ ✓ 统一多个相似但不兼容的接口 │
│ ✓ 需要复用现有类但接口不匹配 │
│ ✓ 系统迁移过程中的接口兼容 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 注意事项 │
├─────────────────────────────────────────────────────────────┤
│ ⚠ 不要过度使用,简单场景不需要适配器 │
│ ⚠ 适配器应保持单一职责 │
│ ⚠ 考虑使用Protocol实现类型安全 │
│ ⚠ 处理好异常转换 │
└─────────────────────────────────────────────────────────────┘
"""6.8 小结
适配器模式是解决接口不兼容问题的核心方案。本章从形式化定义、历史渊源、实现机制、高级技术、企业级应用等多个维度进行了深入分析。
核心要点:
形式化定义:适配器模式可形式化为接口映射函数 $\text{Adapter}: \mathcal{I}{\text{Adaptee}} \xrightarrow{\text{transform}} \mathcal{I}{\text{Target}}$,实现接口的透明转换。
实现方式:对象适配器通过组合实现,更灵活;类适配器通过多重继承实现,更直接但灵活性较低。
高级技术:双向适配器、可插拔适配器、缺省适配器、异步适配器等扩展了模式的应用范围。
Protocol支持:Python 3.8+ 的Protocol提供结构化类型支持,实现类型安全的适配器。
企业级应用:数据库适配器、消息队列适配器、支付网关适配器等展示了模式的实际价值。
最佳实践:优先使用对象适配器、保持单一职责、处理好异常转换、使用Protocol实现类型安全。
适配器模式特别适用于集成第三方库、遗留系统或任何接口不兼容的场景,是系统集成和代码复用的重要工具。