Skip to content

第16章 并发编程

学习目标

完成本章学习后,读者应能够:

  1. 理论层面:深入理解并发与并行的本质区别,掌握Python GIL的实现机制及其对多线程性能的影响,理解操作系统级别的线程、进程调度原理
  2. 技术层面:熟练运用threadingmultiprocessingconcurrent.futuresasyncio四大并发模型,能够根据任务特征选择最优并发策略
  3. 工程层面:设计线程安全的数据结构与并发控制机制,实现高性能异步I/O系统,构建可扩展的并发应用架构
  4. 前沿视野:了解结构化并发(Structured Concurrency)、Python 3.12+ TaskGroup、子解释器等前沿技术动态

16.1 并发编程基础理论

16.1.1 并发与并行

并发(Concurrency)与并行(Parallelism)是两个常被混淆但本质不同的概念:

维度并发(Concurrency)并行(Parallelism)
定义多个任务在重叠的时间段内推进多个任务在同一时刻同时执行
核心思想交替处理(时间片轮转)真正的同时执行
硬件要求单核即可需要多核/多处理器
典型场景I/O密集型任务CPU密集型任务
比喻一个人交替处理多件事多个人同时各处理一件事
python
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task(n):
    total = 0
    for i in range(n):
        total += i * i
    return total

def io_bound_task(duration):
    time.sleep(duration)
    return duration

def benchmark_sequential(tasks, task_func):
    start = time.perf_counter()
    results = [task_func(t) for t in tasks]
    return time.perf_counter() - start

def benchmark_threaded(tasks, task_func):
    start = time.perf_counter()
    with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
        results = list(executor.map(task_func, tasks))
    return time.perf_counter() - start

def benchmark_process(tasks, task_func):
    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=len(tasks)) as executor:
        results = list(executor.map(task_func, tasks))
    return time.perf_counter() - start

if __name__ == "__main__":
    cpu_tasks = [10_000_000] * 4
    io_tasks = [1.0] * 4

    print("=== CPU密集型任务 ===")
    print(f"顺序执行: {benchmark_sequential(cpu_tasks, cpu_bound_task):.2f}s")
    print(f"多线程:   {benchmark_threaded(cpu_tasks, cpu_bound_task):.2f}s")
    print(f"多进程:   {benchmark_process(cpu_tasks, cpu_bound_task):.2f}s")

    print("\n=== I/O密集型任务 ===")
    print(f"顺序执行: {benchmark_sequential(io_tasks, io_bound_task):.2f}s")
    print(f"多线程:   {benchmark_threaded(io_tasks, io_bound_task):.2f}s")
    print(f"多进程:   {benchmark_process(io_tasks, io_bound_task):.2f}s")

选择策略决策树

任务类型?
├── I/O密集型(网络请求、文件读写、数据库查询)
│   ├── 需要与同步代码兼容 → threading + ThreadPoolExecutor
│   └── 高并发、低资源消耗 → asyncio
├── CPU密集型(数值计算、图像处理、加密运算)
│   └── 多进程 → multiprocessing + ProcessPoolExecutor
└── 混合型
    ├── I/O为主 + 少量计算 → asyncio + run_in_executor
    └── 计算为主 + 少量I/O → 多进程 + 线程池

16.1.2 Python GIL深度解析

全局解释器锁(Global Interpreter Lock,GIL)是CPython实现中最具争议的设计之一。理解GIL对于正确选择并发模型至关重要。

GIL的本质

GIL是CPython解释器中的一把互斥锁,确保同一时刻只有一个线程执行Python字节码。其存在原因:

  1. 内存管理安全:CPython使用引用计数进行内存管理,若无GIL,多线程同时修改引用计数将导致内存泄漏或提前释放
  2. C扩展兼容性:大量C扩展库假设GIL存在,移除GIL将破坏整个生态系统
  3. 单线程性能:GIL简化了解释器实现,使单线程Python代码运行更快

GIL的调度机制

python
import sys

print(f"检查间隔(switch interval): {sys.getswitchinterval()}s")
print(f"默认值: 5ms = 0.005s")

sys.setswitchinterval(0.005)

GIL的切换规则:

  • Python 3.2+:采用固定时间片机制,默认每5ms强制释放GIL
  • I/O操作时主动释放GIL(如time.sleep()、文件读写、网络请求)
  • C扩展可在执行计算密集操作时手动释放GIL

GIL对性能的影响

python
import threading
import time

def pure_python_count(n):
    start = time.perf_counter()
    total = 0
    for i in range(n):
        total += i
    return time.perf_counter() - start

def threaded_python_count(n, num_threads):
    def worker(count):
        total = 0
        for i in range(count):
            total += i

    start = time.perf_counter()
    threads = []
    chunk = n // num_threads
    for i in range(num_threads):
        count = chunk if i < num_threads - 1 else n - chunk * (num_threads - 1)
        t = threading.Thread(target=worker, args=(count,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    return time.perf_counter() - start

if __name__ == "__main__":
    N = 50_000_000
    single = pure_python_count(N)
    multi = threaded_python_count(N, 4)
    print(f"单线程: {single:.2f}s")
    print(f"4线程:  {multi:.2f}s")
    print(f"加速比: {single/multi:.2f}x")
    print("注意: 多线程可能更慢,因为GIL切换开销")

绕过GIL的策略

python
import numpy as np
from concurrent.futures import ProcessPoolExecutor

def numpy_computation(size):
    arr = np.random.rand(size)
    return np.sum(arr ** 2)

def parallel_numpy(size, workers=4):
    chunk = size // workers
    def compute(s):
        arr = np.random.rand(s)
        return np.sum(arr ** 2)
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(compute, [chunk] * workers))
    return sum(results)

前沿动态:Python 3.13引入了实验性的自由线程模式(PEP 703),可通过--disable-gil编译选项构建无GIL的Python解释器。这标志着Python社区在移除GIL方面迈出了关键一步。


16.2 多线程编程

16.2.1 线程基础与生命周期

Python线程是对操作系统原生线程的封装,每个Python线程对应一个OS线程。

python
import threading
import time

def worker(name, duration):
    print(f"[{threading.current_thread().name}] {name} 启动")
    time.sleep(duration)
    print(f"[{threading.current_thread().name}] {name} 完成")
    return f"{name}的结果"

t = threading.Thread(
    target=worker,
    args=("任务A", 2),
    name="WorkerThread-1",
    daemon=False
)

print(f"线程名称: {t.name}")
print(f"是否守护线程: {t.daemon}")
print(f"线程标识: {t.ident}")
print(f"是否存活: {t.is_alive()}")

t.start()
t.join(timeout=5)

if t.is_alive():
    print("线程未在超时时间内完成")
else:
    print(f"线程已完成,标识: {t.ident}")

线程生命周期状态图

New → start() → Runnable → 获取GIL → Running
                      ↑                    ↓
                      ← 释放GIL ←         ← 阻塞操作

                                        Blocked (I/O/Lock)

                                        wait完成 → Runnable
Running → 函数返回/异常 → Terminated

守护线程

python
import threading
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

def background_monitor():
    while True:
        logging.info("监控心跳...")
        time.sleep(1)

def main_task():
    logging.info("主任务开始")
    time.sleep(3)
    logging.info("主任务完成")

monitor = threading.Thread(target=background_monitor, name="Monitor", daemon=True)
monitor.start()

main_task()
print("主线程结束,守护线程将自动终止")

16.2.2 线程同步原语

多线程共享进程的内存空间,因此必须使用同步原语防止竞态条件(Race Condition)。

竞态条件示例

python
import threading

class UnsafeCounter:
    def __init__(self):
        self.value = 0

    def increment(self):
        temp = self.value
        temp += 1
        self.value = temp

counter = UnsafeCounter()
threads = []

for _ in range(10):
    t = threading.Thread(target=lambda: [counter.increment() for _ in range(100000)])
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"期望值: 1000000, 实际值: {counter.value}")

