Skip to content

第1章 单例模式

学习目标

完成本章学习后,读者将能够:

  • 深入理解单例模式的理论基础、设计动机与数学本质
  • 掌握Python语言特性下的多种单例实现机制及其内存模型
  • 分析线程安全、进程安全与分布式环境下的单例实现策略
  • 评估单例模式在软件架构中的利弊权衡与替代方案
  • 运用形式化方法验证单例实现的正确性

1.1 理论基础与模式定义

1.1.1 形式化定义

单例模式(Singleton Pattern) 是一种创建型设计模式,其核心约束可形式化表述为:

$$\forall c \in Class, \forall i_1, i_2 \in Instances(c): i_1 = i_2$$

即对于任意类 $c$,其所有实例 $i_1, i_2$ 必须满足同一性关系。该模式同时提供全局访问点 $AccessPoint(c)$,满足:

$$AccessPoint(c) \rightarrow i, \text{其中 } i \text{ 是 } c \text{ 的唯一实例}$$

1.1.2 历史演进与学术背景

单例模式最早由"四人组"(Gang of Four, GoF)在1994年的经典著作《设计模式:可复用面向对象软件的基础》中系统阐述。其设计灵感源于对全局变量的批判性继承——既保留全局访问的便利性,又克服其初始化时机不可控、命名空间污染等缺陷。

从软件工程视角,单例模式体现了以下设计原则:

设计原则单例模式的体现
单一职责原则(SRP)将实例管理职责从业务逻辑中分离(理想状态)
开闭原则(OCP)通过继承扩展行为,但实例管理机制封闭
依赖倒置原则(DIP)客户端依赖抽象接口而非具体单例实现
接口隔离原则(ISP)单例应暴露最小必要接口

1.1.3 模式动机与应用场景

在软件系统架构中,存在一类具有全局唯一性约束的资源:

┌─────────────────────────────────────────────────────────────────┐
│                    单例模式应用场景分类                          │
├─────────────────────────────────────────────────────────────────┤
│  资源管理型    │  数据库连接池 │ 文件系统 │ 设备驱动 │ 线程池   │
├─────────────────────────────────────────────────────────────────┤
│  配置管理型    │  应用配置 │ 环境变量 │ 特性开关 │ 国际化资源 │
├─────────────────────────────────────────────────────────────────┤
│  日志监控型    │  日志记录器 │ 性能监控 │ 审计追踪 │ 告警系统 │
├─────────────────────────────────────────────────────────────────┤
│  缓存服务型    │  内存缓存 │ 会话管理 │ 对象池 │ 连接注册表 │
└─────────────────────────────────────────────────────────────────┘

核心设计动机

  1. 资源约束:某些资源(如数据库连接、硬件设备)在物理或逻辑上只能存在一个实例
  2. 状态一致性:全局状态需要集中管理,避免多实例导致的状态不一致
  3. 性能优化:避免重复创建开销较大的对象
  4. 协调服务:作为系统各组件的协调中心

1.1.4 UML结构模型

┌─────────────────────────────────────────────────────────────────┐
│                      Singleton                                  │
├─────────────────────────────────────────────────────────────────┤
│  - instance: Singleton «static»                                 │
│  - lock: Lock «static» «optional»                               │
├─────────────────────────────────────────────────────────────────┤
│  - __init__() «private»                                         │
│  + get_instance(): Singleton «static»                           │
│  + business_operation()                                         │
└─────────────────────────────────────────────────────────────────┘


              ┌────────────────────────┐
              │    Client Code        │
              │                        │
              │  singleton = Singleton │
              │    .get_instance()     │
              └────────────────────────┘

结构要素解析

  • 私有静态实例变量:持有唯一的实例引用
  • 私有构造函数:防止外部直接实例化
  • 公共静态访问方法:提供全局访问点,负责延迟初始化

1.2 Python实现机制深度解析

Python作为动态语言,其对象模型与类机制为单例实现提供了多种途径。每种方法在语义、性能和可维护性上各有特点。

1.2.1 模块级单例(推荐方案)

理论基础:Python模块系统遵循单次导入原则——模块在首次import时执行初始化,后续导入直接返回已加载的模块对象。这一特性由sys.modules字典保证。

python
"""
singleton_module.py - 模块级单例实现
"""
from typing import Any, Optional
from dataclasses import dataclass, field
import threading
from datetime import datetime

@dataclass
class AppState:
    """应用状态管理器"""
    app_name: str = "MyApplication"
    version: str = "1.0.0"
    started_at: datetime = field(default_factory=datetime.now)
    _settings: dict = field(default_factory=dict)
    _lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
    
    def set_setting(self, key: str, value: Any) -> None:
        with self._lock:
            self._settings[key] = value
    
    def get_setting(self, key: str, default: Any = None) -> Any:
        with self._lock:
            return self._settings.get(key, default)
    
    def get_uptime(self) -> float:
        return (datetime.now() - self.started_at).total_seconds()

state = AppState()

使用方式

python
from singleton_module import state

state.set_setting("debug", True)
print(f"应用运行时间: {state.get_uptime():.2f}秒")

内存模型分析

┌─────────────────────────────────────────────────────────────────┐
│                      sys.modules                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ "singleton_module" → <module object>                    │   │
│  │                          │                               │   │
│  │                          ▼                               │   │
│  │                    state = AppState(...)                │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
         │                              │
         │ import                       │ import
         ▼                              ▼
┌─────────────────┐            ┌─────────────────┐
│   module_a.py   │            │   module_b.py   │
│                 │            │                 │
│ state.set(...)  │            │ state.get(...)  │
└─────────────────┘            └─────────────────┘
         │                              │
         └──────────────┬───────────────┘

              同一个 AppState 实例

优势与局限

