Skip to content

性能优化模式

学习目标

  • 理解Python性能优化的核心原则和数学基础
  • 掌握缓存、惰性加载、对象池等优化模式的形式化定义
  • 学会使用性能分析工具定位瓶颈并进行量化分析
  • 了解不同场景下的优化策略选择与权衡分析

历史背景与理论基础

性能优化的发展历程

性能优化的历史可以追溯到计算机科学的早期:

时期里程碑代表性贡献
1960s算法分析Knuth《计算机程序设计艺术》,Big-O记号标准化
1970s数据结构优化Aho-Hopcroft-Ullman算法与数据结构理论
1980s缓存理论CPU缓存层次结构,局部性原理形式化
1990s并发优化多线程编程模型,锁优化理论
2000sWeb性能CDN缓存,数据库连接池,对象关系映射优化
2010s大数据性能MapReduce,内存计算,流处理优化
2020s云原生性能容器化优化,服务网格,边缘计算

Python性能演进

1991 ──────────────────────────────────────────────────► 2024
  │                                                      │
  ├─ Python 1.0: 解释执行,基础优化
  ├─ Python 2.0: 列表推导,迭代器协议
  ├─ Python 2.4: decorator语法,@lru_cache
  ├─ Python 3.0: 更高效的unicode处理
  ├─ Python 3.4: asyncio异步IO
  ├─ Python 3.7: dataclass,更快的字典实现
  ├─ Python 3.10: 模式匹配优化
  └─ Python 3.11: 显著性能提升(10-60%)

形式化定义

性能系统定义

定义32.1(性能系统) 性能优化系统是一个五元组:

$$\mathcal{P} = \langle \mathcal{R}, \mathcal{C}, \mathcal{O}, \mathcal{M}, \mathcal{S} \rangle$$

其中:

  • $\mathcal{R}$:资源集合(CPU、内存、IO、网络)
  • $\mathcal{C}$:约束条件(时间限制、内存限制、吞吐量要求)
  • $\mathcal{O}$:优化操作集合(缓存、并行、惰性加载等)
  • $\mathcal{M}$:度量函数 $M: Operation \rightarrow \mathbb{R}^+$
  • $\mathcal{S}$:状态空间,描述系统当前性能状态

定义32.2(时间复杂度) 算法的时间复杂度 $T(n)$ 定义为:

$$T(n) = \Theta(f(n)) \iff \exists c_1, c_2, n_0 > 0: \forall n \geq n_0, c_1 \cdot f(n) \leq T(n) \leq c_2 \cdot f(n)$$

定义32.3(空间复杂度) 算法的空间复杂度 $S(n)$ 定义为:

$$S(n) = \text{max}{space(p) : p \text{ is a point in execution}}$$

缓存理论

定义32.4(缓存命中率) 缓存命中率 $h$ 定义为:

$$h = \frac{\text{cache hits}}{\text{total accesses}} = \frac{N_{hit}}{N_{hit} + N_{miss}}$$

定义32.5(缓存效率) 缓存效率 $E$ 考虑命中时间和未命中惩罚:

$$E = h \cdot t_{hit} + (1-h) \cdot t_{miss}$$

定理32.1(缓存最优性) 对于固定大小的缓存,LRU策略在竞争比意义下是最优的:

$$\frac{C_{LRU}}{C_{OPT}} \leq k$$

其中 $k$ 是缓存大小,$C$ 是缓存未命中次数。

惰性求值理论

定义32.6(惰性求值) 惰性求值是一个求值策略:

$$eval: Expression \rightarrow (Environment \rightarrow Value)$$

满足:表达式仅在需要其值时才被求值。

定义32.7(Thunk) Thunk是延迟计算的基本单元:

$$Thunk = \langle f, \sigma, forced \rangle$$

其中 $f$ 是待执行函数,$\sigma$ 是环境,$forced$ 表示是否已求值。

对象池理论

定义32.8(对象池) 对象池是一个资源管理系统:

$$Pool = \langle C, A, F, V \rangle$$

其中:

  • $C$:创建函数 $create: \emptyset \rightarrow Object$
  • $A$:可用对象集合 $Available \subseteq Object$
  • $F$:获取/释放操作 $acquire, release$
  • $V$:验证函数 $validate: Object \rightarrow {True, False}$

性能优化层次架构

┌─────────────────────────────────────────────────────────────────┐
│                    性能优化层次                                  │
├─────────────────────────────────────────────────────────────────┤
│  算法层        │  时间复杂度O(f(n)) │ 空间复杂度 │ 数据结构选择   │
├─────────────────────────────────────────────────────────────────┤
│  架构层        │  缓存 │ 异步 │ 并行 │ 惰性加载              │
├─────────────────────────────────────────────────────────────────┤
│  代码层        │  内置函数 │ 生成器 │ 列表推导               │
├─────────────────────────────────────────────────────────────────┤
│  内存层        │  对象池 │ __slots__ │ 弱引用                │
├─────────────────────────────────────────────────────────────────┤
│  工具层        │  Cython │ PyPy │ numba │ C扩展               │
└─────────────────────────────────────────────────────────────────┘

缓存模式

UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                           缓存模式类图                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐         ┌─────────────────┐                       │
│  │ <<interface>>   │         │  CacheEntry     │                       │
│  │ CacheStrategy   │         ├─────────────────┤                       │
│  ├─────────────────┤         │ - value: Any    │                       │
│  │ + get(key): V   │         │ - timestamp: float│                      │
│  │ + put(key, val) │         │ - ttl: float    │                       │
│  │ + remove(key)   │         │ - hits: int     │                       │
│  └────────┬────────┘         └─────────────────┘                       │
│           │                                                             │
│     ┌─────┴─────┬─────────────┐                                        │
│     │           │             │                                        │
│  ┌──▼───┐   ┌───▼───┐   ┌────▼────┐                                   │
│  │ LRU  │   │  LFU  │   │  FIFO  │                                   │
│  │Cache │   │ Cache │   │ Cache  │                                   │
│  └──────┘   └───────┘   └────────┘                                   │
│                                                                         │
│  ┌─────────────────────────────────────────────────────────┐           │
│  │                 MultiLevelCache                         │           │
│  ├─────────────────────────────────────────────────────────┤           │
│  │ - levels: List[MemoryCache]                             │           │
│  │ + get(key): Any                                         │           │
│  │ + set(key, value, ttl)                                  │           │
│  │ + get_stats(): List[Dict]                               │           │
│  └─────────────────────────────────────────────────────────┘           │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

函数结果缓存

python
from typing import Callable, Any, Dict, Tuple, Optional, TypeVar, ParamSpec, Generic, List
from functools import wraps, lru_cache
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import time
import hashlib
import pickle
import threading
from collections import OrderedDict
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor, Future
import gc

P = ParamSpec('P')
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

@dataclass
class CacheEntry(Generic[V]):
    value: V
    timestamp: float
    ttl: float
    hits: int = 0
    size: int = 0
    
    def is_expired(self) -> bool:
        return time.time() - self.timestamp >= self.ttl

@dataclass
class CacheStats:
    hits: int = 0
    misses: int = 0
    evictions: int = 0
    size: int = 0
    max_size: int = 0
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0.0
    
    @property
    def miss_rate(self) -> float:
        return 1.0 - self.hit_rate

class CacheStrategy(ABC, Generic[K, V]):
    @abstractmethod
    def get(self, key: K) -> Optional[V]:
        pass
    
    @abstractmethod
    def put(self, key: K, value: V) -> Optional[K]:
        """Returns evicted key if any"""
        pass
    
    @abstractmethod
    def remove(self, key: K) -> bool:
        pass
    
    @abstractmethod
    def clear(self) -> None:
        pass
    
    @abstractmethod
    def size(self) -> int:
        pass

class LRUCache(CacheStrategy[K, V]):
    def __init__(self, capacity: int):
        if capacity <= 0:
            raise ValueError("Capacity must be positive")
        self._capacity = capacity
        self._cache: OrderedDict[K, V] = OrderedDict()
        self._lock = threading.RLock()
    
    def get(self, key: K) -> Optional[V]:
        with self._lock:
            if key in self._cache:
                self._cache.move_to_end(key)
                return self._cache[key]
            return None
    
    def put(self, key: K, value: V) -> Optional[K]:
        evicted_key = None
        with self._lock:
            if key in self._cache:
                self._cache.move_to_end(key)
            else:
                if len(self._cache) >= self._capacity:
                    evicted_key, _ = self._cache.popitem(last=False)
            self._cache[key] = value
        return evicted_key
    
    def remove(self, key: K) -> bool:
        with self._lock:
            if key in self._cache:
                del self._cache[key]
                return True
            return False
    
    def clear(self) -> None:
        with self._lock:
            self._cache.clear()
    
    def size(self) -> int:
        with self._lock:
            return len(self._cache)

