Skip to content

第55章 高性能计算

学习目标

完成本章学习后,你将能够:

  1. 理解并行计算:多进程、多线程、并发模型
  2. 使用NumPy优化:向量化运算、内存布局、广播机制
  3. 实现多进程计算:进程池、共享内存、进程间通信
  4. 使用GPU加速:CUDA编程、Numba JIT、CuPy库
  5. 进行分布式计算:Dask框架、任务调度、数据分片
  6. 优化内存管理:内存映射、零拷贝、内存池
  7. 实现算法优化:缓存优化、分支预测、SIMD指令
  8. 进行性能分析:性能剖析、热点分析、优化策略

55.1 NumPy高性能计算

55.1.1 向量化运算

python
import numpy as np
from typing import Tuple, List, Optional
from dataclasses import dataclass
import time


class VectorizedOperations:
    @staticmethod
    def element_wise_operations():
        size = 10_000_000

        a = np.random.rand(size)
        b = np.random.rand(size)

        start = time.time()
        result = a + b
        add_time = time.time() - start

        start = time.time()
        result = a * b
        mul_time = time.time() - start

        start = time.time()
        result = np.sqrt(a)
        sqrt_time = time.time() - start

        return {
            "add_time": add_time,
            "mul_time": mul_time,
            "sqrt_time": sqrt_time
        }

    @staticmethod
    def reduction_operations():
        size = 10_000_000
        a = np.random.rand(size)

        start = time.time()
        total = np.sum(a)
        sum_time = time.time() - start

        start = time.time()
        mean = np.mean(a)
        mean_time = time.time() - start

        start = time.time()
        std = np.std(a)
        std_time = time.time() - start

        start = time.time()
        sorted_a = np.sort(a)
        sort_time = time.time() - start

        return {
            "sum_time": sum_time,
            "mean_time": mean_time,
            "std_time": std_time,
            "sort_time": sort_time
        }

    @staticmethod
    def matrix_operations():
        n = 1000
        a = np.random.rand(n, n)
        b = np.random.rand(n, n)

        start = time.time()
        c = np.dot(a, b)
        dot_time = time.time() - start

        start = time.time()
        d = a @ b
        matmul_time = time.time() - start

        start = time.time()
        eigenvalues = np.linalg.eigvals(a)
        eig_time = time.time() - start

        start = time.time()
        inv = np.linalg.inv(a)
        inv_time = time.time() - start

        return {
            "dot_time": dot_time,
            "matmul_time": matmul_time,
            "eig_time": eig_time,
            "inv_time": inv_time
        }


class BroadcastingDemo:
    @staticmethod
    def broadcast_examples():
        a = np.array([[1, 2, 3], [4, 5, 6]])
        b = np.array([10, 20, 30])

        result1 = a + b

        c = np.array([[1], [2], [3]])
        d = np.array([10, 20])

        result2 = c + d

        return {
            "shape_a": a.shape,
            "shape_b": b.shape,
            "result1": result1,
            "shape_c": c.shape,
            "shape_d": d.shape,
            "result2": result2
        }


class StrideOptimization:
    @staticmethod
    def demonstrate_stride():
        arr = np.arange(1000000).reshape(1000, 1000)

        start = time.time()
        for i in range(1000):
            _ = arr[i, :]
        row_time = time.time() - start

        start = time.time()
        for j in range(1000):
            _ = arr[:, j]
        col_time = time.time() - start

        arr_c = np.ascontiguousarray(arr)
        arr_f = np.asfortranarray(arr)

        return {
            "row_access_time": row_time,
            "col_access_time": col_time,
            "c_order_strides": arr_c.strides,
            "f_order_strides": arr_f.strides
        }


@dataclass
class MemoryLayout:
    shape: Tuple[int, ...]
    dtype: np.dtype
    strides: Tuple[int, ...]
    flags: dict


class ArrayMemoryAnalyzer:
    @staticmethod
    def analyze(arr: np.ndarray) -> MemoryLayout:
        return MemoryLayout(
            shape=arr.shape,
            dtype=arr.dtype,
            strides=arr.strides,
            flags={
                "C_CONTIGUOUS": arr.flags['C_CONTIGUOUS'],
                "F_CONTIGUOUS": arr.flags['F_CONTIGUOUS'],
                "OWNDATA": arr.flags['OWNDATA'],
                "WRITEABLE": arr.flags['WRITEABLE']
            }
        )

    @staticmethod
    def optimize_for_operation(arr: np.ndarray, operation: str) -> np.ndarray:
        if operation == "row_access":
            return np.ascontiguousarray(arr)
        elif operation == "col_access":
            return np.asfortranarray(arr)
        elif operation == "transpose":
            return np.ascontiguousarray(arr.T)
        return arr