Lock(互斥锁)

python
import threading

class SafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self._value += 1

    @property
    def value(self):
        with self._lock:
            return self._value

counter = SafeCounter()
threads = []

for _ in range(10):
    t = threading.Thread(target=lambda: [counter.increment() for _ in range(100000)])
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"期望值: 1000000, 实际值: {counter.value}")

Lock与RLock的区别

python
import threading

lock = threading.Lock()
try:
    lock.acquire()
    lock.acquire()
except RuntimeError as e:
    print(f"同一线程重复获取非可重入锁: {e}")

rlock = threading.RLock()
rlock.acquire()
rlock.acquire()
print(f"可重入锁嵌套获取成功,计数: 2")
rlock.release()
rlock.release()
print("可重入锁全部释放")

RLock的应用场景——递归调用

python
import threading

class TreeWalker:
    def __init__(self):
        self._lock = threading.RLock()
        self.visited = []

    def walk(self, node):
        with self._lock:
            self.visited.append(node.value)
            for child in node.children:
                self.walk(child)

class TreeNode:
    def __init__(self, value):
        self.value = value
        self.children = []

root = TreeNode("root")
root.children = [TreeNode("left"), TreeNode("right")]
root.children[0].children = [TreeNode("left-left")]

walker = TreeWalker()
walker.walk(root)
print(f"遍历结果: {walker.visited}")

Semaphore(信号量)

python
import threading
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

class ConnectionPool:
    def __init__(self, max_connections=3):
        self._semaphore = threading.Semaphore(max_connections)
        self._connections = []
        self._lock = threading.Lock()

    def acquire(self):
        self._semaphore.acquire()
        with self._lock:
            conn = f"Connection-{len(self._connections)}"
            self._connections.append(conn)
            logging.info(f"获取连接: {conn}")
            return conn

    def release(self, conn):
        with self._lock:
            self._connections.remove(conn)
            logging.info(f"释放连接: {conn}")
        self._semaphore.release()

    def __enter__(self):
        self._conn = self.acquire()
        return self._conn

    def __exit__(self, *args):
        self.release(self._conn)

pool = ConnectionPool(max_connections=2)

def use_connection(name, duration):
    with pool as conn:
        logging.info(f"{name} 使用 {conn}")
        time.sleep(duration)