class LFUCache(CacheStrategy[K, V]):
    def __init__(self, capacity: int):
        if capacity <= 0:
            raise ValueError("Capacity must be positive")
        self._capacity = capacity
        self._cache: Dict[K, V] = {}
        self._freq: Dict[K, int] = {}
        self._lock = threading.RLock()
    
    def get(self, key: K) -> Optional[V]:
        with self._lock:
            if key in self._cache:
                self._freq[key] = self._freq.get(key, 0) + 1
                return self._cache[key]
            return None
    
    def put(self, key: K, value: V) -> Optional[K]:
        evicted_key = None
        with self._lock:
            if key not in self._cache and len(self._cache) >= self._capacity:
                evicted_key = min(self._freq, key=self._freq.get)
                del self._cache[evicted_key]
                del self._freq[evicted_key]
            
            self._cache[key] = value
            self._freq[key] = self._freq.get(key, 0) + 1
        return evicted_key
    
    def remove(self, key: K) -> bool:
        with self._lock:
            if key in self._cache:
                del self._cache[key]
                del self._freq[key]
                return True
            return False
    
    def clear(self) -> None:
        with self._lock:
            self._cache.clear()
            self._freq.clear()
    
    def size(self) -> int:
        with self._lock:
            return len(self._cache)

class ARCach(CacheStrategy[K, V]):
    """Adaptive Replacement Cache - adapts between LRU and LFU"""
    
    def __init__(self, capacity: int):
        if capacity <= 0:
            raise ValueError("Capacity must be positive")
        self._capacity = capacity
        self._p = 0
        self._t1: OrderedDict[K, V] = OrderedDict()
        self._t2: OrderedDict[K, V] = OrderedDict()
        self._b1: OrderedDict[K, None] = OrderedDict()
        self._b2: OrderedDict[K, None] = OrderedDict()
        self._lock = threading.RLock()
    
    def get(self, key: K) -> Optional[V]:
        with self._lock:
            if key in self._t1:
                self._t1.move_to_end(key)
                self._t2[key] = self._t1.pop(key)
                return self._t2[key]
            elif key in self._t2:
                self._t2.move_to_end(key)
                return self._t2[key]
            return None
    
    def put(self, key: K, value: V) -> Optional[K]:
        evicted_key = None
        with self._lock:
            if key in self._t1 or key in self._t2:
                if key in self._t1:
                    self._t1.move_to_end(key)
                    self._t2[key] = self._t1.pop(key)
                else:
                    self._t2.move_to_end(key)
                self._t2[key] = value
            elif key in self._b1:
                self._adapt(self._b1, self._b2)
                self._b1.pop(key, None)
                evicted_key = self._replace(key)
                self._t2[key] = value
            elif key in self._b2:
                self._adapt(self._b2, self._b1)
                self._b2.pop(key, None)
                evicted_key = self._replace(key)
                self._t2[key] = value
            else:
                total = len(self._t1) + len(self._b1)
                if total >= self._capacity:
                    if len(self._t1) < self._capacity:
                        if self._b1:
                            self._b1.popitem(last=False)
                        evicted_key = self._replace(key)
                    else:
                        if self._t1:
                            evicted_key, _ = self._t1.popitem(last=False)
                self._t1[key] = value
        return evicted_key
    
    def _adapt(self, hit: OrderedDict, other: OrderedDict):
        delta = max(1, len(other) // max(1, len(hit)))
        self._p = min(self._capacity, self._p + delta)
    
    def _replace(self, key: K) -> Optional[K]:
        if self._t1 and (len(self._t1) > self._p or 
                         (key in self._b2 and len(self._t1) == self._p)):
            evicted_key, value = self._t1.popitem(last=False)
            self._b1[evicted_key] = None
            return evicted_key
        elif self._t2:
            evicted_key, value = self._t2.popitem(last=False)
            self._b2[evicted_key] = None
            return evicted_key
        return None
    
    def remove(self, key: K) -> bool:
        with self._lock:
            removed = False
            for cache in [self._t1, self._t2]:
                if key in cache:
                    del cache[key]
                    removed = True
            for ghost in [self._b1, self._b2]:
                if key in ghost:
                    del ghost[key]
            return removed
    
    def clear(self) -> None:
        with self._lock:
            self._t1.clear()
            self._t2.clear()
            self._b1.clear()
            self._b2.clear()
    
    def size(self) -> int:
        with self._lock:
            return len(self._t1) + len(self._t2)

class MemoryCache(Generic[K, V]):
    def __init__(self, default_ttl: float = 300.0, max_size: int = 1000,
                 strategy: str = 'lru'):
        self._strategy: CacheStrategy[K, CacheEntry[V]]
        if strategy == 'lru':
            self._strategy = LRUCache(max_size)
        elif strategy == 'lfu':
            self._strategy = LFUCache(max_size)
        elif strategy == 'arc':
            self._strategy = ARCach(max_size)
        else:
            raise ValueError(f"Unknown strategy: {strategy}")
        
        self._default_ttl = default_ttl
        self._stats = CacheStats(max_size=max_size)
        self._lock = threading.RLock()
    
    def get(self, key: K) -> Optional[V]:
        with self._lock:
            entry = self._strategy.get(key)
            if entry is not None:
                if not entry.is_expired():
                    entry.hits += 1
                    self._stats.hits += 1
                    return entry.value
                else:
                    self._strategy.remove(key)
            self._stats.misses += 1
            return None
    
    def set(self, key: K, value: V, ttl: float = None):
        with self._lock:
            entry = CacheEntry(
                value=value,
                timestamp=time.time(),
                ttl=ttl or self._default_ttl
            )
            evicted = self._strategy.put(key, entry)
            if evicted:
                self._stats.evictions += 1
            self._stats.size = self._strategy.size()
    
    def delete(self, key: K) -> bool:
        with self._lock:
            result = self._strategy.remove(key)
            self._stats.size = self._strategy.size()
            return result
    
    def clear(self):
        with self._lock:
            self._strategy.clear()
            self._stats.size = 0
    
    def get_stats(self) -> CacheStats:
        with self._lock:
            return CacheStats(
                hits=self._stats.hits,
                misses=self._stats.misses,
                evictions=self._stats.evictions,
                size=self._stats.size,
                max_size=self._stats.max_size
            )

def cached(cache: MemoryCache = None, key_func: Callable = None, 
          ttl: float = None):
    if cache is None:
        cache = MemoryCache()
    
    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        @wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            if key_func:
                key = key_func(*args, **kwargs)
            else:
                key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
            
            result = cache.get(key)
            if result is not None:
                return result
            
            result = func(*args, **kwargs)
            cache.set(key, result, ttl)
            return result
        
        wrapper.cache = cache
        wrapper.cache_clear = cache.clear
        wrapper.cache_stats = cache.get_stats
        return wrapper
    
    return decorator

class MultiLevelCache(Generic[K, V]):
    def __init__(self, levels: List[Tuple[int, float]] = None):
        if levels is None:
            levels = [(100, 60.0), (1000, 300.0), (10000, 3600.0)]
        
        self._levels: List[MemoryCache[K, V]] = [
            MemoryCache(max_size=size, default_ttl=ttl)
            for size, ttl in levels
        ]
        self._lock = threading.RLock()
    
    def get(self, key: K) -> Optional[V]:
        with self._lock:
            for i, cache in enumerate(self._levels):
                value = cache.get(key)
                if value is not None:
                    for j in range(i):
                        self._levels[j].set(key, value)
                    return value
        return None
    
    def set(self, key: K, value: V, ttl: float = None):
        with self._lock:
            for cache in self._levels:
                cache.set(key, value, ttl)
    
    def get_stats(self) -> List[CacheStats]:
        return [cache.get_stats() for cache in self._levels]

@lru_cache(maxsize=128)
def fibonacci_lru(n: int) -> int:
    if n < 2:
        return n
    return fibonacci_lru(n - 1) + fibonacci_lru(n - 2)

@cached(ttl=60.0)
def expensive_computation(n: int) -> int:
    print(f"计算中... n={n}")
    time.sleep(0.1)
    return n * n

print(f"fibonacci(30) = {fibonacci_lru(30)}")
print(f"缓存信息: {fibonacci_lru.cache_info()}")

print(expensive_computation(10))
print(expensive_computation(10))
print(f"缓存统计: {expensive_computation.cache_stats()}")

数据缓存模式

python
class ReadThroughCache(Generic[K, V]):
    def __init__(self, loader: Callable[[K], V], 
                 strategy: CacheStrategy[K, V],
                 ttl: float = 300.0):
        self._loader = loader
        self._strategy = strategy
        self._ttl = ttl
        self._entries: Dict[K, CacheEntry[V]] = {}
        self._lock = threading.RLock()
    
    def get(self, key: K) -> V:
        with self._lock:
            entry = self._entries.get(key)
            if entry and not entry.is_expired():
                entry.hits += 1
                return entry.value
        
        value = self._loader(key)
        
        with self._lock:
            self._strategy.put(key, value)
            self._entries[key] = CacheEntry(
                value=value,
                timestamp=time.time(),
                ttl=self._ttl
            )
        return value
    
    def invalidate(self, key: K):
        with self._lock:
            self._strategy.remove(key)
            self._entries.pop(key, None)
    
    def refresh(self, key: K) -> V:
        self.invalidate(key)
        return self.get(key)

class WriteThroughCache(Generic[K, V]):
    def __init__(self, reader: Callable[[K], V],
                 writer: Callable[[K, V], None],
                 strategy: CacheStrategy[K, V]):
        self._reader = reader
        self._writer = writer
        self._strategy = strategy
        self._lock = threading.RLock()
    
    def get(self, key: K) -> Optional[V]:
        return self._strategy.get(key)
    
    def put(self, key: K, value: V):
        with self._lock:
            self._writer(key, value)
            self._strategy.put(key, value)
    
    def invalidate(self, key: K):
        self._strategy.remove(key)

class WriteBehindCache(Generic[K, V]):
    def __init__(self, writer: Callable[[K, V], None],
                 strategy: CacheStrategy[K, V],
                 flush_interval: float = 5.0,
                 max_batch_size: int = 100):
        self._writer = writer
        self._strategy = strategy
        self._flush_interval = flush_interval
        self._max_batch_size = max_batch_size
        self._write_queue: Dict[K, V] = {}
        self._lock = threading.RLock()
        self._running = True
        self._start_flusher()
    
    def get(self, key: K) -> Optional[V]:
        return self._strategy.get(key)
    
    def put(self, key: K, value: V):
        with self._lock:
            self._strategy.put(key, value)
            self._write_queue[key] = value
    
    def _start_flusher(self):
        def flusher():
            while self._running:
                time.sleep(self._flush_interval)
                self._flush()
        
        thread = threading.Thread(target=flusher, daemon=True)
        thread.start()
    
    def _flush(self):
        with self._lock:
            if not self._write_queue:
                return
            
            batch = list(self._write_queue.items())[:self._max_batch_size]
            for key, value in batch:
                try:
                    self._writer(key, value)
                    del self._write_queue[key]
                except Exception as e:
                    print(f"写入失败: {key}, 错误: {e}")
    
    def shutdown(self):
        self._running = False
        self._flush()

class CacheAsidePattern(Generic[K, V]):
    def __init__(self, loader: Callable[[K], V],
                 writer: Callable[[K, V], None],
                 deleter: Callable[[K], None],
                 cache: CacheStrategy[K, V]):
        self._loader = loader
        self._writer = writer
        self._deleter = deleter
        self._cache = cache
        self._lock = threading.RLock()
    
    def get(self, key: K) -> V:
        value = self._cache.get(key)
        if value is not None:
            return value
        
        value = self._loader(key)
        with self._lock:
            self._cache.put(key, value)
        return value
    
    def put(self, key: K, value: V):
        with self._lock:
            self._writer(key, value)
            self._cache.put(key, value)
    
    def delete(self, key: K):
        with self._lock:
            self._deleter(key)
            self._cache.remove(key)

def load_user(user_id: int) -> dict:
    print(f"从数据库加载用户 {user_id}")
    return {"id": user_id, "name": f"用户{user_id}"}

lru_strategy = LRUCache[int, dict](capacity=100)
user_cache = ReadThroughCache(loader=load_user, strategy=lru_strategy)

print(user_cache.get(1))
print(user_cache.get(1))

惰性加载模式

UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                         惰性加载模式类图                                 │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐         ┌─────────────────┐                       │
│  │    Lazy<T>      │         │    Thunk        │                       │
│  ├─────────────────┤         ├─────────────────┤                       │
│  │ - _factory: Func│         │ - _func: Func   │                       │
│  │ - _value: T     │         │ - _forced: bool │                       │
│  │ - _initialized  │         │ - _value: Any   │                       │
│  ├─────────────────┤         ├─────────────────┤                       │
│  │ + get(): T      │         │ + force(): Any  │                       │
│  │ + is_init():bool│         │ + is_forced()   │                       │
│  │ + reset()       │         └─────────────────┘                       │
│  └─────────────────┘                                                   │
│                                                                         │
│  ┌─────────────────┐         ┌─────────────────┐                       │
│  │ LazyAttribute   │         │  VirtualProxy   │                       │
│  ├─────────────────┤         ├─────────────────┤                       │
│  │ - factory: Func │         │ - _factory: Func│                       │
│  │ - name: str     │         │ - _subject: Any │                       │
│  ├─────────────────┤         ├─────────────────┤                       │
│  │ + __get__()     │         │ + __getattr__() │                       │
│  └─────────────────┘         │ + _get_subject()│                       │
│                              └─────────────────┘                       │
│                                                                         │
│  ┌─────────────────────────────────────────────────────────┐           │
│  │                 LazySequence<T>                         │           │
│  ├─────────────────────────────────────────────────────────┤           │
│  │ - _iterable: Iterator<T>                                │           │
│  │ - _cache: List<T>                                       │           │
│  │ - _exhausted: bool                                      │           │
│  ├─────────────────────────────────────────────────────────┤           │
│  │ + __iter__(): Generator<T>                              │           │
│  │ + __getitem__(index): T                                 │           │
│  │ + __len__(): int                                        │           │
│  └─────────────────────────────────────────────────────────┘           │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

惰性初始化

python
from typing import Callable, Any, TypeVar, Generic, Optional, Iterable, Generator
from dataclasses import dataclass, field
from functools import wraps
import threading

T = TypeVar('T')

class Lazy(Generic[T]):
    def __init__(self, factory: Callable[[], T]):
        self._factory = factory
        self._value: Optional[T] = None
        self._initialized = False
        self._lock = threading.RLock()
        self._exception: Optional[Exception] = None
    
    def get(self) -> T:
        if not self._initialized:
            with self._lock:
                if not self._initialized:
                    try:
                        self._value = self._factory()
                    except Exception as e:
                        self._exception = e
                        raise
                    finally:
                        self._initialized = True
        
        if self._exception:
            raise self._exception
        
        return self._value
    
    def is_initialized(self) -> bool:
        return self._initialized
    
    def reset(self):
        with self._lock:
            self._value = None
            self._initialized = False
            self._exception = None
    
    def map(self, func: Callable[[T], Any]) -> 'Lazy':
        return Lazy(lambda: func(self.get()))
    
    def flat_map(self, func: Callable[[T], 'Lazy']) -> 'Lazy':
        return Lazy(lambda: func(self.get()).get())
    
    def __repr__(self) -> str:
        if self._initialized:
            return f"Lazy(value={self._value!r})"
        return f"Lazy(<not computed>)"

class LazyAttribute:
    def __init__(self, factory: Callable):
        self.factory = factory
        self._name = None
    
    def __set_name__(self, owner, name):
        self._name = name
    
    def __get__(self, obj, owner=None):
        if obj is None:
            return self
        
        if self._name not in obj.__dict__:
            obj.__dict__[self._name] = self.factory(obj)
        
        return obj.__dict__[self._name]
    
    def __delete__(self, obj):
        if self._name in obj.__dict__:
            del obj.__dict__[self._name]

class LazyDict(Generic[K, V]):
    def __init__(self):
        self._data: Dict[K, Lazy[V]] = {}
        self._lock = threading.RLock()
    
    def __setitem__(self, key: K, factory: Callable[[], V]):
        with self._lock:
            if isinstance(factory, Lazy):
                self._data[key] = factory
            else:
                self._data[key] = Lazy(factory)
    
    def __getitem__(self, key: K) -> V:
        with self._lock:
            if key not in self._data:
                raise KeyError(key)
            return self._data[key].get()
    
    def __contains__(self, key: K) -> bool:
        return key in self._data
    
    def get(self, key: K, default: V = None) -> Optional[V]:
        if key in self._data:
            return self._data[key].get()
        return default
    
    def is_loaded(self, key: K) -> bool:
        with self._lock:
            return key in self._data and self._data[key].is_initialized()

class VirtualProxy:
    def __init__(self, subject_factory: Callable):
        self._factory = subject_factory
        self._subject = None
        self._lock = threading.RLock()
    
    def _get_subject(self):
        if self._subject is None:
            with self._lock:
                if self._subject is None:
                    self._subject = self._factory()
        return self._subject
    
    def __getattr__(self, name):
        if name.startswith('_'):
            raise AttributeError(name)
        return getattr(self._get_subject(), name)
    
    def is_loaded(self) -> bool:
        return self._subject is not None

class HeavyResource:
    def __init__(self, name: str):
        print(f"初始化重量级资源: {name}")
        self.name = name
        self.data = [i for i in range(1000000)]
    
    def process(self):
        return f"处理 {self.name}"
    
    def close(self):
        print(f"关闭资源: {self.name}")

class DataPipeline:
    source = LazyAttribute(lambda self: self._load_source())
    transformed = LazyAttribute(lambda self: self._transform())
    
    def __init__(self, source_path: str):
        self.source_path = source_path
    
    def _load_source(self):
        print(f"加载源数据: {self.source_path}")
        return list(range(100))
    
    def _transform(self):
        print("转换数据...")
        return [x * 2 for x in self.source]
    
    def get_result(self):
        return self.transformed

pipeline = DataPipeline("data.csv")
print("Pipeline创建完成,数据尚未加载")
print(f"结果: {pipeline.get_result()[:5]}...")

lazy_resource = VirtualProxy(lambda: HeavyResource("数据库连接"))
print("代理创建完成")
lazy_resource.process()

延迟计算模式

python
class Thunk:
    def __init__(self, func: Callable[[], Any]):
        self._func = func
        self._forced = False
        self._value = None
        self._lock = threading.RLock()
    
    def force(self) -> Any:
        if not self._forced:
            with self._lock:
                if not self._forced:
                    self._value = self._func()
                    self._forced = True
        return self._value
    
    def is_forced(self) -> bool:
        return self._forced
    
    def __repr__(self) -> str:
        if self._forced:
            return f"Thunk({self._value!r})"
        return "Thunk(<unevaluated>)"

def lazy(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(*args, **kwargs):
        return Thunk(lambda: func(*args, **kwargs))
    return wrapper

class LazySequence(Generic[T]):
    def __init__(self, iterable: Iterable[T]):
        self._iterable = iter(iterable)
        self._cache: List[T] = []
        self._exhausted = False
        self._lock = threading.RLock()
    
    def __iter__(self) -> Generator[T, None, None]:
        idx = 0
        while True:
            with self._lock:
                if idx < len(self._cache):
                    yield self._cache[idx]
                elif not self._exhausted:
                    try:
                        value = next(self._iterable)
                        self._cache.append(value)
                        yield value
                    except StopIteration:
                        self._exhausted = True
                        break
                else:
                    break
            idx += 1
    
    def __getitem__(self, index: int) -> T:
        if index < 0:
            raise IndexError("不支持负索引")
        
        with self._lock:
            if index < len(self._cache):
                return self._cache[index]
        
        for i, value in enumerate(self):
            if i == index:
                return value
        
        raise IndexError("索引超出范围")
    
    def __len__(self) -> int:
        if not self._exhausted:
            list(self)
        return len(self._cache)
    
    def is_cached(self, index: int) -> bool:
        with self._lock:
            return index < len(self._cache)
    
    def cache_size(self) -> int:
        with self._lock:
            return len(self._cache)

class LazyRange:
    def __init__(self, start: int, stop: int = None, step: int = 1):
        if stop is None:
            self.start = 0
            self.stop = start
        else:
            self.start = start
            self.stop = stop
        self.step = step
    
    def __iter__(self) -> Generator[int, None, None]:
        current = self.start
        while current < self.stop:
            yield current
            current += self.step
    
    def __len__(self) -> int:
        return max(0, (self.stop - self.start + self.step - 1) // self.step)
    
    def __getitem__(self, index: int) -> int:
        if index < 0:
            index = len(self) + index
        if index < 0 or index >= len(self):
            raise IndexError("索引超出范围")
        return self.start + index * self.step
    
    def __contains__(self, value: int) -> bool:
        if value < self.start or value >= self.stop:
            return False
        return (value - self.start) % self.step == 0

def lazy_map(func: Callable, iterable: Iterable) -> Generator:
    for item in iterable:
        yield func(item)

def lazy_filter(predicate: Callable, iterable: Iterable) -> Generator:
    for item in iterable:
        if predicate(item):
            yield item

def lazy_zip(*iterables: Iterable) -> Generator:
    iterators = [iter(it) for it in iterables]
    while True:
        try:
            yield tuple(next(it) for it in iterators)
        except StopIteration:
            break

def lazy_take(n: int, iterable: Iterable) -> Generator:
    for i, item in enumerate(iterable):
        if i >= n:
            break
        yield item

def lazy_drop(n: int, iterable: Iterable) -> Generator:
    for i, item in enumerate(iterable):
        if i >= n:
            yield item

@lazy
def expensive_computation(x: int) -> int:
    print(f"计算 {x}...")
    return x ** 2

thunk = expensive_computation(10)
print("Thunk创建,尚未计算")
print(f"强制计算: {thunk.force()}")
print(f"再次获取: {thunk.force()}")

lazy_seq = LazySequence(range(1000000))
print(f"第一个元素: {lazy_seq[0]}")
print(f"第十个元素: {lazy_seq[10]}")
print(f"已缓存元素数: {lazy_seq.cache_size()}")

对象池模式

UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                           对象池模式类图                                 │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐         ┌─────────────────┐                       │
│  │ <<interface>>   │         │   PoolConfig    │                       │
│  │ PoolableObject  │         ├─────────────────┤                       │
│  ├─────────────────┤         │ min_size: int   │                       │
│  │ + reset()       │         │ max_size: int   │                       │
│  │ + is_valid()    │         │ max_idle_time   │                       │
│  └────────┬────────┘         │ validation_int  │                       │
│           │                  └─────────────────┘                       │
│           │                                                            │
│  ┌────────▼────────┐         ┌─────────────────┐                       │
│  │  ObjectPool<T>  │◄────────│ ResourceManager │                       │
│  ├─────────────────┤         ├─────────────────┤                       │
│  │ - _pool: Queue  │         │ - pool: ObjectPool│                      │
│  │ - _active: Set  │         │ - stats: Stats  │                       │
│  │ - _factory: Func│         ├─────────────────┤                       │
│  ├─────────────────┤         │ + use(): T      │                       │
│  │ + acquire(): T  │         │ + get_stats()   │                       │
│  │ + release(T)    │         └─────────────────┘                       │
│  │ + get_stats()   │                                                   │
│  └────────┬────────┘                                                   │
│           │                                                            │
│     ┌─────┴─────┐                                                     │
│     │           │                                                     │
│  ┌──▼────────┐ ┌▼──────────────┐                                     │
│  │Connection │ │   ThreadPool  │                                     │
│  │   Pool    │ │               │                                     │
│  └───────────┘ └───────────────┘                                     │
│                                                                        │
└─────────────────────────────────────────────────────────────────────────┘

通用对象池

python
from typing import Generic, TypeVar, Callable, Optional, List, ContextManager
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from contextlib import contextmanager
import threading
import time
from queue import Queue, Empty, Full

T = TypeVar('T')

class PoolableObject(ABC):
    @abstractmethod
    def reset(self):
        pass
    
    @abstractmethod
    def is_valid(self) -> bool:
        pass

@dataclass
class PoolConfig:
    min_size: int = 5
    max_size: int = 20
    max_idle_time: float = 300.0
    validation_interval: float = 60.0
    acquire_timeout: float = 30.0

@dataclass
class PoolStats:
    total_created: int = 0
    total_destroyed: int = 0
    active_count: int = 0
    idle_count: int = 0
    acquire_count: int = 0
    release_count: int = 0
    wait_time_total: float = 0.0
    
    @property
    def utilization(self) -> float:
        total = self.active_count + self.idle_count
        return self.active_count / total if total > 0 else 0.0

@dataclass
class PooledObject(Generic[T]):
    obj: T
    created_at: float
    last_used: float
    use_count: int = 0

class ObjectPool(Generic[T]):
    def __init__(self, factory: Callable[[], T], 
                 config: PoolConfig = None,
                 validator: Callable[[T], bool] = None,
                 destroyer: Callable[[T], None] = None):
        self._factory = factory
        self._config = config or PoolConfig()
        self._validator = validator
        self._destroyer = destroyer
        self._pool: Queue[PooledObject[T]] = Queue()
        self._active: Dict[int, PooledObject[T]] = {}
        self._lock = threading.RLock()
        self._stats = PoolStats()
        self._created = 0
        self._running = True
        
        for _ in range(self._config.min_size):
            self._add_object()
        
        self._start_maintenance()
    
    def acquire(self, timeout: float = None) -> T:
        if timeout is None:
            timeout = self._config.acquire_timeout
        
        start_time = time.time()
        pooled_obj = None
        
        while True:
            with self._lock:
                if not self._pool.empty():
                    pooled_obj = self._pool.get_nowait()
                elif self._created < self._config.max_size:
                    pooled_obj = self._create_object()
            
            if pooled_obj is not None:
                break
            
            try:
                pooled_obj = self._pool.get(timeout=0.1)
                break
            except Empty:
                pass
            
            if time.time() - start_time >= timeout:
                raise TimeoutError("获取对象超时")
        
        if self._validator and not self._validator(pooled_obj.obj):
            with self._lock:
                self._created -= 1
                self._stats.total_destroyed += 1
            pooled_obj = self._create_object()
        
        pooled_obj.last_used = time.time()
        pooled_obj.use_count += 1
        
        with self._lock:
            self._active[id(pooled_obj.obj)] = pooled_obj
            self._stats.active_count = len(self._active)
            self._stats.acquire_count += 1
            self._stats.wait_time_total += time.time() - start_time
        
        return pooled_obj.obj
    
    def release(self, obj: T):
        with self._lock:
            obj_id = id(obj)
            if obj_id not in self._active:
                return
            
            pooled_obj = self._active.pop(obj_id)
            self._stats.active_count = len(self._active)
            self._stats.release_count += 1
        
        if hasattr(obj, 'reset'):
            try:
                obj.reset()
            except Exception as e:
                print(f"重置对象失败: {e}")
        
        with self._lock:
            if self._created <= self._config.max_size:
                self._pool.put(pooled_obj)
                self._stats.idle_count = self._pool.qsize()
    
    def _create_object(self) -> PooledObject[T]:
        obj = self._factory()
        self._created += 1
        self._stats.total_created += 1
        return PooledObject(
            obj=obj,
            created_at=time.time(),
            last_used=time.time()
        )
    
    def _add_object(self) -> Optional[PooledObject[T]]:
        if self._created >= self._config.max_size:
            return None
        
        pooled_obj = self._create_object()
        self._pool.put(pooled_obj)
        self._stats.idle_count = self._pool.qsize()
        return pooled_obj
    
    def _start_maintenance(self):
        def maintenance():
            while self._running:
                time.sleep(self._config.validation_interval)
                self._evict_idle()
        
        thread = threading.Thread(target=maintenance, daemon=True)
        thread.start()
    
    def _evict_idle(self):
        now = time.time()
        with self._lock:
            temp_list = []
            while not self._pool.empty():
                try:
                    pooled_obj = self._pool.get_nowait()
                    idle_time = now - pooled_obj.last_used
                    
                    if idle_time < self._config.max_idle_time:
                        temp_list.append(pooled_obj)
                    else:
                        if self._destroyer:
                            self._destroyer(pooled_obj.obj)
                        self._created -= 1
                        self._stats.total_destroyed += 1
                except Empty:
                    break
            
            for pooled_obj in temp_list:
                self._pool.put(pooled_obj)
            
            self._stats.idle_count = self._pool.qsize()
    
    def get_stats(self) -> PoolStats:
        with self._lock:
            return PoolStats(
                total_created=self._stats.total_created,
                total_destroyed=self._stats.total_destroyed,
                active_count=self._stats.active_count,
                idle_count=self._stats.idle_count,
                acquire_count=self._stats.acquire_count,
                release_count=self._stats.release_count,
                wait_time_total=self._stats.wait_time_total
            )
    
    def shutdown(self):
        self._running = False
        with self._lock:
            while not self._pool.empty():
                try:
                    pooled_obj = self._pool.get_nowait()
                    if self._destroyer:
                        self._destroyer(pooled_obj.obj)
                except Empty:
                    break

class ConnectionPool(ObjectPool):
    def __init__(self, connection_factory: Callable, 
                 max_connections: int = 10,
                 connection_timeout: float = 30.0):
        config = PoolConfig(
            min_size=2,
            max_size=max_connections,
            acquire_timeout=connection_timeout
        )
        super().__init__(
            factory=connection_factory,
            config=config,
            validator=self._validate_connection,
            destroyer=self._close_connection
        )
    
    def _validate_connection(self, conn) -> bool:
        try:
            if hasattr(conn, 'ping'):
                return conn.ping()
            if hasattr(conn, 'is_connected'):
                return conn.is_connected()
            return True
        except Exception:
            return False
    
    def _close_connection(self, conn):
        try:
            if hasattr(conn, 'close'):
                conn.close()
        except Exception as e:
            print(f"关闭连接失败: {e}")

class ResourceManager(Generic[T]):
    def __init__(self, pool: ObjectPool[T]):
        self._pool = pool
        self._stats = PoolStats()
        self._lock = threading.Lock()
    
    @contextmanager
    def use(self, timeout: float = None) -> T:
        resource = None
        try:
            resource = self._pool.acquire(timeout)
            with self._lock:
                self._stats.acquire_count += 1
                self._stats.active_count += 1
                self._stats.idle_count -= 1
            yield resource
        finally:
            if resource:
                self._pool.release(resource)
                with self._lock:
                    self._stats.release_count += 1
                    self._stats.active_count -= 1
                    self._stats.idle_count += 1
    
    def get_stats(self) -> PoolStats:
        with self._lock:
            return PoolStats(
                total_created=self._stats.total_created,
                active_count=self._stats.active_count,
                idle_count=self._stats.idle_count,
                acquire_count=self._stats.acquire_count,
                release_count=self._stats.release_count
            )

class DatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self._connected = True
        print(f"创建连接: {connection_string}")
    
    def execute(self, query: str):
        if not self._connected:
            raise Exception("连接已关闭")
        return f"执行: {query}"
    
    def reset(self):
        self._connected = True
    
    def is_valid(self) -> bool:
        return self._connected
    
    def ping(self) -> bool:
        return self._connected
    
    def close(self):
        self._connected = False
        print("关闭连接")

def create_db_connection():
    return DatabaseConnection("mysql://localhost:3306/db")

pool = ConnectionPool(create_db_connection, max_connections=5)

conn1 = pool.acquire()
conn2 = pool.acquire()
print(f"连接池状态: {pool.get_stats()}")

conn1.execute("SELECT * FROM users")
pool.release(conn1)
print(f"释放后状态: {pool.get_stats()}")

resource_manager = ResourceManager(pool)

with resource_manager.use() as conn:
    conn.execute("SELECT 1")

print(f"资源统计: {resource_manager.get_stats()}")

内存优化模式

__slots__优化

python
from typing import ClassVar, Dict, Any, Set
from dataclasses import dataclass
from weakref import WeakValueDictionary, WeakKeyDictionary, ref, WeakSet
import sys
import gc

class Point:
    __slots__ = ('x', 'y')
    
    def __init__(self, x: float, y: float):
        self.x = x
        self.y = y
    
    def distance(self, other: 'Point') -> float:
        return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5
    
    def __repr__(self) -> str:
        return f"Point({self.x}, {self.y})"

class PointNoSlots:
    def __init__(self, x: float, y: float):
        self.x = x
        self.y = y
    
    def distance(self, other: 'PointNoSlots') -> float:
        return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5

class OptimizedClass:
    __slots__ = ('_data', '_cache')
    _class_data: ClassVar[Dict[str, Any]] = {}
    
    def __init__(self):
        self._data = {}
        self._cache = {}
    
    def get(self, key: str) -> Any:
        if key in self._cache:
            return self._cache[key]
        return self._data.get(key)
    
    def set(self, key: str, value: Any):
        self._data[key] = value
        self._cache[key] = value

class SlottedDataClass:
    __slots__ = ('id', 'name', 'value', '_hash')
    
    def __init__(self, id: int, name: str, value: float):
        self.id = id
        self.name = name
        self.value = value
        self._hash = hash((id, name, value))
    
    def __hash__(self):
        return self._hash
    
    def __eq__(self, other):
        if not isinstance(other, SlottedDataClass):
            return False
        return (self.id, self.name, self.value) == (other.id, other.name, other.value)
    
    def __repr__(self) -> str:
        return f"SlottedDataClass({self.id}, {self.name}, {self.value})"

class SlottedContainer:
    __slots__ = ('_items', '_count')
    
    def __init__(self):
        self._items = []
        self._count = 0
    
    def add(self, item):
        self._items.append(item)
        self._count += 1
    
    def __len__(self):
        return self._count

points_slots = [Point(i, i) for i in range(10000)]
points_normal = [PointNoSlots(i, i) for i in range(10000)]

print(f"使用__slots__: {sys.getsizeof(points_slots[0])} bytes")
print(f"不使用__slots__: {sys.getsizeof(points_normal[0])} bytes")

slots_size = sum(sys.getsizeof(p) for p in points_slots)
normal_size = sum(sys.getsizeof(p) for p in points_normal)
print(f"总内存节省: {(normal_size - slots_size) / 1024 / 1024:.2f} MB")

弱引用模式

python
class CachedObject:
    def __init__(self, key: str, value: Any):
        self.key = key
        self.value = value
        print(f"创建对象: {key}")
    
    def __del__(self):
        print(f"销毁对象: {self.key}")
    
    def __repr__(self) -> str:
        return f"CachedObject({self.key}, {self.value})"

class WeakCache:
    def __init__(self):
        self._cache: WeakValueDictionary = WeakValueDictionary()
        self._lock = threading.RLock()
    
    def get(self, key: str) -> Optional[CachedObject]:
        with self._lock:
            return self._cache.get(key)
    
    def set(self, key: str, obj: CachedObject):
        with self._lock:
            self._cache[key] = obj
    
    def __contains__(self, key: str) -> bool:
        return key in self._cache
    
    def keys(self) -> Set[str]:
        with self._lock:
            return set(self._cache.keys())

class ObserverPattern:
    def __init__(self):
        self._observers: WeakSet = WeakSet()
        self._lock = threading.Lock()
    
    def add_observer(self, observer: Any):
        with self._lock:
            self._observers.add(observer)
    
    def remove_observer(self, observer: Any):
        with self._lock:
            self._observers.discard(observer)
    
    def notify(self, message: str):
        with self._lock:
            observers = list(self._observers)
        
        for observer in observers:
            if hasattr(observer, 'on_notify'):
                try:
                    observer.on_notify(message)
                except Exception as e:
                    print(f"通知失败: {e}")

class PropertyTracker:
    def __init__(self):
        self._data: WeakKeyDictionary = WeakKeyDictionary()
        self._lock = threading.RLock()
    
    def track(self, obj: Any, property_name: str, value: Any):
        with self._lock:
            if obj not in self._data:
                self._data[obj] = {}
            self._data[obj][property_name] = value
    
    def get_tracked(self, obj: Any) -> Dict:
        with self._lock:
            return self._data.get(obj, {}).copy()
    
    def get_all_tracked(self) -> Dict:
        with self._lock:
            return {obj: data.copy() for obj, data in self._data.items()}

class FlyweightFactory:
    _instances: Dict[str, 'Flyweight'] = {}
    _weak_instances: WeakValueDictionary = WeakValueDictionary()
    
    @classmethod
    def get_flyweight(cls, key: str, use_weak: bool = False) -> 'Flyweight':
        cache = cls._weak_instances if use_weak else cls._instances
        if key not in cache:
            cache[key] = Flyweight(key)
        return cache[key]
    
    @classmethod
    def get_stats(cls) -> Dict:
        return {
            'strong_refs': len(cls._instances),
            'weak_refs': len(cls._weak_instances)
        }

class Flyweight:
    def __init__(self, shared_state: str):
        self.shared_state = shared_state
    
    def operation(self, unique_state: str) -> str:
        return f"共享: {self.shared_state}, 唯一: {unique_state}"

class CachedProperty:
    def __init__(self, func: Callable):
        self.func = func
        self.attr_name = None
    
    def __set_name__(self, owner, name):
        self.attr_name = name
    
    def __get__(self, obj, owner=None):
        if obj is None:
            return self
        
        cache = obj.__dict__.setdefault('_cached_props', {})
        
        if self.attr_name not in cache:
            cache[self.attr_name] = self.func(obj)
        
        return cache[self.attr_name]
    
    def __delete__(self, obj):
        cache = getattr(obj, '_cached_props', {})
        if self.attr_name in cache:
            del cache[self.attr_name]

weak_cache = WeakCache()

obj1 = CachedObject("key1", "value1")
weak_cache.set("key1", obj1)

print(f"获取对象: {weak_cache.get('key1')}")
del obj1
gc.collect()
print(f"删除后: {weak_cache.get('key1')}")

并发优化模式

线程安全缓存

python
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
from dataclasses import dataclass, field

class ThreadSafeCache(Generic[K, V]):
    def __init__(self, max_size: int = 1000):
        self._cache: Dict[K, V] = {}
        self._lock = threading.RLock()
        self._max_size = max_size
        self._stats = CacheStats(max_size=max_size)
    
    def get(self, key: K) -> Optional[V]:
        with self._lock:
            value = self._cache.get(key)
            if value is not None:
                self._stats.hits += 1
            else:
                self._stats.misses += 1
            return value
    
    def set(self, key: K, value: V):
        with self._lock:
            if len(self._cache) >= self._max_size and key not in self._cache:
                evicted_key = next(iter(self._cache))
                del self._cache[evicted_key]
            self._cache[key] = value
            self._stats.size = len(self._cache)
    
    def get_or_set(self, key: K, factory: Callable[[], V]) -> V:
        with self._lock:
            if key in self._cache:
                self._stats.hits += 1
                return self._cache[key]
            
            self._stats.misses += 1
            value = factory()
            self._cache[key] = value
            self._stats.size = len(self._cache)
            return value
    
    def compute_if_absent(self, key: K, 
                         loader: Callable[[], V]) -> V:
        return self.get_or_set(key, loader)
    
    def invalidate(self, key: K):
        with self._lock:
            if key in self._cache:
                del self._cache[key]
                self._stats.size = len(self._cache)
    
    def clear(self):
        with self._lock:
            self._cache.clear()
            self._stats.size = 0
    
    def get_stats(self) -> CacheStats:
        with self._lock:
            return CacheStats(
                hits=self._stats.hits,
                misses=self._stats.misses,
                size=self._stats.size,
                max_size=self._stats.max_size
            )

class AsyncCache(Generic[K, V]):
    def __init__(self, max_workers: int = 10):
        self._cache: Dict[K, V] = {}
        self._futures: Dict[K, Future] = {}
        self._lock = threading.Lock()
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._stats = CacheStats()
    
    def get_or_compute(self, key: K, 
                      compute: Callable[[], V]) -> Future:
        with self._lock:
            if key in self._cache:
                future = Future()
                future.set_result(self._cache[key])
                self._stats.hits += 1
                return future
            
            if key in self._futures:
                return self._futures[key]
            
            future = self._executor.submit(self._compute, key, compute)
            self._futures[key] = future
            self._stats.misses += 1
            return future
    
    def _compute(self, key: K, compute: Callable[[], V]) -> V:
        result = compute()
        with self._lock:
            self._cache[key] = result
            if key in self._futures:
                del self._futures[key]
        return result
    
    def invalidate(self, key: K):
        with self._lock:
            self._cache.pop(key, None)
            if key in self._futures:
                future = self._futures.pop(key)
                future.cancel()
    
    def shutdown(self):
        self._executor.shutdown(wait=True)

class BoundedExecutor:
    def __init__(self, max_workers: int, max_queue_size: int):
        self._executor = ThreadPoolExecutor(max_workers=max_workers)
        self._semaphore = threading.Semaphore(max_queue_size)
        self._submitted = 0
        self._completed = 0
        self._lock = threading.Lock()
    
    def submit(self, fn: Callable, *args, **kwargs) -> Future:
        acquired = self._semaphore.acquire(timeout=30.0)
        if not acquired:
            raise RuntimeError("队列已满,提交超时")
        
        def wrapped():
            try:
                return fn(*args, **kwargs)
            finally:
                with self._lock:
                    self._completed += 1
                self._semaphore.release()
        
        with self._lock:
            self._submitted += 1
        
        return self._executor.submit(wrapped)
    
    def get_stats(self) -> Dict:
        with self._lock:
            return {
                'submitted': self._submitted,
                'completed': self._completed,
                'pending': self._submitted - self._completed
            }
    
    def shutdown(self):
        self._executor.shutdown(wait=True)

class ProducerConsumer:
    def __init__(self, num_consumers: int = 3, queue_size: int = 100):
        self._queue: Queue = Queue(maxsize=queue_size)
        self._consumers: List[threading.Thread] = []
        self._running = False
        self._results: Dict[int, Any] = {}
        self._result_lock = threading.Lock()
        self._task_id = 0
        self._num_consumers = num_consumers
    
    def start(self):
        self._running = True
        for i in range(self._num_consumers):
            consumer = threading.Thread(
                target=self._consume,
                args=(i,),
                daemon=True
            )
            consumer.start()
            self._consumers.append(consumer)
    
    def stop(self):
        self._running = False
        for _ in self._consumers:
            try:
                self._queue.put(None, timeout=1.0)
            except Full:
                pass
    
    def produce(self, task: Callable) -> int:
        with self._result_lock:
            task_id = self._task_id
            self._task_id += 1
        
        self._queue.put((task_id, task))
        return task_id
    
    def _consume(self, consumer_id: int):
        while self._running:
            try:
                item = self._queue.get(timeout=0.5)
            except Empty:
                continue
            
            if item is None:
                break
            
            task_id, task = item
            try:
                result = task()
                with self._result_lock:
                    self._results[task_id] = result
            except Exception as e:
                with self._result_lock:
                    self._results[task_id] = e
    
    def get_result(self, task_id: int, timeout: float = None) -> Any:
        start = time.time()
        while True:
            with self._result_lock:
                if task_id in self._results:
                    return self._results.pop(task_id)
            
            if timeout and time.time() - start > timeout:
                raise TimeoutError("等待结果超时")
            time.sleep(0.01)

async_cache = AsyncCache[str, str]()

def slow_computation():
    time.sleep(1)
    return "计算结果"

future = async_cache.get_or_compute("key1", slow_computation)
print("提交计算任务...")
result = future.result()
print(f"结果: {result}")

性能分析工具

性能分析器

python
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass, field
from functools import wraps
import time
import sys
from collections import defaultdict
import cProfile
import pstats
import io
import tracemalloc
from contextlib import contextmanager

@dataclass
class TimingResult:
    name: str
    total_time: float
    call_count: int
    avg_time: float
    min_time: float
    max_time: float
    std_dev: float = 0.0
    
    @property
    def ops_per_second(self) -> float:
        return self.call_count / self.total_time if self.total_time > 0 else 0.0

@dataclass
class MemorySnapshot:
    label: str
    timestamp: float
    current_memory: int
    peak_memory: int

class PerformanceProfiler:
    def __init__(self):
        self._timings: Dict[str, List[float]] = defaultdict(list)
        self._enabled = True
        self._lock = threading.Lock()
    
    def enable(self):
        self._enabled = True
    
    def disable(self):
        self._enabled = False
    
    def record(self, name: str, duration: float):
        if self._enabled:
            with self._lock:
                self._timings[name].append(duration)
    
    def get_results(self) -> List[TimingResult]:
        results = []
        with self._lock:
            for name, times in self._timings.items():
                if not times:
                    continue
                avg = sum(times) / len(times)
                variance = sum((t - avg) ** 2 for t in times) / len(times)
                results.append(TimingResult(
                    name=name,
                    total_time=sum(times),
                    call_count=len(times),
                    avg_time=avg,
                    min_time=min(times),
                    max_time=max(times),
                    std_dev=variance ** 0.5
                ))
        return sorted(results, key=lambda r: r.total_time, reverse=True)
    
    def clear(self):
        with self._lock:
            self._timings.clear()
    
    def print_report(self):
        print("\n=== 性能分析报告 ===")
        print(f"{'函数名':<30} {'调用次数':>10} {'总时间':>12} {'平均时间':>12} {'标准差':>12}")
        print("-" * 80)
        for result in self.get_results():
            print(f"{result.name:<30} {result.call_count:>10} "
                  f"{result.total_time:>12.4f} {result.avg_time:>12.6f} {result.std_dev:>12.6f}")

profiler = PerformanceProfiler()

def profile(func: Callable) -> Callable:
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            return func(*args, **kwargs)
        finally:
            duration = time.perf_counter() - start
            profiler.record(func.__name__, duration)
    return wrapper

class MemoryTracker:
    def __init__(self):
        self._snapshots: List[MemorySnapshot] = []
        self._tracking = False
    
    def start(self):
        tracemalloc.start()
        self._tracking = True
    
    def stop(self):
        if self._tracking:
            tracemalloc.stop()
            self._tracking = False
    
    def snapshot(self, label: str = ""):
        if not self._tracking:
            self.start()
        
        current, peak = tracemalloc.get_traced_memory()
        self._snapshots.append(MemorySnapshot(
            label=label,
            timestamp=time.time(),
            current_memory=current,
            peak_memory=peak
        ))
    
    def get_report(self) -> str:
        if len(self._snapshots) < 2:
            return "需要至少两个快照"
        
        lines = ["内存使用报告:"]
        prev = None
        for snap in self._snapshots:
            if prev:
                diff = snap.current_memory - prev.current_memory
                lines.append(
                    f"  {snap.label}: {snap.current_memory / 1024 / 1024:.2f}MB "
                    f"(变化: {diff / 1024 / 1024:+.2f}MB, 峰值: {snap.peak_memory / 1024 / 1024:.2f}MB)"
                )
            else:
                lines.append(
                    f"  {snap.label}: {snap.current_memory / 1024 / 1024:.2f}MB"
                )
            prev = snap
        
        return "\n".join(lines)
    
    def get_diff(self, start_idx: int = 0, end_idx: int = -1) -> Dict:
        if not self._snapshots:
            return {}
        
        start = self._snapshots[start_idx]
        end = self._snapshots[end_idx]
        
        return {
            'memory_diff': end.current_memory - start.current_memory,
            'peak_diff': end.peak_memory - start.peak_memory,
            'time_diff': end.timestamp - start.timestamp
        }

class Benchmark:
    def __init__(self, name: str):
        self.name = name
        self._results: List[float] = []
    
    def run(self, func: Callable, iterations: int = 1000, 
           warmup: int = 100) -> Dict:
        for _ in range(warmup):
            func()
        
        self._results = []
        for _ in range(iterations):
            start = time.perf_counter()
            func()
            self._results.append(time.perf_counter() - start)
        
        avg = sum(self._results) / len(self._results)
        variance = sum((t - avg) ** 2 for t in self._results) / len(self._results)
        
        return {
            'name': self.name,
            'iterations': iterations,
            'total_time': sum(self._results),
            'avg_time': avg,
            'min_time': min(self._results),
            'max_time': max(self._results),
            'std_dev': variance ** 0.5,
            'ops_per_second': iterations / sum(self._results)
        }
    
    def compare(self, other: 'Benchmark') -> Dict:
        if not self._results or not other._results:
            return {}
        
        self_avg = sum(self._results) / len(self._results)
        other_avg = sum(other._results) / len(other._results)
        
        return {
            'name1': self.name,
            'name2': other.name,
            'speedup': other_avg / self_avg if self_avg > 0 else 0,
            'time_diff': self_avg - other_avg
        }

@contextmanager
def timeit(name: str = "block"):
    start = time.perf_counter()
    try:
        yield
    finally:
        duration = time.perf_counter() - start
        print(f"{name} 耗时: {duration:.6f}秒")

@profile
def test_function():
    return sum(range(1000))

for _ in range(100):
    test_function()

profiler.print_report()

benchmark = Benchmark("sum_range")
result = benchmark.run(lambda: sum(range(10000)), iterations=1000)
print(f"\n基准测试结果: {result}")

企业级应用示例

高性能数据处理管道

python
from typing import Iterator, Callable, Any, List, Dict, Protocol
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
import threading
from queue import Queue, Empty
import time

class Stage(Protocol):
    def process(self, data: Any) -> Any:
        ...

@dataclass
class PipelineConfig:
    batch_size: int = 100
    num_workers: int = 4
    queue_size: int = 1000
    timeout: float = 30.0
    enable_cache: bool = True
    cache_size: int = 1000

@dataclass
class PipelineStats:
    items_processed: int = 0
    batches_processed: int = 0
    errors: int = 0
    total_time: float = 0.0
    avg_batch_time: float = 0.0

class DataPipeline:
    def __init__(self, config: PipelineConfig = None):
        self.config = config or PipelineConfig()
        self._stages: List[Callable] = []
        self._queue: Queue = Queue(maxsize=self.config.queue_size)
        self._output_queue: Queue = Queue(maxsize=self.config.queue_size)
        self._running = False
        self._stats = PipelineStats()
        self._stats_lock = threading.Lock()
        
        if self.config.enable_cache:
            self._cache = ThreadSafeCache(max_size=self.config.cache_size)
        else:
            self._cache = None
    
    def add_stage(self, stage: Callable) -> 'DataPipeline':
        self._stages.append(stage)
        return self
    
    def process(self, data: Iterator) -> Iterator:
        self._running = True
        self._stats = PipelineStats()
        
        producer = threading.Thread(
            target=self._produce,
            args=(data,),
            daemon=True
        )
        producer.start()
        
        workers = []
        for i in range(self.config.num_workers):
            worker = threading.Thread(
                target=self._worker,
                args=(i,),
                daemon=True
            )
            worker.start()
            workers.append(worker)
        
        active_workers = self.config.num_workers
        while self._running:
            try:
                result = self._output_queue.get(timeout=0.1)
                if result is None:
                    active_workers -= 1
                    if active_workers == 0:
                        break
                    continue
                yield result
            except Empty:
                if not producer.is_alive() and self._queue.empty():
                    break
        
        self._running = False
    
    def _produce(self, data: Iterator):
        batch = []
        for item in data:
            batch.append(item)
            if len(batch) >= self.config.batch_size:
                self._queue.put(batch)
                batch = []
        
        if batch:
            self._queue.put(batch)
        
        for _ in range(self.config.num_workers):
            self._queue.put(None)
    
    def _worker(self, worker_id: int):
        while self._running:
            try:
                batch = self._queue.get(timeout=0.5)
            except Empty:
                continue
            
            if batch is None:
                self._output_queue.put(None)
                break
            
            start_time = time.time()
            results = []
            
            for item in batch:
                try:
                    result = self._process_item(item)
                    results.append(result)
                    
                    with self._stats_lock:
                        self._stats.items_processed += 1
                except Exception as e:
                    with self._stats_lock:
                        self._stats.errors += 1
            
            self._output_queue.put(results)
            
            with self._stats_lock:
                self._stats.batches_processed += 1
                self._stats.total_time += time.time() - start_time
        
    def _process_item(self, item: Any) -> Any:
        result = item
        for stage in self._stages:
            cache_key = None
            if self._cache:
                cache_key = f"{stage.__name__}:{hash(str(result))}"
                cached = self._cache.get(cache_key)
                if cached is not None:
                    result = cached
                    continue
            
            result = stage(result)
            
            if self._cache and cache_key:
                self._cache.set(cache_key, result)
        
        return result
    
    def get_stats(self) -> PipelineStats:
        with self._stats_lock:
            avg_time = 0.0
            if self._stats.batches_processed > 0:
                avg_time = self._stats.total_time / self._stats.batches_processed
            return PipelineStats(
                items_processed=self._stats.items_processed,
                batches_processed=self._stats.batches_processed,
                errors=self._stats.errors,
                total_time=self._stats.total_time,
                avg_batch_time=avg_time
            )

class CachedDataLoader:
    def __init__(self, loader: Callable, cache_size: int = 1000):
        self._loader = loader
        self._cache = ThreadSafeCache(max_size=cache_size)
        self._prefetch_queue: Queue = Queue(maxsize=100)
        self._prefetch_running = False
    
    def load(self, key: str) -> Any:
        cached = self._cache.get(key)
        if cached is not None:
            return cached
        
        data = self._loader(key)
        self._cache.set(key, data)
        return data
    
    def preload(self, keys: List[str], num_workers: int = 4):
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = [executor.submit(self.load, key) for key in keys]
            for future in as_completed(futures):
                try:
                    future.result()
                except Exception as e:
                    print(f"预加载失败: {e}")
    
    def start_prefetch(self, key_generator: Callable[[], Iterator[str]]):
        self._prefetch_running = True
        
        def prefetch_worker():
            for key in key_generator():
                if not self._prefetch_running:
                    break
                try:
                    self.load(key)
                except Exception:
                    pass
        
        thread = threading.Thread(target=prefetch_worker, daemon=True)
        thread.start()
    
    def stop_prefetch(self):
        self._prefetch_running = False
    
    def get_cache_stats(self) -> CacheStats:
        return self._cache.get_stats()

config = PipelineConfig(batch_size=50, num_workers=4, enable_cache=True)
pipeline = DataPipeline(config)

pipeline.add_stage(lambda x: x * 2)
pipeline.add_stage(lambda x: x + 10)

data = range(1000)
results = list(pipeline.process(iter(data)))
print(f"处理结果数量: {len(results)}")
print(f"前5个结果: {results[:5]}")
print(f"管道统计: {pipeline.get_stats()}")

反模式与陷阱

常见性能反模式

反模式问题描述解决方案
过早优化在没有测量数据的情况下优化先测量,后优化
缓存滥用缓存所有内容导致内存问题只缓存热点数据,设置TTL
过度池化为轻量级对象创建对象池只对创建成本高的对象池化
锁竞争过度使用锁导致性能下降使用无锁数据结构或减少锁粒度
内存泄漏缓存/闭包导致的内存泄漏使用弱引用,定期清理
批量操作逐条处理而非批量处理使用批量API

反模式示例

python
class PerformanceAntiPatterns:
    
    @staticmethod
    def bad_cache_pattern():
        cache = {}
        def get_data(key):
            if key not in cache:
                cache[key] = expensive_load(key)
            return cache[key]
        return cache
    
    @staticmethod
    def good_cache_pattern():
        cache = MemoryCache(max_size=1000, default_ttl=300.0)
        def get_data(key):
            result = cache.get(key)
            if result is None:
                result = expensive_load(key)
                cache.set(key, result)
            return result
        return cache

def expensive_load(key):
    return f"data_{key}"

class MemoryLeakExample:
    def __init__(self):
        self._callbacks = []
    
    def register_callback(self, callback):
        self._callbacks.append(callback)
    
    def trigger(self, data):
        for callback in self._callbacks:
            callback(data)

class FixedMemoryLeak:
    def __init__(self):
        self._callbacks = WeakSet()
    
    def register_callback(self, callback):
        self._callbacks.add(callback)
    
    def trigger(self, data):
        for callback in list(self._callbacks):
            if hasattr(callback, '__call__'):
                callback(data)

决策指南

缓存策略选择

┌─────────────────────────────────────────────────────────────────────────┐
│                         缓存策略决策树                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  数据访问模式?                                                          │
│       │                                                                 │
│       ├── 时间局部性强 ──► LRU Cache                                   │
│       │                                                                 │
│       ├── 频率局部性强 ──► LFU Cache                                   │
│       │                                                                 │
│       ├── 混合模式 ──► ARC (Adaptive Replacement Cache)                │
│       │                                                                 │
│       └── 顺序访问 ──► FIFO Cache                                      │
│                                                                         │
│  缓存一致性要求?                                                        │
│       │                                                                 │
│       ├── 强一致性 ──► Write-Through Cache                             │
│       │                                                                 │
│       ├── 最终一致性 ──► Write-Behind Cache                            │
│       │                                                                 │
│       └── 读多写少 ──► Cache-Aside Pattern                             │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

对象池使用决策

条件使用对象池不使用对象池
对象创建成本高(连接、大对象)低(简单对象)
对象使用频率高频使用低频使用
对象生命周期可重置不可重置
并发需求高并发场景单线程场景

性能优化优先级

优先级 1: 算法优化 (O(n²) → O(n log n))

优先级 2: 数据结构选择 (list → set/dict for lookup)

优先级 3: 缓存策略 (重复计算 → 缓存结果)

优先级 4: 并发处理 (串行 → 并行)

优先级 5: 内存优化 (__slots__, 弱引用)

优先级 6: 底层优化 (Cython, C扩展)

快速参考卡片

缓存模式速查

模式适用场景关键特性
LRU时间局部性强最近最少使用淘汰
LFU频率局部性强最少频率使用淘汰
ARC混合访问模式自适应调整
Read-Through读密集型透明加载
Write-Through强一致性同步写入
Write-Behind高吞吐量异步写入

性能优化检查清单

  • [ ] 使用性能分析工具定位瓶颈
  • [ ] 检查算法时间复杂度
  • [ ] 评估数据结构选择
  • [ ] 考虑缓存策略
  • [ ] 评估并发可能性
  • [ ] 检查内存使用
  • [ ] 建立基准测试
  • [ ] 验证优化效果

Python性能优化技巧

python
# 1. 使用内置函数
sum(range(1000))  # 快于 for循环累加

# 2. 使用生成器
(x for x in range(1000000))  # 节省内存

# 3. 使用__slots__
class Point:
    __slots__ = ('x', 'y')

# 4. 使用缓存
@lru_cache(maxsize=128)
def fib(n):
    return n if n < 2 else fib(n-1) + fib(n-2)

# 5. 使用局部变量
def process(data):
    local_func = some_func  # 局部引用更快
    return [local_func(x) for x in data]

# 6. 使用join而非+
''.join(strings)  # 快于 s += s2

# 7. 使用集合进行成员检查
if item in my_set:  # O(1) vs O(n) for list
    pass

总结

性能优化是一个系统工程,需要从算法、架构、代码、内存等多个层面综合考虑。本章介绍了缓存模式、惰性加载模式、对象池模式、内存优化模式和并发优化模式,并提供了形式化的数学定义和企业级实现示例。

关键要点:

  1. 测量先行:始终以实际测量数据为指导进行优化
  2. 选择合适策略:根据访问模式选择缓存策略
  3. 权衡取舍:在内存、CPU、一致性之间做出合理权衡
  4. 持续监控:建立性能基准,持续监控优化效果

记住Donald Knuth的名言:"过早优化是万恶之源",但也要认识到,正确的优化模式可以带来数量级的性能提升。

Python技术丛书 - 江苏省宿城中等专业学校