55.1.2 内存优化

python
from typing import Iterator, Optional
import mmap
import os


class MemoryMappedArray:
    def __init__(self, filepath: str, shape: Tuple[int, ...], dtype: np.dtype = np.float64):
        self.filepath = filepath
        self.shape = shape
        self.dtype = dtype

        self.size = int(np.prod(shape)) * np.dtype(dtype).itemsize

        if not os.path.exists(filepath):
            with open(filepath, 'wb') as f:
                f.seek(self.size - 1)
                f.write(b'\0')

        self.file = open(filepath, 'r+b')
        self.mmap = mmap.mmap(self.file.fileno(), 0)
        self.array = np.frombuffer(self.mmap, dtype=dtype).reshape(shape)

    def __getitem__(self, index):
        return self.array[index]

    def __setitem__(self, index, value):
        self.array[index] = value

    def close(self) -> None:
        self.mmap.close()
        self.file.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


class ChunkedArray:
    def __init__(
        self,
        shape: Tuple[int, ...],
        dtype: np.dtype = np.float64,
        chunk_size: int = 1000
    ):
        self.shape = shape
        self.dtype = dtype
        self.chunk_size = chunk_size
        self.chunks: Dict[int, np.ndarray] = {}

    def _get_chunk_index(self, index: int) -> int:
        return index // self.chunk_size

    def _get_local_index(self, index: int) -> int:
        return index % self.chunk_size

    def _load_chunk(self, chunk_idx: int) -> np.ndarray:
        if chunk_idx not in self.chunks:
            chunk_shape = (self.chunk_size,) + self.shape[1:]
            self.chunks[chunk_idx] = np.zeros(chunk_shape, dtype=self.dtype)
        return self.chunks[chunk_idx]

    def __getitem__(self, index):
        if isinstance(index, int):
            chunk_idx = self._get_chunk_index(index)
            local_idx = self._get_local_index(index)
            return self._load_chunk(chunk_idx)[local_idx]
        elif isinstance(index, slice):
            return self._get_slice(index)

    def __setitem__(self, index, value):
        if isinstance(index, int):
            chunk_idx = self._get_chunk_index(index)
            local_idx = self._get_local_index(index)
            self._load_chunk(chunk_idx)[local_idx] = value

    def _get_slice(self, s: slice) -> np.ndarray:
        start = s.start or 0
        stop = s.stop or self.shape[0]
        step = s.step or 1

        result = []
        for i in range(start, stop, step):
            result.append(self[i])
        return np.array(result)


class ZeroCopyOperations:
    @staticmethod
    def view_operations():
        arr = np.arange(12).reshape(3, 4)

        view1 = arr[:2, :]
        view2 = arr[::2, ::2]
        view3 = arr.T

        return {
            "original": arr,
            "view1_is_view": view1.base is not None,
            "view2_is_view": view2.base is not None,
            "view3_is_view": view3.base is not None
        }

    @staticmethod
    def avoid_copy():
        arr = np.arange(1000000)

        view = arr[100:200]

        copy = arr[100:200].copy()

        return {
            "view_memory": view.nbytes,
            "copy_memory": copy.nbytes,
            "view_base": view.base is not None,
            "copy_base": copy.base is not None
        }

55.2 多进程计算

55.2.1 进程池

python
from multiprocessing import Pool, cpu_count, shared_memory
from typing import Callable, List, Any, Tuple
import numpy as np


class ParallelProcessor:
    def __init__(self, num_workers: int = None):
        self.num_workers = num_workers or cpu_count()

    def map(
        self,
        func: Callable,
        data: List[Any],
        chunksize: int = None
    ) -> List[Any]:
        with Pool(self.num_workers) as pool:
            if chunksize:
                return pool.map(func, data, chunksize=chunksize)
            return pool.map(func, data)

    def starmap(
        self,
        func: Callable,
        data: List[Tuple],
        chunksize: int = None
    ) -> List[Any]:
        with Pool(self.num_workers) as pool:
            if chunksize:
                return pool.starmap(func, data, chunksize=chunksize)
            return pool.starmap(func, data)

    def imap(
        self,
        func: Callable,
        data: List[Any],
        chunksize: int = 1
    ) -> Iterator[Any]:
        with Pool(self.num_workers) as pool:
            yield from pool.imap(func, data, chunksize=chunksize)

    def apply_async(
        self,
        func: Callable,
        args: Tuple = (),
        callback: Callable = None
    ):
        with Pool(self.num_workers) as pool:
            result = pool.apply_async(func, args, callback=callback)
            return result.get()