维度评估
简洁性★★★★★ 最Pythonic的实现方式
线程安全★★★★★ 模块导入本身是线程安全的
可测试性★★★☆☆ 需要重新加载模块来重置状态
灵活性★★☆☆☆ 难以延迟初始化或参数化
继承支持★★★★☆ 可通过模块内继承实现

1.2.2 装饰器实现

装饰器模式利用Python的闭包机制,在不修改类定义的前提下注入单例行为。

python
from functools import wraps
from typing import Callable, TypeVar, ParamSpec, Any, Dict, Type
import threading
import weakref

P = ParamSpec('P')
T = TypeVar('T')

def singleton(
    cls: Callable[P, T] = None,
    *,
    thread_safe: bool = True,
    weak: bool = False
) -> Callable[P, T]:
    """
    单例装饰器工厂
    
    参数:
        thread_safe: 是否启用线程安全保护
        weak: 是否使用弱引用(允许垃圾回收)
    
    返回:
        装饰后的类或装饰器函数
    """
    def decorator(target_cls: Callable[P, T]) -> Callable[P, T]:
        instances: Dict[Type, Any] = {}
        lock = threading.RLock() if thread_safe else None
        
        @wraps(target_cls)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            instance = instances.get(target_cls)
            
            if instance is not None and weak:
                instance = instance() if isinstance(instance, weakref.ref) else instance
                if instance is None:
                    del instances[target_cls]
            
            if target_cls not in instances:
                create_instance = lambda: target_cls(*args, **kwargs)
                
                if thread_safe:
                    with lock:
                        if target_cls not in instances:
                            instance = create_instance()
                            instances[target_cls] = weakref.ref(instance) if weak else instance
                else:
                    instance = create_instance()
                    instances[target_cls] = weakref.ref(instance) if weak else instance
            
            return instances[target_cls] if weak else instances.get(target_cls)
        
        wrapper._singleton_instances = instances
        wrapper._singleton_reset = lambda: instances.clear()
        
        return wrapper
    
    if cls is not None:
        return decorator(cls)
    return decorator


@singleton
class DatabaseConnection:
    """数据库连接单例"""
    
    def __init__(self, connection_string: str = "sqlite:///:memory:"):
        self._connection_string = connection_string
        self._connected = False
        print(f"[DatabaseConnection] 初始化: {connection_string}")
    
    def connect(self) -> None:
        if not self._connected:
            self._connected = True
            print(f"[DatabaseConnection] 已连接")
    
    def disconnect(self) -> None:
        if self._connected:
            self._connected = False
            print("[DatabaseConnection] 已断开")
    
    def execute(self, query: str) -> str:
        if not self._connected:
            raise RuntimeError("数据库未连接")
        return f"执行查询: {query}"


@singleton(thread_safe=True, weak=False)
class CacheManager:
    """线程安全缓存管理器"""
    
    def __init__(self, max_size: int = 1000):
        self._cache: Dict[str, Any] = {}
        self._max_size = max_size
        self._lock = threading.RLock()
    
    def get(self, key: str) -> Any:
        with self._lock:
            return self._cache.get(key)
    
    def set(self, key: str, value: Any) -> None:
        with self._lock:
            if len(self._cache) >= self._max_size:
                oldest = next(iter(self._cache))
                del self._cache[oldest]
            self._cache[key] = value


db1 = DatabaseConnection("mysql://localhost")
db2 = DatabaseConnection("postgres://remote")
print(f"同一实例: {db1 is db2}")
print(f"连接字符串: {db1._connection_string}")

装饰器实现的内存语义

┌─────────────────────────────────────────────────────────────────┐
│                    singleton 装饰器作用域                        │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  instances = {}  ← 闭包变量                              │   │
│  │  lock = RLock()                                         │   │
│  │                                                         │   │
│  │  def wrapper(*args, **kwargs):                          │   │
│  │      if cls not in instances:                           │   │
│  │          instances[cls] = cls(*args, **kwargs)          │   │
│  │      return instances[cls]                              │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

                              │ @singleton

┌─────────────────────────────────────────────────────────────────┐
│                    DatabaseConnection                           │
│  (被 wrapper 替代,但保留原类的 __name__, __doc__ 等属性)        │
└─────────────────────────────────────────────────────────────────┘

1.2.3 __new__ 方法实现

__new__ 是Python对象创建的第一阶段,负责分配内存空间。通过重写该方法,可在对象构造层面控制实例化行为。

python
from typing import Any, Optional, ClassVar
import threading
from datetime import datetime

class SingletonMeta(type):
    """支持继承的单例元类基类"""
    pass

class Singleton:
    """
    基于 __new__ 的单例基类
    
    特性:
        - 线程安全的实例创建
        - 支持初始化参数检测
        - 提供重置接口用于测试
    """
    
    _instance: ClassVar[Optional['Singleton']] = None
    _lock: ClassVar[threading.RLock] = threading.RLock()
    _initialized: ClassVar[bool] = False
    
    def __new__(cls, *args: Any, **kwargs: Any) -> 'Singleton':
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    instance = super().__new__(cls)
                    cls._instance = instance
                    print(f"[{cls.__name__}] 创建新实例")
        return cls._instance
    
    def __init__(self, *args: Any, **kwargs: Any):
        if self._initialized:
            if args or kwargs:
                print(f"[{self.__class__.__name__}] 警告: 忽略重复初始化参数")
            return
        
        with self._lock:
            if not self._initialized:
                self._do_init(*args, **kwargs)
                self._initialized = True
    
    def _do_init(self, *args: Any, **kwargs: Any) -> None:
        """子类应重写此方法进行初始化"""
        pass
    
    @classmethod
    def reset(cls) -> None:
        """重置单例状态(仅用于测试)"""
        with cls._lock:
            if cls._instance is not None:
                cls._instance = None
                cls._initialized = False
                print(f"[{cls.__name__}] 单例已重置")