threads = [
    threading.Thread(target=use_connection, args=(f"Client-{i}", i * 0.5 + 0.5), name=f"Client-{i}")
    for i in range(5)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

Event(事件)

python
import threading
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

class ServiceManager:
    def __init__(self):
        self._ready = threading.Event()
        self._shutdown = threading.Event()

    def wait_ready(self, timeout=None):
        return self._ready.wait(timeout)

    def mark_ready(self):
        self._ready.set()
        logging.info("服务就绪")

    def request_shutdown(self):
        self._shutdown.set()
        logging.info("请求关闭")

    def is_shutdown_requested(self):
        return self._shutdown.is_set()

manager = ServiceManager()

def service_worker():
    logging.info("等待服务就绪...")
    manager.wait_ready()
    logging.info("服务已就绪,开始工作")
    while not manager.is_shutdown_requested():
        time.sleep(0.5)
        logging.info("工作中...")
    logging.info("收到关闭信号,退出")

def service_initializer():
    logging.info("初始化服务...")
    time.sleep(2)
    manager.mark_ready()
    time.sleep(3)
    manager.request_shutdown()

threads = [
    threading.Thread(target=service_worker, name="Worker"),
    threading.Thread(target=service_initializer, name="Initializer"),
]

for t in threads:
    t.start()
for t in threads:
    t.join()

Condition(条件变量)

python
import threading
import time
import random
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

class BoundedBuffer:
    def __init__(self, capacity):
        self._buffer = []
        self._capacity = capacity
        self._condition = threading.Condition()

    def put(self, item):
        with self._condition:
            while len(self._buffer) >= self._capacity:
                logging.info(f"缓冲区满,等待消费... (size={len(self._buffer)})")
                self._condition.wait()
            self._buffer.append(item)
            logging.info(f"生产: {item} (size={len(self._buffer)})")
            self._condition.notify_all()

    def get(self):
        with self._condition:
            while not self._buffer:
                logging.info("缓冲区空,等待生产...")
                self._condition.wait()
            item = self._buffer.pop(0)
            logging.info(f"消费: {item} (size={len(self._buffer)})")
            self._condition.notify_all()
            return item

buffer = BoundedBuffer(capacity=3)

def producer(pid):
    for i in range(5):
        item = f"P{pid}-{i}"
        buffer.put(item)
        time.sleep(random.uniform(0.1, 0.3))

def consumer(cid):
    for _ in range(5):
        item = buffer.get()
        time.sleep(random.uniform(0.2, 0.5))

threads = []
for i in range(2):
    threads.append(threading.Thread(target=producer, args=(i,), name=f"Producer-{i}"))
for i in range(2):
    threads.append(threading.Thread(target=consumer, args=(i,), name=f"Consumer-{i}"))

for t in threads:
    t.start()
for t in threads:
    t.join()

Barrier(栅栏)

python
import threading
import time
import random
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

def parallel_computation(worker_id, barrier):
    phase1_time = random.uniform(0.5, 1.5)
    logging.info(f"Worker-{worker_id} 阶段1 (耗时{phase1_time:.1f}s)")
    time.sleep(phase1_time)

    logging.info(f"Worker-{worker_id} 等待其他worker完成阶段1")
    barrier.wait()

    phase2_time = random.uniform(0.5, 1.0)
    logging.info(f"Worker-{worker_id} 阶段2 (耗时{phase2_time:.1f}s)")
    time.sleep(phase2_time)

    barrier.wait()
    logging.info(f"Worker-{worker_id} 全部完成")

num_workers = 4
barrier = threading.Barrier(num_workers)

threads = [
    threading.Thread(target=parallel_computation, args=(i, barrier), name=f"Worker-{i}")
    for i in range(num_workers)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

16.2.3 线程安全的数据结构

基于Queue的生产者-消费者模式

python
import threading
import queue
import time
import random
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

class TaskQueue:
    def __init__(self, maxsize=0):
        self._queue = queue.Queue(maxsize=maxsize)
        self._shutdown = threading.Event()

    def submit(self, task):
        if not self._shutdown.is_set():
            self._queue.put(task)
            return True
        return False

    def get(self, timeout=1.0):
        try:
            return self._queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def task_done(self):
        self._queue.task_done()

    def shutdown(self):
        self._shutdown.set()

    def join(self):
        self._queue.join()

    @property
    def size(self):
        return self._queue.qsize()

class WorkerPool:
    def __init__(self, num_workers, task_handler):
        self._task_queue = TaskQueue(maxsize=100)
        self._workers = []
        self._handler = task_handler

        for i in range(num_workers):
            t = threading.Thread(target=self._worker_loop, name=f"Worker-{i}", daemon=True)
            t.start()
            self._workers.append(t)

    def _worker_loop(self):
        while True:
            task = self._task_queue.get()
            if task is None:
                break
            try:
                self._handler(task)
            except Exception as e:
                logging.error(f"任务处理失败: {e}")
            finally:
                self._task_queue.task_done()

    def submit(self, task):
        self._task_queue.submit(task)

    def wait_completion(self):
        self._task_queue.join()

    def shutdown(self):
        self._task_queue.shutdown()
        for _ in self._workers:
            self._task_queue._queue.put(None)
        for t in self._workers:
            t.join()

def process_task(task):
    task_id, data = task
    processing_time = random.uniform(0.1, 0.5)
    time.sleep(processing_time)
    result = data * 2
    logging.info(f"任务 {task_id}: {data}{result}")

pool = WorkerPool(num_workers=3, task_handler=process_task)

for i in range(10):
    pool.submit((i, i * 10))

pool.wait_completion()
logging.info("所有任务完成")
pool.shutdown()

PriorityQueue与LifoQueue

python
import queue
import threading
import time

priority_q = queue.PriorityQueue()

def priority_producer():
    tasks = [
        (3, "低优先级任务"),
        (1, "高优先级任务"),
        (2, "中优先级任务"),
        (1, "另一个高优先级任务"),
        (3, "另一个低优先级任务"),
    ]
    for priority, task in tasks:
        priority_q.put((priority, task))
        time.sleep(0.1)

def priority_consumer():
    while True:
        try:
            priority, task = priority_q.get(timeout=2)
            print(f"处理: [{priority}] {task}")
            priority_q.task_done()
        except queue.Empty:
            break

t1 = threading.Thread(target=priority_producer)
t2 = threading.Thread(target=priority_consumer)

t1.start()
t1.join()
t2.start()
t2.join()

lifo_q = queue.LifoQueue()
for i in range(5):
    lifo_q.put(f"Item-{i}")

print("\nLIFO顺序:")
while not lifo_q.empty():
    print(f"  取出: {lifo_q.get()}")

16.2.4 线程本地存储

python
import threading
import random
import time

class RequestContext:
    _local = threading.local()

    @classmethod
    def set_request_id(cls, request_id):
        cls._local.request_id = request_id

    @classmethod
    def get_request_id(cls):
        return getattr(cls._local, 'request_id', 'unknown')

    @classmethod
    def set_user(cls, user):
        cls._local.user = user

    @classmethod
    def get_user(cls):
        return getattr(cls._local, 'user', None)

def handle_request(request_id, user):
    RequestContext.set_request_id(request_id)
    RequestContext.set_user(user)

    time.sleep(random.uniform(0.1, 0.3))

    print(f"RequestID={RequestContext.get_request_id()}, "
          f"User={RequestContext.get_user()}, "
          f"Thread={threading.current_thread().name}")

threads = []
for i in range(5):
    t = threading.Thread(
        target=handle_request,
        args=(f"REQ-{i:04d}", f"user_{i}"),
        name=f"Handler-{i}"
    )
    threads.append(t)
    t.start()

for t in threads:
    t.join()

16.2.5 线程池深度应用

python
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
import threading
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")

class ManagedThreadPool:
    def __init__(self, max_workers, thread_name_prefix="Worker"):
        self._executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=thread_name_prefix
        )
        self._futures = []
        self._lock = threading.Lock()

    def submit(self, fn, *args, **kwargs) -> Future:
        future = self._executor.submit(fn, *args, **kwargs)
        with self._lock:
            self._futures.append(future)
        future.add_done_callback(self._on_task_done)
        return future

    def _on_task_done(self, future):
        try:
            result = future.result()
            logging.info(f"任务完成: {result}")
        except Exception as e:
            logging.error(f"任务失败: {e}")

    def map_as_completed(self, fn, iterables):
        futures = {self._executor.submit(fn, item): item for item in iterables}
        results = []
        for future in as_completed(futures):
            item = futures[future]
            try:
                results.append((item, future.result()))
            except Exception as e:
                results.append((item, e))
        return results

    def shutdown(self, wait=True):
        self._executor.shutdown(wait=wait)

def fetch_url(url):
    time.sleep(1)
    return f"Response from {url}"

urls = [f"http://api.example.com/data/{i}" for i in range(8)]

pool = ManagedThreadPool(max_workers=4)
futures = [pool.submit(fetch_url, url) for url in urls]

for future in as_completed(futures):
    try:
        print(future.result())
    except Exception as e:
        print(f"Error: {e}")

pool.shutdown()

Future回调链

python
from concurrent.futures import ThreadPoolExecutor
import time

def fetch_data(source):
    time.sleep(1)
    return f"raw_data_from_{source}"

def parse_data(raw_data):
    time.sleep(0.5)
    return raw_data.replace("raw_data_from_", "parsed: ")

def store_data(parsed_data):
    time.sleep(0.3)
    return f"stored[{parsed_data}]"

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(fetch_data, "database")

    def on_fetch_done(f):
        raw = f.result()
        parse_future = executor.submit(parse_data, raw)
        parse_future.add_done_callback(on_parse_done)

    def on_parse_done(f):
        parsed = f.result()
        store_future = executor.submit(store_data, parsed)
        store_future.add_done_callback(on_store_done)

    def on_store_done(f):
        print(f"Pipeline完成: {f.result()}")

    future.add_done_callback(on_fetch_done)

    time.sleep(3)

16.3 多进程编程

16.3.1 进程模型与内存模型

每个进程拥有独立的内存空间,进程间不共享变量,因此天然避免了竞态条件,但也带来了进程间通信的复杂性。

python
import multiprocessing
import os

def show_process_info():
    print(f"PID: {os.getpid()}, PPID: {os.getppid()}")
    print(f"进程名: {multiprocessing.current_process().name}")

if __name__ == "__main__":
    show_process_info()

    p = multiprocessing.Process(target=show_process_info, name="ChildProcess")
    p.start()
    p.join()

进程启动方法

python
import multiprocessing as mp

available = mp.get_all_start_methods()
print(f"可用启动方法: {available}")
print(f"当前方法: {mp.get_start_method()}")

if 'spawn' in available:
    ctx = mp.get_context('spawn')
    print(f"spawn上下文: {ctx}")
启动方法平台特点
spawn全平台创建新进程,导入主模块,安全但慢
forkUnix复制父进程内存,快但可能不安全
forkserverUnix先创建服务器进程,再fork,折中方案

进程间数据隔离

python
import multiprocessing

global_var = 0

def modify_global():
    global global_var
    global_var = 100
    print(f"子进程 global_var = {global_var}")

if __name__ == "__main__":
    p = multiprocessing.Process(target=modify_global)
    p.start()
    p.join()
    print(f"主进程 global_var = {global_var}")

16.3.2 进程间通信(IPC)

Queue

python
import multiprocessing
import time

def producer(q):
    for i in range(5):
        item = f"Item-{i}"
        q.put(item)
        print(f"生产: {item}")
        time.sleep(0.1)
    q.put(None)

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")
        time.sleep(0.2)

if __name__ == "__main__":
    q = multiprocessing.Queue(maxsize=10)

    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Pipe

python
import multiprocessing

def sender(conn):
    messages = ["Hello", "World", "Python", "Concurrency"]
    for msg in messages:
        conn.send(msg)
        print(f"发送: {msg}")
    conn.send(None)
    conn.close()

def receiver(conn):
    while True:
        msg = conn.recv()
        if msg is None:
            break
        print(f"接收: {msg}")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target=sender, args=(child_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

共享内存(Value与Array)

python
import multiprocessing
import ctypes

def worker(shared_value, shared_array, lock):
    with lock:
        shared_value.value += 1
        for i in range(len(shared_array)):
            shared_array[i] += 1

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    value = multiprocessing.Value(ctypes.c_int, 0)
    array = multiprocessing.Array(ctypes.c_int, [0, 0, 0, 0, 0])

    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=worker, args=(value, array, lock))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Value: {value.value}")
    print(f"Array: {list(array)}")

Manager(托管对象)

python
import multiprocessing
import time

def worker(namespace, shared_list, shared_dict, lock):
    with lock:
        namespace.count += 1
        shared_list.append(multiprocessing.current_process().name)
        shared_dict[multiprocessing.current_process().name] = time.time()

if __name__ == "__main__":
    with multiprocessing.Manager() as manager:
        namespace = manager.Namespace()
        namespace.count = 0

        shared_list = manager.list()
        shared_dict = manager.dict()
        lock = manager.Lock()

        processes = []
        for i in range(3):
            p = multiprocessing.Process(
                target=worker,
                args=(namespace, shared_list, shared_dict, lock),
                name=f"Worker-{i}"
            )
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"计数: {namespace.count}")
        print(f"列表: {list(shared_list)}")
        print(f"字典: {dict(shared_dict)}")

