第55章 高性能计算
学习目标
完成本章学习后,你将能够:
- 理解并行计算:多进程、多线程、并发模型
- 使用NumPy优化:向量化运算、内存布局、广播机制
- 实现多进程计算:进程池、共享内存、进程间通信
- 使用GPU加速:CUDA编程、Numba JIT、CuPy库
- 进行分布式计算:Dask框架、任务调度、数据分片
- 优化内存管理:内存映射、零拷贝、内存池
- 实现算法优化:缓存优化、分支预测、SIMD指令
- 进行性能分析:性能剖析、热点分析、优化策略
55.1 NumPy高性能计算
55.1.1 向量化运算
python
import numpy as np
from typing import Tuple, List, Optional
from dataclasses import dataclass
import time
class VectorizedOperations:
@staticmethod
def element_wise_operations():
size = 10_000_000
a = np.random.rand(size)
b = np.random.rand(size)
start = time.time()
result = a + b
add_time = time.time() - start
start = time.time()
result = a * b
mul_time = time.time() - start
start = time.time()
result = np.sqrt(a)
sqrt_time = time.time() - start
return {
"add_time": add_time,
"mul_time": mul_time,
"sqrt_time": sqrt_time
}
@staticmethod
def reduction_operations():
size = 10_000_000
a = np.random.rand(size)
start = time.time()
total = np.sum(a)
sum_time = time.time() - start
start = time.time()
mean = np.mean(a)
mean_time = time.time() - start
start = time.time()
std = np.std(a)
std_time = time.time() - start
start = time.time()
sorted_a = np.sort(a)
sort_time = time.time() - start
return {
"sum_time": sum_time,
"mean_time": mean_time,
"std_time": std_time,
"sort_time": sort_time
}
@staticmethod
def matrix_operations():
n = 1000
a = np.random.rand(n, n)
b = np.random.rand(n, n)
start = time.time()
c = np.dot(a, b)
dot_time = time.time() - start
start = time.time()
d = a @ b
matmul_time = time.time() - start
start = time.time()
eigenvalues = np.linalg.eigvals(a)
eig_time = time.time() - start
start = time.time()
inv = np.linalg.inv(a)
inv_time = time.time() - start
return {
"dot_time": dot_time,
"matmul_time": matmul_time,
"eig_time": eig_time,
"inv_time": inv_time
}
class BroadcastingDemo:
@staticmethod
def broadcast_examples():
a = np.array([[1, 2, 3], [4, 5, 6]])
b = np.array([10, 20, 30])
result1 = a + b
c = np.array([[1], [2], [3]])
d = np.array([10, 20])
result2 = c + d
return {
"shape_a": a.shape,
"shape_b": b.shape,
"result1": result1,
"shape_c": c.shape,
"shape_d": d.shape,
"result2": result2
}
class StrideOptimization:
@staticmethod
def demonstrate_stride():
arr = np.arange(1000000).reshape(1000, 1000)
start = time.time()
for i in range(1000):
_ = arr[i, :]
row_time = time.time() - start
start = time.time()
for j in range(1000):
_ = arr[:, j]
col_time = time.time() - start
arr_c = np.ascontiguousarray(arr)
arr_f = np.asfortranarray(arr)
return {
"row_access_time": row_time,
"col_access_time": col_time,
"c_order_strides": arr_c.strides,
"f_order_strides": arr_f.strides
}
@dataclass
class MemoryLayout:
shape: Tuple[int, ...]
dtype: np.dtype
strides: Tuple[int, ...]
flags: dict
class ArrayMemoryAnalyzer:
@staticmethod
def analyze(arr: np.ndarray) -> MemoryLayout:
return MemoryLayout(
shape=arr.shape,
dtype=arr.dtype,
strides=arr.strides,
flags={
"C_CONTIGUOUS": arr.flags['C_CONTIGUOUS'],
"F_CONTIGUOUS": arr.flags['F_CONTIGUOUS'],
"OWNDATA": arr.flags['OWNDATA'],
"WRITEABLE": arr.flags['WRITEABLE']
}
)
@staticmethod
def optimize_for_operation(arr: np.ndarray, operation: str) -> np.ndarray:
if operation == "row_access":
return np.ascontiguousarray(arr)
elif operation == "col_access":
return np.asfortranarray(arr)
elif operation == "transpose":
return np.ascontiguousarray(arr.T)
return arr55.1.2 内存优化
python
from typing import Iterator, Optional
import mmap
import os
class MemoryMappedArray:
def __init__(self, filepath: str, shape: Tuple[int, ...], dtype: np.dtype = np.float64):
self.filepath = filepath
self.shape = shape
self.dtype = dtype
self.size = int(np.prod(shape)) * np.dtype(dtype).itemsize
if not os.path.exists(filepath):
with open(filepath, 'wb') as f:
f.seek(self.size - 1)
f.write(b'\0')
self.file = open(filepath, 'r+b')
self.mmap = mmap.mmap(self.file.fileno(), 0)
self.array = np.frombuffer(self.mmap, dtype=dtype).reshape(shape)
def __getitem__(self, index):
return self.array[index]
def __setitem__(self, index, value):
self.array[index] = value
def close(self) -> None:
self.mmap.close()
self.file.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class ChunkedArray:
def __init__(
self,
shape: Tuple[int, ...],
dtype: np.dtype = np.float64,
chunk_size: int = 1000
):
self.shape = shape
self.dtype = dtype
self.chunk_size = chunk_size
self.chunks: Dict[int, np.ndarray] = {}
def _get_chunk_index(self, index: int) -> int:
return index // self.chunk_size
def _get_local_index(self, index: int) -> int:
return index % self.chunk_size
def _load_chunk(self, chunk_idx: int) -> np.ndarray:
if chunk_idx not in self.chunks:
chunk_shape = (self.chunk_size,) + self.shape[1:]
self.chunks[chunk_idx] = np.zeros(chunk_shape, dtype=self.dtype)
return self.chunks[chunk_idx]
def __getitem__(self, index):
if isinstance(index, int):
chunk_idx = self._get_chunk_index(index)
local_idx = self._get_local_index(index)
return self._load_chunk(chunk_idx)[local_idx]
elif isinstance(index, slice):
return self._get_slice(index)
def __setitem__(self, index, value):
if isinstance(index, int):
chunk_idx = self._get_chunk_index(index)
local_idx = self._get_local_index(index)
self._load_chunk(chunk_idx)[local_idx] = value
def _get_slice(self, s: slice) -> np.ndarray:
start = s.start or 0
stop = s.stop or self.shape[0]
step = s.step or 1
result = []
for i in range(start, stop, step):
result.append(self[i])
return np.array(result)
class ZeroCopyOperations:
@staticmethod
def view_operations():
arr = np.arange(12).reshape(3, 4)
view1 = arr[:2, :]
view2 = arr[::2, ::2]
view3 = arr.T
return {
"original": arr,
"view1_is_view": view1.base is not None,
"view2_is_view": view2.base is not None,
"view3_is_view": view3.base is not None
}
@staticmethod
def avoid_copy():
arr = np.arange(1000000)
view = arr[100:200]
copy = arr[100:200].copy()
return {
"view_memory": view.nbytes,
"copy_memory": copy.nbytes,
"view_base": view.base is not None,
"copy_base": copy.base is not None
}55.2 多进程计算
55.2.1 进程池
python
from multiprocessing import Pool, cpu_count, shared_memory
from typing import Callable, List, Any, Tuple
import numpy as np
class ParallelProcessor:
def __init__(self, num_workers: int = None):
self.num_workers = num_workers or cpu_count()
def map(
self,
func: Callable,
data: List[Any],
chunksize: int = None
) -> List[Any]:
with Pool(self.num_workers) as pool:
if chunksize:
return pool.map(func, data, chunksize=chunksize)
return pool.map(func, data)
def starmap(
self,
func: Callable,
data: List[Tuple],
chunksize: int = None
) -> List[Any]:
with Pool(self.num_workers) as pool:
if chunksize:
return pool.starmap(func, data, chunksize=chunksize)
return pool.starmap(func, data)
def imap(
self,
func: Callable,
data: List[Any],
chunksize: int = 1
) -> Iterator[Any]:
with Pool(self.num_workers) as pool:
yield from pool.imap(func, data, chunksize=chunksize)
def apply_async(
self,
func: Callable,
args: Tuple = (),
callback: Callable = None
):
with Pool(self.num_workers) as pool:
result = pool.apply_async(func, args, callback=callback)
return result.get()
class SharedMemoryArray:
def __init__(self, shape: Tuple[int, ...], dtype: np.dtype = np.float64):
self.shape = shape
self.dtype = dtype
self.size = int(np.prod(shape)) * np.dtype(dtype).itemsize
self.shm = shared_memory.SharedMemory(create=True, size=self.size)
self.array = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf)
def __getitem__(self, index):
return self.array[index]
def __setitem__(self, index, value):
self.array[index] = value
def close(self) -> None:
self.shm.close()
def unlink(self) -> None:
self.shm.unlink()
@property
def name(self) -> str:
return self.shm.name
@classmethod
def connect(cls, name: str, shape: Tuple[int, ...], dtype: np.dtype = np.float64) -> "SharedMemoryArray":
instance = cls.__new__(cls)
instance.shape = shape
instance.dtype = dtype
instance.size = int(np.prod(shape)) * np.dtype(dtype).itemsize
instance.shm = shared_memory.SharedMemory(name=name)
instance.array = np.ndarray(shape, dtype=dtype, buffer=instance.shm.buf)
return instance
def parallel_matrix_multiply(
a: np.ndarray,
b: np.ndarray,
num_workers: int = None
) -> np.ndarray:
num_workers = num_workers or cpu_count()
n = a.shape[0]
m = b.shape[1]
k = a.shape[1]
result = np.zeros((n, m))
def compute_row(i):
return np.dot(a[i, :], b)
with Pool(num_workers) as pool:
rows = pool.map(compute_row, range(n))
for i, row in enumerate(rows):
result[i, :] = row
return result
def parallel_reduction(
arr: np.ndarray,
func: Callable,
num_workers: int = None
) -> Any:
num_workers = num_workers or cpu_count()
chunk_size = len(arr) // num_workers
chunks = [
arr[i * chunk_size:(i + 1) * chunk_size]
for i in range(num_workers)
]
if len(arr) % num_workers != 0:
chunks[-1] = np.concatenate([chunks[-1], arr[num_workers * chunk_size:]])
with Pool(num_workers) as pool:
partial_results = pool.map(func, chunks)
return func(np.array(partial_results))55.3 GPU计算
55.3.1 Numba CUDA
python
from typing import Tuple
import numpy as np
try:
from numba import cuda, jit
NUMBA_AVAILABLE = True
except ImportError:
NUMBA_AVAILABLE = False
if NUMBA_AVAILABLE:
@cuda.jit
def vector_add_kernel(a, b, c):
idx = cuda.grid(1)
if idx < a.size:
c[idx] = a[idx] + b[idx]
@cuda.jit
def matrix_multiply_kernel(a, b, c, m, n, k):
row, col = cuda.grid(2)
if row < m and col < k:
tmp = 0.0
for i in range(n):
tmp += a[row, i] * b[i, col]
c[row, col] = tmp
@cuda.jit
def reduce_kernel(arr, partial_sums):
tid = cuda.threadIdx.x
bid = cuda.blockIdx.x
block_size = cuda.blockDim.x
shared = cuda.shared.array(256, dtype=np.float64)
idx = bid * block_size * 2 + tid
shared[tid] = 0.0
if idx < arr.size:
shared[tid] = arr[idx]
if idx + block_size < arr.size:
shared[tid] += arr[idx + block_size]
cuda.syncthreads()
s = block_size // 2
while s > 0:
if tid < s:
shared[tid] += shared[tid + s]
cuda.syncthreads()
s //= 2
if tid == 0:
partial_sums[bid] = shared[0]
class GPUOperations:
@staticmethod
def vector_add(a: np.ndarray, b: np.ndarray) -> np.ndarray:
if not NUMBA_AVAILABLE:
return a + b
a_device = cuda.to_device(a)
b_device = cuda.to_device(b)
c_device = cuda.device_array_like(a)
threads_per_block = 256
blocks_per_grid = (a.size + threads_per_block - 1) // threads_per_block
vector_add_kernel[blocks_per_grid, threads_per_block](a_device, b_device, c_device)
return c_device.copy_to_host()
@staticmethod
def matrix_multiply(a: np.ndarray, b: np.ndarray) -> np.ndarray:
if not NUMBA_AVAILABLE:
return a @ b
m, n = a.shape
n2, k = b.shape
a_device = cuda.to_device(a)
b_device = cuda.to_device(b)
c_device = cuda.device_array((m, k))
threads_per_block = (16, 16)
blocks_per_grid = (
(m + threads_per_block[0] - 1) // threads_per_block[0],
(k + threads_per_block[1] - 1) // threads_per_block[1]
)
matrix_multiply_kernel[blocks_per_grid, threads_per_block](
a_device, b_device, c_device, m, n, k
)
return c_device.copy_to_host()
@staticmethod
def sum(arr: np.ndarray) -> float:
if not NUMBA_AVAILABLE:
return np.sum(arr)
arr_device = cuda.to_device(arr)
threads_per_block = 256
blocks_per_grid = (arr.size + threads_per_block * 2 - 1) // (threads_per_block * 2)
partial_sums = cuda.device_array(blocks_per_grid, dtype=np.float64)
reduce_kernel[blocks_per_grid, threads_per_block](arr_device, partial_sums)
result = partial_sums.copy_to_host()
return np.sum(result)
class JITCompiler:
@staticmethod
@jit(nopython=True)
def mandelbrot(c: complex, max_iter: int) -> int:
z = 0 + 0j
for i in range(max_iter):
if abs(z) > 2:
return i
z = z * z + c
return max_iter
@staticmethod
@jit(nopython=True, parallel=True)
def compute_mandelbrot(xmin: float, xmax: float, ymin: float, ymax: float,
width: int, height: int, max_iter: int) -> np.ndarray:
result = np.zeros((height, width), dtype=np.int32)
for i in range(height):
for j in range(width):
x = xmin + (xmax - xmin) * j / width
y = ymin + (ymax - ymin) * i / height
c = complex(x, y)
result[i, j] = JITCompiler.mandelbrot(c, max_iter)
return result
@staticmethod
@jit(nopython=True)
def fast_sort(arr: np.ndarray) -> np.ndarray:
result = arr.copy()
n = len(result)
for i in range(n):
for j in range(0, n - i - 1):
if result[j] > result[j + 1]:
result[j], result[j + 1] = result[j + 1], result[j]
return result55.4 性能分析
55.4.1 性能剖析
python
import cProfile
import pstats
import io
from functools import wraps
from typing import Callable, Dict, Any
import time
class Profiler:
def __init__(self):
self.stats: Dict[str, Dict] = {}
def profile(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
start_memory = self._get_memory_usage()
result = func(*args, **kwargs)
end_time = time.perf_counter()
end_memory = self._get_memory_usage()
func_name = func.__name__
if func_name not in self.stats:
self.stats[func_name] = {
"calls": 0,
"total_time": 0.0,
"total_memory": 0.0
}
self.stats[func_name]["calls"] += 1
self.stats[func_name]["total_time"] += end_time - start_time
self.stats[func_name]["total_memory"] += end_memory - start_memory
return result
return wrapper
def _get_memory_usage(self) -> float:
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
except ImportError:
return 0.0
def get_report(self) -> Dict[str, Any]:
report = {}
for func_name, stats in self.stats.items():
calls = stats["calls"]
report[func_name] = {
"calls": calls,
"total_time": stats["total_time"],
"avg_time": stats["total_time"] / calls if calls > 0 else 0,
"total_memory_mb": stats["total_memory"],
"avg_memory_mb": stats["total_memory"] / calls if calls > 0 else 0
}
return report
def reset(self) -> None:
self.stats.clear()
def profile_function(func: Callable) -> str:
pr = cProfile.Profile()
pr.enable()
result = func()
pr.disable()
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats()
return s.getvalue()
class Benchmark:
def __init__(self, name: str):
self.name = name
self.times: List[float] = []
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.perf_counter()
self.times.append(end_time - self.start_time)
def run(self, func: Callable, iterations: int = 100) -> Dict[str, float]:
for _ in range(iterations):
with self:
func()
return {
"name": self.name,
"iterations": iterations,
"total_time": sum(self.times),
"avg_time": sum(self.times) / len(self.times),
"min_time": min(self.times),
"max_time": max(self.times)
}
class HotspotAnalyzer:
def __init__(self):
self.line_times: Dict[str, float] = {}
def analyze(self, code: str, globals_dict: dict = None) -> Dict[str, float]:
try:
from line_profiler import LineProfiler
lp = LineProfiler()
exec(code, globals_dict or {})
for name, obj in (globals_dict or {}).items():
if callable(obj) and not name.startswith('_'):
lp.add_function(obj)
lp.enable()
exec(code, globals_dict or {})
lp.disable()
return {"line_profiler": "available"}
except ImportError:
return {"line_profiler": "not available"}55.5 知识图谱
55.5.1 高性能计算技术架构
┌─────────────────────────────────────────────────────────────────────┐
│ 高性能计算技术架构 │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 应用层 (Application) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 科学计算 │ │ 机器学习 │ │ 数据分析 │ │ 图像处理 │ │ │
│ │ │ SciPy │ │ PyTorch │ │ Pandas │ │ OpenCV │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 加速层 (Acceleration) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ NumPy │ │ Numba │ │ CuPy │ │ JAX │ │ │
│ │ │ 向量化 │ │ JIT编译 │ │ GPU加速 │ │ 自动微分 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 并行层 (Parallelism) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │多进程 │ │ 多线程 │ │ 分布式 │ │ GPU并行 │ │ │
│ │ │multiproc │ │threading│ │ Dask │ │ CUDA │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────┐ │
│ │ 硬件层 (Hardware) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ CPU │ │ GPU │ │ TPU │ │ 集群 │ │ │
│ │ │ 多核 │ │ CUDA核心│ │ 张量处理 │ │ 分布式 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘55.5.2 性能优化策略
┌─────────────────────────────────────────────────────────────────────┐
│ 性能优化策略全景 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 算法优化 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 时间复杂度│ │ 空间复杂度│ │ 缓存友好 │ │ │
│ │ │ O(n)→O(logn)│ │ 减少内存 │ │ 数据局部性│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 计算优化 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 向量化 │ │ JIT编译 │ │ 并行计算 │ │ │
│ │ │ NumPy │ │ Numba │ │ 多进程 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 内存优化 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 内存映射 │ │ 分块处理 │ │ 零拷贝 │ │ │
│ │ │ mmap │ │ chunking │ │ view │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ I/O优化 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 异步I/O │ │ 批量读写 │ │ 压缩存储 │ │ │
│ │ │ asyncio │ │ buffer │ │ parquet │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘55.6 技术选型指南
55.6.1 数值计算库选型
| 库 | 性能 | 功能 | GPU支持 | 推荐指数 |
|---|---|---|---|---|
| NumPy | 高 | 基础数值计算 | ❌ | ★★★★★ |
| SciPy | 高 | 科学计算 | ❌ | ★★★★★ |
| Numba | 极高 | JIT编译 | ✅ | ★★★★★ |
| CuPy | 极高 | GPU数组 | ✅ | ★★★★☆ |
| JAX | 极高 | 自动微分 | ✅ | ★★★★★ |
55.6.2 并行计算框架选型
| 框架 | 适用场景 | 分布式 | GPU支持 | 推荐指数 |
|---|---|---|---|---|
| multiprocessing | CPU密集型 | ❌ | ❌ | ★★★★★ |
| concurrent.futures | 通用并行 | ❌ | ❌ | ★★★★★ |
| Dask | 大规模数据 | ✅ | ✅ | ★★★★★ |
| Ray | 分布式计算 | ✅ | ✅ | ★★★★★ |
| Joblib | 科学计算 | ❌ | ❌ | ★★★★☆ |
55.6.3 性能分析工具选型
| 工具 | 功能 | 开销 | 推荐指数 |
|---|---|---|---|
| cProfile | 函数级分析 | 中 | ★★★★★ |
| line_profiler | 行级分析 | 高 | ★★★★☆ |
| memory_profiler | 内存分析 | 高 | ★★★★☆ |
| py-spy | 采样分析 | 低 | ★★★★★ |
| nvprof | GPU分析 | 中 | ★★★★☆ |
55.7 常见问题与解决方案
55.7.1 内存优化策略
python
import numpy as np
from typing import Tuple, Iterator
import mmap
class MemoryOptimizer:
"""内存优化器"""
@staticmethod
def optimize_dtypes(arr: np.ndarray) -> np.ndarray:
"""优化数据类型"""
if arr.dtype == np.float64:
if np.all(np.isfinite(arr)) and np.all(np.abs(arr) < 3.4e38):
return arr.astype(np.float32)
elif arr.dtype == np.int64:
min_val, max_val = arr.min(), arr.max()
if min_val >= 0:
if max_val < 256:
return arr.astype(np.uint8)
elif max_val < 65536:
return arr.astype(np.uint16)
elif max_val < 4294967296:
return arr.astype(np.uint32)
else:
if min_val > -128 and max_val < 127:
return arr.astype(np.int8)
elif min_val > -32768 and max_val < 32767:
return arr.astype(np.int16)
return arr
@staticmethod
def create_memory_mapped_array(
filepath: str,
shape: Tuple[int, ...],
dtype: np.dtype = np.float64
) -> np.memmap:
"""创建内存映射数组"""
return np.memmap(filepath, dtype=dtype, mode='w+', shape=shape)
@staticmethod
def process_in_chunks(
arr: np.ndarray,
chunk_size: int,
process_func: callable
) -> np.ndarray:
"""分块处理"""
results = []
for i in range(0, len(arr), chunk_size):
chunk = arr[i:i + chunk_size]
results.append(process_func(chunk))
return np.concatenate(results)
class ChunkedArray:
"""分块数组"""
def __init__(
self,
shape: Tuple[int, ...],
chunk_size: int,
dtype: np.dtype = np.float64
):
self.shape = shape
self.chunk_size = chunk_size
self.dtype = dtype
self._chunks = []
def append(self, data: np.ndarray):
"""添加数据"""
self._chunks.append(data.astype(self.dtype))
def get_chunk(self, index: int) -> np.ndarray:
"""获取块"""
return self._chunks[index]
def iterate_chunks(self) -> Iterator[np.ndarray]:
"""迭代块"""
for chunk in self._chunks:
yield chunk
def to_array(self) -> np.ndarray:
"""转换为完整数组"""
return np.concatenate(self._chunks)55.7.2 并行计算优化
python
from multiprocessing import Pool, shared_memory
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import Callable, List, Any, Tuple
import numpy as np
class ParallelProcessor:
"""并行处理器"""
def __init__(self, n_workers: int = None):
import multiprocessing
self.n_workers = n_workers or multiprocessing.cpu_count()
def map(
self,
func: Callable,
data: List[Any],
chunksize: int = None
) -> List[Any]:
"""并行映射"""
with Pool(self.n_workers) as pool:
return pool.map(func, data, chunksize=chunksize)
def starmap(
self,
func: Callable,
data: List[Tuple],
chunksize: int = None
) -> List[Any]:
"""并行星号映射"""
with Pool(self.n_workers) as pool:
return pool.starmap(func, data, chunksize=chunksize)
def imap_unordered(
self,
func: Callable,
data: List[Any],
chunksize: int = None
) -> Any:
"""无序并行迭代映射"""
with Pool(self.n_workers) as pool:
yield from pool.imap_unordered(func, data, chunksize=chunksize)
class SharedMemoryArray:
"""共享内存数组"""
def __init__(self, arr: np.ndarray):
self.shape = arr.shape
self.dtype = arr.dtype
self.shm = shared_memory.SharedMemory(
create=True,
size=arr.nbytes
)
self.arr = np.ndarray(
self.shape,
dtype=self.dtype,
buffer=self.shm.buf
)
self.arr[:] = arr[:]
def get_array(self) -> np.ndarray:
"""获取数组"""
return self.arr
def close(self):
"""关闭共享内存"""
self.shm.close()
def unlink(self):
"""释放共享内存"""
self.shm.unlink()
def parallel_reduce(
func: Callable[[Any, Any], Any],
data: List[Any],
n_workers: int = None
) -> Any:
"""并行归约"""
import multiprocessing
n_workers = n_workers or multiprocessing.cpu_count()
if len(data) <= n_workers:
from functools import reduce
return reduce(func, data)
chunk_size = len(data) // n_workers
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
with Pool(n_workers) as pool:
from functools import reduce
partial_results = pool.map(
lambda chunk: reduce(func, chunk),
chunks
)
from functools import reduce
return reduce(func, partial_results)55.7.3 性能分析与调优
python
import time
import cProfile
import pstats
from functools import wraps
from typing import Callable, Dict, Any
from dataclasses import dataclass
@dataclass
class ProfileResult:
"""性能分析结果"""
total_time: float
function_calls: Dict[str, Dict]
hotspots: list
class PerformanceProfiler:
"""性能分析器"""
def __init__(self):
self._results: Dict[str, ProfileResult] = {}
def profile(self, func: Callable) -> Callable:
"""性能分析装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
profiler.disable()
stats = pstats.Stats(profiler)
function_calls = {}
for func_info, (cc, nc, tt, ct, callers) in stats.stats.items():
function_calls[str(func_info)] = {
"call_count": cc,
"total_time": tt,
"cumulative_time": ct
}
hotspots = sorted(
function_calls.items(),
key=lambda x: x[1]["cumulative_time"],
reverse=True
)[:10]
self._results[func.__name__] = ProfileResult(
total_time=end_time - start_time,
function_calls=function_calls,
hotspots=hotspots
)
return result
return wrapper
def get_results(self, func_name: str = None) -> Dict:
"""获取分析结果"""
if func_name:
return self._results.get(func_name)
return self._results
class Benchmark:
"""基准测试"""
def __init__(self, name: str):
self.name = name
self._times: list = []
def __enter__(self):
self._start = time.perf_counter()
return self
def __exit__(self, *args):
self._times.append(time.perf_counter() - self._start)
@property
def mean(self) -> float:
return np.mean(self._times) if self._times else 0
@property
def std(self) -> float:
return np.std(self._times) if self._times else 0
@property
def min(self) -> float:
return np.min(self._times) if self._times else 0
@property
def max(self) -> float:
return np.max(self._times) if self._times else 0
def summary(self) -> Dict:
return {
"name": self.name,
"runs": len(self._times),
"mean": self.mean,
"std": self.std,
"min": self.min,
"max": self.max
}
def timeit(func: Callable, *args, n_runs: int = 10, **kwargs) -> Dict:
"""计时函数"""
times = []
for _ in range(n_runs):
start = time.perf_counter()
func(*args, **kwargs)
times.append(time.perf_counter() - start)
return {
"function": func.__name__,
"runs": n_runs,
"mean": np.mean(times),
"std": np.std(times),
"min": np.min(times),
"max": np.max(times)
}55.8 本章小结
本章详细介绍了Python高性能计算的核心概念和实践:
- NumPy优化:向量化运算、广播机制、内存布局
- 内存管理:内存映射、分块数组、零拷贝操作
- 多进程计算:进程池、共享内存、并行归约
- GPU计算:CUDA编程、Numba JIT、并行内核
- 性能分析:性能剖析、基准测试、热点分析
练习题
- 实现一个高性能矩阵运算库,支持多种优化策略
- 开发一个并行数据处理框架,支持自动分片
- 实现一个GPU加速的图像处理库
- 开发一个性能分析工具,支持实时监控
- 实现一个分布式计算框架,支持任务调度