class Application(Singleton):
    """应用管理器单例"""
    
    def _do_init(self, name: str = "MyApp", version: str = "1.0.0") -> None:
        self._name = name
        self._version = version
        self._started_at = datetime.now()
        self._components: dict = {}
        print(f"[Application] 初始化: {name} v{version}")
    
    @property
    def name(self) -> str:
        return self._name
    
    @property
    def uptime(self) -> float:
        return (datetime.now() - self._started_at).total_seconds()
    
    def register_component(self, name: str, component: Any) -> None:
        self._components[name] = component
    
    def get_component(self, name: str) -> Optional[Any]:
        return self._components.get(name)


app1 = Application("WebServer", "2.0.0")
app2 = Application("APIGateway", "3.0.0")

print(f"同一实例: {app1 is app2}")
print(f"应用名称: {app1.name}")
print(f"运行时间: {app1.uptime:.2f}秒")

__new____init__ 的协作机制

┌─────────────────────────────────────────────────────────────────┐
│                    对象创建流程                                  │
│                                                                 │
│  Application("WebServer", "2.0.0")                              │
│              │                                                  │
│              ▼                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  __new__(cls, *args, **kwargs)                          │   │
│  │                                                         │   │
│  │  1. 检查 _instance 是否为 None                          │   │
│  │  2. 若为 None,获取锁并再次检查(双重检查锁定)          │   │
│  │  3. 调用 super().__new__(cls) 分配内存                  │   │
│  │  4. 将新实例赋值给 _instance                            │   │
│  │  5. 返回 _instance                                      │   │
│  └─────────────────────────────────────────────────────────┘   │
│              │                                                  │
│              ▼                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  __init__(self, *args, **kwargs)                        │   │
│  │                                                         │   │
│  │  1. 检查 _initialized 标志                              │   │
│  │  2. 若已初始化,忽略参数并返回                          │   │
│  │  3. 若未初始化,获取锁并调用 _do_init                   │   │
│  │  4. 设置 _initialized = True                            │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

1.2.4 元类实现

元类是Python中控制类创建行为的最高层机制。通过自定义元类,可在类定义层面强制实施单例约束。

python
from typing import Any, Dict, Type, Optional
import threading
from abc import ABCMeta

class SingletonMeta(type):
    """
    线程安全的单例元类
    
    特性:
        - 在类创建时注入单例逻辑
        - 支持继承(子类也是单例,但与父类独立)
        - 线程安全的实例创建
        - 可配置的初始化行为
    """
    
    _instances: Dict[Type, Any] = {}
    _lock: threading.RLock = threading.RLock()
    
    def __call__(cls, *args: Any, **kwargs: Any) -> Any:
        if cls not in cls._instances:
            with cls._lock:
                if cls not in cls._instances:
                    instance = super().__call__(*args, **kwargs)
                    cls._instances[cls] = instance
                    print(f"[{cls.__name__}] 元类创建实例")
        return cls._instances[cls]
    
    @classmethod
    def reset_instance(mcs, cls: Type) -> None:
        """重置指定类的单例实例"""
        with mcs._lock:
            if cls in mcs._instances:
                del mcs._instances[cls]
    
    @classmethod
    def get_all_instances(mcs) -> Dict[Type, Any]:
        """获取所有单例实例(用于调试)"""
        return mcs._instances.copy()


class AbstractSingletonMeta(SingletonMeta, ABCMeta):
    """
    结合抽象基类的单例元类
    允许定义抽象单例接口
    """
    pass


class Logger(metaclass=SingletonMeta):
    """日志管理器单例"""
    
    def __init__(self, level: str = "INFO", output: str = "console"):
        self._level = level
        self._output = output
        self._logs: list = []
        print(f"[Logger] 初始化: level={level}, output={output}")
    
    def log(self, message: str, level: str = None) -> None:
        entry_level = level or self._level
        entry = f"[{entry_level}] {message}"
        self._logs.append(entry)
        print(entry)
    
    def get_logs(self) -> list:
        return self._logs.copy()
    
    def clear(self) -> None:
        self._logs.clear()


class MetricsCollector(metaclass=SingletonMeta):
    """指标收集器单例"""
    
    def __init__(self, namespace: str = "default"):
        self._namespace = namespace
        self._metrics: Dict[str, list] = {}
        self._lock = threading.RLock()
    
    def record(self, name: str, value: float) -> None:
        with self._lock:
            if name not in self._metrics:
                self._metrics[name] = []
            self._metrics[name].append(value)
    
    def get_average(self, name: str) -> Optional[float]:
        with self._lock:
            values = self._metrics.get(name, [])
            return sum(values) / len(values) if values else None


logger1 = Logger("DEBUG", "file")
logger2 = Logger("ERROR", "console")

print(f"同一实例: {logger1 is logger2}")
logger1.log("系统启动")

metrics = MetricsCollector("production")
metrics.record("response_time", 0.125)
metrics.record("response_time", 0.150)
print(f"平均响应时间: {metrics.get_average('response_time'):.3f}s")

元类继承与实例隔离

python
class BaseService(metaclass=SingletonMeta):
    """服务基类"""
    
    def __init__(self, name: str):
        self._name = name
    
    @property
    def name(self) -> str:
        return self._name


class DatabaseService(BaseService):
    """数据库服务"""
    pass


class CacheService(BaseService):
    """缓存服务"""
    pass


db = DatabaseService("MySQL")
cache = CacheService("Redis")

print(f"db 与 cache 是同一实例: {db is cache}")
print(f"db 名称: {db.name}")
print(f"cache 名称: {cache.name}")

print(f"\n所有单例实例: {[cls.__name__ for cls in SingletonMeta.get_all_instances()]}")

1.2.5 实现方式对比分析

实现方式线程安全可继承性延迟初始化参数化复杂度推荐场景
模块级简单单例
装饰器可配置需要灵活配置
__new__需实现需要继承
元类复杂场景、框架开发