16.3.3 进程池

python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math

def is_prime(n):
    if n < 2:
        return False
    if n < 4:
        return True
    if n % 2 == 0 or n % 3 == 0:
        return False
    for i in range(5, int(math.sqrt(n)) + 1, 6):
        if n % i == 0 or n % (i + 2) == 0:
            return False
    return True

def find_primes_in_range(start, end):
    return [n for n in range(start, end) if is_prime(n)]

if __name__ == "__main__":
    ranges = [(i * 100000, (i + 1) * 100000) for i in range(8)]

    start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(find_primes_in_range, s, e): (s, e) for s, e in ranges}
        total_primes = 0
        for future in as_completed(futures):
            r = futures[future]
            primes = future.result()
            total_primes += len(primes)
            print(f"范围 {r}: 找到 {len(primes)} 个素数")
    print(f"总计: {total_primes} 个素数, 耗时: {time.perf_counter() - start:.2f}s")

    start = time.perf_counter()
    all_primes = []
    for s, e in ranges:
        all_primes.extend(find_primes_in_range(s, e))
    print(f"顺序执行: {len(all_primes)} 个素数, 耗时: {time.perf_counter() - start:.2f}s")

进程池初始化与大数据处理

python
from concurrent.futures import ProcessPoolExecutor

def init_worker(shared_data):
    global _shared_data
    _shared_data = shared_data

def process_chunk(chunk):
    return sum(x * _shared_data for x in chunk)

if __name__ == "__main__":
    data_chunks = [list(range(1000)) for _ in range(4)]
    shared_value = 2

    with ProcessPoolExecutor(
        max_workers=4,
        initializer=init_worker,
        initargs=(shared_value,)
    ) as executor:
        results = list(executor.map(process_chunk, data_chunks))
        print(f"各块结果: {results}")
        print(f"总和: {sum(results)}")

16.4 异步编程(asyncio)

16.4.1 事件循环与协程原理

asyncio是Python的异步I/O框架,基于事件循环(Event Loop)和协程(Coroutine)实现单线程并发。

协程的本质

python
import asyncio

async def modern_coroutine():
    await asyncio.sleep(1)
    return "result"

coro = modern_coroutine()
print(f"类型: {type(coro)}")
print(f"是否为协程: {asyncio.iscoroutine(coro)}")

async def demonstrate_await():
    result = await modern_coroutine()
    print(f"结果: {result}")

asyncio.run(demonstrate_await())

事件循环的工作机制

事件循环 (Event Loop)
┌──────────────────────────────────────┐
│  1. 检查就绪的I/O事件               │
│  2. 检查到期的定时器                │
│  3. 执行就绪的回调                  │
│  4. 推进挂起的协程                  │
│  5. 重复步骤1-4                     │
└──────────────────────────────────────┘
python
import asyncio

async def task_lifecycle():
    print("1. 协程开始")
    await asyncio.sleep(0)
    print("2. 让出控制权后恢复")
    await asyncio.sleep(0)
    print("3. 再次恢复")
    return "完成"

async def main():
    result = await task_lifecycle()
    print(f"结果: {result}")

asyncio.run(main())

16.4.2 Task与任务调度

python
import asyncio
import time

async def fetch_data(name, delay):
    print(f"{name} 开始请求")
    await asyncio.sleep(delay)
    print(f"{name} 完成")
    return f"{name}的数据"

async def gather_demo():
    start = time.perf_counter()
    results = await asyncio.gather(
        fetch_data("API-A", 2),
        fetch_data("API-B", 1),
        fetch_data("API-C", 3),
    )
    elapsed = time.perf_counter() - start
    print(f"总耗时: {elapsed:.1f}s (并行执行,约等于最长任务)")
    print(f"结果: {results}")

asyncio.run(gather_demo())

create_task与gather的区别

python
import asyncio

async def task_demo():
    t1 = asyncio.create_task(fetch_data("Task-1", 2), name="Task-1")
    t2 = asyncio.create_task(fetch_data("Task-2", 1), name="Task-2")

    print(f"Task-1 状态: {t1.done()}")
    print(f"Task-2 名称: {t2.get_name()}")

    r1 = await t1
    r2 = await t2

    print(f"Task-1 状态: {t1.done()}")
    print(f"结果: {r1}, {r2}")

asyncio.run(task_demo())