class SharedMemoryArray:
    def __init__(self, shape: Tuple[int, ...], dtype: np.dtype = np.float64):
        self.shape = shape
        self.dtype = dtype
        self.size = int(np.prod(shape)) * np.dtype(dtype).itemsize

        self.shm = shared_memory.SharedMemory(create=True, size=self.size)
        self.array = np.ndarray(shape, dtype=dtype, buffer=self.shm.buf)

    def __getitem__(self, index):
        return self.array[index]

    def __setitem__(self, index, value):
        self.array[index] = value

    def close(self) -> None:
        self.shm.close()

    def unlink(self) -> None:
        self.shm.unlink()

    @property
    def name(self) -> str:
        return self.shm.name

    @classmethod
    def connect(cls, name: str, shape: Tuple[int, ...], dtype: np.dtype = np.float64) -> "SharedMemoryArray":
        instance = cls.__new__(cls)
        instance.shape = shape
        instance.dtype = dtype
        instance.size = int(np.prod(shape)) * np.dtype(dtype).itemsize

        instance.shm = shared_memory.SharedMemory(name=name)
        instance.array = np.ndarray(shape, dtype=dtype, buffer=instance.shm.buf)

        return instance


def parallel_matrix_multiply(
    a: np.ndarray,
    b: np.ndarray,
    num_workers: int = None
) -> np.ndarray:
    num_workers = num_workers or cpu_count()

    n = a.shape[0]
    m = b.shape[1]
    k = a.shape[1]

    result = np.zeros((n, m))

    def compute_row(i):
        return np.dot(a[i, :], b)

    with Pool(num_workers) as pool:
        rows = pool.map(compute_row, range(n))

    for i, row in enumerate(rows):
        result[i, :] = row

    return result


def parallel_reduction(
    arr: np.ndarray,
    func: Callable,
    num_workers: int = None
) -> Any:
    num_workers = num_workers or cpu_count()

    chunk_size = len(arr) // num_workers
    chunks = [
        arr[i * chunk_size:(i + 1) * chunk_size]
        for i in range(num_workers)
    ]

    if len(arr) % num_workers != 0:
        chunks[-1] = np.concatenate([chunks[-1], arr[num_workers * chunk_size:]])

    with Pool(num_workers) as pool:
        partial_results = pool.map(func, chunks)

    return func(np.array(partial_results))

55.3 GPU计算

55.3.1 Numba CUDA

python
from typing import Tuple
import numpy as np

try:
    from numba import cuda, jit
    NUMBA_AVAILABLE = True
except ImportError:
    NUMBA_AVAILABLE = False


if NUMBA_AVAILABLE:
    @cuda.jit
    def vector_add_kernel(a, b, c):
        idx = cuda.grid(1)
        if idx < a.size:
            c[idx] = a[idx] + b[idx]

    @cuda.jit
    def matrix_multiply_kernel(a, b, c, m, n, k):
        row, col = cuda.grid(2)

        if row < m and col < k:
            tmp = 0.0
            for i in range(n):
                tmp += a[row, i] * b[i, col]
            c[row, col] = tmp

    @cuda.jit
    def reduce_kernel(arr, partial_sums):
        tid = cuda.threadIdx.x
        bid = cuda.blockIdx.x
        block_size = cuda.blockDim.x

        shared = cuda.shared.array(256, dtype=np.float64)

        idx = bid * block_size * 2 + tid

        shared[tid] = 0.0
        if idx < arr.size:
            shared[tid] = arr[idx]
        if idx + block_size < arr.size:
            shared[tid] += arr[idx + block_size]

        cuda.syncthreads()

        s = block_size // 2
        while s > 0:
            if tid < s:
                shared[tid] += shared[tid + s]
            cuda.syncthreads()
            s //= 2

        if tid == 0:
            partial_sums[bid] = shared[0]


