第1章 单例模式
学习目标
完成本章学习后,读者将能够:
- 深入理解单例模式的理论基础、设计动机与数学本质
- 掌握Python语言特性下的多种单例实现机制及其内存模型
- 分析线程安全、进程安全与分布式环境下的单例实现策略
- 评估单例模式在软件架构中的利弊权衡与替代方案
- 运用形式化方法验证单例实现的正确性
1.1 理论基础与模式定义
1.1.1 形式化定义
单例模式(Singleton Pattern) 是一种创建型设计模式,其核心约束可形式化表述为:
$$\forall c \in Class, \forall i_1, i_2 \in Instances(c): i_1 = i_2$$
即对于任意类 $c$,其所有实例 $i_1, i_2$ 必须满足同一性关系。该模式同时提供全局访问点 $AccessPoint(c)$,满足:
$$AccessPoint(c) \rightarrow i, \text{其中 } i \text{ 是 } c \text{ 的唯一实例}$$
1.1.2 历史演进与学术背景
单例模式最早由"四人组"(Gang of Four, GoF)在1994年的经典著作《设计模式:可复用面向对象软件的基础》中系统阐述。其设计灵感源于对全局变量的批判性继承——既保留全局访问的便利性,又克服其初始化时机不可控、命名空间污染等缺陷。
从软件工程视角,单例模式体现了以下设计原则:
| 设计原则 | 单例模式的体现 |
|---|---|
| 单一职责原则(SRP) | 将实例管理职责从业务逻辑中分离(理想状态) |
| 开闭原则(OCP) | 通过继承扩展行为,但实例管理机制封闭 |
| 依赖倒置原则(DIP) | 客户端依赖抽象接口而非具体单例实现 |
| 接口隔离原则(ISP) | 单例应暴露最小必要接口 |
1.1.3 模式动机与应用场景
在软件系统架构中,存在一类具有全局唯一性约束的资源:
┌─────────────────────────────────────────────────────────────────┐
│ 单例模式应用场景分类 │
├─────────────────────────────────────────────────────────────────┤
│ 资源管理型 │ 数据库连接池 │ 文件系统 │ 设备驱动 │ 线程池 │
├─────────────────────────────────────────────────────────────────┤
│ 配置管理型 │ 应用配置 │ 环境变量 │ 特性开关 │ 国际化资源 │
├─────────────────────────────────────────────────────────────────┤
│ 日志监控型 │ 日志记录器 │ 性能监控 │ 审计追踪 │ 告警系统 │
├─────────────────────────────────────────────────────────────────┤
│ 缓存服务型 │ 内存缓存 │ 会话管理 │ 对象池 │ 连接注册表 │
└─────────────────────────────────────────────────────────────────┘核心设计动机:
- 资源约束:某些资源(如数据库连接、硬件设备)在物理或逻辑上只能存在一个实例
- 状态一致性:全局状态需要集中管理,避免多实例导致的状态不一致
- 性能优化:避免重复创建开销较大的对象
- 协调服务:作为系统各组件的协调中心
1.1.4 UML结构模型
┌─────────────────────────────────────────────────────────────────┐
│ Singleton │
├─────────────────────────────────────────────────────────────────┤
│ - instance: Singleton «static» │
│ - lock: Lock «static» «optional» │
├─────────────────────────────────────────────────────────────────┤
│ - __init__() «private» │
│ + get_instance(): Singleton «static» │
│ + business_operation() │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────┐
│ Client Code │
│ │
│ singleton = Singleton │
│ .get_instance() │
└────────────────────────┘结构要素解析:
- 私有静态实例变量:持有唯一的实例引用
- 私有构造函数:防止外部直接实例化
- 公共静态访问方法:提供全局访问点,负责延迟初始化
1.2 Python实现机制深度解析
Python作为动态语言,其对象模型与类机制为单例实现提供了多种途径。每种方法在语义、性能和可维护性上各有特点。
1.2.1 模块级单例(推荐方案)
理论基础:Python模块系统遵循单次导入原则——模块在首次import时执行初始化,后续导入直接返回已加载的模块对象。这一特性由sys.modules字典保证。
"""
singleton_module.py - 模块级单例实现
"""
from typing import Any, Optional
from dataclasses import dataclass, field
import threading
from datetime import datetime
@dataclass
class AppState:
"""应用状态管理器"""
app_name: str = "MyApplication"
version: str = "1.0.0"
started_at: datetime = field(default_factory=datetime.now)
_settings: dict = field(default_factory=dict)
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
def set_setting(self, key: str, value: Any) -> None:
with self._lock:
self._settings[key] = value
def get_setting(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._settings.get(key, default)
def get_uptime(self) -> float:
return (datetime.now() - self.started_at).total_seconds()
state = AppState()使用方式:
from singleton_module import state
state.set_setting("debug", True)
print(f"应用运行时间: {state.get_uptime():.2f}秒")内存模型分析:
┌─────────────────────────────────────────────────────────────────┐
│ sys.modules │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ "singleton_module" → <module object> │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ state = AppState(...) │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │
│ import │ import
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ module_a.py │ │ module_b.py │
│ │ │ │
│ state.set(...) │ │ state.get(...) │
└─────────────────┘ └─────────────────┘
│ │
└──────────────┬───────────────┘
▼
同一个 AppState 实例优势与局限:
| 维度 | 评估 |
|---|---|
| 简洁性 | ★★★★★ 最Pythonic的实现方式 |
| 线程安全 | ★★★★★ 模块导入本身是线程安全的 |
| 可测试性 | ★★★☆☆ 需要重新加载模块来重置状态 |
| 灵活性 | ★★☆☆☆ 难以延迟初始化或参数化 |
| 继承支持 | ★★★★☆ 可通过模块内继承实现 |
1.2.2 装饰器实现
装饰器模式利用Python的闭包机制,在不修改类定义的前提下注入单例行为。
from functools import wraps
from typing import Callable, TypeVar, ParamSpec, Any, Dict, Type
import threading
import weakref
P = ParamSpec('P')
T = TypeVar('T')
def singleton(
cls: Callable[P, T] = None,
*,
thread_safe: bool = True,
weak: bool = False
) -> Callable[P, T]:
"""
单例装饰器工厂
参数:
thread_safe: 是否启用线程安全保护
weak: 是否使用弱引用(允许垃圾回收)
返回:
装饰后的类或装饰器函数
"""
def decorator(target_cls: Callable[P, T]) -> Callable[P, T]:
instances: Dict[Type, Any] = {}
lock = threading.RLock() if thread_safe else None
@wraps(target_cls)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
instance = instances.get(target_cls)
if instance is not None and weak:
instance = instance() if isinstance(instance, weakref.ref) else instance
if instance is None:
del instances[target_cls]
if target_cls not in instances:
create_instance = lambda: target_cls(*args, **kwargs)
if thread_safe:
with lock:
if target_cls not in instances:
instance = create_instance()
instances[target_cls] = weakref.ref(instance) if weak else instance
else:
instance = create_instance()
instances[target_cls] = weakref.ref(instance) if weak else instance
return instances[target_cls] if weak else instances.get(target_cls)
wrapper._singleton_instances = instances
wrapper._singleton_reset = lambda: instances.clear()
return wrapper
if cls is not None:
return decorator(cls)
return decorator
@singleton
class DatabaseConnection:
"""数据库连接单例"""
def __init__(self, connection_string: str = "sqlite:///:memory:"):
self._connection_string = connection_string
self._connected = False
print(f"[DatabaseConnection] 初始化: {connection_string}")
def connect(self) -> None:
if not self._connected:
self._connected = True
print(f"[DatabaseConnection] 已连接")
def disconnect(self) -> None:
if self._connected:
self._connected = False
print("[DatabaseConnection] 已断开")
def execute(self, query: str) -> str:
if not self._connected:
raise RuntimeError("数据库未连接")
return f"执行查询: {query}"
@singleton(thread_safe=True, weak=False)
class CacheManager:
"""线程安全缓存管理器"""
def __init__(self, max_size: int = 1000):
self._cache: Dict[str, Any] = {}
self._max_size = max_size
self._lock = threading.RLock()
def get(self, key: str) -> Any:
with self._lock:
return self._cache.get(key)
def set(self, key: str, value: Any) -> None:
with self._lock:
if len(self._cache) >= self._max_size:
oldest = next(iter(self._cache))
del self._cache[oldest]
self._cache[key] = value
db1 = DatabaseConnection("mysql://localhost")
db2 = DatabaseConnection("postgres://remote")
print(f"同一实例: {db1 is db2}")
print(f"连接字符串: {db1._connection_string}")装饰器实现的内存语义:
┌─────────────────────────────────────────────────────────────────┐
│ singleton 装饰器作用域 │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ instances = {} ← 闭包变量 │ │
│ │ lock = RLock() │ │
│ │ │ │
│ │ def wrapper(*args, **kwargs): │ │
│ │ if cls not in instances: │ │
│ │ instances[cls] = cls(*args, **kwargs) │ │
│ │ return instances[cls] │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ @singleton
▼
┌─────────────────────────────────────────────────────────────────┐
│ DatabaseConnection │
│ (被 wrapper 替代,但保留原类的 __name__, __doc__ 等属性) │
└─────────────────────────────────────────────────────────────────┘1.2.3 __new__ 方法实现
__new__ 是Python对象创建的第一阶段,负责分配内存空间。通过重写该方法,可在对象构造层面控制实例化行为。
from typing import Any, Optional, ClassVar
import threading
from datetime import datetime
class SingletonMeta(type):
"""支持继承的单例元类基类"""
pass
class Singleton:
"""
基于 __new__ 的单例基类
特性:
- 线程安全的实例创建
- 支持初始化参数检测
- 提供重置接口用于测试
"""
_instance: ClassVar[Optional['Singleton']] = None
_lock: ClassVar[threading.RLock] = threading.RLock()
_initialized: ClassVar[bool] = False
def __new__(cls, *args: Any, **kwargs: Any) -> 'Singleton':
if cls._instance is None:
with cls._lock:
if cls._instance is None:
instance = super().__new__(cls)
cls._instance = instance
print(f"[{cls.__name__}] 创建新实例")
return cls._instance
def __init__(self, *args: Any, **kwargs: Any):
if self._initialized:
if args or kwargs:
print(f"[{self.__class__.__name__}] 警告: 忽略重复初始化参数")
return
with self._lock:
if not self._initialized:
self._do_init(*args, **kwargs)
self._initialized = True
def _do_init(self, *args: Any, **kwargs: Any) -> None:
"""子类应重写此方法进行初始化"""
pass
@classmethod
def reset(cls) -> None:
"""重置单例状态(仅用于测试)"""
with cls._lock:
if cls._instance is not None:
cls._instance = None
cls._initialized = False
print(f"[{cls.__name__}] 单例已重置")
class Application(Singleton):
"""应用管理器单例"""
def _do_init(self, name: str = "MyApp", version: str = "1.0.0") -> None:
self._name = name
self._version = version
self._started_at = datetime.now()
self._components: dict = {}
print(f"[Application] 初始化: {name} v{version}")
@property
def name(self) -> str:
return self._name
@property
def uptime(self) -> float:
return (datetime.now() - self._started_at).total_seconds()
def register_component(self, name: str, component: Any) -> None:
self._components[name] = component
def get_component(self, name: str) -> Optional[Any]:
return self._components.get(name)
app1 = Application("WebServer", "2.0.0")
app2 = Application("APIGateway", "3.0.0")
print(f"同一实例: {app1 is app2}")
print(f"应用名称: {app1.name}")
print(f"运行时间: {app1.uptime:.2f}秒")__new__ 与 __init__ 的协作机制:
┌─────────────────────────────────────────────────────────────────┐
│ 对象创建流程 │
│ │
│ Application("WebServer", "2.0.0") │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ __new__(cls, *args, **kwargs) │ │
│ │ │ │
│ │ 1. 检查 _instance 是否为 None │ │
│ │ 2. 若为 None,获取锁并再次检查(双重检查锁定) │ │
│ │ 3. 调用 super().__new__(cls) 分配内存 │ │
│ │ 4. 将新实例赋值给 _instance │ │
│ │ 5. 返回 _instance │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ __init__(self, *args, **kwargs) │ │
│ │ │ │
│ │ 1. 检查 _initialized 标志 │ │
│ │ 2. 若已初始化,忽略参数并返回 │ │
│ │ 3. 若未初始化,获取锁并调用 _do_init │ │
│ │ 4. 设置 _initialized = True │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘1.2.4 元类实现
元类是Python中控制类创建行为的最高层机制。通过自定义元类,可在类定义层面强制实施单例约束。
from typing import Any, Dict, Type, Optional
import threading
from abc import ABCMeta
class SingletonMeta(type):
"""
线程安全的单例元类
特性:
- 在类创建时注入单例逻辑
- 支持继承(子类也是单例,但与父类独立)
- 线程安全的实例创建
- 可配置的初始化行为
"""
_instances: Dict[Type, Any] = {}
_lock: threading.RLock = threading.RLock()
def __call__(cls, *args: Any, **kwargs: Any) -> Any:
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
print(f"[{cls.__name__}] 元类创建实例")
return cls._instances[cls]
@classmethod
def reset_instance(mcs, cls: Type) -> None:
"""重置指定类的单例实例"""
with mcs._lock:
if cls in mcs._instances:
del mcs._instances[cls]
@classmethod
def get_all_instances(mcs) -> Dict[Type, Any]:
"""获取所有单例实例(用于调试)"""
return mcs._instances.copy()
class AbstractSingletonMeta(SingletonMeta, ABCMeta):
"""
结合抽象基类的单例元类
允许定义抽象单例接口
"""
pass
class Logger(metaclass=SingletonMeta):
"""日志管理器单例"""
def __init__(self, level: str = "INFO", output: str = "console"):
self._level = level
self._output = output
self._logs: list = []
print(f"[Logger] 初始化: level={level}, output={output}")
def log(self, message: str, level: str = None) -> None:
entry_level = level or self._level
entry = f"[{entry_level}] {message}"
self._logs.append(entry)
print(entry)
def get_logs(self) -> list:
return self._logs.copy()
def clear(self) -> None:
self._logs.clear()
class MetricsCollector(metaclass=SingletonMeta):
"""指标收集器单例"""
def __init__(self, namespace: str = "default"):
self._namespace = namespace
self._metrics: Dict[str, list] = {}
self._lock = threading.RLock()
def record(self, name: str, value: float) -> None:
with self._lock:
if name not in self._metrics:
self._metrics[name] = []
self._metrics[name].append(value)
def get_average(self, name: str) -> Optional[float]:
with self._lock:
values = self._metrics.get(name, [])
return sum(values) / len(values) if values else None
logger1 = Logger("DEBUG", "file")
logger2 = Logger("ERROR", "console")
print(f"同一实例: {logger1 is logger2}")
logger1.log("系统启动")
metrics = MetricsCollector("production")
metrics.record("response_time", 0.125)
metrics.record("response_time", 0.150)
print(f"平均响应时间: {metrics.get_average('response_time'):.3f}s")元类继承与实例隔离:
class BaseService(metaclass=SingletonMeta):
"""服务基类"""
def __init__(self, name: str):
self._name = name
@property
def name(self) -> str:
return self._name
class DatabaseService(BaseService):
"""数据库服务"""
pass
class CacheService(BaseService):
"""缓存服务"""
pass
db = DatabaseService("MySQL")
cache = CacheService("Redis")
print(f"db 与 cache 是同一实例: {db is cache}")
print(f"db 名称: {db.name}")
print(f"cache 名称: {cache.name}")
print(f"\n所有单例实例: {[cls.__name__ for cls in SingletonMeta.get_all_instances()]}")1.2.5 实现方式对比分析
| 实现方式 | 线程安全 | 可继承性 | 延迟初始化 | 参数化 | 复杂度 | 推荐场景 |
|---|---|---|---|---|---|---|
| 模块级 | ✓ | △ | ✗ | ✗ | 低 | 简单单例 |
| 装饰器 | 可配置 | △ | ✓ | ✓ | 中 | 需要灵活配置 |
__new__ | 需实现 | ✓ | ✓ | ✓ | 中 | 需要继承 |
| 元类 | ✓ | ✓ | ✓ | ✓ | 高 | 复杂场景、框架开发 |
1.3 并发环境下的单例实现
1.3.1 线程安全分析
在多线程环境中,单例创建存在竞态条件(Race Condition)。考虑以下非线程安全实现:
class UnsafeSingleton:
_instance = None
@classmethod
def get_instance(cls) -> 'UnsafeSingleton':
if cls._instance is None:
import time
time.sleep(0.001)
cls._instance = cls()
return cls._instance竞态条件时序图:
时间轴 线程A 线程B _instance
────────────────────────────────────────────────────────────────────
t1 检查 None ✓
t2 检查 None ✓ None
t3 创建实例 A
t4 创建实例 B A
t5 返回 A
t6 返回 B B (覆盖A!)
────────────────────────────────────────────────────────────────────
结果: 线程A获得实例A,线程B获得实例B,违反单例约束1.3.2 双重检查锁定模式
双重检查锁定是一种优化模式,在保证线程安全的同时减少锁竞争开销。
from typing import ClassVar, Optional, Any
import threading
from functools import wraps
class DoubleCheckedLockingSingleton:
"""
双重检查锁定单例
优化原理:
- 第一次检查避免不必要的锁获取
- 第二次检查确保只创建一个实例
- 使用 RLock 支持递归调用
"""
_instance: ClassVar[Optional['DoubleCheckedLockingSingleton']] = None
_lock: ClassVar[threading.RLock] = threading.RLock()
_initialized: ClassVar[bool] = False
def __new__(cls, *args: Any, **kwargs: Any):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
instance = super().__new__(cls)
cls._instance = instance
return cls._instance
def __init__(self, value: str = "default"):
if DoubleCheckedLockingSingleton._initialized:
return
with DoubleCheckedLockingSingleton._lock:
if not DoubleCheckedLockingSingleton._initialized:
self._value = value
DoubleCheckedLockingSingleton._initialized = True
@property
def value(self) -> str:
return self._value
def demonstrate_thread_safety():
"""演示线程安全的单例创建"""
import concurrent.futures
results = []
def create_singleton():
instance = DoubleCheckedLockingSingleton(f"thread-{threading.current_thread().name}")
return id(instance)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(create_singleton) for _ in range(100)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
unique_ids = set(results)
print(f"创建实例数: {len(results)}")
print(f"唯一实例数: {len(unique_ids)}")
print(f"单例验证: {'通过' if len(unique_ids) == 1 else '失败'}")
demonstrate_thread_safety()1.3.3 线程局部单例
某些场景需要每个线程拥有独立的单例实例:
from typing import Any, Callable, TypeVar, ParamSpec
import threading
from functools import wraps
P = ParamSpec('P')
T = TypeVar('T')
def thread_local_singleton(cls: Callable[P, T]) -> Callable[P, T]:
"""
线程局部单例装饰器
每个线程拥有独立的单例实例
"""
local_data = threading.local()
lock = threading.Lock()
@wraps(cls)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
if not hasattr(local_data, 'instance'):
with lock:
if not hasattr(local_data, 'instance'):
local_data.instance = cls(*args, **kwargs)
return local_data.instance
return wrapper
@thread_local_singleton
class RequestContext:
"""请求上下文(线程局部单例)"""
def __init__(self):
self._request_id = id(threading.current_thread())
self._data: dict = {}
print(f"[RequestContext] 线程 {threading.current_thread().name} 创建上下文")
def set(self, key: str, value: Any) -> None:
self._data[key] = value
def get(self, key: str) -> Any:
return self._data.get(key)
@property
def request_id(self) -> int:
return self._request_id
def process_request(request_num: int):
ctx = RequestContext()
ctx.set("request_num", request_num)
print(f"线程 {threading.current_thread().name}: "
f"request_id={ctx.request_id}, request_num={ctx.get('request_num')}")
threads = [threading.Thread(target=process_request, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()1.3.4 进程安全与分布式单例
在多进程或分布式环境中,单例约束需要跨进程/节点协调:
import os
import fcntl
from typing import Optional, Any
from pathlib import Path
from contextlib import contextmanager
class FileLockSingleton:
"""
基于文件锁的进程安全单例
原理:
- 使用操作系统文件锁(fcntl)确保跨进程互斥
- 适用于单机多进程环境
"""
_instance: Optional['FileLockSingleton'] = None
_lock_file: Optional[int] = None
def __new__(cls, *args: Any, **kwargs: Any):
if cls._instance is None:
lock_path = Path("/tmp") / f"{cls.__name__}.lock"
try:
cls._lock_file = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
fcntl.flock(cls._lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
if cls._instance is None:
cls._instance = super().__new__(cls)
except (IOError, OSError):
if cls._lock_file is not None:
os.close(cls._lock_file)
raise RuntimeError(f"另一个 {cls.__name__} 实例正在运行")
return cls._instance
def __del__(self):
if self._lock_file is not None:
fcntl.flock(self._lock_file, fcntl.LOCK_UN)
os.close(self._lock_file)
class DistributedSingleton:
"""
分布式单例(概念实现)
实际生产环境应使用:
- Redis 分布式锁
- ZooKeeper 临时节点
- etcd 选举机制
"""
def __init__(self, redis_client=None, lock_key: str = "singleton:lock"):
self._redis = redis_client
self._lock_key = lock_key
self._lock_token = None
@contextmanager
def distributed_lock(self, timeout: int = 10):
"""获取分布式锁"""
import uuid
token = str(uuid.uuid4())
try:
if self._redis:
acquired = self._redis.set(
self._lock_key, token,
nx=True, ex=timeout
)
if not acquired:
raise RuntimeError("获取分布式锁失败")
self._lock_token = token
yield self
finally:
if self._redis and self._lock_token:
self._release_lock()
def _release_lock(self):
"""释放分布式锁(Lua脚本保证原子性)"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self._redis.eval(lua_script, 1, self._lock_key, self._lock_token)1.4 单例模式的变体与扩展
1.4.1 延迟初始化单例
延迟初始化将实例创建推迟到首次访问时,优化启动性能和内存占用。
from typing import Callable, TypeVar, Generic, Optional
import threading
T = TypeVar('T')
class LazySingleton(Generic[T]):
"""
泛型延迟初始化单例
特性:
- 类型安全的延迟初始化
- 支持自定义工厂函数
- 线程安全的实例创建
"""
def __init__(self, factory: Callable[[], T]):
self._factory = factory
self._instance: Optional[T] = None
self._lock = threading.RLock()
def get(self) -> T:
if self._instance is None:
with self._lock:
if self._instance is None:
self._instance = self._factory()
return self._instance
def is_initialized(self) -> bool:
return self._instance is not None
def reset(self) -> None:
with self._lock:
self._instance = None
class HeavyResource:
"""重量级资源"""
def __init__(self, config: dict):
self._config = config
print(f"[HeavyResource] 初始化,配置: {config}")
def process(self, data: str) -> str:
return f"处理: {data}"
resource_singleton = LazySingleton(lambda: HeavyResource({"timeout": 30}))
print("延迟单例已创建,资源尚未初始化")
print(f"资源初始化状态: {resource_singleton.is_initialized()}")
resource = resource_singleton.get()
print(f"资源初始化状态: {resource_singleton.is_initialized()}")
print(resource.process("测试数据"))1.4.2 多例模式
多例模式允许存在有限数量的实例,每个实例通过唯一键标识。
from typing import Dict, Any, Optional, Callable, TypeVar
import threading
from dataclasses import dataclass
@dataclass
class InstanceInfo:
"""实例信息"""
instance: Any
created_at: float
access_count: int = 0
class Multiton:
"""
多例模式实现
特性:
- 按键管理多个单例实例
- 支持实例上限和淘汰策略
- 线程安全的实例管理
"""
_instances: Dict[str, InstanceInfo] = {}
_lock = threading.RLock()
_max_instances: int = 10
@classmethod
def get_instance(cls, key: str, factory: Callable[[], Any] = None) -> Any:
with cls._lock:
if key in cls._instances:
cls._instances[key].access_count += 1
return cls._instances[key].instance
if len(cls._instances) >= cls._max_instances:
cls._evict_lru()
if factory is None:
raise ValueError(f"实例 '{key}' 不存在且未提供工厂函数")
import time
instance = factory()
cls._instances[key] = InstanceInfo(
instance=instance,
created_at=time.time(),
access_count=1
)
return instance
@classmethod
def _evict_lru(cls) -> None:
"""淘汰最少使用的实例"""
lru_key = min(cls._instances.keys(),
key=lambda k: cls._instances[k].access_count)
del cls._instances[lru_key]
print(f"[Multiton] 淘汰实例: {lru_key}")
@classmethod
def get_all_keys(cls) -> list:
return list(cls._instances.keys())
@classmethod
def get_stats(cls) -> Dict[str, Dict]:
return {
key: {
"created_at": info.created_at,
"access_count": info.access_count
}
for key, info in cls._instances.items()
}
class DatabaseConnection:
"""数据库连接"""
def __init__(self, db_type: str, host: str):
self.db_type = db_type
self.host = host
print(f"[DatabaseConnection] 创建 {db_type} 连接到 {host}")
mysql = Multiton.get_instance("mysql", lambda: DatabaseConnection("MySQL", "localhost"))
postgres = Multiton.get_instance("postgres", lambda: DatabaseConnection("PostgreSQL", "remote"))
mysql_again = Multiton.get_instance("mysql")
print(f"mysql 与 mysql_again 是同一实例: {mysql is mysql_again}")
print(f"所有实例键: {Multiton.get_all_keys()}")1.4.3 Borg模式(共享状态单例)
Borg模式允许多个实例存在,但所有实例共享同一状态。
from typing import Any, Dict, ClassVar
import threading
class Borg:
"""
Borg模式基类
原理:
- 所有实例共享 __dict__ 字典
- 实例身份不同,但状态相同
- 比传统单例更灵活
"""
_shared_state: ClassVar[Dict[str, Any]] = {}
_lock: ClassVar[threading.RLock] = threading.RLock()
def __new__(cls, *args: Any, **kwargs: Any):
instance = super().__new__(cls)
instance.__dict__ = cls._shared_state
return instance
class Configuration(Borg):
"""配置管理器(Borg模式)"""
def __init__(self, env: str = "development"):
with self._lock:
if not hasattr(self, '_initialized'):
self._env = env
self._settings: Dict[str, Any] = {}
self._initialized = True
print(f"[Configuration] 初始化环境: {env}")
def set(self, key: str, value: Any) -> None:
with self._lock:
self._settings[key] = value
def get(self, key: str, default: Any = None) -> Any:
return self._settings.get(key, default)
def get_env(self) -> str:
return self._env
config1 = Configuration("production")
config1.set("debug", False)
config1.set("max_connections", 100)
config2 = Configuration("development")
print(f"config1 is config2: {config1 is config2}")
print(f"config1.get_env(): {config1.get_env()}")
print(f"config2.get('debug'): {config2.get('debug')}")
print(f"config1._settings is config2._settings: {config1._settings is config2._settings}")Borg模式与传统单例对比:
| 特性 | 传统单例 | Borg模式 |
|---|---|---|
| 实例数量 | 严格唯一 | 可多个 |
| 状态共享 | 隐式(同一实例) | 显式(共享字典) |
| 子类支持 | 复杂 | 自然支持 |
| 内存模型 | 单对象 | 多对象共享字典 |
| 适用场景 | 严格唯一约束 | 状态共享需求 |
1.5 实际应用案例
1.5.1 企业级日志管理器
import logging
import logging.handlers
from typing import Optional, Dict, Any
from pathlib import Path
from datetime import datetime
import threading
import json
class LogManager(metaclass=SingletonMeta):
"""
企业级日志管理器
特性:
- 多输出目标(控制台、文件、远程)
- 结构化日志支持
- 日志级别动态调整
- 性能监控集成
"""
def __init__(
self,
app_name: str = "Application",
level: int = logging.INFO,
log_dir: str = "logs"
):
self._app_name = app_name
self._level = level
self._log_dir = Path(log_dir)
self._log_dir.mkdir(parents=True, exist_ok=True)
self._logger = logging.getLogger(app_name)
self._logger.setLevel(level)
self._lock = threading.RLock()
self._setup_handlers()
self._metrics = {"info": 0, "warning": 0, "error": 0, "debug": 0}
def _setup_handlers(self) -> None:
"""配置日志处理器"""
if self._logger.handlers:
return
formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(formatter)
self._logger.addHandler(console_handler)
log_file = self._log_dir / f"{self._app_name}_{datetime.now():%Y%m%d}.log"
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024,
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
self._logger.addHandler(file_handler)
def _log(self, level: int, message: str, **context: Any) -> None:
with self._lock:
level_name = logging.getLevelName(level).lower()
self._metrics[level_name] = self._metrics.get(level_name, 0) + 1
if context:
structured_msg = json.dumps({
"message": message,
"context": context
}, ensure_ascii=False)
self._logger.log(level, structured_msg)
else:
self._logger.log(level, message)
def info(self, message: str, **context: Any) -> None:
self._log(logging.INFO, message, **context)
def warning(self, message: str, **context: Any) -> None:
self._log(logging.WARNING, message, **context)
def error(self, message: str, exc_info: bool = False, **context: Any) -> None:
with self._lock:
self._metrics["error"] = self._metrics.get("error", 0) + 1
self._logger.error(message, exc_info=exc_info, extra=context)
def debug(self, message: str, **context: Any) -> None:
self._log(logging.DEBUG, message, **context)
def get_metrics(self) -> Dict[str, int]:
return self._metrics.copy()
def set_level(self, level: int) -> None:
self._level = level
self._logger.setLevel(level)
logger = LogManager("MyApp", logging.DEBUG)
logger.info("应用启动", version="1.0.0", env="production")
logger.warning("配置文件不存在,使用默认配置")
logger.error("数据库连接失败", exc_info=False, host="localhost", port=3306)
print(f"日志统计: {logger.get_metrics()}")1.5.2 高性能数据库连接池
from typing import Optional, Any, Generator, ContextManager
from dataclasses import dataclass, field
from queue import Queue, Empty
from contextlib import contextmanager
import threading
import time
from abc import ABC, abstractmethod
@dataclass
class ConnectionConfig:
"""连接配置"""
host: str = "localhost"
port: int = 3306
database: str = "test"
username: str = "root"
password: str = ""
timeout: float = 30.0
pool_size: int = 10
max_overflow: int = 5
class Connection(ABC):
"""连接抽象基类"""
def __init__(self, conn_id: int, config: ConnectionConfig):
self.id = conn_id
self.config = config
self._connected = False
self._last_used = time.time()
self._use_count = 0
@abstractmethod
def connect(self) -> None:
pass
@abstractmethod
def disconnect(self) -> None:
pass
@abstractmethod
def execute(self, query: str) -> Any:
pass
def is_alive(self) -> bool:
return self._connected
def mark_used(self) -> None:
self._last_used = time.time()
self._use_count += 1
class MySQLConnection(Connection):
"""MySQL连接实现"""
def connect(self) -> None:
print(f"[MySQL-{self.id}] 连接到 {self.config.host}:{self.config.port}")
self._connected = True
def disconnect(self) -> None:
print(f"[MySQL-{self.id}] 断开连接")
self._connected = False
def execute(self, query: str) -> str:
if not self._connected:
raise RuntimeError("连接未建立")
self.mark_used()
return f"[MySQL-{self.id}] 执行: {query}"
class ConnectionPool(metaclass=SingletonMeta):
"""
高性能数据库连接池
特性:
- 单例模式确保全局唯一连接池
- 连接复用减少创建开销
- 动态扩容支持突发流量
- 连接健康检查与自动回收
"""
def __init__(self, config: ConnectionConfig = None):
if hasattr(self, '_initialized') and self._initialized:
return
self._config = config or ConnectionConfig()
self._pool: Queue = Queue(maxsize=self._config.pool_size + self._config.max_overflow)
self._lock = threading.RLock()
self._connection_count = 0
self._active_connections: set = set()
self._stats = {
"created": 0,
"borrowed": 0,
"returned": 0,
"overflow": 0
}
self._initialize_pool()
self._start_health_checker()
self._initialized = True
def _initialize_pool(self) -> None:
"""初始化连接池"""
for i in range(self._config.pool_size):
conn = self._create_connection(i)
self._pool.put(conn)
def _create_connection(self, conn_id: int = None) -> Connection:
"""创建新连接"""
with self._lock:
if conn_id is None:
self._connection_count += 1
conn_id = self._connection_count
conn = MySQLConnection(conn_id, self._config)
conn.connect()
self._stats["created"] += 1
return conn
def get_connection(self, timeout: float = None) -> Optional[Connection]:
"""获取连接"""
timeout = timeout or self._config.timeout
try:
conn = self._pool.get(block=True, timeout=timeout)
if not conn.is_alive():
conn = self._create_connection(conn.id)
with self._lock:
self._active_connections.add(conn.id)
self._stats["borrowed"] += 1
return conn
except Empty:
if self._connection_count < self._config.pool_size + self._config.max_overflow:
with self._lock:
conn = self._create_connection()
self._active_connections.add(conn.id)
self._stats["overflow"] += 1
self._stats["borrowed"] += 1
return conn
raise RuntimeError("连接池已满,无法获取连接")
def release_connection(self, conn: Connection) -> None:
"""释放连接"""
with self._lock:
self._active_connections.discard(conn.id)
self._stats["returned"] += 1
if conn.is_alive():
self._pool.put(conn)
else:
with self._lock:
self._connection_count -= 1
@contextmanager
def connection(self, timeout: float = None) -> Generator[Optional[Connection], None, None]:
"""上下文管理器方式获取连接"""
conn = None
try:
conn = self.get_connection(timeout)
yield conn
finally:
if conn:
self.release_connection(conn)
def _start_health_checker(self) -> None:
"""启动健康检查线程"""
def health_check():
while True:
time.sleep(60)
self._check_connections()
thread = threading.Thread(target=health_check, daemon=True)
thread.start()
def _check_connections(self) -> None:
"""检查连接健康状态"""
temp_conns = []
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
if conn.is_alive() and time.time() - conn._last_used < 300:
temp_conns.append(conn)
else:
conn.disconnect()
with self._lock:
self._connection_count -= 1
except Empty:
break
for conn in temp_conns:
self._pool.put(conn)
def get_stats(self) -> dict:
return {
**self._stats,
"pool_size": self._pool.qsize(),
"active": len(self._active_connections),
"total_created": self._connection_count
}
pool = ConnectionPool(ConnectionConfig(pool_size=3, max_overflow=2))
with pool.connection() as conn:
if conn:
result = conn.execute("SELECT * FROM users")
print(result)
print(f"连接池统计: {pool.get_stats()}")1.6 反模式与最佳实践
1.6.1 常见反模式
反模式1:滥用单例
class UserService:
"""错误示例:用户服务不应是单例"""
_instance = None
def __init__(self):
self._current_user = None
def set_user(self, user: dict):
self._current_user = user
def get_user(self):
return self._current_user
service1 = UserService()
service1.set_user({"id": 1, "name": "Alice"})
service2 = UserService()
print(service2.get_user())问题分析:用户服务代表当前用户上下文,不同请求应拥有独立实例。使用单例会导致状态污染。
正确做法:
class UserService:
"""正确示例:每次请求创建新实例"""
def __init__(self, user: dict = None):
self._user = user
def set_user(self, user: dict):
self._user = user
def get_user(self):
return self._user
def handle_request(user_id: int):
user = fetch_user(user_id)
service = UserService(user)
return service.process()反模式2:单例中持有可变状态
class BadCache:
"""错误示例:单例中持有大量可变状态"""
def __init__(self):
self._data = {}
def set(self, key, value):
self._data[key] = value
def get(self, key):
return self._data.get(key)
def clear(self):
self._data.clear()问题分析:单例的生命周期与应用相同,长期持有可变状态会导致内存泄漏和数据不一致。
正确做法:
class GoodCache:
"""正确示例:限制缓存大小,提供过期机制"""
def __init__(self, max_size: int = 1000, ttl: int = 3600):
self._data = {}
self._timestamps = {}
self._max_size = max_size
self._ttl = ttl
self._lock = threading.RLock()
def set(self, key, value):
with self._lock:
if len(self._data) >= self._max_size:
self._evict_expired()
if len(self._data) >= self._max_size:
self._evict_lru()
self._data[key] = value
self._timestamps[key] = time.time()
def get(self, key):
with self._lock:
if key in self._data:
if time.time() - self._timestamps[key] > self._ttl:
del self._data[key]
del self._timestamps[key]
return None
return self._data[key]
return None
def _evict_expired(self):
current = time.time()
expired = [k for k, t in self._timestamps.items()
if current - t > self._ttl]
for k in expired:
del self._data[k]
del self._timestamps[k]
def _evict_lru(self):
lru_key = min(self._timestamps, key=self._timestamps.get)
del self._data[lru_key]
del self._timestamps[lru_key]1.6.2 测试策略
单例模式在测试中常带来挑战,需要特殊处理:
import unittest
import threading
class SingletonTestCase(unittest.TestCase):
"""单例测试基类"""
def setUp(self):
"""每个测试前重置单例"""
Singleton.reset()
def tearDown(self):
"""测试后清理"""
Singleton.reset()
class TestSingleton(SingletonTestCase):
def test_single_instance(self):
s1 = Singleton()
s2 = Singleton()
self.assertIs(s1, s2)
def test_thread_safety(self):
instances = []
def create_instance():
instances.append(Singleton())
threads = [threading.Thread(target=create_instance) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
first = instances[0]
for instance in instances[1:]:
self.assertIs(first, instance)
class TestableSingleton:
"""可测试的单例实现"""
_instance = None
_lock = threading.RLock()
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def reset(cls):
"""重置单例(仅用于测试)"""
with cls._lock:
cls._instance = None
@classmethod
def inject_instance(cls, instance):
"""注入测试替身"""
with cls._lock:
cls._instance = instance1.6.3 依赖注入替代方案
在现代软件架构中,依赖注入(Dependency Injection)通常是更好的选择:
from typing import Protocol, runtime_checkable
@runtime_checkable
class ILogger(Protocol):
"""日志接口协议"""
def info(self, message: str) -> None: ...
def error(self, message: str) -> None: ...
class ConsoleLogger:
"""控制台日志实现"""
def info(self, message: str) -> None:
print(f"[INFO] {message}")
def error(self, message: str) -> None:
print(f"[ERROR] {message}")
class FileLogger:
"""文件日志实现"""
def __init__(self, file_path: str):
self._file_path = file_path
def info(self, message: str) -> None:
with open(self._file_path, 'a') as f:
f.write(f"[INFO] {message}\n")
def error(self, message: str) -> None:
with open(self._file_path, 'a') as f:
f.write(f"[ERROR] {message}\n")
class UserService:
"""用户服务(依赖注入方式)"""
def __init__(self, logger: ILogger):
self._logger = logger
def create_user(self, username: str) -> None:
self._logger.info(f"创建用户: {username}")
class ServiceContainer:
"""服务容器(简单的IoC容器)"""
def __init__(self):
self._services = {}
self._singletons = {}
def register_singleton(self, interface: type, implementation: type) -> None:
self._services[interface] = ('singleton', implementation)
def register_transient(self, interface: type, implementation: type) -> None:
self._services[interface] = ('transient', implementation)
def resolve(self, interface: type) -> Any:
if interface not in self._services:
raise ValueError(f"服务 {interface} 未注册")
scope, impl = self._services[interface]
if scope == 'singleton':
if interface not in self._singletons:
self._singletons[interface] = impl()
return self._singletons[interface]
else:
return impl()
container = ServiceContainer()
container.register_singleton(ILogger, ConsoleLogger)
logger = container.resolve(ILogger)
user_service = UserService(logger)
user_service.create_user("Alice")1.7 模式评估与决策指南
1.7.1 适用性检查清单
使用以下检查清单评估是否应使用单例模式:
| 检查项 | 是 | 否 | 说明 |
|---|---|---|---|
| 资源是否具有物理唯一性? | ✓ 使用 | ✗ 评估 | 如设备驱动、端口 |
| 是否需要全局协调? | ✓ 使用 | ✗ 评估 | 如事件总线、调度器 |
| 实例创建开销是否很大? | ✓ 使用 | ✗ 评估 | 如重量级配置加载 |
| 是否需要延迟初始化? | ✓ 使用 | ✗ 评估 | 按需创建优化启动 |
| 是否需要严格控制实例数量? | ✓ 使用 | ✗ 评估 | 如许可证限制 |
| 是否在测试中需要隔离? | ✗ 评估 | ✓ 使用 | 考虑依赖注入 |
1.7.2 替代方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 单例模式 | 简单、全局访问 | 测试困难、隐藏依赖 | 真正的全局资源 |
| 模块级变量 | 最简单 | 不够灵活 | 简单配置 |
| 依赖注入 | 可测试、灵活 | 增加复杂度 | 企业级应用 |
| 服务定位器 | 解耦 | 仍是全局状态 | 框架开发 |
| Borg模式 | 灵活继承 | 状态共享不明显 | 需要多实例但共享状态 |
1.8 小结
单例模式作为最简单却也最易滥用的设计模式,其核心价值在于控制实例数量和提供全局访问点。在Python中,实现方式多样,各有优劣:
┌─────────────────────────────────────────────────────────────────┐
│ 单例实现选择决策树 │
│ │
│ 需要单例吗? │
│ │ │
│ ┌──────────┴──────────┐ │
│ ▼ ▼ │
│ 是的 不确定 │
│ │ │ │
│ ▼ ▼ │
│ 需要继承吗? 考虑依赖注入 │
│ │ │
│ ┌───────┴───────┐ │
│ ▼ ▼ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 元类实现 模块级单例 │
│ __new__实现 装饰器实现 │
└─────────────────────────────────────────────────────────────────┘关键要点:
- 谨慎使用:单例模式应仅用于真正需要全局唯一实例的场景
- 线程安全:多线程环境必须使用双重检查锁定或元类实现
- 可测试性:提供重置机制,或考虑依赖注入替代
- 文档清晰:明确说明单例的生命周期和线程安全特性
- 遵循原则:避免单例类承担过多职责
在下一章中,我们将探讨工厂方法模式,学习如何在不指定具体类的情况下创建对象。