TaskGroup(Python 3.11+)

python
import asyncio

async def taskgroup_demo():
    results = []

    async with asyncio.TaskGroup() as tg:
        async def tracked_fetch(name, delay):
            result = await fetch_data(name, delay)
            results.append(result)

        tg.create_task(tracked_fetch("TG-A", 2))
        tg.create_task(tracked_fetch("TG-B", 1))
        tg.create_task(tracked_fetch("TG-C", 3))

    print(f"所有任务完成: {results}")

asyncio.run(taskgroup_demo())

异常处理对比

python
import asyncio

async def failing_task():
    await asyncio.sleep(0.5)
    raise ValueError("任务失败")

async def gather_with_error():
    results = await asyncio.gather(
        fetch_data("OK-Task", 1),
        failing_task(),
        return_exceptions=True
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"异常: {r}")
        else:
            print(f"成功: {r}")

asyncio.run(gather_with_error())

async def taskgroup_with_error():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(fetch_data("OK-Task", 1))
            tg.create_task(failing_task())
    except* ValueError as eg:
        print(f"TaskGroup捕获ValueError: {eg.exceptions}")

asyncio.run(taskgroup_with_error())

16.4.3 异步同步原语

asyncio提供了与threading对应的异步同步原语,但它们在事件循环中协作式调度,不会真正阻塞线程。

python
import asyncio

class AsyncCounter:
    def __init__(self):
        self._value = 0
        self._lock = asyncio.Lock()

    async def increment(self):
        async with self._lock:
            self._value += 1

    @property
    def value(self):
        return self._value

async def worker(counter, worker_id, increments):
    for _ in range(increments):
        await counter.increment()
    print(f"Worker-{worker_id} 完成")

async def lock_demo():
    counter = AsyncCounter()
    increments = 10000
    workers = 5

    await asyncio.gather(*[
        worker(counter, i, increments) for i in range(workers)
    ])

    expected = increments * workers
    print(f"期望: {expected}, 实际: {counter.value}")

asyncio.run(lock_demo())

asyncio.Event

python
import asyncio

class AsyncService:
    def __init__(self):
        self._ready = asyncio.Event()
        self._data = {}

    async def initialize(self):
        await asyncio.sleep(1)
        self._data = {"status": "ready", "version": "2.0"}
        self._ready.set()

    async def wait_ready(self):
        await self._ready.wait()
        return self._data

async def service_demo():
    service = AsyncService()

    async def client(name):
        print(f"{name}: 等待服务就绪...")
        data = await service.wait_ready()
        print(f"{name}: 服务就绪,数据={data}")

    await asyncio.gather(
        service.initialize(),
        client("Client-A"),
        client("Client-B"),
    )

asyncio.run(service_demo())

asyncio.Queue

python
import asyncio
import random

class AsyncProducerConsumer:
    def __init__(self, buffer_size=5):
        self._queue = asyncio.Queue(maxsize=buffer_size)
        self._produced = 0
        self._consumed = 0

    async def producer(self, name, count):
        for i in range(count):
            item = f"{name}-{i}"
            await self._queue.put(item)
            self._produced += 1
            print(f"生产: {item} (队列大小: {self._queue.qsize()})")
            await asyncio.sleep(random.uniform(0.05, 0.15))

    async def consumer(self, name, count):
        for _ in range(count):
            item = await self._queue.get()
            self._consumed += 1
            print(f"消费: {item} by {name}")
            self._queue.task_done()
            await asyncio.sleep(random.uniform(0.1, 0.2))

    async def run(self, num_producers=2, num_consumers=2, items_per_producer=5):
        producers = [
            self.producer(f"P{i}", items_per_producer)
            for i in range(num_producers)
        ]
        consumers = [
            self.consumer(f"C{i}", num_producers * items_per_producer // num_consumers)
            for i in range(num_consumers)
        ]

        await asyncio.gather(*producers)
        await self._queue.join()
        await asyncio.gather(*consumers)

        print(f"生产: {self._produced}, 消费: {self._consumed}")

asyncio.run(AsyncProducerConsumer().run())

asyncio.Semaphore与限流

python
import asyncio
import time

class RateLimiter:
    def __init__(self, max_concurrent=3):
        self._semaphore = asyncio.Semaphore(max_concurrent)

    async def __aenter__(self):
        await self._semaphore.acquire()
        return self

    async def __aexit__(self, *args):
        self._semaphore.release()

async def rate_limited_fetch(url, limiter):
    async with limiter:
        print(f"开始请求: {url}")
        await asyncio.sleep(1)
        print(f"完成请求: {url}")
        return f"Response from {url}"

async def rate_limiter_demo():
    limiter = RateLimiter(max_concurrent=2)
    urls = [f"http://api.example.com/{i}" for i in range(6)]

    start = time.perf_counter()
    results = await asyncio.gather(*[
        rate_limited_fetch(url, limiter) for url in urls
    ])
    elapsed = time.perf_counter() - start
    print(f"6个请求,并发限制2,耗时: {elapsed:.1f}s (约3批次×1s)")

asyncio.run(rate_limiter_demo())

16.4.4 异步迭代器与生成器

python
import asyncio

class AsyncPageIterator:
    def __init__(self, total_items, page_size=10):
        self._total = total_items
        self._page_size = page_size
        self._current = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self._current >= self._total:
            raise StopAsyncIteration

        start = self._current
        end = min(self._current + self._page_size, self._total)
        self._current = end

        await asyncio.sleep(0.1)
        return list(range(start, end))

async def pagination_demo():
    async for page in AsyncPageIterator(total_items=35, page_size=10):
        print(f"获取页面: {page}")

asyncio.run(pagination_demo())

async def async_chunk_generator(iterable, chunk_size):
    chunk = []
    for item in iterable:
        chunk.append(item)
        if len(chunk) == chunk_size:
            await asyncio.sleep(0)
            yield chunk
            chunk = []
    if chunk:
        yield chunk

async def chunk_demo():
    data = range(20)
    async for chunk in async_chunk_generator(data, 5):
        print(f"处理批次: {chunk}")

asyncio.run(chunk_demo())

异步推导式

python
import asyncio

async def async_transform(x):
    await asyncio.sleep(0.01)
    return x ** 2

async def comprehension_demo():
    result = [await async_transform(i) for i in range(10)]
    print(f"顺序异步推导: {result}")

    result = await asyncio.gather(*[async_transform(i) for i in range(10)])
    print(f"并行异步推导: {result}")

asyncio.run(comprehension_demo())

16.4.5 异步上下文管理器与子进程

python
import asyncio
from contextlib import asynccontextmanager

class AsyncDatabaseConnection:
    def __init__(self, url):
        self._url = url
        self._connected = False

    async def __aenter__(self):
        await asyncio.sleep(0.1)
        self._connected = True
        print(f"连接到: {self._url}")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(0.05)
        self._connected = False
        print(f"断开连接: {self._url}")

    async def query(self, sql):
        if not self._connected:
            raise RuntimeError("未连接")
        await asyncio.sleep(0.05)
        return f"[{sql}] → 结果"

@asynccontextmanager
async def managed_connection(url):
    conn = AsyncDatabaseConnection(url)
    await conn.__aenter__()
    try:
        yield conn
    finally:
        await conn.__aexit__(None, None, None)

async def db_demo():
    async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
        result = await db.query("SELECT * FROM users")
        print(result)

    async with managed_connection("mysql://localhost/test") as db:
        result = await db.query("SELECT COUNT(*) FROM orders")
        print(result)

asyncio.run(db_demo())

异步子进程

python
import asyncio

async def run_command(cmd):
    process = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await process.communicate()
    return process.returncode, stdout.decode(), stderr.decode()

async def subprocess_demo():
    returncode, stdout, stderr = await run_command(
        ["python", "-c", "print('Hello from subprocess')"]
    )
    print(f"返回码: {returncode}")
    print(f"输出: {stdout.strip()}")

async def parallel_commands():
    commands = [
        ["python", "-c", f"import time; time.sleep({i}); print('Task {i} done')"]
        for i in range(1, 4)
    ]

    results = await asyncio.gather(*[run_command(cmd) for cmd in commands])
    for i, (rc, out, err) in enumerate(results, 1):
        print(f"命令{i}: 返回码={rc}, 输出={out.strip()}")

asyncio.run(parallel_commands())

16.4.6 异步网络编程

python
import asyncio

async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"新连接: {addr}")

    try:
        while True:
            data = await reader.read(1024)
            if not data:
                break
            message = data.decode()
            print(f"收到来自 {addr}: {message}")
            response = f"Echo: {message}"
            writer.write(response.encode())
            await writer.drain()
    except ConnectionResetError:
        print(f"连接断开: {addr}")
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"关闭连接: {addr}")