class GPUOperations:
    @staticmethod
    def vector_add(a: np.ndarray, b: np.ndarray) -> np.ndarray:
        if not NUMBA_AVAILABLE:
            return a + b

        a_device = cuda.to_device(a)
        b_device = cuda.to_device(b)
        c_device = cuda.device_array_like(a)

        threads_per_block = 256
        blocks_per_grid = (a.size + threads_per_block - 1) // threads_per_block

        vector_add_kernel[blocks_per_grid, threads_per_block](a_device, b_device, c_device)

        return c_device.copy_to_host()

    @staticmethod
    def matrix_multiply(a: np.ndarray, b: np.ndarray) -> np.ndarray:
        if not NUMBA_AVAILABLE:
            return a @ b

        m, n = a.shape
        n2, k = b.shape

        a_device = cuda.to_device(a)
        b_device = cuda.to_device(b)
        c_device = cuda.device_array((m, k))

        threads_per_block = (16, 16)
        blocks_per_grid = (
            (m + threads_per_block[0] - 1) // threads_per_block[0],
            (k + threads_per_block[1] - 1) // threads_per_block[1]
        )

        matrix_multiply_kernel[blocks_per_grid, threads_per_block](
            a_device, b_device, c_device, m, n, k
        )

        return c_device.copy_to_host()

    @staticmethod
    def sum(arr: np.ndarray) -> float:
        if not NUMBA_AVAILABLE:
            return np.sum(arr)

        arr_device = cuda.to_device(arr)

        threads_per_block = 256
        blocks_per_grid = (arr.size + threads_per_block * 2 - 1) // (threads_per_block * 2)

        partial_sums = cuda.device_array(blocks_per_grid, dtype=np.float64)

        reduce_kernel[blocks_per_grid, threads_per_block](arr_device, partial_sums)

        result = partial_sums.copy_to_host()
        return np.sum(result)


class JITCompiler:
    @staticmethod
    @jit(nopython=True)
    def mandelbrot(c: complex, max_iter: int) -> int:
        z = 0 + 0j
        for i in range(max_iter):
            if abs(z) > 2:
                return i
            z = z * z + c
        return max_iter

    @staticmethod
    @jit(nopython=True, parallel=True)
    def compute_mandelbrot(xmin: float, xmax: float, ymin: float, ymax: float,
                           width: int, height: int, max_iter: int) -> np.ndarray:
        result = np.zeros((height, width), dtype=np.int32)

        for i in range(height):
            for j in range(width):
                x = xmin + (xmax - xmin) * j / width
                y = ymin + (ymax - ymin) * i / height
                c = complex(x, y)
                result[i, j] = JITCompiler.mandelbrot(c, max_iter)

        return result

    @staticmethod
    @jit(nopython=True)
    def fast_sort(arr: np.ndarray) -> np.ndarray:
        result = arr.copy()
        n = len(result)
        for i in range(n):
            for j in range(0, n - i - 1):
                if result[j] > result[j + 1]:
                    result[j], result[j + 1] = result[j + 1], result[j]
        return result

55.4 性能分析

55.4.1 性能剖析

python
import cProfile
import pstats
import io
from functools import wraps
from typing import Callable, Dict, Any
import time