1.3 并发环境下的单例实现

1.3.1 线程安全分析

在多线程环境中,单例创建存在竞态条件(Race Condition)。考虑以下非线程安全实现:

python
class UnsafeSingleton:
    _instance = None
    
    @classmethod
    def get_instance(cls) -> 'UnsafeSingleton':
        if cls._instance is None:
            import time
            time.sleep(0.001)
            cls._instance = cls()
        return cls._instance

竞态条件时序图

时间轴    线程A                    线程B                    _instance
────────────────────────────────────────────────────────────────────
  t1     检查 None ✓
  t2                              检查 None ✓              None
  t3     创建实例 A
  t4                              创建实例 B                A
  t5     返回 A
  t6                              返回 B                    B (覆盖A!)
────────────────────────────────────────────────────────────────────
结果: 线程A获得实例A,线程B获得实例B,违反单例约束

1.3.2 双重检查锁定模式

双重检查锁定是一种优化模式,在保证线程安全的同时减少锁竞争开销。

python
from typing import ClassVar, Optional, Any
import threading
from functools import wraps

class DoubleCheckedLockingSingleton:
    """
    双重检查锁定单例
    
    优化原理:
        - 第一次检查避免不必要的锁获取
        - 第二次检查确保只创建一个实例
        - 使用 RLock 支持递归调用
    """
    
    _instance: ClassVar[Optional['DoubleCheckedLockingSingleton']] = None
    _lock: ClassVar[threading.RLock] = threading.RLock()
    _initialized: ClassVar[bool] = False
    
    def __new__(cls, *args: Any, **kwargs: Any):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    instance = super().__new__(cls)
                    cls._instance = instance
        return cls._instance
    
    def __init__(self, value: str = "default"):
        if DoubleCheckedLockingSingleton._initialized:
            return
        with DoubleCheckedLockingSingleton._lock:
            if not DoubleCheckedLockingSingleton._initialized:
                self._value = value
                DoubleCheckedLockingSingleton._initialized = True
    
    @property
    def value(self) -> str:
        return self._value


def demonstrate_thread_safety():
    """演示线程安全的单例创建"""
    import concurrent.futures
    
    results = []
    
    def create_singleton():
        instance = DoubleCheckedLockingSingleton(f"thread-{threading.current_thread().name}")
        return id(instance)
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(create_singleton) for _ in range(100)]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    unique_ids = set(results)
    print(f"创建实例数: {len(results)}")
    print(f"唯一实例数: {len(unique_ids)}")
    print(f"单例验证: {'通过' if len(unique_ids) == 1 else '失败'}")


demonstrate_thread_safety()

1.3.3 线程局部单例

某些场景需要每个线程拥有独立的单例实例:

python
from typing import Any, Callable, TypeVar, ParamSpec
import threading
from functools import wraps

P = ParamSpec('P')
T = TypeVar('T')

def thread_local_singleton(cls: Callable[P, T]) -> Callable[P, T]:
    """
    线程局部单例装饰器
    
    每个线程拥有独立的单例实例
    """
    local_data = threading.local()
    lock = threading.Lock()
    
    @wraps(cls)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        if not hasattr(local_data, 'instance'):
            with lock:
                if not hasattr(local_data, 'instance'):
                    local_data.instance = cls(*args, **kwargs)
        return local_data.instance
    
    return wrapper


@thread_local_singleton
class RequestContext:
    """请求上下文(线程局部单例)"""
    
    def __init__(self):
        self._request_id = id(threading.current_thread())
        self._data: dict = {}
        print(f"[RequestContext] 线程 {threading.current_thread().name} 创建上下文")
    
    def set(self, key: str, value: Any) -> None:
        self._data[key] = value
    
    def get(self, key: str) -> Any:
        return self._data.get(key)
    
    @property
    def request_id(self) -> int:
        return self._request_id


def process_request(request_num: int):
    ctx = RequestContext()
    ctx.set("request_num", request_num)
    print(f"线程 {threading.current_thread().name}: "
          f"request_id={ctx.request_id}, request_num={ctx.get('request_num')}")