async def start_echo_server():
    server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
    addrs = ', '.join(str(s.getsockname()) for s in server.sockets)
    print(f"服务器运行在 {addrs}")

    async with server:
        await server.serve_forever()

async def tcp_client():
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    messages = ["Hello", "World", "Python"]
    for msg in messages:
        writer.write(msg.encode())
        await writer.drain()

        data = await reader.read(1024)
        print(f"收到响应: {data.decode()}")

    writer.close()
    await writer.wait_closed()

async def server_client_demo():
    server_task = asyncio.create_task(start_echo_server())
    await asyncio.sleep(0.5)

    await tcp_client()

    server_task.cancel()
    try:
        await server_task
    except asyncio.CancelledError:
        print("服务器已关闭")

asyncio.run(server_client_demo())

16.4.7 异步与同步代码的桥接

python
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def blocking_io_task(duration):
    time.sleep(duration)
    return f"阻塞任务完成 ({duration}s)"

def cpu_heavy(n):
    total = sum(i * i for i in range(n))
    return total

async def bridge_demo():
    loop = asyncio.get_event_loop()

    result = await loop.run_in_executor(None, blocking_io_task, 2)
    print(result)

    process_executor = ProcessPoolExecutor(max_workers=2)

    results = await asyncio.gather(
        loop.run_in_executor(process_executor, cpu_heavy, 10_000_000),
        loop.run_in_executor(process_executor, cpu_heavy, 20_000_000),
        fetch_data("async-task", 1),
    )
    print(f"混合执行结果: {results}")

    process_executor.shutdown(wait=True)

asyncio.run(bridge_demo())

16.5 高级并发模式

16.5.1 生产者-消费者模式的完整实现

python
import asyncio
import random
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")

class AsyncPipeline:
    def __init__(self, buffer_size=10):
        self._input_queue = asyncio.Queue(maxsize=buffer_size)
        self._output_queue = asyncio.Queue(maxsize=buffer_size)
        self._error_queue = asyncio.Queue()
        self._stats = {"produced": 0, "processed": 0, "errors": 0}

    async def producer(self, count):
        for i in range(count):
            item = {"id": i, "data": random.random(), "timestamp": time.time()}
            await self._input_queue.put(item)
            self._stats["produced"] += 1
            logging.info(f"生产: Item-{i}")
            await asyncio.sleep(random.uniform(0.01, 0.05))
        await self._input_queue.put(None)

    async def processor(self, name):
        while True:
            item = await self._input_queue.get()
            if item is None:
                await self._input_queue.put(None)
                break

            try:
                if random.random() < 0.1:
                    raise ValueError(f"处理 Item-{item['id']} 时出错")

                processed = {
                    "id": item["id"],
                    "result": item["data"] ** 2,
                    "processed_by": name,
                }
                await self._output_queue.put(processed)
                self._stats["processed"] += 1
                logging.info(f"{name} 处理: Item-{item['id']}")
            except Exception as e:
                error_info = {"item_id": item["id"], "error": str(e)}
                await self._error_queue.put(error_info)
                self._stats["errors"] += 1
                logging.error(f"{name} 错误: {e}")
            finally:
                self._input_queue.task_done()

            await asyncio.sleep(random.uniform(0.02, 0.08))

    async def consumer(self, count):
        consumed = 0
        while consumed < count:
            item = await self._output_queue.get()
            logging.info(
                f"消费: Item-{item['id']} = {item['result']:.4f} "
                f"(by {item['processed_by']})"
            )
            self._output_queue.task_done()
            consumed += 1

    async def error_handler(self):
        while True:
            try:
                error = await asyncio.wait_for(self._error_queue.get(), timeout=0.5)
                logging.warning(f"错误记录: Item-{error['item_id']} - {error['error']}")
            except asyncio.TimeoutError:
                break

    async def run(self, num_items=20, num_processors=3):
        start = time.perf_counter()

        producers = [self.producer(num_items)]
        processors = [self.processor(f"Proc-{i}") for i in range(num_processors)]
        consumers = [self.consumer(num_items)]
        error_handlers = [self.error_handler()]

        await asyncio.gather(*producers)
        await asyncio.gather(*processors)
        await asyncio.gather(*consumers)
        await asyncio.gather(*error_handlers)

        elapsed = time.perf_counter() - start
        print(f"\n统计: {self._stats}")
        print(f"耗时: {elapsed:.2f}s")

asyncio.run(AsyncPipeline().run())

16.5.2 限流与背压

python
import asyncio
import time

class TokenBucket:
    def __init__(self, rate, capacity):
        self._rate = rate
        self._capacity = capacity
        self._tokens = capacity
        self._last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens=1):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self._last_refill
            self._tokens = min(self._capacity, self._tokens + elapsed * self._rate)
            self._last_refill = now

            if self._tokens >= tokens:
                self._tokens -= tokens
                return True
            return False

    async def wait_and_acquire(self, tokens=1):
        while True:
            if await self.acquire(tokens):
                return
            wait_time = (tokens - self._tokens) / self._rate
            await asyncio.sleep(wait_time)

class ThrottledClient:
    def __init__(self, requests_per_second=5):
        self._bucket = TokenBucket(
            rate=requests_per_second,
            capacity=requests_per_second * 2
        )
        self._request_count = 0

    async def request(self, url):
        await self._bucket.wait_and_acquire()
        self._request_count += 1
        await asyncio.sleep(0.1)
        return f"Response-{self._request_count} from {url}"