class Profiler:
    def __init__(self):
        self.stats: Dict[str, Dict] = {}

    def profile(self, func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.perf_counter()
            start_memory = self._get_memory_usage()

            result = func(*args, **kwargs)

            end_time = time.perf_counter()
            end_memory = self._get_memory_usage()

            func_name = func.__name__
            if func_name not in self.stats:
                self.stats[func_name] = {
                    "calls": 0,
                    "total_time": 0.0,
                    "total_memory": 0.0
                }

            self.stats[func_name]["calls"] += 1
            self.stats[func_name]["total_time"] += end_time - start_time
            self.stats[func_name]["total_memory"] += end_memory - start_memory

            return result

        return wrapper

    def _get_memory_usage(self) -> float:
        try:
            import psutil
            process = psutil.Process()
            return process.memory_info().rss / 1024 / 1024
        except ImportError:
            return 0.0

    def get_report(self) -> Dict[str, Any]:
        report = {}
        for func_name, stats in self.stats.items():
            calls = stats["calls"]
            report[func_name] = {
                "calls": calls,
                "total_time": stats["total_time"],
                "avg_time": stats["total_time"] / calls if calls > 0 else 0,
                "total_memory_mb": stats["total_memory"],
                "avg_memory_mb": stats["total_memory"] / calls if calls > 0 else 0
            }
        return report

    def reset(self) -> None:
        self.stats.clear()


def profile_function(func: Callable) -> str:
    pr = cProfile.Profile()
    pr.enable()

    result = func()

    pr.disable()

    s = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
    ps.print_stats()

    return s.getvalue()


class Benchmark:
    def __init__(self, name: str):
        self.name = name
        self.times: List[float] = []

    def __enter__(self):
        self.start_time = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        end_time = time.perf_counter()
        self.times.append(end_time - self.start_time)

    def run(self, func: Callable, iterations: int = 100) -> Dict[str, float]:
        for _ in range(iterations):
            with self:
                func()

        return {
            "name": self.name,
            "iterations": iterations,
            "total_time": sum(self.times),
            "avg_time": sum(self.times) / len(self.times),
            "min_time": min(self.times),
            "max_time": max(self.times)
        }


class HotspotAnalyzer:
    def __init__(self):
        self.line_times: Dict[str, float] = {}

    def analyze(self, code: str, globals_dict: dict = None) -> Dict[str, float]:
        try:
            from line_profiler import LineProfiler

            lp = LineProfiler()

            exec(code, globals_dict or {})

            for name, obj in (globals_dict or {}).items():
                if callable(obj) and not name.startswith('_'):
                    lp.add_function(obj)

            lp.enable()
            exec(code, globals_dict or {})
            lp.disable()

            return {"line_profiler": "available"}

        except ImportError:
            return {"line_profiler": "not available"}

55.5 知识图谱

55.5.1 高性能计算技术架构

┌─────────────────────────────────────────────────────────────────────┐
│                      高性能计算技术架构                               │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      应用层 (Application)                     │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 科学计算  │ │ 机器学习  │ │ 数据分析  │ │ 图像处理  │       │   │
│  │  │ SciPy   │ │ PyTorch │ │ Pandas  │ │ OpenCV  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      加速层 (Acceleration)                    │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ NumPy   │ │ Numba   │ │ CuPy    │ │ JAX     │       │   │
│  │  │ 向量化   │ │ JIT编译  │ │ GPU加速  │ │ 自动微分  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      并行层 (Parallelism)                     │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │多进程    │ │ 多线程   │ │ 分布式   │ │ GPU并行  │       │   │
│  │  │multiproc │ │threading│ │ Dask    │ │ CUDA    │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      硬件层 (Hardware)                        │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ CPU     │ │ GPU     │ │ TPU     │ │ 集群     │       │   │
│  │  │ 多核    │ │ CUDA核心│ │ 张量处理 │ │ 分布式  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

55.5.2 性能优化策略

┌─────────────────────────────────────────────────────────────────────┐
│                      性能优化策略全景                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    算法优化                                  │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐                    │  │
│  │  │ 时间复杂度│ │ 空间复杂度│ │ 缓存友好  │                    │  │
│  │  │ O(n)→O(logn)│ │ 减少内存  │ │ 数据局部性│                    │  │
│  │  └──────────┘ └──────────┘ └──────────┘                    │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    计算优化                                  │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐                    │  │
│  │  │ 向量化   │ │ JIT编译  │ │ 并行计算  │                    │  │
│  │  │ NumPy   │ │ Numba   │ │ 多进程   │                    │  │
│  │  └──────────┘ └──────────┘ └──────────┘                    │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    内存优化                                  │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐                    │  │
│  │  │ 内存映射  │ │ 分块处理  │ │ 零拷贝   │                    │  │
│  │  │ mmap    │ │ chunking │ │ view    │                    │  │
│  │  └──────────┘ └──────────┘ └──────────┘                    │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                    I/O优化                                   │  │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐                    │  │
│  │  │ 异步I/O  │ │ 批量读写  │ │ 压缩存储  │                    │  │
│  │  │ asyncio │ │ buffer   │ │ parquet │                    │  │
│  │  └──────────┘ └──────────┘ └──────────┘                    │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

55.6 技术选型指南

55.6.1 数值计算库选型

性能功能GPU支持推荐指数
NumPy基础数值计算★★★★★
SciPy科学计算★★★★★
Numba极高JIT编译★★★★★
CuPy极高GPU数组★★★★☆
JAX极高自动微分★★★★★

55.6.2 并行计算框架选型

框架适用场景分布式GPU支持推荐指数
multiprocessingCPU密集型★★★★★
concurrent.futures通用并行★★★★★
Dask大规模数据★★★★★
Ray分布式计算★★★★★
Joblib科学计算★★★★☆

55.6.3 性能分析工具选型

工具功能开销推荐指数
cProfile函数级分析★★★★★
line_profiler行级分析★★★★☆
memory_profiler内存分析★★★★☆
py-spy采样分析★★★★★
nvprofGPU分析★★★★☆

55.7 常见问题与解决方案

55.7.1 内存优化策略

python
import numpy as np
from typing import Tuple, Iterator
import mmap

class MemoryOptimizer:
    """内存优化器"""
    
    @staticmethod
    def optimize_dtypes(arr: np.ndarray) -> np.ndarray:
        """优化数据类型"""
        if arr.dtype == np.float64:
            if np.all(np.isfinite(arr)) and np.all(np.abs(arr) < 3.4e38):
                return arr.astype(np.float32)
        elif arr.dtype == np.int64:
            min_val, max_val = arr.min(), arr.max()
            if min_val >= 0:
                if max_val < 256:
                    return arr.astype(np.uint8)
                elif max_val < 65536:
                    return arr.astype(np.uint16)
                elif max_val < 4294967296:
                    return arr.astype(np.uint32)
            else:
                if min_val > -128 and max_val < 127:
                    return arr.astype(np.int8)
                elif min_val > -32768 and max_val < 32767:
                    return arr.astype(np.int16)
        return arr
    
    @staticmethod
    def create_memory_mapped_array(
        filepath: str,
        shape: Tuple[int, ...],
        dtype: np.dtype = np.float64
    ) -> np.memmap:
        """创建内存映射数组"""
        return np.memmap(filepath, dtype=dtype, mode='w+', shape=shape)
    
    @staticmethod
    def process_in_chunks(
        arr: np.ndarray,
        chunk_size: int,
        process_func: callable
    ) -> np.ndarray:
        """分块处理"""
        results = []
        for i in range(0, len(arr), chunk_size):
            chunk = arr[i:i + chunk_size]
            results.append(process_func(chunk))
        return np.concatenate(results)


class ChunkedArray:
    """分块数组"""
    
    def __init__(
        self,
        shape: Tuple[int, ...],
        chunk_size: int,
        dtype: np.dtype = np.float64
    ):
        self.shape = shape
        self.chunk_size = chunk_size
        self.dtype = dtype
        self._chunks = []
    
    def append(self, data: np.ndarray):
        """添加数据"""
        self._chunks.append(data.astype(self.dtype))
    
    def get_chunk(self, index: int) -> np.ndarray:
        """获取块"""
        return self._chunks[index]
    
    def iterate_chunks(self) -> Iterator[np.ndarray]:
        """迭代块"""
        for chunk in self._chunks:
            yield chunk
    
    def to_array(self) -> np.ndarray:
        """转换为完整数组"""
        return np.concatenate(self._chunks)

55.7.2 并行计算优化

python
from multiprocessing import Pool, shared_memory
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import Callable, List, Any, Tuple
import numpy as np

class ParallelProcessor:
    """并行处理器"""
    
    def __init__(self, n_workers: int = None):
        import multiprocessing
        self.n_workers = n_workers or multiprocessing.cpu_count()
    
    def map(
        self,
        func: Callable,
        data: List[Any],
        chunksize: int = None
    ) -> List[Any]:
        """并行映射"""
        with Pool(self.n_workers) as pool:
            return pool.map(func, data, chunksize=chunksize)
    
    def starmap(
        self,
        func: Callable,
        data: List[Tuple],
        chunksize: int = None
    ) -> List[Any]:
        """并行星号映射"""
        with Pool(self.n_workers) as pool:
            return pool.starmap(func, data, chunksize=chunksize)
    
    def imap_unordered(
        self,
        func: Callable,
        data: List[Any],
        chunksize: int = None
    ) -> Any:
        """无序并行迭代映射"""
        with Pool(self.n_workers) as pool:
            yield from pool.imap_unordered(func, data, chunksize=chunksize)


class SharedMemoryArray:
    """共享内存数组"""
    
    def __init__(self, arr: np.ndarray):
        self.shape = arr.shape
        self.dtype = arr.dtype
        self.shm = shared_memory.SharedMemory(
            create=True,
            size=arr.nbytes
        )
        self.arr = np.ndarray(
            self.shape,
            dtype=self.dtype,
            buffer=self.shm.buf
        )
        self.arr[:] = arr[:]
    
    def get_array(self) -> np.ndarray:
        """获取数组"""
        return self.arr
    
    def close(self):
        """关闭共享内存"""
        self.shm.close()
    
    def unlink(self):
        """释放共享内存"""
        self.shm.unlink()


def parallel_reduce(
    func: Callable[[Any, Any], Any],
    data: List[Any],
    n_workers: int = None
) -> Any:
    """并行归约"""
    import multiprocessing
    n_workers = n_workers or multiprocessing.cpu_count()
    
    if len(data) <= n_workers:
        from functools import reduce
        return reduce(func, data)
    
    chunk_size = len(data) // n_workers
    chunks = [
        data[i:i + chunk_size]
        for i in range(0, len(data), chunk_size)
    ]
    
    with Pool(n_workers) as pool:
        from functools import reduce
        partial_results = pool.map(
            lambda chunk: reduce(func, chunk),
            chunks
        )
    
    from functools import reduce
    return reduce(func, partial_results)

55.7.3 性能分析与调优

python
import time
import cProfile
import pstats
from functools import wraps
from typing import Callable, Dict, Any
from dataclasses import dataclass

@dataclass
class ProfileResult:
    """性能分析结果"""
    total_time: float
    function_calls: Dict[str, Dict]
    hotspots: list


class PerformanceProfiler:
    """性能分析器"""
    
    def __init__(self):
        self._results: Dict[str, ProfileResult] = {}
    
    def profile(self, func: Callable) -> Callable:
        """性能分析装饰器"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            profiler = cProfile.Profile()
            profiler.enable()
            
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            end_time = time.perf_counter()
            
            profiler.disable()
            
            stats = pstats.Stats(profiler)
            
            function_calls = {}
            for func_info, (cc, nc, tt, ct, callers) in stats.stats.items():
                function_calls[str(func_info)] = {
                    "call_count": cc,
                    "total_time": tt,
                    "cumulative_time": ct
                }
            
            hotspots = sorted(
                function_calls.items(),
                key=lambda x: x[1]["cumulative_time"],
                reverse=True
            )[:10]
            
            self._results[func.__name__] = ProfileResult(
                total_time=end_time - start_time,
                function_calls=function_calls,
                hotspots=hotspots
            )
            
            return result
        return wrapper
    
    def get_results(self, func_name: str = None) -> Dict:
        """获取分析结果"""
        if func_name:
            return self._results.get(func_name)
        return self._results


class Benchmark:
    """基准测试"""
    
    def __init__(self, name: str):
        self.name = name
        self._times: list = []
    
    def __enter__(self):
        self._start = time.perf_counter()
        return self
    
    def __exit__(self, *args):
        self._times.append(time.perf_counter() - self._start)
    
    @property
    def mean(self) -> float:
        return np.mean(self._times) if self._times else 0
    
    @property
    def std(self) -> float:
        return np.std(self._times) if self._times else 0
    
    @property
    def min(self) -> float:
        return np.min(self._times) if self._times else 0
    
    @property
    def max(self) -> float:
        return np.max(self._times) if self._times else 0
    
    def summary(self) -> Dict:
        return {
            "name": self.name,
            "runs": len(self._times),
            "mean": self.mean,
            "std": self.std,
            "min": self.min,
            "max": self.max
        }


def timeit(func: Callable, *args, n_runs: int = 10, **kwargs) -> Dict:
    """计时函数"""
    times = []
    for _ in range(n_runs):
        start = time.perf_counter()
        func(*args, **kwargs)
        times.append(time.perf_counter() - start)
    
    return {
        "function": func.__name__,
        "runs": n_runs,
        "mean": np.mean(times),
        "std": np.std(times),
        "min": np.min(times),
        "max": np.max(times)
    }

55.8 本章小结

本章详细介绍了Python高性能计算的核心概念和实践:

  1. NumPy优化:向量化运算、广播机制、内存布局
  2. 内存管理:内存映射、分块数组、零拷贝操作
  3. 多进程计算:进程池、共享内存、并行归约
  4. GPU计算:CUDA编程、Numba JIT、并行内核
  5. 性能分析:性能剖析、基准测试、热点分析

练习题

  1. 实现一个高性能矩阵运算库,支持多种优化策略
  2. 开发一个并行数据处理框架,支持自动分片
  3. 实现一个GPU加速的图像处理库
  4. 开发一个性能分析工具,支持实时监控
  5. 实现一个分布式计算框架,支持任务调度

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校