性能优化模式
学习目标
- 理解Python性能优化的核心原则和数学基础
- 掌握缓存、惰性加载、对象池等优化模式的形式化定义
- 学会使用性能分析工具定位瓶颈并进行量化分析
- 了解不同场景下的优化策略选择与权衡分析
历史背景与理论基础
性能优化的发展历程
性能优化的历史可以追溯到计算机科学的早期:
| 时期 | 里程碑 | 代表性贡献 |
|---|---|---|
| 1960s | 算法分析 | Knuth《计算机程序设计艺术》,Big-O记号标准化 |
| 1970s | 数据结构优化 | Aho-Hopcroft-Ullman算法与数据结构理论 |
| 1980s | 缓存理论 | CPU缓存层次结构,局部性原理形式化 |
| 1990s | 并发优化 | 多线程编程模型,锁优化理论 |
| 2000s | Web性能 | CDN缓存,数据库连接池,对象关系映射优化 |
| 2010s | 大数据性能 | MapReduce,内存计算,流处理优化 |
| 2020s | 云原生性能 | 容器化优化,服务网格,边缘计算 |
Python性能演进
1991 ──────────────────────────────────────────────────► 2024
│ │
├─ Python 1.0: 解释执行,基础优化
├─ Python 2.0: 列表推导,迭代器协议
├─ Python 2.4: decorator语法,@lru_cache
├─ Python 3.0: 更高效的unicode处理
├─ Python 3.4: asyncio异步IO
├─ Python 3.7: dataclass,更快的字典实现
├─ Python 3.10: 模式匹配优化
└─ Python 3.11: 显著性能提升(10-60%)形式化定义
性能系统定义
定义32.1(性能系统) 性能优化系统是一个五元组:
$$\mathcal{P} = \langle \mathcal{R}, \mathcal{C}, \mathcal{O}, \mathcal{M}, \mathcal{S} \rangle$$
其中:
- $\mathcal{R}$:资源集合(CPU、内存、IO、网络)
- $\mathcal{C}$:约束条件(时间限制、内存限制、吞吐量要求)
- $\mathcal{O}$:优化操作集合(缓存、并行、惰性加载等)
- $\mathcal{M}$:度量函数 $M: Operation \rightarrow \mathbb{R}^+$
- $\mathcal{S}$:状态空间,描述系统当前性能状态
定义32.2(时间复杂度) 算法的时间复杂度 $T(n)$ 定义为:
$$T(n) = \Theta(f(n)) \iff \exists c_1, c_2, n_0 > 0: \forall n \geq n_0, c_1 \cdot f(n) \leq T(n) \leq c_2 \cdot f(n)$$
定义32.3(空间复杂度) 算法的空间复杂度 $S(n)$ 定义为:
$$S(n) = \text{max}{space(p) : p \text{ is a point in execution}}$$
缓存理论
定义32.4(缓存命中率) 缓存命中率 $h$ 定义为:
$$h = \frac{\text{cache hits}}{\text{total accesses}} = \frac{N_{hit}}{N_{hit} + N_{miss}}$$
定义32.5(缓存效率) 缓存效率 $E$ 考虑命中时间和未命中惩罚:
$$E = h \cdot t_{hit} + (1-h) \cdot t_{miss}$$
定理32.1(缓存最优性) 对于固定大小的缓存,LRU策略在竞争比意义下是最优的:
$$\frac{C_{LRU}}{C_{OPT}} \leq k$$
其中 $k$ 是缓存大小,$C$ 是缓存未命中次数。
惰性求值理论
定义32.6(惰性求值) 惰性求值是一个求值策略:
$$eval: Expression \rightarrow (Environment \rightarrow Value)$$
满足:表达式仅在需要其值时才被求值。
定义32.7(Thunk) Thunk是延迟计算的基本单元:
$$Thunk = \langle f, \sigma, forced \rangle$$
其中 $f$ 是待执行函数,$\sigma$ 是环境,$forced$ 表示是否已求值。
对象池理论
定义32.8(对象池) 对象池是一个资源管理系统:
$$Pool = \langle C, A, F, V \rangle$$
其中:
- $C$:创建函数 $create: \emptyset \rightarrow Object$
- $A$:可用对象集合 $Available \subseteq Object$
- $F$:获取/释放操作 $acquire, release$
- $V$:验证函数 $validate: Object \rightarrow {True, False}$
性能优化层次架构
┌─────────────────────────────────────────────────────────────────┐
│ 性能优化层次 │
├─────────────────────────────────────────────────────────────────┤
│ 算法层 │ 时间复杂度O(f(n)) │ 空间复杂度 │ 数据结构选择 │
├─────────────────────────────────────────────────────────────────┤
│ 架构层 │ 缓存 │ 异步 │ 并行 │ 惰性加载 │
├─────────────────────────────────────────────────────────────────┤
│ 代码层 │ 内置函数 │ 生成器 │ 列表推导 │
├─────────────────────────────────────────────────────────────────┤
│ 内存层 │ 对象池 │ __slots__ │ 弱引用 │
├─────────────────────────────────────────────────────────────────┤
│ 工具层 │ Cython │ PyPy │ numba │ C扩展 │
└─────────────────────────────────────────────────────────────────┘缓存模式
UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ 缓存模式类图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ <<interface>> │ │ CacheEntry │ │
│ │ CacheStrategy │ ├─────────────────┤ │
│ ├─────────────────┤ │ - value: Any │ │
│ │ + get(key): V │ │ - timestamp: float│ │
│ │ + put(key, val) │ │ - ttl: float │ │
│ │ + remove(key) │ │ - hits: int │ │
│ └────────┬────────┘ └─────────────────┘ │
│ │ │
│ ┌─────┴─────┬─────────────┐ │
│ │ │ │ │
│ ┌──▼───┐ ┌───▼───┐ ┌────▼────┐ │
│ │ LRU │ │ LFU │ │ FIFO │ │
│ │Cache │ │ Cache │ │ Cache │ │
│ └──────┘ └───────┘ └────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ MultiLevelCache │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ - levels: List[MemoryCache] │ │
│ │ + get(key): Any │ │
│ │ + set(key, value, ttl) │ │
│ │ + get_stats(): List[Dict] │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘函数结果缓存
from typing import Callable, Any, Dict, Tuple, Optional, TypeVar, ParamSpec, Generic, List
from functools import wraps, lru_cache
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import time
import hashlib
import pickle
import threading
from collections import OrderedDict
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor, Future
import gc
P = ParamSpec('P')
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
@dataclass
class CacheEntry(Generic[V]):
value: V
timestamp: float
ttl: float
hits: int = 0
size: int = 0
def is_expired(self) -> bool:
return time.time() - self.timestamp >= self.ttl
@dataclass
class CacheStats:
hits: int = 0
misses: int = 0
evictions: int = 0
size: int = 0
max_size: int = 0
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def miss_rate(self) -> float:
return 1.0 - self.hit_rate
class CacheStrategy(ABC, Generic[K, V]):
@abstractmethod
def get(self, key: K) -> Optional[V]:
pass
@abstractmethod
def put(self, key: K, value: V) -> Optional[K]:
"""Returns evicted key if any"""
pass
@abstractmethod
def remove(self, key: K) -> bool:
pass
@abstractmethod
def clear(self) -> None:
pass
@abstractmethod
def size(self) -> int:
pass
class LRUCache(CacheStrategy[K, V]):
def __init__(self, capacity: int):
if capacity <= 0:
raise ValueError("Capacity must be positive")
self._capacity = capacity
self._cache: OrderedDict[K, V] = OrderedDict()
self._lock = threading.RLock()
def get(self, key: K) -> Optional[V]:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
return self._cache[key]
return None
def put(self, key: K, value: V) -> Optional[K]:
evicted_key = None
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
else:
if len(self._cache) >= self._capacity:
evicted_key, _ = self._cache.popitem(last=False)
self._cache[key] = value
return evicted_key
def remove(self, key: K) -> bool:
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> None:
with self._lock:
self._cache.clear()
def size(self) -> int:
with self._lock:
return len(self._cache)
class LFUCache(CacheStrategy[K, V]):
def __init__(self, capacity: int):
if capacity <= 0:
raise ValueError("Capacity must be positive")
self._capacity = capacity
self._cache: Dict[K, V] = {}
self._freq: Dict[K, int] = {}
self._lock = threading.RLock()
def get(self, key: K) -> Optional[V]:
with self._lock:
if key in self._cache:
self._freq[key] = self._freq.get(key, 0) + 1
return self._cache[key]
return None
def put(self, key: K, value: V) -> Optional[K]:
evicted_key = None
with self._lock:
if key not in self._cache and len(self._cache) >= self._capacity:
evicted_key = min(self._freq, key=self._freq.get)
del self._cache[evicted_key]
del self._freq[evicted_key]
self._cache[key] = value
self._freq[key] = self._freq.get(key, 0) + 1
return evicted_key
def remove(self, key: K) -> bool:
with self._lock:
if key in self._cache:
del self._cache[key]
del self._freq[key]
return True
return False
def clear(self) -> None:
with self._lock:
self._cache.clear()
self._freq.clear()
def size(self) -> int:
with self._lock:
return len(self._cache)
class ARCach(CacheStrategy[K, V]):
"""Adaptive Replacement Cache - adapts between LRU and LFU"""
def __init__(self, capacity: int):
if capacity <= 0:
raise ValueError("Capacity must be positive")
self._capacity = capacity
self._p = 0
self._t1: OrderedDict[K, V] = OrderedDict()
self._t2: OrderedDict[K, V] = OrderedDict()
self._b1: OrderedDict[K, None] = OrderedDict()
self._b2: OrderedDict[K, None] = OrderedDict()
self._lock = threading.RLock()
def get(self, key: K) -> Optional[V]:
with self._lock:
if key in self._t1:
self._t1.move_to_end(key)
self._t2[key] = self._t1.pop(key)
return self._t2[key]
elif key in self._t2:
self._t2.move_to_end(key)
return self._t2[key]
return None
def put(self, key: K, value: V) -> Optional[K]:
evicted_key = None
with self._lock:
if key in self._t1 or key in self._t2:
if key in self._t1:
self._t1.move_to_end(key)
self._t2[key] = self._t1.pop(key)
else:
self._t2.move_to_end(key)
self._t2[key] = value
elif key in self._b1:
self._adapt(self._b1, self._b2)
self._b1.pop(key, None)
evicted_key = self._replace(key)
self._t2[key] = value
elif key in self._b2:
self._adapt(self._b2, self._b1)
self._b2.pop(key, None)
evicted_key = self._replace(key)
self._t2[key] = value
else:
total = len(self._t1) + len(self._b1)
if total >= self._capacity:
if len(self._t1) < self._capacity:
if self._b1:
self._b1.popitem(last=False)
evicted_key = self._replace(key)
else:
if self._t1:
evicted_key, _ = self._t1.popitem(last=False)
self._t1[key] = value
return evicted_key
def _adapt(self, hit: OrderedDict, other: OrderedDict):
delta = max(1, len(other) // max(1, len(hit)))
self._p = min(self._capacity, self._p + delta)
def _replace(self, key: K) -> Optional[K]:
if self._t1 and (len(self._t1) > self._p or
(key in self._b2 and len(self._t1) == self._p)):
evicted_key, value = self._t1.popitem(last=False)
self._b1[evicted_key] = None
return evicted_key
elif self._t2:
evicted_key, value = self._t2.popitem(last=False)
self._b2[evicted_key] = None
return evicted_key
return None
def remove(self, key: K) -> bool:
with self._lock:
removed = False
for cache in [self._t1, self._t2]:
if key in cache:
del cache[key]
removed = True
for ghost in [self._b1, self._b2]:
if key in ghost:
del ghost[key]
return removed
def clear(self) -> None:
with self._lock:
self._t1.clear()
self._t2.clear()
self._b1.clear()
self._b2.clear()
def size(self) -> int:
with self._lock:
return len(self._t1) + len(self._t2)
class MemoryCache(Generic[K, V]):
def __init__(self, default_ttl: float = 300.0, max_size: int = 1000,
strategy: str = 'lru'):
self._strategy: CacheStrategy[K, CacheEntry[V]]
if strategy == 'lru':
self._strategy = LRUCache(max_size)
elif strategy == 'lfu':
self._strategy = LFUCache(max_size)
elif strategy == 'arc':
self._strategy = ARCach(max_size)
else:
raise ValueError(f"Unknown strategy: {strategy}")
self._default_ttl = default_ttl
self._stats = CacheStats(max_size=max_size)
self._lock = threading.RLock()
def get(self, key: K) -> Optional[V]:
with self._lock:
entry = self._strategy.get(key)
if entry is not None:
if not entry.is_expired():
entry.hits += 1
self._stats.hits += 1
return entry.value
else:
self._strategy.remove(key)
self._stats.misses += 1
return None
def set(self, key: K, value: V, ttl: float = None):
with self._lock:
entry = CacheEntry(
value=value,
timestamp=time.time(),
ttl=ttl or self._default_ttl
)
evicted = self._strategy.put(key, entry)
if evicted:
self._stats.evictions += 1
self._stats.size = self._strategy.size()
def delete(self, key: K) -> bool:
with self._lock:
result = self._strategy.remove(key)
self._stats.size = self._strategy.size()
return result
def clear(self):
with self._lock:
self._strategy.clear()
self._stats.size = 0
def get_stats(self) -> CacheStats:
with self._lock:
return CacheStats(
hits=self._stats.hits,
misses=self._stats.misses,
evictions=self._stats.evictions,
size=self._stats.size,
max_size=self._stats.max_size
)
def cached(cache: MemoryCache = None, key_func: Callable = None,
ttl: float = None):
if cache is None:
cache = MemoryCache()
def decorator(func: Callable[P, T]) -> Callable[P, T]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
if key_func:
key = key_func(*args, **kwargs)
else:
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
result = cache.get(key)
if result is not None:
return result
result = func(*args, **kwargs)
cache.set(key, result, ttl)
return result
wrapper.cache = cache
wrapper.cache_clear = cache.clear
wrapper.cache_stats = cache.get_stats
return wrapper
return decorator
class MultiLevelCache(Generic[K, V]):
def __init__(self, levels: List[Tuple[int, float]] = None):
if levels is None:
levels = [(100, 60.0), (1000, 300.0), (10000, 3600.0)]
self._levels: List[MemoryCache[K, V]] = [
MemoryCache(max_size=size, default_ttl=ttl)
for size, ttl in levels
]
self._lock = threading.RLock()
def get(self, key: K) -> Optional[V]:
with self._lock:
for i, cache in enumerate(self._levels):
value = cache.get(key)
if value is not None:
for j in range(i):
self._levels[j].set(key, value)
return value
return None
def set(self, key: K, value: V, ttl: float = None):
with self._lock:
for cache in self._levels:
cache.set(key, value, ttl)
def get_stats(self) -> List[CacheStats]:
return [cache.get_stats() for cache in self._levels]
@lru_cache(maxsize=128)
def fibonacci_lru(n: int) -> int:
if n < 2:
return n
return fibonacci_lru(n - 1) + fibonacci_lru(n - 2)
@cached(ttl=60.0)
def expensive_computation(n: int) -> int:
print(f"计算中... n={n}")
time.sleep(0.1)
return n * n
print(f"fibonacci(30) = {fibonacci_lru(30)}")
print(f"缓存信息: {fibonacci_lru.cache_info()}")
print(expensive_computation(10))
print(expensive_computation(10))
print(f"缓存统计: {expensive_computation.cache_stats()}")数据缓存模式
class ReadThroughCache(Generic[K, V]):
def __init__(self, loader: Callable[[K], V],
strategy: CacheStrategy[K, V],
ttl: float = 300.0):
self._loader = loader
self._strategy = strategy
self._ttl = ttl
self._entries: Dict[K, CacheEntry[V]] = {}
self._lock = threading.RLock()
def get(self, key: K) -> V:
with self._lock:
entry = self._entries.get(key)
if entry and not entry.is_expired():
entry.hits += 1
return entry.value
value = self._loader(key)
with self._lock:
self._strategy.put(key, value)
self._entries[key] = CacheEntry(
value=value,
timestamp=time.time(),
ttl=self._ttl
)
return value
def invalidate(self, key: K):
with self._lock:
self._strategy.remove(key)
self._entries.pop(key, None)
def refresh(self, key: K) -> V:
self.invalidate(key)
return self.get(key)
class WriteThroughCache(Generic[K, V]):
def __init__(self, reader: Callable[[K], V],
writer: Callable[[K, V], None],
strategy: CacheStrategy[K, V]):
self._reader = reader
self._writer = writer
self._strategy = strategy
self._lock = threading.RLock()
def get(self, key: K) -> Optional[V]:
return self._strategy.get(key)
def put(self, key: K, value: V):
with self._lock:
self._writer(key, value)
self._strategy.put(key, value)
def invalidate(self, key: K):
self._strategy.remove(key)
class WriteBehindCache(Generic[K, V]):
def __init__(self, writer: Callable[[K, V], None],
strategy: CacheStrategy[K, V],
flush_interval: float = 5.0,
max_batch_size: int = 100):
self._writer = writer
self._strategy = strategy
self._flush_interval = flush_interval
self._max_batch_size = max_batch_size
self._write_queue: Dict[K, V] = {}
self._lock = threading.RLock()
self._running = True
self._start_flusher()
def get(self, key: K) -> Optional[V]:
return self._strategy.get(key)
def put(self, key: K, value: V):
with self._lock:
self._strategy.put(key, value)
self._write_queue[key] = value
def _start_flusher(self):
def flusher():
while self._running:
time.sleep(self._flush_interval)
self._flush()
thread = threading.Thread(target=flusher, daemon=True)
thread.start()
def _flush(self):
with self._lock:
if not self._write_queue:
return
batch = list(self._write_queue.items())[:self._max_batch_size]
for key, value in batch:
try:
self._writer(key, value)
del self._write_queue[key]
except Exception as e:
print(f"写入失败: {key}, 错误: {e}")
def shutdown(self):
self._running = False
self._flush()
class CacheAsidePattern(Generic[K, V]):
def __init__(self, loader: Callable[[K], V],
writer: Callable[[K, V], None],
deleter: Callable[[K], None],
cache: CacheStrategy[K, V]):
self._loader = loader
self._writer = writer
self._deleter = deleter
self._cache = cache
self._lock = threading.RLock()
def get(self, key: K) -> V:
value = self._cache.get(key)
if value is not None:
return value
value = self._loader(key)
with self._lock:
self._cache.put(key, value)
return value
def put(self, key: K, value: V):
with self._lock:
self._writer(key, value)
self._cache.put(key, value)
def delete(self, key: K):
with self._lock:
self._deleter(key)
self._cache.remove(key)
def load_user(user_id: int) -> dict:
print(f"从数据库加载用户 {user_id}")
return {"id": user_id, "name": f"用户{user_id}"}
lru_strategy = LRUCache[int, dict](capacity=100)
user_cache = ReadThroughCache(loader=load_user, strategy=lru_strategy)
print(user_cache.get(1))
print(user_cache.get(1))惰性加载模式
UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ 惰性加载模式类图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Lazy<T> │ │ Thunk │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ - _factory: Func│ │ - _func: Func │ │
│ │ - _value: T │ │ - _forced: bool │ │
│ │ - _initialized │ │ - _value: Any │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ + get(): T │ │ + force(): Any │ │
│ │ + is_init():bool│ │ + is_forced() │ │
│ │ + reset() │ └─────────────────┘ │
│ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ LazyAttribute │ │ VirtualProxy │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ - factory: Func │ │ - _factory: Func│ │
│ │ - name: str │ │ - _subject: Any │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ + __get__() │ │ + __getattr__() │ │
│ └─────────────────┘ │ + _get_subject()│ │
│ └─────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ LazySequence<T> │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ - _iterable: Iterator<T> │ │
│ │ - _cache: List<T> │ │
│ │ - _exhausted: bool │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ + __iter__(): Generator<T> │ │
│ │ + __getitem__(index): T │ │
│ │ + __len__(): int │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘惰性初始化
from typing import Callable, Any, TypeVar, Generic, Optional, Iterable, Generator
from dataclasses import dataclass, field
from functools import wraps
import threading
T = TypeVar('T')
class Lazy(Generic[T]):
def __init__(self, factory: Callable[[], T]):
self._factory = factory
self._value: Optional[T] = None
self._initialized = False
self._lock = threading.RLock()
self._exception: Optional[Exception] = None
def get(self) -> T:
if not self._initialized:
with self._lock:
if not self._initialized:
try:
self._value = self._factory()
except Exception as e:
self._exception = e
raise
finally:
self._initialized = True
if self._exception:
raise self._exception
return self._value
def is_initialized(self) -> bool:
return self._initialized
def reset(self):
with self._lock:
self._value = None
self._initialized = False
self._exception = None
def map(self, func: Callable[[T], Any]) -> 'Lazy':
return Lazy(lambda: func(self.get()))
def flat_map(self, func: Callable[[T], 'Lazy']) -> 'Lazy':
return Lazy(lambda: func(self.get()).get())
def __repr__(self) -> str:
if self._initialized:
return f"Lazy(value={self._value!r})"
return f"Lazy(<not computed>)"
class LazyAttribute:
def __init__(self, factory: Callable):
self.factory = factory
self._name = None
def __set_name__(self, owner, name):
self._name = name
def __get__(self, obj, owner=None):
if obj is None:
return self
if self._name not in obj.__dict__:
obj.__dict__[self._name] = self.factory(obj)
return obj.__dict__[self._name]
def __delete__(self, obj):
if self._name in obj.__dict__:
del obj.__dict__[self._name]
class LazyDict(Generic[K, V]):
def __init__(self):
self._data: Dict[K, Lazy[V]] = {}
self._lock = threading.RLock()
def __setitem__(self, key: K, factory: Callable[[], V]):
with self._lock:
if isinstance(factory, Lazy):
self._data[key] = factory
else:
self._data[key] = Lazy(factory)
def __getitem__(self, key: K) -> V:
with self._lock:
if key not in self._data:
raise KeyError(key)
return self._data[key].get()
def __contains__(self, key: K) -> bool:
return key in self._data
def get(self, key: K, default: V = None) -> Optional[V]:
if key in self._data:
return self._data[key].get()
return default
def is_loaded(self, key: K) -> bool:
with self._lock:
return key in self._data and self._data[key].is_initialized()
class VirtualProxy:
def __init__(self, subject_factory: Callable):
self._factory = subject_factory
self._subject = None
self._lock = threading.RLock()
def _get_subject(self):
if self._subject is None:
with self._lock:
if self._subject is None:
self._subject = self._factory()
return self._subject
def __getattr__(self, name):
if name.startswith('_'):
raise AttributeError(name)
return getattr(self._get_subject(), name)
def is_loaded(self) -> bool:
return self._subject is not None
class HeavyResource:
def __init__(self, name: str):
print(f"初始化重量级资源: {name}")
self.name = name
self.data = [i for i in range(1000000)]
def process(self):
return f"处理 {self.name}"
def close(self):
print(f"关闭资源: {self.name}")
class DataPipeline:
source = LazyAttribute(lambda self: self._load_source())
transformed = LazyAttribute(lambda self: self._transform())
def __init__(self, source_path: str):
self.source_path = source_path
def _load_source(self):
print(f"加载源数据: {self.source_path}")
return list(range(100))
def _transform(self):
print("转换数据...")
return [x * 2 for x in self.source]
def get_result(self):
return self.transformed
pipeline = DataPipeline("data.csv")
print("Pipeline创建完成,数据尚未加载")
print(f"结果: {pipeline.get_result()[:5]}...")
lazy_resource = VirtualProxy(lambda: HeavyResource("数据库连接"))
print("代理创建完成")
lazy_resource.process()延迟计算模式
class Thunk:
def __init__(self, func: Callable[[], Any]):
self._func = func
self._forced = False
self._value = None
self._lock = threading.RLock()
def force(self) -> Any:
if not self._forced:
with self._lock:
if not self._forced:
self._value = self._func()
self._forced = True
return self._value
def is_forced(self) -> bool:
return self._forced
def __repr__(self) -> str:
if self._forced:
return f"Thunk({self._value!r})"
return "Thunk(<unevaluated>)"
def lazy(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
return Thunk(lambda: func(*args, **kwargs))
return wrapper
class LazySequence(Generic[T]):
def __init__(self, iterable: Iterable[T]):
self._iterable = iter(iterable)
self._cache: List[T] = []
self._exhausted = False
self._lock = threading.RLock()
def __iter__(self) -> Generator[T, None, None]:
idx = 0
while True:
with self._lock:
if idx < len(self._cache):
yield self._cache[idx]
elif not self._exhausted:
try:
value = next(self._iterable)
self._cache.append(value)
yield value
except StopIteration:
self._exhausted = True
break
else:
break
idx += 1
def __getitem__(self, index: int) -> T:
if index < 0:
raise IndexError("不支持负索引")
with self._lock:
if index < len(self._cache):
return self._cache[index]
for i, value in enumerate(self):
if i == index:
return value
raise IndexError("索引超出范围")
def __len__(self) -> int:
if not self._exhausted:
list(self)
return len(self._cache)
def is_cached(self, index: int) -> bool:
with self._lock:
return index < len(self._cache)
def cache_size(self) -> int:
with self._lock:
return len(self._cache)
class LazyRange:
def __init__(self, start: int, stop: int = None, step: int = 1):
if stop is None:
self.start = 0
self.stop = start
else:
self.start = start
self.stop = stop
self.step = step
def __iter__(self) -> Generator[int, None, None]:
current = self.start
while current < self.stop:
yield current
current += self.step
def __len__(self) -> int:
return max(0, (self.stop - self.start + self.step - 1) // self.step)
def __getitem__(self, index: int) -> int:
if index < 0:
index = len(self) + index
if index < 0 or index >= len(self):
raise IndexError("索引超出范围")
return self.start + index * self.step
def __contains__(self, value: int) -> bool:
if value < self.start or value >= self.stop:
return False
return (value - self.start) % self.step == 0
def lazy_map(func: Callable, iterable: Iterable) -> Generator:
for item in iterable:
yield func(item)
def lazy_filter(predicate: Callable, iterable: Iterable) -> Generator:
for item in iterable:
if predicate(item):
yield item
def lazy_zip(*iterables: Iterable) -> Generator:
iterators = [iter(it) for it in iterables]
while True:
try:
yield tuple(next(it) for it in iterators)
except StopIteration:
break
def lazy_take(n: int, iterable: Iterable) -> Generator:
for i, item in enumerate(iterable):
if i >= n:
break
yield item
def lazy_drop(n: int, iterable: Iterable) -> Generator:
for i, item in enumerate(iterable):
if i >= n:
yield item
@lazy
def expensive_computation(x: int) -> int:
print(f"计算 {x}...")
return x ** 2
thunk = expensive_computation(10)
print("Thunk创建,尚未计算")
print(f"强制计算: {thunk.force()}")
print(f"再次获取: {thunk.force()}")
lazy_seq = LazySequence(range(1000000))
print(f"第一个元素: {lazy_seq[0]}")
print(f"第十个元素: {lazy_seq[10]}")
print(f"已缓存元素数: {lazy_seq.cache_size()}")对象池模式
UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ 对象池模式类图 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ <<interface>> │ │ PoolConfig │ │
│ │ PoolableObject │ ├─────────────────┤ │
│ ├─────────────────┤ │ min_size: int │ │
│ │ + reset() │ │ max_size: int │ │
│ │ + is_valid() │ │ max_idle_time │ │
│ └────────┬────────┘ │ validation_int │ │
│ │ └─────────────────┘ │
│ │ │
│ ┌────────▼────────┐ ┌─────────────────┐ │
│ │ ObjectPool<T> │◄────────│ ResourceManager │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ - _pool: Queue │ │ - pool: ObjectPool│ │
│ │ - _active: Set │ │ - stats: Stats │ │
│ │ - _factory: Func│ ├─────────────────┤ │
│ ├─────────────────┤ │ + use(): T │ │
│ │ + acquire(): T │ │ + get_stats() │ │
│ │ + release(T) │ └─────────────────┘ │
│ │ + get_stats() │ │
│ └────────┬────────┘ │
│ │ │
│ ┌─────┴─────┐ │
│ │ │ │
│ ┌──▼────────┐ ┌▼──────────────┐ │
│ │Connection │ │ ThreadPool │ │
│ │ Pool │ │ │ │
│ └───────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘通用对象池
from typing import Generic, TypeVar, Callable, Optional, List, ContextManager
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
from contextlib import contextmanager
import threading
import time
from queue import Queue, Empty, Full
T = TypeVar('T')
class PoolableObject(ABC):
@abstractmethod
def reset(self):
pass
@abstractmethod
def is_valid(self) -> bool:
pass
@dataclass
class PoolConfig:
min_size: int = 5
max_size: int = 20
max_idle_time: float = 300.0
validation_interval: float = 60.0
acquire_timeout: float = 30.0
@dataclass
class PoolStats:
total_created: int = 0
total_destroyed: int = 0
active_count: int = 0
idle_count: int = 0
acquire_count: int = 0
release_count: int = 0
wait_time_total: float = 0.0
@property
def utilization(self) -> float:
total = self.active_count + self.idle_count
return self.active_count / total if total > 0 else 0.0
@dataclass
class PooledObject(Generic[T]):
obj: T
created_at: float
last_used: float
use_count: int = 0
class ObjectPool(Generic[T]):
def __init__(self, factory: Callable[[], T],
config: PoolConfig = None,
validator: Callable[[T], bool] = None,
destroyer: Callable[[T], None] = None):
self._factory = factory
self._config = config or PoolConfig()
self._validator = validator
self._destroyer = destroyer
self._pool: Queue[PooledObject[T]] = Queue()
self._active: Dict[int, PooledObject[T]] = {}
self._lock = threading.RLock()
self._stats = PoolStats()
self._created = 0
self._running = True
for _ in range(self._config.min_size):
self._add_object()
self._start_maintenance()
def acquire(self, timeout: float = None) -> T:
if timeout is None:
timeout = self._config.acquire_timeout
start_time = time.time()
pooled_obj = None
while True:
with self._lock:
if not self._pool.empty():
pooled_obj = self._pool.get_nowait()
elif self._created < self._config.max_size:
pooled_obj = self._create_object()
if pooled_obj is not None:
break
try:
pooled_obj = self._pool.get(timeout=0.1)
break
except Empty:
pass
if time.time() - start_time >= timeout:
raise TimeoutError("获取对象超时")
if self._validator and not self._validator(pooled_obj.obj):
with self._lock:
self._created -= 1
self._stats.total_destroyed += 1
pooled_obj = self._create_object()
pooled_obj.last_used = time.time()
pooled_obj.use_count += 1
with self._lock:
self._active[id(pooled_obj.obj)] = pooled_obj
self._stats.active_count = len(self._active)
self._stats.acquire_count += 1
self._stats.wait_time_total += time.time() - start_time
return pooled_obj.obj
def release(self, obj: T):
with self._lock:
obj_id = id(obj)
if obj_id not in self._active:
return
pooled_obj = self._active.pop(obj_id)
self._stats.active_count = len(self._active)
self._stats.release_count += 1
if hasattr(obj, 'reset'):
try:
obj.reset()
except Exception as e:
print(f"重置对象失败: {e}")
with self._lock:
if self._created <= self._config.max_size:
self._pool.put(pooled_obj)
self._stats.idle_count = self._pool.qsize()
def _create_object(self) -> PooledObject[T]:
obj = self._factory()
self._created += 1
self._stats.total_created += 1
return PooledObject(
obj=obj,
created_at=time.time(),
last_used=time.time()
)
def _add_object(self) -> Optional[PooledObject[T]]:
if self._created >= self._config.max_size:
return None
pooled_obj = self._create_object()
self._pool.put(pooled_obj)
self._stats.idle_count = self._pool.qsize()
return pooled_obj
def _start_maintenance(self):
def maintenance():
while self._running:
time.sleep(self._config.validation_interval)
self._evict_idle()
thread = threading.Thread(target=maintenance, daemon=True)
thread.start()
def _evict_idle(self):
now = time.time()
with self._lock:
temp_list = []
while not self._pool.empty():
try:
pooled_obj = self._pool.get_nowait()
idle_time = now - pooled_obj.last_used
if idle_time < self._config.max_idle_time:
temp_list.append(pooled_obj)
else:
if self._destroyer:
self._destroyer(pooled_obj.obj)
self._created -= 1
self._stats.total_destroyed += 1
except Empty:
break
for pooled_obj in temp_list:
self._pool.put(pooled_obj)
self._stats.idle_count = self._pool.qsize()
def get_stats(self) -> PoolStats:
with self._lock:
return PoolStats(
total_created=self._stats.total_created,
total_destroyed=self._stats.total_destroyed,
active_count=self._stats.active_count,
idle_count=self._stats.idle_count,
acquire_count=self._stats.acquire_count,
release_count=self._stats.release_count,
wait_time_total=self._stats.wait_time_total
)
def shutdown(self):
self._running = False
with self._lock:
while not self._pool.empty():
try:
pooled_obj = self._pool.get_nowait()
if self._destroyer:
self._destroyer(pooled_obj.obj)
except Empty:
break
class ConnectionPool(ObjectPool):
def __init__(self, connection_factory: Callable,
max_connections: int = 10,
connection_timeout: float = 30.0):
config = PoolConfig(
min_size=2,
max_size=max_connections,
acquire_timeout=connection_timeout
)
super().__init__(
factory=connection_factory,
config=config,
validator=self._validate_connection,
destroyer=self._close_connection
)
def _validate_connection(self, conn) -> bool:
try:
if hasattr(conn, 'ping'):
return conn.ping()
if hasattr(conn, 'is_connected'):
return conn.is_connected()
return True
except Exception:
return False
def _close_connection(self, conn):
try:
if hasattr(conn, 'close'):
conn.close()
except Exception as e:
print(f"关闭连接失败: {e}")
class ResourceManager(Generic[T]):
def __init__(self, pool: ObjectPool[T]):
self._pool = pool
self._stats = PoolStats()
self._lock = threading.Lock()
@contextmanager
def use(self, timeout: float = None) -> T:
resource = None
try:
resource = self._pool.acquire(timeout)
with self._lock:
self._stats.acquire_count += 1
self._stats.active_count += 1
self._stats.idle_count -= 1
yield resource
finally:
if resource:
self._pool.release(resource)
with self._lock:
self._stats.release_count += 1
self._stats.active_count -= 1
self._stats.idle_count += 1
def get_stats(self) -> PoolStats:
with self._lock:
return PoolStats(
total_created=self._stats.total_created,
active_count=self._stats.active_count,
idle_count=self._stats.idle_count,
acquire_count=self._stats.acquire_count,
release_count=self._stats.release_count
)
class DatabaseConnection:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self._connected = True
print(f"创建连接: {connection_string}")
def execute(self, query: str):
if not self._connected:
raise Exception("连接已关闭")
return f"执行: {query}"
def reset(self):
self._connected = True
def is_valid(self) -> bool:
return self._connected
def ping(self) -> bool:
return self._connected
def close(self):
self._connected = False
print("关闭连接")
def create_db_connection():
return DatabaseConnection("mysql://localhost:3306/db")
pool = ConnectionPool(create_db_connection, max_connections=5)
conn1 = pool.acquire()
conn2 = pool.acquire()
print(f"连接池状态: {pool.get_stats()}")
conn1.execute("SELECT * FROM users")
pool.release(conn1)
print(f"释放后状态: {pool.get_stats()}")
resource_manager = ResourceManager(pool)
with resource_manager.use() as conn:
conn.execute("SELECT 1")
print(f"资源统计: {resource_manager.get_stats()}")内存优化模式
__slots__优化
from typing import ClassVar, Dict, Any, Set
from dataclasses import dataclass
from weakref import WeakValueDictionary, WeakKeyDictionary, ref, WeakSet
import sys
import gc
class Point:
__slots__ = ('x', 'y')
def __init__(self, x: float, y: float):
self.x = x
self.y = y
def distance(self, other: 'Point') -> float:
return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5
def __repr__(self) -> str:
return f"Point({self.x}, {self.y})"
class PointNoSlots:
def __init__(self, x: float, y: float):
self.x = x
self.y = y
def distance(self, other: 'PointNoSlots') -> float:
return ((self.x - other.x) ** 2 + (self.y - other.y) ** 2) ** 0.5
class OptimizedClass:
__slots__ = ('_data', '_cache')
_class_data: ClassVar[Dict[str, Any]] = {}
def __init__(self):
self._data = {}
self._cache = {}
def get(self, key: str) -> Any:
if key in self._cache:
return self._cache[key]
return self._data.get(key)
def set(self, key: str, value: Any):
self._data[key] = value
self._cache[key] = value
class SlottedDataClass:
__slots__ = ('id', 'name', 'value', '_hash')
def __init__(self, id: int, name: str, value: float):
self.id = id
self.name = name
self.value = value
self._hash = hash((id, name, value))
def __hash__(self):
return self._hash
def __eq__(self, other):
if not isinstance(other, SlottedDataClass):
return False
return (self.id, self.name, self.value) == (other.id, other.name, other.value)
def __repr__(self) -> str:
return f"SlottedDataClass({self.id}, {self.name}, {self.value})"
class SlottedContainer:
__slots__ = ('_items', '_count')
def __init__(self):
self._items = []
self._count = 0
def add(self, item):
self._items.append(item)
self._count += 1
def __len__(self):
return self._count
points_slots = [Point(i, i) for i in range(10000)]
points_normal = [PointNoSlots(i, i) for i in range(10000)]
print(f"使用__slots__: {sys.getsizeof(points_slots[0])} bytes")
print(f"不使用__slots__: {sys.getsizeof(points_normal[0])} bytes")
slots_size = sum(sys.getsizeof(p) for p in points_slots)
normal_size = sum(sys.getsizeof(p) for p in points_normal)
print(f"总内存节省: {(normal_size - slots_size) / 1024 / 1024:.2f} MB")弱引用模式
class CachedObject:
def __init__(self, key: str, value: Any):
self.key = key
self.value = value
print(f"创建对象: {key}")
def __del__(self):
print(f"销毁对象: {self.key}")
def __repr__(self) -> str:
return f"CachedObject({self.key}, {self.value})"
class WeakCache:
def __init__(self):
self._cache: WeakValueDictionary = WeakValueDictionary()
self._lock = threading.RLock()
def get(self, key: str) -> Optional[CachedObject]:
with self._lock:
return self._cache.get(key)
def set(self, key: str, obj: CachedObject):
with self._lock:
self._cache[key] = obj
def __contains__(self, key: str) -> bool:
return key in self._cache
def keys(self) -> Set[str]:
with self._lock:
return set(self._cache.keys())
class ObserverPattern:
def __init__(self):
self._observers: WeakSet = WeakSet()
self._lock = threading.Lock()
def add_observer(self, observer: Any):
with self._lock:
self._observers.add(observer)
def remove_observer(self, observer: Any):
with self._lock:
self._observers.discard(observer)
def notify(self, message: str):
with self._lock:
observers = list(self._observers)
for observer in observers:
if hasattr(observer, 'on_notify'):
try:
observer.on_notify(message)
except Exception as e:
print(f"通知失败: {e}")
class PropertyTracker:
def __init__(self):
self._data: WeakKeyDictionary = WeakKeyDictionary()
self._lock = threading.RLock()
def track(self, obj: Any, property_name: str, value: Any):
with self._lock:
if obj not in self._data:
self._data[obj] = {}
self._data[obj][property_name] = value
def get_tracked(self, obj: Any) -> Dict:
with self._lock:
return self._data.get(obj, {}).copy()
def get_all_tracked(self) -> Dict:
with self._lock:
return {obj: data.copy() for obj, data in self._data.items()}
class FlyweightFactory:
_instances: Dict[str, 'Flyweight'] = {}
_weak_instances: WeakValueDictionary = WeakValueDictionary()
@classmethod
def get_flyweight(cls, key: str, use_weak: bool = False) -> 'Flyweight':
cache = cls._weak_instances if use_weak else cls._instances
if key not in cache:
cache[key] = Flyweight(key)
return cache[key]
@classmethod
def get_stats(cls) -> Dict:
return {
'strong_refs': len(cls._instances),
'weak_refs': len(cls._weak_instances)
}
class Flyweight:
def __init__(self, shared_state: str):
self.shared_state = shared_state
def operation(self, unique_state: str) -> str:
return f"共享: {self.shared_state}, 唯一: {unique_state}"
class CachedProperty:
def __init__(self, func: Callable):
self.func = func
self.attr_name = None
def __set_name__(self, owner, name):
self.attr_name = name
def __get__(self, obj, owner=None):
if obj is None:
return self
cache = obj.__dict__.setdefault('_cached_props', {})
if self.attr_name not in cache:
cache[self.attr_name] = self.func(obj)
return cache[self.attr_name]
def __delete__(self, obj):
cache = getattr(obj, '_cached_props', {})
if self.attr_name in cache:
del cache[self.attr_name]
weak_cache = WeakCache()
obj1 = CachedObject("key1", "value1")
weak_cache.set("key1", obj1)
print(f"获取对象: {weak_cache.get('key1')}")
del obj1
gc.collect()
print(f"删除后: {weak_cache.get('key1')}")并发优化模式
线程安全缓存
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
from dataclasses import dataclass, field
class ThreadSafeCache(Generic[K, V]):
def __init__(self, max_size: int = 1000):
self._cache: Dict[K, V] = {}
self._lock = threading.RLock()
self._max_size = max_size
self._stats = CacheStats(max_size=max_size)
def get(self, key: K) -> Optional[V]:
with self._lock:
value = self._cache.get(key)
if value is not None:
self._stats.hits += 1
else:
self._stats.misses += 1
return value
def set(self, key: K, value: V):
with self._lock:
if len(self._cache) >= self._max_size and key not in self._cache:
evicted_key = next(iter(self._cache))
del self._cache[evicted_key]
self._cache[key] = value
self._stats.size = len(self._cache)
def get_or_set(self, key: K, factory: Callable[[], V]) -> V:
with self._lock:
if key in self._cache:
self._stats.hits += 1
return self._cache[key]
self._stats.misses += 1
value = factory()
self._cache[key] = value
self._stats.size = len(self._cache)
return value
def compute_if_absent(self, key: K,
loader: Callable[[], V]) -> V:
return self.get_or_set(key, loader)
def invalidate(self, key: K):
with self._lock:
if key in self._cache:
del self._cache[key]
self._stats.size = len(self._cache)
def clear(self):
with self._lock:
self._cache.clear()
self._stats.size = 0
def get_stats(self) -> CacheStats:
with self._lock:
return CacheStats(
hits=self._stats.hits,
misses=self._stats.misses,
size=self._stats.size,
max_size=self._stats.max_size
)
class AsyncCache(Generic[K, V]):
def __init__(self, max_workers: int = 10):
self._cache: Dict[K, V] = {}
self._futures: Dict[K, Future] = {}
self._lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._stats = CacheStats()
def get_or_compute(self, key: K,
compute: Callable[[], V]) -> Future:
with self._lock:
if key in self._cache:
future = Future()
future.set_result(self._cache[key])
self._stats.hits += 1
return future
if key in self._futures:
return self._futures[key]
future = self._executor.submit(self._compute, key, compute)
self._futures[key] = future
self._stats.misses += 1
return future
def _compute(self, key: K, compute: Callable[[], V]) -> V:
result = compute()
with self._lock:
self._cache[key] = result
if key in self._futures:
del self._futures[key]
return result
def invalidate(self, key: K):
with self._lock:
self._cache.pop(key, None)
if key in self._futures:
future = self._futures.pop(key)
future.cancel()
def shutdown(self):
self._executor.shutdown(wait=True)
class BoundedExecutor:
def __init__(self, max_workers: int, max_queue_size: int):
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._semaphore = threading.Semaphore(max_queue_size)
self._submitted = 0
self._completed = 0
self._lock = threading.Lock()
def submit(self, fn: Callable, *args, **kwargs) -> Future:
acquired = self._semaphore.acquire(timeout=30.0)
if not acquired:
raise RuntimeError("队列已满,提交超时")
def wrapped():
try:
return fn(*args, **kwargs)
finally:
with self._lock:
self._completed += 1
self._semaphore.release()
with self._lock:
self._submitted += 1
return self._executor.submit(wrapped)
def get_stats(self) -> Dict:
with self._lock:
return {
'submitted': self._submitted,
'completed': self._completed,
'pending': self._submitted - self._completed
}
def shutdown(self):
self._executor.shutdown(wait=True)
class ProducerConsumer:
def __init__(self, num_consumers: int = 3, queue_size: int = 100):
self._queue: Queue = Queue(maxsize=queue_size)
self._consumers: List[threading.Thread] = []
self._running = False
self._results: Dict[int, Any] = {}
self._result_lock = threading.Lock()
self._task_id = 0
self._num_consumers = num_consumers
def start(self):
self._running = True
for i in range(self._num_consumers):
consumer = threading.Thread(
target=self._consume,
args=(i,),
daemon=True
)
consumer.start()
self._consumers.append(consumer)
def stop(self):
self._running = False
for _ in self._consumers:
try:
self._queue.put(None, timeout=1.0)
except Full:
pass
def produce(self, task: Callable) -> int:
with self._result_lock:
task_id = self._task_id
self._task_id += 1
self._queue.put((task_id, task))
return task_id
def _consume(self, consumer_id: int):
while self._running:
try:
item = self._queue.get(timeout=0.5)
except Empty:
continue
if item is None:
break
task_id, task = item
try:
result = task()
with self._result_lock:
self._results[task_id] = result
except Exception as e:
with self._result_lock:
self._results[task_id] = e
def get_result(self, task_id: int, timeout: float = None) -> Any:
start = time.time()
while True:
with self._result_lock:
if task_id in self._results:
return self._results.pop(task_id)
if timeout and time.time() - start > timeout:
raise TimeoutError("等待结果超时")
time.sleep(0.01)
async_cache = AsyncCache[str, str]()
def slow_computation():
time.sleep(1)
return "计算结果"
future = async_cache.get_or_compute("key1", slow_computation)
print("提交计算任务...")
result = future.result()
print(f"结果: {result}")性能分析工具
性能分析器
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass, field
from functools import wraps
import time
import sys
from collections import defaultdict
import cProfile
import pstats
import io
import tracemalloc
from contextlib import contextmanager
@dataclass
class TimingResult:
name: str
total_time: float
call_count: int
avg_time: float
min_time: float
max_time: float
std_dev: float = 0.0
@property
def ops_per_second(self) -> float:
return self.call_count / self.total_time if self.total_time > 0 else 0.0
@dataclass
class MemorySnapshot:
label: str
timestamp: float
current_memory: int
peak_memory: int
class PerformanceProfiler:
def __init__(self):
self._timings: Dict[str, List[float]] = defaultdict(list)
self._enabled = True
self._lock = threading.Lock()
def enable(self):
self._enabled = True
def disable(self):
self._enabled = False
def record(self, name: str, duration: float):
if self._enabled:
with self._lock:
self._timings[name].append(duration)
def get_results(self) -> List[TimingResult]:
results = []
with self._lock:
for name, times in self._timings.items():
if not times:
continue
avg = sum(times) / len(times)
variance = sum((t - avg) ** 2 for t in times) / len(times)
results.append(TimingResult(
name=name,
total_time=sum(times),
call_count=len(times),
avg_time=avg,
min_time=min(times),
max_time=max(times),
std_dev=variance ** 0.5
))
return sorted(results, key=lambda r: r.total_time, reverse=True)
def clear(self):
with self._lock:
self._timings.clear()
def print_report(self):
print("\n=== 性能分析报告 ===")
print(f"{'函数名':<30} {'调用次数':>10} {'总时间':>12} {'平均时间':>12} {'标准差':>12}")
print("-" * 80)
for result in self.get_results():
print(f"{result.name:<30} {result.call_count:>10} "
f"{result.total_time:>12.4f} {result.avg_time:>12.6f} {result.std_dev:>12.6f}")
profiler = PerformanceProfiler()
def profile(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return func(*args, **kwargs)
finally:
duration = time.perf_counter() - start
profiler.record(func.__name__, duration)
return wrapper
class MemoryTracker:
def __init__(self):
self._snapshots: List[MemorySnapshot] = []
self._tracking = False
def start(self):
tracemalloc.start()
self._tracking = True
def stop(self):
if self._tracking:
tracemalloc.stop()
self._tracking = False
def snapshot(self, label: str = ""):
if not self._tracking:
self.start()
current, peak = tracemalloc.get_traced_memory()
self._snapshots.append(MemorySnapshot(
label=label,
timestamp=time.time(),
current_memory=current,
peak_memory=peak
))
def get_report(self) -> str:
if len(self._snapshots) < 2:
return "需要至少两个快照"
lines = ["内存使用报告:"]
prev = None
for snap in self._snapshots:
if prev:
diff = snap.current_memory - prev.current_memory
lines.append(
f" {snap.label}: {snap.current_memory / 1024 / 1024:.2f}MB "
f"(变化: {diff / 1024 / 1024:+.2f}MB, 峰值: {snap.peak_memory / 1024 / 1024:.2f}MB)"
)
else:
lines.append(
f" {snap.label}: {snap.current_memory / 1024 / 1024:.2f}MB"
)
prev = snap
return "\n".join(lines)
def get_diff(self, start_idx: int = 0, end_idx: int = -1) -> Dict:
if not self._snapshots:
return {}
start = self._snapshots[start_idx]
end = self._snapshots[end_idx]
return {
'memory_diff': end.current_memory - start.current_memory,
'peak_diff': end.peak_memory - start.peak_memory,
'time_diff': end.timestamp - start.timestamp
}
class Benchmark:
def __init__(self, name: str):
self.name = name
self._results: List[float] = []
def run(self, func: Callable, iterations: int = 1000,
warmup: int = 100) -> Dict:
for _ in range(warmup):
func()
self._results = []
for _ in range(iterations):
start = time.perf_counter()
func()
self._results.append(time.perf_counter() - start)
avg = sum(self._results) / len(self._results)
variance = sum((t - avg) ** 2 for t in self._results) / len(self._results)
return {
'name': self.name,
'iterations': iterations,
'total_time': sum(self._results),
'avg_time': avg,
'min_time': min(self._results),
'max_time': max(self._results),
'std_dev': variance ** 0.5,
'ops_per_second': iterations / sum(self._results)
}
def compare(self, other: 'Benchmark') -> Dict:
if not self._results or not other._results:
return {}
self_avg = sum(self._results) / len(self._results)
other_avg = sum(other._results) / len(other._results)
return {
'name1': self.name,
'name2': other.name,
'speedup': other_avg / self_avg if self_avg > 0 else 0,
'time_diff': self_avg - other_avg
}
@contextmanager
def timeit(name: str = "block"):
start = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start
print(f"{name} 耗时: {duration:.6f}秒")
@profile
def test_function():
return sum(range(1000))
for _ in range(100):
test_function()
profiler.print_report()
benchmark = Benchmark("sum_range")
result = benchmark.run(lambda: sum(range(10000)), iterations=1000)
print(f"\n基准测试结果: {result}")企业级应用示例
高性能数据处理管道
from typing import Iterator, Callable, Any, List, Dict, Protocol
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
import threading
from queue import Queue, Empty
import time
class Stage(Protocol):
def process(self, data: Any) -> Any:
...
@dataclass
class PipelineConfig:
batch_size: int = 100
num_workers: int = 4
queue_size: int = 1000
timeout: float = 30.0
enable_cache: bool = True
cache_size: int = 1000
@dataclass
class PipelineStats:
items_processed: int = 0
batches_processed: int = 0
errors: int = 0
total_time: float = 0.0
avg_batch_time: float = 0.0
class DataPipeline:
def __init__(self, config: PipelineConfig = None):
self.config = config or PipelineConfig()
self._stages: List[Callable] = []
self._queue: Queue = Queue(maxsize=self.config.queue_size)
self._output_queue: Queue = Queue(maxsize=self.config.queue_size)
self._running = False
self._stats = PipelineStats()
self._stats_lock = threading.Lock()
if self.config.enable_cache:
self._cache = ThreadSafeCache(max_size=self.config.cache_size)
else:
self._cache = None
def add_stage(self, stage: Callable) -> 'DataPipeline':
self._stages.append(stage)
return self
def process(self, data: Iterator) -> Iterator:
self._running = True
self._stats = PipelineStats()
producer = threading.Thread(
target=self._produce,
args=(data,),
daemon=True
)
producer.start()
workers = []
for i in range(self.config.num_workers):
worker = threading.Thread(
target=self._worker,
args=(i,),
daemon=True
)
worker.start()
workers.append(worker)
active_workers = self.config.num_workers
while self._running:
try:
result = self._output_queue.get(timeout=0.1)
if result is None:
active_workers -= 1
if active_workers == 0:
break
continue
yield result
except Empty:
if not producer.is_alive() and self._queue.empty():
break
self._running = False
def _produce(self, data: Iterator):
batch = []
for item in data:
batch.append(item)
if len(batch) >= self.config.batch_size:
self._queue.put(batch)
batch = []
if batch:
self._queue.put(batch)
for _ in range(self.config.num_workers):
self._queue.put(None)
def _worker(self, worker_id: int):
while self._running:
try:
batch = self._queue.get(timeout=0.5)
except Empty:
continue
if batch is None:
self._output_queue.put(None)
break
start_time = time.time()
results = []
for item in batch:
try:
result = self._process_item(item)
results.append(result)
with self._stats_lock:
self._stats.items_processed += 1
except Exception as e:
with self._stats_lock:
self._stats.errors += 1
self._output_queue.put(results)
with self._stats_lock:
self._stats.batches_processed += 1
self._stats.total_time += time.time() - start_time
def _process_item(self, item: Any) -> Any:
result = item
for stage in self._stages:
cache_key = None
if self._cache:
cache_key = f"{stage.__name__}:{hash(str(result))}"
cached = self._cache.get(cache_key)
if cached is not None:
result = cached
continue
result = stage(result)
if self._cache and cache_key:
self._cache.set(cache_key, result)
return result
def get_stats(self) -> PipelineStats:
with self._stats_lock:
avg_time = 0.0
if self._stats.batches_processed > 0:
avg_time = self._stats.total_time / self._stats.batches_processed
return PipelineStats(
items_processed=self._stats.items_processed,
batches_processed=self._stats.batches_processed,
errors=self._stats.errors,
total_time=self._stats.total_time,
avg_batch_time=avg_time
)
class CachedDataLoader:
def __init__(self, loader: Callable, cache_size: int = 1000):
self._loader = loader
self._cache = ThreadSafeCache(max_size=cache_size)
self._prefetch_queue: Queue = Queue(maxsize=100)
self._prefetch_running = False
def load(self, key: str) -> Any:
cached = self._cache.get(key)
if cached is not None:
return cached
data = self._loader(key)
self._cache.set(key, data)
return data
def preload(self, keys: List[str], num_workers: int = 4):
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(self.load, key) for key in keys]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"预加载失败: {e}")
def start_prefetch(self, key_generator: Callable[[], Iterator[str]]):
self._prefetch_running = True
def prefetch_worker():
for key in key_generator():
if not self._prefetch_running:
break
try:
self.load(key)
except Exception:
pass
thread = threading.Thread(target=prefetch_worker, daemon=True)
thread.start()
def stop_prefetch(self):
self._prefetch_running = False
def get_cache_stats(self) -> CacheStats:
return self._cache.get_stats()
config = PipelineConfig(batch_size=50, num_workers=4, enable_cache=True)
pipeline = DataPipeline(config)
pipeline.add_stage(lambda x: x * 2)
pipeline.add_stage(lambda x: x + 10)
data = range(1000)
results = list(pipeline.process(iter(data)))
print(f"处理结果数量: {len(results)}")
print(f"前5个结果: {results[:5]}")
print(f"管道统计: {pipeline.get_stats()}")反模式与陷阱
常见性能反模式
| 反模式 | 问题描述 | 解决方案 |
|---|---|---|
| 过早优化 | 在没有测量数据的情况下优化 | 先测量,后优化 |
| 缓存滥用 | 缓存所有内容导致内存问题 | 只缓存热点数据,设置TTL |
| 过度池化 | 为轻量级对象创建对象池 | 只对创建成本高的对象池化 |
| 锁竞争 | 过度使用锁导致性能下降 | 使用无锁数据结构或减少锁粒度 |
| 内存泄漏 | 缓存/闭包导致的内存泄漏 | 使用弱引用,定期清理 |
| 批量操作 | 逐条处理而非批量处理 | 使用批量API |
反模式示例
class PerformanceAntiPatterns:
@staticmethod
def bad_cache_pattern():
cache = {}
def get_data(key):
if key not in cache:
cache[key] = expensive_load(key)
return cache[key]
return cache
@staticmethod
def good_cache_pattern():
cache = MemoryCache(max_size=1000, default_ttl=300.0)
def get_data(key):
result = cache.get(key)
if result is None:
result = expensive_load(key)
cache.set(key, result)
return result
return cache
def expensive_load(key):
return f"data_{key}"
class MemoryLeakExample:
def __init__(self):
self._callbacks = []
def register_callback(self, callback):
self._callbacks.append(callback)
def trigger(self, data):
for callback in self._callbacks:
callback(data)
class FixedMemoryLeak:
def __init__(self):
self._callbacks = WeakSet()
def register_callback(self, callback):
self._callbacks.add(callback)
def trigger(self, data):
for callback in list(self._callbacks):
if hasattr(callback, '__call__'):
callback(data)决策指南
缓存策略选择
┌─────────────────────────────────────────────────────────────────────────┐
│ 缓存策略决策树 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 数据访问模式? │
│ │ │
│ ├── 时间局部性强 ──► LRU Cache │
│ │ │
│ ├── 频率局部性强 ──► LFU Cache │
│ │ │
│ ├── 混合模式 ──► ARC (Adaptive Replacement Cache) │
│ │ │
│ └── 顺序访问 ──► FIFO Cache │
│ │
│ 缓存一致性要求? │
│ │ │
│ ├── 强一致性 ──► Write-Through Cache │
│ │ │
│ ├── 最终一致性 ──► Write-Behind Cache │
│ │ │
│ └── 读多写少 ──► Cache-Aside Pattern │
│ │
└─────────────────────────────────────────────────────────────────────────┘对象池使用决策
| 条件 | 使用对象池 | 不使用对象池 |
|---|---|---|
| 对象创建成本 | 高(连接、大对象) | 低(简单对象) |
| 对象使用频率 | 高频使用 | 低频使用 |
| 对象生命周期 | 可重置 | 不可重置 |
| 并发需求 | 高并发场景 | 单线程场景 |
性能优化优先级
优先级 1: 算法优化 (O(n²) → O(n log n))
│
优先级 2: 数据结构选择 (list → set/dict for lookup)
│
优先级 3: 缓存策略 (重复计算 → 缓存结果)
│
优先级 4: 并发处理 (串行 → 并行)
│
优先级 5: 内存优化 (__slots__, 弱引用)
│
优先级 6: 底层优化 (Cython, C扩展)快速参考卡片
缓存模式速查
| 模式 | 适用场景 | 关键特性 |
|---|---|---|
| LRU | 时间局部性强 | 最近最少使用淘汰 |
| LFU | 频率局部性强 | 最少频率使用淘汰 |
| ARC | 混合访问模式 | 自适应调整 |
| Read-Through | 读密集型 | 透明加载 |
| Write-Through | 强一致性 | 同步写入 |
| Write-Behind | 高吞吐量 | 异步写入 |
性能优化检查清单
- [ ] 使用性能分析工具定位瓶颈
- [ ] 检查算法时间复杂度
- [ ] 评估数据结构选择
- [ ] 考虑缓存策略
- [ ] 评估并发可能性
- [ ] 检查内存使用
- [ ] 建立基准测试
- [ ] 验证优化效果
Python性能优化技巧
# 1. 使用内置函数
sum(range(1000)) # 快于 for循环累加
# 2. 使用生成器
(x for x in range(1000000)) # 节省内存
# 3. 使用__slots__
class Point:
__slots__ = ('x', 'y')
# 4. 使用缓存
@lru_cache(maxsize=128)
def fib(n):
return n if n < 2 else fib(n-1) + fib(n-2)
# 5. 使用局部变量
def process(data):
local_func = some_func # 局部引用更快
return [local_func(x) for x in data]
# 6. 使用join而非+
''.join(strings) # 快于 s += s2
# 7. 使用集合进行成员检查
if item in my_set: # O(1) vs O(n) for list
pass总结
性能优化是一个系统工程,需要从算法、架构、代码、内存等多个层面综合考虑。本章介绍了缓存模式、惰性加载模式、对象池模式、内存优化模式和并发优化模式,并提供了形式化的数学定义和企业级实现示例。
关键要点:
- 测量先行:始终以实际测量数据为指导进行优化
- 选择合适策略:根据访问模式选择缓存策略
- 权衡取舍:在内存、CPU、一致性之间做出合理权衡
- 持续监控:建立性能基准,持续监控优化效果
记住Donald Knuth的名言:"过早优化是万恶之源",但也要认识到,正确的优化模式可以带来数量级的性能提升。