async def throttled_demo():
    client = ThrottledClient(requests_per_second=3)
    urls = [f"http://api.example.com/{i}" for i in range(10)]

    start = time.perf_counter()
    results = await asyncio.gather(*[client.request(url) for url in urls])
    elapsed = time.perf_counter() - start

    for r in results:
        print(r)
    print(f"10个请求,3/s限流,耗时: {elapsed:.1f}s")

asyncio.run(throttled_demo())

16.5.3 超时与重试模式

python
import asyncio
import random
import time
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")

class RetryPolicy:
    def __init__(self, max_retries=3, base_delay=1.0, max_delay=30.0, exponential=True):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential = exponential

    def get_delay(self, attempt):
        if not self.exponential:
            return self.base_delay
        delay = self.base_delay * (2 ** attempt)
        jitter = random.uniform(0, delay * 0.1)
        return min(delay + jitter, self.max_delay)

async def retry_async(coro_factory, policy=None, retryable_exceptions=(Exception,)):
    policy = policy or RetryPolicy()
    last_exception = None

    for attempt in range(policy.max_retries + 1):
        try:
            return await asyncio.wait_for(coro_factory(), timeout=5.0)
        except retryable_exceptions as e:
            last_exception = e
            if attempt < policy.max_retries:
                delay = policy.get_delay(attempt)
                logging.warning(f"第{attempt+1}次失败: {e}, {delay:.1f}s后重试...")
                await asyncio.sleep(delay)
            else:
                logging.error(f"达到最大重试次数({policy.max_retries})")

    raise last_exception

async def unreliable_service():
    if random.random() < 0.7:
        raise ConnectionError("服务不可用")
    return "成功响应"

async def retry_demo():
    try:
        result = await retry_async(
            unreliable_service,
            policy=RetryPolicy(max_retries=5, base_delay=0.5),
            retryable_exceptions=(ConnectionError,)
        )
        print(f"最终结果: {result}")
    except ConnectionError as e:
        print(f"彻底失败: {e}")

asyncio.run(retry_demo())

16.5.4 结构化并发

结构化并发(Structured Concurrency)是近年来并发编程领域的重要范式,其核心思想是:所有并发任务的生命周期必须被严格限定在一个语法作用域内。

python
import asyncio

async def structured_concurrency_demo():
    async def task(name, duration):
        print(f"{name} 开始")
        await asyncio.sleep(duration)
        print(f"{name} 完成")
        return f"{name}-结果"

    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(task("A", 2))
        t2 = tg.create_task(task("B", 1))
        t3 = tg.create_task(task("C", 3))

    print(f"所有结果: {t1.result()}, {t2.result()}, {t3.result()}")

asyncio.run(structured_concurrency_demo())

结构化并发的优势

  1. 异常安全:子任务异常会传播到父作用域,不会出现"孤儿"任务
  2. 资源管理:任务完成前不会退出作用域,确保资源正确释放
  3. 可推理性:并发逻辑被限定在明确的作用域内,易于理解和调试

超时取消模式

python
import asyncio

async def cancellable_task(name, duration):
    try:
        print(f"{name} 开始")
        await asyncio.sleep(duration)
        print(f"{name} 完成")
        return f"{name}-结果"
    except asyncio.CancelledError:
        print(f"{name} 被取消,执行清理...")
        await asyncio.sleep(0.1)
        print(f"{name} 清理完成")
        raise

async def timeout_pattern():
    try:
        result = await asyncio.wait_for(
            cancellable_task("SlowTask", 10),
            timeout=2.0
        )
    except asyncio.TimeoutError:
        print("任务超时,已取消")

asyncio.run(timeout_pattern())

16.5.5 Actor模式

Actor模式是一种基于消息传递的并发模型,每个Actor独立处理消息,避免共享状态:

python
import asyncio
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")

class Actor:
    def __init__(self, name):
        self._name = name
        self._inbox = asyncio.Queue()
        self._running = False

    async def start(self):
        self._running = True
        asyncio.create_task(self._process_messages())
        logging.info(f"Actor {self._name} 启动")

    async def stop(self):
        self._running = False
        await self._inbox.put(None)

    async def send(self, message):
        await self._inbox.put(message)

    async def _process_messages(self):
        while self._running:
            message = await self._inbox.get()
            if message is None:
                break
            await self.handle(message)

    async def handle(self, message):
        raise NotImplementedError

class CounterActor(Actor):
    def __init__(self, name):
        super().__init__(name)
        self._count = 0

    async def handle(self, message):
        match message:
            case {"action": "increment", "value": v}:
                self._count += v
                logging.info(f"{self._name}: count = {self._count}")
            case {"action": "decrement", "value": v}:
                self._count -= v
                logging.info(f"{self._name}: count = {self._count}")
            case {"action": "get", "reply_to": reply_queue}:
                await reply_queue.put(self._count)
            case _:
                logging.warning(f"{self._name}: 未知消息 {message}")

class PrinterActor(Actor):
    async def handle(self, message):
        logging.info(f"{self._name} 打印: {message}")

async def actor_demo():
    counter = CounterActor("Counter")
    printer = PrinterActor("Printer")

    await counter.start()
    await printer.start()

    await asyncio.gather(
        counter.send({"action": "increment", "value": 5}),
        counter.send({"action": "increment", "value": 3}),
        counter.send({"action": "decrement", "value": 2}),
        printer.send("Hello from Actor system"),
    )

    await asyncio.sleep(0.5)

    reply_queue = asyncio.Queue()
    await counter.send({"action": "get", "reply_to": reply_queue})
    result = await reply_queue.get()
    print(f"计数器值: {result}")

    await counter.stop()
    await printer.stop()

asyncio.run(actor_demo())

16.6 并发调试与性能分析

16.6.1 常见并发问题

死锁

python
import threading
import time

def demonstrate_deadlock():
    lock_a = threading.Lock()
    lock_b = threading.Lock()

    def task1():
        with lock_a:
            time.sleep(0.1)
            print("Task1: 等待lock_b...")
            with lock_b:
                print("Task1: 获得lock_b")

    def task2():
        with lock_b:
            time.sleep(0.1)
            print("Task2: 等待lock_a...")
            with lock_a:
                print("Task2: 获得lock_a")

    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)

    t1.start()
    t2.start()

    t1.join(timeout=3)
    t2.join(timeout=3)

    if t1.is_alive() or t2.is_alive():
        print("检测到死锁!")
        return True
    return False

def avoid_deadlock_with_ordered_locks():
    lock_a = threading.Lock()
    lock_b = threading.Lock()

    def task1():
        with lock_a:
            time.sleep(0.1)
            with lock_b:
                print("Task1: 安全获得两把锁")

    def task2():
        with lock_a:
            time.sleep(0.1)
            with lock_b:
                print("Task2: 安全获得两把锁")

    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("无死锁完成")