threads = [threading.Thread(target=process_request, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

1.3.4 进程安全与分布式单例

在多进程或分布式环境中,单例约束需要跨进程/节点协调:

python
import os
import fcntl
from typing import Optional, Any
from pathlib import Path
from contextlib import contextmanager

class FileLockSingleton:
    """
    基于文件锁的进程安全单例
    
    原理:
        - 使用操作系统文件锁(fcntl)确保跨进程互斥
        - 适用于单机多进程环境
    """
    
    _instance: Optional['FileLockSingleton'] = None
    _lock_file: Optional[int] = None
    
    def __new__(cls, *args: Any, **kwargs: Any):
        if cls._instance is None:
            lock_path = Path("/tmp") / f"{cls.__name__}.lock"
            
            try:
                cls._lock_file = os.open(str(lock_path), os.O_CREAT | os.O_RDWR)
                fcntl.flock(cls._lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
                
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    
            except (IOError, OSError):
                if cls._lock_file is not None:
                    os.close(cls._lock_file)
                raise RuntimeError(f"另一个 {cls.__name__} 实例正在运行")
        
        return cls._instance
    
    def __del__(self):
        if self._lock_file is not None:
            fcntl.flock(self._lock_file, fcntl.LOCK_UN)
            os.close(self._lock_file)


class DistributedSingleton:
    """
    分布式单例(概念实现)
    
    实际生产环境应使用:
        - Redis 分布式锁
        - ZooKeeper 临时节点
        - etcd 选举机制
    """
    
    def __init__(self, redis_client=None, lock_key: str = "singleton:lock"):
        self._redis = redis_client
        self._lock_key = lock_key
        self._lock_token = None
    
    @contextmanager
    def distributed_lock(self, timeout: int = 10):
        """获取分布式锁"""
        import uuid
        token = str(uuid.uuid4())
        
        try:
            if self._redis:
                acquired = self._redis.set(
                    self._lock_key, token, 
                    nx=True, ex=timeout
                )
                if not acquired:
                    raise RuntimeError("获取分布式锁失败")
                self._lock_token = token
            
            yield self
            
        finally:
            if self._redis and self._lock_token:
                self._release_lock()
    
    def _release_lock(self):
        """释放分布式锁(Lua脚本保证原子性)"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self._redis.eval(lua_script, 1, self._lock_key, self._lock_token)

1.4 单例模式的变体与扩展

1.4.1 延迟初始化单例

延迟初始化将实例创建推迟到首次访问时,优化启动性能和内存占用。

python
from typing import Callable, TypeVar, Generic, Optional
import threading

T = TypeVar('T')

class LazySingleton(Generic[T]):
    """
    泛型延迟初始化单例
    
    特性:
        - 类型安全的延迟初始化
        - 支持自定义工厂函数
        - 线程安全的实例创建
    """
    
    def __init__(self, factory: Callable[[], T]):
        self._factory = factory
        self._instance: Optional[T] = None
        self._lock = threading.RLock()
    
    def get(self) -> T:
        if self._instance is None:
            with self._lock:
                if self._instance is None:
                    self._instance = self._factory()
        return self._instance
    
    def is_initialized(self) -> bool:
        return self._instance is not None
    
    def reset(self) -> None:
        with self._lock:
            self._instance = None


class HeavyResource:
    """重量级资源"""
    
    def __init__(self, config: dict):
        self._config = config
        print(f"[HeavyResource] 初始化,配置: {config}")
    
    def process(self, data: str) -> str:
        return f"处理: {data}"


resource_singleton = LazySingleton(lambda: HeavyResource({"timeout": 30}))

print("延迟单例已创建,资源尚未初始化")
print(f"资源初始化状态: {resource_singleton.is_initialized()}")

resource = resource_singleton.get()
print(f"资源初始化状态: {resource_singleton.is_initialized()}")
print(resource.process("测试数据"))

1.4.2 多例模式

多例模式允许存在有限数量的实例,每个实例通过唯一键标识。

python
from typing import Dict, Any, Optional, Callable, TypeVar
import threading
from dataclasses import dataclass

@dataclass
class InstanceInfo:
    """实例信息"""
    instance: Any
    created_at: float
    access_count: int = 0


class Multiton:
    """
    多例模式实现
    
    特性:
        - 按键管理多个单例实例
        - 支持实例上限和淘汰策略
        - 线程安全的实例管理
    """
    
    _instances: Dict[str, InstanceInfo] = {}
    _lock = threading.RLock()
    _max_instances: int = 10
    
    @classmethod
    def get_instance(cls, key: str, factory: Callable[[], Any] = None) -> Any:
        with cls._lock:
            if key in cls._instances:
                cls._instances[key].access_count += 1
                return cls._instances[key].instance
            
            if len(cls._instances) >= cls._max_instances:
                cls._evict_lru()
            
            if factory is None:
                raise ValueError(f"实例 '{key}' 不存在且未提供工厂函数")
            
            import time
            instance = factory()
            cls._instances[key] = InstanceInfo(
                instance=instance,
                created_at=time.time(),
                access_count=1
            )
            return instance
    
    @classmethod
    def _evict_lru(cls) -> None:
        """淘汰最少使用的实例"""
        lru_key = min(cls._instances.keys(), 
                     key=lambda k: cls._instances[k].access_count)
        del cls._instances[lru_key]
        print(f"[Multiton] 淘汰实例: {lru_key}")
    
    @classmethod
    def get_all_keys(cls) -> list:
        return list(cls._instances.keys())
    
    @classmethod
    def get_stats(cls) -> Dict[str, Dict]:
        return {
            key: {
                "created_at": info.created_at,
                "access_count": info.access_count
            }
            for key, info in cls._instances.items()
        }


class DatabaseConnection:
    """数据库连接"""
    
    def __init__(self, db_type: str, host: str):
        self.db_type = db_type
        self.host = host
        print(f"[DatabaseConnection] 创建 {db_type} 连接到 {host}")


mysql = Multiton.get_instance("mysql", lambda: DatabaseConnection("MySQL", "localhost"))
postgres = Multiton.get_instance("postgres", lambda: DatabaseConnection("PostgreSQL", "remote"))
mysql_again = Multiton.get_instance("mysql")

print(f"mysql 与 mysql_again 是同一实例: {mysql is mysql_again}")
print(f"所有实例键: {Multiton.get_all_keys()}")

1.4.3 Borg模式(共享状态单例)

Borg模式允许多个实例存在,但所有实例共享同一状态。

python
from typing import Any, Dict, ClassVar
import threading

class Borg:
    """
    Borg模式基类
    
    原理:
        - 所有实例共享 __dict__ 字典
        - 实例身份不同,但状态相同
        - 比传统单例更灵活
    """
    
    _shared_state: ClassVar[Dict[str, Any]] = {}
    _lock: ClassVar[threading.RLock] = threading.RLock()
    
    def __new__(cls, *args: Any, **kwargs: Any):
        instance = super().__new__(cls)
        instance.__dict__ = cls._shared_state
        return instance


class Configuration(Borg):
    """配置管理器(Borg模式)"""
    
    def __init__(self, env: str = "development"):
        with self._lock:
            if not hasattr(self, '_initialized'):
                self._env = env
                self._settings: Dict[str, Any] = {}
                self._initialized = True
                print(f"[Configuration] 初始化环境: {env}")
    
    def set(self, key: str, value: Any) -> None:
        with self._lock:
            self._settings[key] = value
    
    def get(self, key: str, default: Any = None) -> Any:
        return self._settings.get(key, default)
    
    def get_env(self) -> str:
        return self._env


config1 = Configuration("production")
config1.set("debug", False)
config1.set("max_connections", 100)

config2 = Configuration("development")

print(f"config1 is config2: {config1 is config2}")
print(f"config1.get_env(): {config1.get_env()}")
print(f"config2.get('debug'): {config2.get('debug')}")
print(f"config1._settings is config2._settings: {config1._settings is config2._settings}")

Borg模式与传统单例对比

特性传统单例Borg模式
实例数量严格唯一可多个
状态共享隐式(同一实例)显式(共享字典)
子类支持复杂自然支持
内存模型单对象多对象共享字典
适用场景严格唯一约束状态共享需求

1.5 实际应用案例

1.5.1 企业级日志管理器

python
import logging
import logging.handlers
from typing import Optional, Dict, Any
from pathlib import Path
from datetime import datetime
import threading
import json

class LogManager(metaclass=SingletonMeta):
    """
    企业级日志管理器
    
    特性:
        - 多输出目标(控制台、文件、远程)
        - 结构化日志支持
        - 日志级别动态调整
        - 性能监控集成
    """
    
    def __init__(
        self,
        app_name: str = "Application",
        level: int = logging.INFO,
        log_dir: str = "logs"
    ):
        self._app_name = app_name
        self._level = level
        self._log_dir = Path(log_dir)
        self._log_dir.mkdir(parents=True, exist_ok=True)
        
        self._logger = logging.getLogger(app_name)
        self._logger.setLevel(level)
        self._lock = threading.RLock()
        
        self._setup_handlers()
        self._metrics = {"info": 0, "warning": 0, "error": 0, "debug": 0}
    
    def _setup_handlers(self) -> None:
        """配置日志处理器"""
        if self._logger.handlers:
            return
        
        formatter = logging.Formatter(
            '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.DEBUG)
        console_handler.setFormatter(formatter)
        self._logger.addHandler(console_handler)
        
        log_file = self._log_dir / f"{self._app_name}_{datetime.now():%Y%m%d}.log"
        file_handler = logging.handlers.RotatingFileHandler(
            log_file,
            maxBytes=10 * 1024 * 1024,
            backupCount=5,
            encoding='utf-8'
        )
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
        self._logger.addHandler(file_handler)
    
    def _log(self, level: int, message: str, **context: Any) -> None:
        with self._lock:
            level_name = logging.getLevelName(level).lower()
            self._metrics[level_name] = self._metrics.get(level_name, 0) + 1
            
            if context:
                structured_msg = json.dumps({
                    "message": message,
                    "context": context
                }, ensure_ascii=False)
                self._logger.log(level, structured_msg)
            else:
                self._logger.log(level, message)
    
    def info(self, message: str, **context: Any) -> None:
        self._log(logging.INFO, message, **context)
    
    def warning(self, message: str, **context: Any) -> None:
        self._log(logging.WARNING, message, **context)
    
    def error(self, message: str, exc_info: bool = False, **context: Any) -> None:
        with self._lock:
            self._metrics["error"] = self._metrics.get("error", 0) + 1
            self._logger.error(message, exc_info=exc_info, extra=context)
    
    def debug(self, message: str, **context: Any) -> None:
        self._log(logging.DEBUG, message, **context)
    
    def get_metrics(self) -> Dict[str, int]:
        return self._metrics.copy()
    
    def set_level(self, level: int) -> None:
        self._level = level
        self._logger.setLevel(level)


logger = LogManager("MyApp", logging.DEBUG)
logger.info("应用启动", version="1.0.0", env="production")
logger.warning("配置文件不存在,使用默认配置")
logger.error("数据库连接失败", exc_info=False, host="localhost", port=3306)
print(f"日志统计: {logger.get_metrics()}")

1.5.2 高性能数据库连接池

python
from typing import Optional, Any, Generator, ContextManager
from dataclasses import dataclass, field
from queue import Queue, Empty
from contextlib import contextmanager
import threading
import time
from abc import ABC, abstractmethod

@dataclass
class ConnectionConfig:
    """连接配置"""
    host: str = "localhost"
    port: int = 3306
    database: str = "test"
    username: str = "root"
    password: str = ""
    timeout: float = 30.0
    pool_size: int = 10
    max_overflow: int = 5


class Connection(ABC):
    """连接抽象基类"""
    
    def __init__(self, conn_id: int, config: ConnectionConfig):
        self.id = conn_id
        self.config = config
        self._connected = False
        self._last_used = time.time()
        self._use_count = 0
    
    @abstractmethod
    def connect(self) -> None:
        pass
    
    @abstractmethod
    def disconnect(self) -> None:
        pass
    
    @abstractmethod
    def execute(self, query: str) -> Any:
        pass
    
    def is_alive(self) -> bool:
        return self._connected
    
    def mark_used(self) -> None:
        self._last_used = time.time()
        self._use_count += 1


class MySQLConnection(Connection):
    """MySQL连接实现"""
    
    def connect(self) -> None:
        print(f"[MySQL-{self.id}] 连接到 {self.config.host}:{self.config.port}")
        self._connected = True
    
    def disconnect(self) -> None:
        print(f"[MySQL-{self.id}] 断开连接")
        self._connected = False
    
    def execute(self, query: str) -> str:
        if not self._connected:
            raise RuntimeError("连接未建立")
        self.mark_used()
        return f"[MySQL-{self.id}] 执行: {query}"


class ConnectionPool(metaclass=SingletonMeta):
    """
    高性能数据库连接池
    
    特性:
        - 单例模式确保全局唯一连接池
        - 连接复用减少创建开销
        - 动态扩容支持突发流量
        - 连接健康检查与自动回收
    """
    
    def __init__(self, config: ConnectionConfig = None):
        if hasattr(self, '_initialized') and self._initialized:
            return
        
        self._config = config or ConnectionConfig()
        self._pool: Queue = Queue(maxsize=self._config.pool_size + self._config.max_overflow)
        self._lock = threading.RLock()
        self._connection_count = 0
        self._active_connections: set = set()
        self._stats = {
            "created": 0,
            "borrowed": 0,
            "returned": 0,
            "overflow": 0
        }
        
        self._initialize_pool()
        self._start_health_checker()
        self._initialized = True
    
    def _initialize_pool(self) -> None:
        """初始化连接池"""
        for i in range(self._config.pool_size):
            conn = self._create_connection(i)
            self._pool.put(conn)
    
    def _create_connection(self, conn_id: int = None) -> Connection:
        """创建新连接"""
        with self._lock:
            if conn_id is None:
                self._connection_count += 1
                conn_id = self._connection_count
            
            conn = MySQLConnection(conn_id, self._config)
            conn.connect()
            self._stats["created"] += 1
            return conn
    
    def get_connection(self, timeout: float = None) -> Optional[Connection]:
        """获取连接"""
        timeout = timeout or self._config.timeout
        
        try:
            conn = self._pool.get(block=True, timeout=timeout)
            if not conn.is_alive():
                conn = self._create_connection(conn.id)
            
            with self._lock:
                self._active_connections.add(conn.id)
                self._stats["borrowed"] += 1
            
            return conn
            
        except Empty:
            if self._connection_count < self._config.pool_size + self._config.max_overflow:
                with self._lock:
                    conn = self._create_connection()
                    self._active_connections.add(conn.id)
                    self._stats["overflow"] += 1
                    self._stats["borrowed"] += 1
                return conn
            
            raise RuntimeError("连接池已满,无法获取连接")
    
    def release_connection(self, conn: Connection) -> None:
        """释放连接"""
        with self._lock:
            self._active_connections.discard(conn.id)
            self._stats["returned"] += 1
        
        if conn.is_alive():
            self._pool.put(conn)
        else:
            with self._lock:
                self._connection_count -= 1
    
    @contextmanager
    def connection(self, timeout: float = None) -> Generator[Optional[Connection], None, None]:
        """上下文管理器方式获取连接"""
        conn = None
        try:
            conn = self.get_connection(timeout)
            yield conn
        finally:
            if conn:
                self.release_connection(conn)
    
    def _start_health_checker(self) -> None:
        """启动健康检查线程"""
        def health_check():
            while True:
                time.sleep(60)
                self._check_connections()
        
        thread = threading.Thread(target=health_check, daemon=True)
        thread.start()
    
    def _check_connections(self) -> None:
        """检查连接健康状态"""
        temp_conns = []
        while not self._pool.empty():
            try:
                conn = self._pool.get_nowait()
                if conn.is_alive() and time.time() - conn._last_used < 300:
                    temp_conns.append(conn)
                else:
                    conn.disconnect()
                    with self._lock:
                        self._connection_count -= 1
            except Empty:
                break
        
        for conn in temp_conns:
            self._pool.put(conn)
    
    def get_stats(self) -> dict:
        return {
            **self._stats,
            "pool_size": self._pool.qsize(),
            "active": len(self._active_connections),
            "total_created": self._connection_count
        }


pool = ConnectionPool(ConnectionConfig(pool_size=3, max_overflow=2))

with pool.connection() as conn:
    if conn:
        result = conn.execute("SELECT * FROM users")
        print(result)

print(f"连接池统计: {pool.get_stats()}")

1.6 反模式与最佳实践

1.6.1 常见反模式

反模式1:滥用单例

python
class UserService:
    """错误示例:用户服务不应是单例"""
    _instance = None
    
    def __init__(self):
        self._current_user = None
    
    def set_user(self, user: dict):
        self._current_user = user
    
    def get_user(self):
        return self._current_user


service1 = UserService()
service1.set_user({"id": 1, "name": "Alice"})

service2 = UserService()
print(service2.get_user())

问题分析:用户服务代表当前用户上下文,不同请求应拥有独立实例。使用单例会导致状态污染。

正确做法

python
class UserService:
    """正确示例:每次请求创建新实例"""
    
    def __init__(self, user: dict = None):
        self._user = user
    
    def set_user(self, user: dict):
        self._user = user
    
    def get_user(self):
        return self._user


def handle_request(user_id: int):
    user = fetch_user(user_id)
    service = UserService(user)
    return service.process()

反模式2:单例中持有可变状态

python
class BadCache:
    """错误示例:单例中持有大量可变状态"""
    
    def __init__(self):
        self._data = {}
    
    def set(self, key, value):
        self._data[key] = value
    
    def get(self, key):
        return self._data.get(key)
    
    def clear(self):
        self._data.clear()

问题分析:单例的生命周期与应用相同,长期持有可变状态会导致内存泄漏和数据不一致。

正确做法

python
class GoodCache:
    """正确示例:限制缓存大小,提供过期机制"""
    
    def __init__(self, max_size: int = 1000, ttl: int = 3600):
        self._data = {}
        self._timestamps = {}
        self._max_size = max_size
        self._ttl = ttl
        self._lock = threading.RLock()
    
    def set(self, key, value):
        with self._lock:
            if len(self._data) >= self._max_size:
                self._evict_expired()
                if len(self._data) >= self._max_size:
                    self._evict_lru()
            
            self._data[key] = value
            self._timestamps[key] = time.time()
    
    def get(self, key):
        with self._lock:
            if key in self._data:
                if time.time() - self._timestamps[key] > self._ttl:
                    del self._data[key]
                    del self._timestamps[key]
                    return None
                return self._data[key]
            return None
    
    def _evict_expired(self):
        current = time.time()
        expired = [k for k, t in self._timestamps.items() 
                   if current - t > self._ttl]
        for k in expired:
            del self._data[k]
            del self._timestamps[k]
    
    def _evict_lru(self):
        lru_key = min(self._timestamps, key=self._timestamps.get)
        del self._data[lru_key]
        del self._timestamps[lru_key]

1.6.2 测试策略

单例模式在测试中常带来挑战,需要特殊处理:

python
import unittest
import threading

class SingletonTestCase(unittest.TestCase):
    """单例测试基类"""
    
    def setUp(self):
        """每个测试前重置单例"""
        Singleton.reset()
    
    def tearDown(self):
        """测试后清理"""
        Singleton.reset()


class TestSingleton(SingletonTestCase):
    
    def test_single_instance(self):
        s1 = Singleton()
        s2 = Singleton()
        self.assertIs(s1, s2)
    
    def test_thread_safety(self):
        instances = []
        
        def create_instance():
            instances.append(Singleton())
        
        threads = [threading.Thread(target=create_instance) for _ in range(10)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        
        first = instances[0]
        for instance in instances[1:]:
            self.assertIs(first, instance)


class TestableSingleton:
    """可测试的单例实现"""
    
    _instance = None
    _lock = threading.RLock()
    
    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance
    
    @classmethod
    def reset(cls):
        """重置单例(仅用于测试)"""
        with cls._lock:
            cls._instance = None
    
    @classmethod
    def inject_instance(cls, instance):
        """注入测试替身"""
        with cls._lock:
            cls._instance = instance

1.6.3 依赖注入替代方案

在现代软件架构中,依赖注入(Dependency Injection)通常是更好的选择:

python
from typing import Protocol, runtime_checkable

@runtime_checkable
class ILogger(Protocol):
    """日志接口协议"""
    
    def info(self, message: str) -> None: ...
    def error(self, message: str) -> None: ...


class ConsoleLogger:
    """控制台日志实现"""
    
    def info(self, message: str) -> None:
        print(f"[INFO] {message}")
    
    def error(self, message: str) -> None:
        print(f"[ERROR] {message}")


class FileLogger:
    """文件日志实现"""
    
    def __init__(self, file_path: str):
        self._file_path = file_path
    
    def info(self, message: str) -> None:
        with open(self._file_path, 'a') as f:
            f.write(f"[INFO] {message}\n")
    
    def error(self, message: str) -> None:
        with open(self._file_path, 'a') as f:
            f.write(f"[ERROR] {message}\n")


class UserService:
    """用户服务(依赖注入方式)"""
    
    def __init__(self, logger: ILogger):
        self._logger = logger
    
    def create_user(self, username: str) -> None:
        self._logger.info(f"创建用户: {username}")


class ServiceContainer:
    """服务容器(简单的IoC容器)"""
    
    def __init__(self):
        self._services = {}
        self._singletons = {}
    
    def register_singleton(self, interface: type, implementation: type) -> None:
        self._services[interface] = ('singleton', implementation)
    
    def register_transient(self, interface: type, implementation: type) -> None:
        self._services[interface] = ('transient', implementation)
    
    def resolve(self, interface: type) -> Any:
        if interface not in self._services:
            raise ValueError(f"服务 {interface} 未注册")
        
        scope, impl = self._services[interface]
        
        if scope == 'singleton':
            if interface not in self._singletons:
                self._singletons[interface] = impl()
            return self._singletons[interface]
        else:
            return impl()


container = ServiceContainer()
container.register_singleton(ILogger, ConsoleLogger)

logger = container.resolve(ILogger)
user_service = UserService(logger)
user_service.create_user("Alice")

1.7 模式评估与决策指南

1.7.1 适用性检查清单

使用以下检查清单评估是否应使用单例模式:

检查项说明
资源是否具有物理唯一性?✓ 使用✗ 评估如设备驱动、端口
是否需要全局协调?✓ 使用✗ 评估如事件总线、调度器
实例创建开销是否很大?✓ 使用✗ 评估如重量级配置加载
是否需要延迟初始化?✓ 使用✗ 评估按需创建优化启动
是否需要严格控制实例数量?✓ 使用✗ 评估如许可证限制
是否在测试中需要隔离?✗ 评估✓ 使用考虑依赖注入

1.7.2 替代方案对比

方案优点缺点适用场景
单例模式简单、全局访问测试困难、隐藏依赖真正的全局资源
模块级变量最简单不够灵活简单配置
依赖注入可测试、灵活增加复杂度企业级应用
服务定位器解耦仍是全局状态框架开发
Borg模式灵活继承状态共享不明显需要多实例但共享状态

1.8 小结

单例模式作为最简单却也最易滥用的设计模式,其核心价值在于控制实例数量提供全局访问点。在Python中,实现方式多样,各有优劣:

┌─────────────────────────────────────────────────────────────────┐
│                    单例实现选择决策树                            │
│                                                                 │
│                    需要单例吗?                                  │
│                         │                                       │
│              ┌──────────┴──────────┐                            │
│              ▼                     ▼                            │
│            是的                   不确定                         │
│              │                     │                            │
│              ▼                     ▼                            │
│        需要继承吗?           考虑依赖注入                        │
│              │                                                  │
│      ┌───────┴───────┐                                          │
│      ▼               ▼                                          │
│     是              否                                           │
│      │               │                                          │
│      ▼               ▼                                          │
│   元类实现      模块级单例                                        │
│   __new__实现  装饰器实现                                        │
└─────────────────────────────────────────────────────────────────┘

关键要点

  1. 谨慎使用:单例模式应仅用于真正需要全局唯一实例的场景
  2. 线程安全:多线程环境必须使用双重检查锁定或元类实现
  3. 可测试性:提供重置机制,或考虑依赖注入替代
  4. 文档清晰:明确说明单例的生命周期和线程安全特性
  5. 遵循原则:避免单例类承担过多职责

在下一章中,我们将探讨工厂方法模式,学习如何在不指定具体类的情况下创建对象。

Python技术丛书 - 江苏省宿城中等专业学校