避免死锁的策略

  1. 锁排序:始终按固定顺序获取多把锁
  2. 超时机制:使用acquire(timeout=...)避免无限等待
  3. 锁粒度:尽量减小临界区范围,减少持锁时间
  4. 避免嵌套:尽量不要在持有一把锁时获取另一把锁

16.6.2 性能分析与调优

python
import asyncio
import time
import statistics

async def benchmark_async(coro_factory, iterations=100):
    times = []
    for _ in range(iterations):
        start = time.perf_counter()
        await coro_factory()
        times.append(time.perf_counter() - start)

    return {
        "mean": statistics.mean(times),
        "median": statistics.median(times),
        "stdev": statistics.stdev(times) if len(times) > 1 else 0,
        "min": min(times),
        "max": max(times),
    }

async def benchmark_comparison():
    async def async_sleep():
        await asyncio.sleep(0.001)

    async def async_compute():
        total = 0
        for i in range(1000):
            total += i * i
        return total

    sleep_stats = await benchmark_async(async_sleep, 50)
    compute_stats = await benchmark_async(async_compute, 50)

    print("异步sleep基准:")
    for k, v in sleep_stats.items():
        print(f"  {k}: {v*1000:.3f}ms")

    print("\n异步计算基准:")
    for k, v in compute_stats.items():
        print(f"  {k}: {v*1000:.3f}ms")

asyncio.run(benchmark_comparison())

最优工作线程/进程数计算

python
import os

def optimal_pool_size(task_type="mixed", cpu_count=None):
    cpu_count = cpu_count or os.cpu_count() or 1

    if task_type == "cpu_bound":
        return cpu_count
    elif task_type == "io_bound":
        return min(32, cpu_count * 5)
    else:
        return min(16, cpu_count * 2)

print(f"CPU核心数: {os.cpu_count()}")
print(f"CPU密集型池大小: {optimal_pool_size('cpu_bound')}")
print(f"I/O密集型池大小: {optimal_pool_size('io_bound')}")
print(f"混合型池大小: {optimal_pool_size('mixed')}")

16.7 前沿技术动态

16.7.1 Python 3.12+ 并发新特性

asyncio.timeout(Python 3.12+)

python
import asyncio

async def per_task_timeout_demo():
    async def risky_task(name, duration):
        await asyncio.sleep(duration)
        return f"{name} 完成"

    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(risky_task("Fast", 0.5))
        t2 = tg.create_task(risky_task("Slow", 5.0))

        try:
            async with asyncio.timeout(2.0):
                await t2
        except TimeoutError:
            print("Slow任务超时")
            t2.cancel()

    if t1.done():
        print(f"Fast任务结果: {t1.result()}")

16.7.2 PEP 703 — 自由线程CPython

Python 3.13引入了实验性的自由线程模式(Free-threaded CPython),这是Python并发领域最具革命性的变化:

  • 移除GIL,允许多线程真正并行执行Python字节码
  • 通过--disable-gil编译选项启用
  • 对引用计数机制进行了重新设计,采用 biased reference counting + per-object lock
  • 预计在Python 3.14+中逐步稳定
python
import sys
if hasattr(sys, '_is_gil_enabled'):
    print(f"GIL启用: {sys._is_gil_enabled()}")
else:
    print("当前Python版本不支持GIL状态查询")

16.7.3 PEP 734 — 子解释器

Python 3.12+引入的子解释器(Sub-interpreters)提供了一种介于线程和进程之间的并发模型:

  • 每个子解释器拥有独立的GIL
  • 比多进程更轻量,启动更快
  • 通过interpreters模块使用
  • 适合需要隔离但又不希望付出进程创建开销的场景
python
import interpreters

interp = interpreters.create()
interp.run("print('Hello from sub-interpreter')")

16.8 本章小结

本章系统学习了Python并发编程的四大模型:

模型适用场景优势劣势
threadingI/O密集型简单易用,共享内存GIL限制,需同步
multiprocessingCPU密集型真正并行,绕过GIL进程开销大,IPC复杂
concurrent.futures通用任务提交统一接口,Future模式不够灵活
asyncio高并发I/O高效,低资源消耗学习曲线陡,需异步生态

核心原则

  1. I/O密集型优先选择asyncio或threading
  2. CPU密集型选择multiprocessing
  3. 混合型使用asyncio + run_in_executor
  4. 始终考虑死锁、竞态条件和资源泄漏
  5. 优先使用高层抽象(TaskGroup、线程池)而非底层原语

16.9 练习题

基础题

  1. 使用threading.Lock实现一个线程安全的字典类ThreadSafeDict,支持getsetdelete操作。

  2. 使用multiprocessing.Queue实现一个简单的分布式任务分发系统,包含一个调度进程和多个工作进程。

  3. 使用asyncio.gather并发请求5个URL(模拟即可),并统计总耗时。

进阶题

  1. 实现一个基于asyncio.Semaphore的连接池管理器,支持最大连接数限制、连接超时和自动回收。

  2. 使用ProcessPoolExecutor实现一个并行MapReduce框架,能够处理大规模数据集的词频统计。

  3. 实现一个异步爬虫框架,支持:

    • 并发控制(最大并发数)
    • 请求限流(每秒请求数)
    • 自动重试(指数退避)
    • 结果持久化

思考题

  1. 为什么Python的GIL在CPU密集型任务中成为瓶颈,而在I/O密集型任务中影响不大?请从GIL的调度机制角度深入分析。

  2. 比较结构化并发(TaskGroup)与非结构化并发(create_task + gather)的优劣,在什么场景下应该优先选择哪种方式?

  3. 随着PEP 703(自由线程CPython)的推进,Python并发编程的生态将如何变化?现有代码需要做哪些适配?

项目练习

  1. 并发Web服务器:使用asyncio实现一个支持路由、中间件和静态文件服务的HTTP服务器,要求能处理至少1000个并发连接。

  2. 并行数据处理管道:设计一个多阶段数据处理系统,使用多进程处理数据加载、清洗、转换和存储四个阶段,支持背压控制和错误恢复。

  3. 分布式任务队列:实现一个类似Celery的轻量级任务队列系统,支持任务优先级、重试、超时和结果存储。

16.10 延伸阅读

16.10.1 并发理论

  • 《The Art of Multiprocessor Programming》 (Herlihy & Shavit) — 多处理器编程理论
  • 《Concurrency in Practice》 (Brian Goetz) — Java并发编程经典,原理通用
  • 《Seven Concurrency Models in Seven Weeks》 — 并发模型全景

16.10.2 Python并发

  • 《Python Concurrency with asyncio》 (Matthew Fowler) — asyncio权威指南
  • 《Fluent Python》第19-21章 — 并发编程深度解析
  • PEP 703 — A Free-Threaded CPython (https://peps.python.org/pep-0703/) — 无GIL Python设计

16.10.3 asyncio生态

16.10.4 并发工具


下一章:第17章 Flask入门

青少年创意编程 - 高中Python组 - 江苏省宿城中等专业学校