第16章 并发编程
学习目标
完成本章学习后,读者应能够:
- 理论层面:深入理解并发与并行的本质区别,掌握Python GIL的实现机制及其对多线程性能的影响,理解操作系统级别的线程、进程调度原理
- 技术层面:熟练运用
threading、multiprocessing、concurrent.futures和asyncio四大并发模型,能够根据任务特征选择最优并发策略 - 工程层面:设计线程安全的数据结构与并发控制机制,实现高性能异步I/O系统,构建可扩展的并发应用架构
- 前沿视野:了解结构化并发(Structured Concurrency)、Python 3.12+ TaskGroup、子解释器等前沿技术动态
16.1 并发编程基础理论
16.1.1 并发与并行
并发(Concurrency)与并行(Parallelism)是两个常被混淆但本质不同的概念:
| 维度 | 并发(Concurrency) | 并行(Parallelism) |
|---|---|---|
| 定义 | 多个任务在重叠的时间段内推进 | 多个任务在同一时刻同时执行 |
| 核心思想 | 交替处理(时间片轮转) | 真正的同时执行 |
| 硬件要求 | 单核即可 | 需要多核/多处理器 |
| 典型场景 | I/O密集型任务 | CPU密集型任务 |
| 比喻 | 一个人交替处理多件事 | 多个人同时各处理一件事 |
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def cpu_bound_task(n):
total = 0
for i in range(n):
total += i * i
return total
def io_bound_task(duration):
time.sleep(duration)
return duration
def benchmark_sequential(tasks, task_func):
start = time.perf_counter()
results = [task_func(t) for t in tasks]
return time.perf_counter() - start
def benchmark_threaded(tasks, task_func):
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=len(tasks)) as executor:
results = list(executor.map(task_func, tasks))
return time.perf_counter() - start
def benchmark_process(tasks, task_func):
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=len(tasks)) as executor:
results = list(executor.map(task_func, tasks))
return time.perf_counter() - start
if __name__ == "__main__":
cpu_tasks = [10_000_000] * 4
io_tasks = [1.0] * 4
print("=== CPU密集型任务 ===")
print(f"顺序执行: {benchmark_sequential(cpu_tasks, cpu_bound_task):.2f}s")
print(f"多线程: {benchmark_threaded(cpu_tasks, cpu_bound_task):.2f}s")
print(f"多进程: {benchmark_process(cpu_tasks, cpu_bound_task):.2f}s")
print("\n=== I/O密集型任务 ===")
print(f"顺序执行: {benchmark_sequential(io_tasks, io_bound_task):.2f}s")
print(f"多线程: {benchmark_threaded(io_tasks, io_bound_task):.2f}s")
print(f"多进程: {benchmark_process(io_tasks, io_bound_task):.2f}s")选择策略决策树:
任务类型?
├── I/O密集型(网络请求、文件读写、数据库查询)
│ ├── 需要与同步代码兼容 → threading + ThreadPoolExecutor
│ └── 高并发、低资源消耗 → asyncio
├── CPU密集型(数值计算、图像处理、加密运算)
│ └── 多进程 → multiprocessing + ProcessPoolExecutor
└── 混合型
├── I/O为主 + 少量计算 → asyncio + run_in_executor
└── 计算为主 + 少量I/O → 多进程 + 线程池16.1.2 Python GIL深度解析
全局解释器锁(Global Interpreter Lock,GIL)是CPython实现中最具争议的设计之一。理解GIL对于正确选择并发模型至关重要。
GIL的本质:
GIL是CPython解释器中的一把互斥锁,确保同一时刻只有一个线程执行Python字节码。其存在原因:
- 内存管理安全:CPython使用引用计数进行内存管理,若无GIL,多线程同时修改引用计数将导致内存泄漏或提前释放
- C扩展兼容性:大量C扩展库假设GIL存在,移除GIL将破坏整个生态系统
- 单线程性能:GIL简化了解释器实现,使单线程Python代码运行更快
GIL的调度机制:
import sys
print(f"检查间隔(switch interval): {sys.getswitchinterval()}s")
print(f"默认值: 5ms = 0.005s")
sys.setswitchinterval(0.005)GIL的切换规则:
- Python 3.2+:采用固定时间片机制,默认每5ms强制释放GIL
- I/O操作时主动释放GIL(如
time.sleep()、文件读写、网络请求) - C扩展可在执行计算密集操作时手动释放GIL
GIL对性能的影响:
import threading
import time
def pure_python_count(n):
start = time.perf_counter()
total = 0
for i in range(n):
total += i
return time.perf_counter() - start
def threaded_python_count(n, num_threads):
def worker(count):
total = 0
for i in range(count):
total += i
start = time.perf_counter()
threads = []
chunk = n // num_threads
for i in range(num_threads):
count = chunk if i < num_threads - 1 else n - chunk * (num_threads - 1)
t = threading.Thread(target=worker, args=(count,))
threads.append(t)
t.start()
for t in threads:
t.join()
return time.perf_counter() - start
if __name__ == "__main__":
N = 50_000_000
single = pure_python_count(N)
multi = threaded_python_count(N, 4)
print(f"单线程: {single:.2f}s")
print(f"4线程: {multi:.2f}s")
print(f"加速比: {single/multi:.2f}x")
print("注意: 多线程可能更慢,因为GIL切换开销")绕过GIL的策略:
import numpy as np
from concurrent.futures import ProcessPoolExecutor
def numpy_computation(size):
arr = np.random.rand(size)
return np.sum(arr ** 2)
def parallel_numpy(size, workers=4):
chunk = size // workers
def compute(s):
arr = np.random.rand(s)
return np.sum(arr ** 2)
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(compute, [chunk] * workers))
return sum(results)前沿动态:Python 3.13引入了实验性的自由线程模式(PEP 703),可通过--disable-gil编译选项构建无GIL的Python解释器。这标志着Python社区在移除GIL方面迈出了关键一步。
16.2 多线程编程
16.2.1 线程基础与生命周期
Python线程是对操作系统原生线程的封装,每个Python线程对应一个OS线程。
import threading
import time
def worker(name, duration):
print(f"[{threading.current_thread().name}] {name} 启动")
time.sleep(duration)
print(f"[{threading.current_thread().name}] {name} 完成")
return f"{name}的结果"
t = threading.Thread(
target=worker,
args=("任务A", 2),
name="WorkerThread-1",
daemon=False
)
print(f"线程名称: {t.name}")
print(f"是否守护线程: {t.daemon}")
print(f"线程标识: {t.ident}")
print(f"是否存活: {t.is_alive()}")
t.start()
t.join(timeout=5)
if t.is_alive():
print("线程未在超时时间内完成")
else:
print(f"线程已完成,标识: {t.ident}")线程生命周期状态图:
New → start() → Runnable → 获取GIL → Running
↑ ↓
← 释放GIL ← ← 阻塞操作
↓
Blocked (I/O/Lock)
↓
wait完成 → Runnable
Running → 函数返回/异常 → Terminated守护线程:
import threading
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
def background_monitor():
while True:
logging.info("监控心跳...")
time.sleep(1)
def main_task():
logging.info("主任务开始")
time.sleep(3)
logging.info("主任务完成")
monitor = threading.Thread(target=background_monitor, name="Monitor", daemon=True)
monitor.start()
main_task()
print("主线程结束,守护线程将自动终止")16.2.2 线程同步原语
多线程共享进程的内存空间,因此必须使用同步原语防止竞态条件(Race Condition)。
竞态条件示例:
import threading
class UnsafeCounter:
def __init__(self):
self.value = 0
def increment(self):
temp = self.value
temp += 1
self.value = temp
counter = UnsafeCounter()
threads = []
for _ in range(10):
t = threading.Thread(target=lambda: [counter.increment() for _ in range(100000)])
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"期望值: 1000000, 实际值: {counter.value}")Lock(互斥锁):
import threading
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
@property
def value(self):
with self._lock:
return self._value
counter = SafeCounter()
threads = []
for _ in range(10):
t = threading.Thread(target=lambda: [counter.increment() for _ in range(100000)])
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"期望值: 1000000, 实际值: {counter.value}")Lock与RLock的区别:
import threading
lock = threading.Lock()
try:
lock.acquire()
lock.acquire()
except RuntimeError as e:
print(f"同一线程重复获取非可重入锁: {e}")
rlock = threading.RLock()
rlock.acquire()
rlock.acquire()
print(f"可重入锁嵌套获取成功,计数: 2")
rlock.release()
rlock.release()
print("可重入锁全部释放")RLock的应用场景——递归调用:
import threading
class TreeWalker:
def __init__(self):
self._lock = threading.RLock()
self.visited = []
def walk(self, node):
with self._lock:
self.visited.append(node.value)
for child in node.children:
self.walk(child)
class TreeNode:
def __init__(self, value):
self.value = value
self.children = []
root = TreeNode("root")
root.children = [TreeNode("left"), TreeNode("right")]
root.children[0].children = [TreeNode("left-left")]
walker = TreeWalker()
walker.walk(root)
print(f"遍历结果: {walker.visited}")Semaphore(信号量):
import threading
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
class ConnectionPool:
def __init__(self, max_connections=3):
self._semaphore = threading.Semaphore(max_connections)
self._connections = []
self._lock = threading.Lock()
def acquire(self):
self._semaphore.acquire()
with self._lock:
conn = f"Connection-{len(self._connections)}"
self._connections.append(conn)
logging.info(f"获取连接: {conn}")
return conn
def release(self, conn):
with self._lock:
self._connections.remove(conn)
logging.info(f"释放连接: {conn}")
self._semaphore.release()
def __enter__(self):
self._conn = self.acquire()
return self._conn
def __exit__(self, *args):
self.release(self._conn)
pool = ConnectionPool(max_connections=2)
def use_connection(name, duration):
with pool as conn:
logging.info(f"{name} 使用 {conn}")
time.sleep(duration)
threads = [
threading.Thread(target=use_connection, args=(f"Client-{i}", i * 0.5 + 0.5), name=f"Client-{i}")
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()Event(事件):
import threading
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
class ServiceManager:
def __init__(self):
self._ready = threading.Event()
self._shutdown = threading.Event()
def wait_ready(self, timeout=None):
return self._ready.wait(timeout)
def mark_ready(self):
self._ready.set()
logging.info("服务就绪")
def request_shutdown(self):
self._shutdown.set()
logging.info("请求关闭")
def is_shutdown_requested(self):
return self._shutdown.is_set()
manager = ServiceManager()
def service_worker():
logging.info("等待服务就绪...")
manager.wait_ready()
logging.info("服务已就绪,开始工作")
while not manager.is_shutdown_requested():
time.sleep(0.5)
logging.info("工作中...")
logging.info("收到关闭信号,退出")
def service_initializer():
logging.info("初始化服务...")
time.sleep(2)
manager.mark_ready()
time.sleep(3)
manager.request_shutdown()
threads = [
threading.Thread(target=service_worker, name="Worker"),
threading.Thread(target=service_initializer, name="Initializer"),
]
for t in threads:
t.start()
for t in threads:
t.join()Condition(条件变量):
import threading
import time
import random
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
class BoundedBuffer:
def __init__(self, capacity):
self._buffer = []
self._capacity = capacity
self._condition = threading.Condition()
def put(self, item):
with self._condition:
while len(self._buffer) >= self._capacity:
logging.info(f"缓冲区满,等待消费... (size={len(self._buffer)})")
self._condition.wait()
self._buffer.append(item)
logging.info(f"生产: {item} (size={len(self._buffer)})")
self._condition.notify_all()
def get(self):
with self._condition:
while not self._buffer:
logging.info("缓冲区空,等待生产...")
self._condition.wait()
item = self._buffer.pop(0)
logging.info(f"消费: {item} (size={len(self._buffer)})")
self._condition.notify_all()
return item
buffer = BoundedBuffer(capacity=3)
def producer(pid):
for i in range(5):
item = f"P{pid}-{i}"
buffer.put(item)
time.sleep(random.uniform(0.1, 0.3))
def consumer(cid):
for _ in range(5):
item = buffer.get()
time.sleep(random.uniform(0.2, 0.5))
threads = []
for i in range(2):
threads.append(threading.Thread(target=producer, args=(i,), name=f"Producer-{i}"))
for i in range(2):
threads.append(threading.Thread(target=consumer, args=(i,), name=f"Consumer-{i}"))
for t in threads:
t.start()
for t in threads:
t.join()Barrier(栅栏):
import threading
import time
import random
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
def parallel_computation(worker_id, barrier):
phase1_time = random.uniform(0.5, 1.5)
logging.info(f"Worker-{worker_id} 阶段1 (耗时{phase1_time:.1f}s)")
time.sleep(phase1_time)
logging.info(f"Worker-{worker_id} 等待其他worker完成阶段1")
barrier.wait()
phase2_time = random.uniform(0.5, 1.0)
logging.info(f"Worker-{worker_id} 阶段2 (耗时{phase2_time:.1f}s)")
time.sleep(phase2_time)
barrier.wait()
logging.info(f"Worker-{worker_id} 全部完成")
num_workers = 4
barrier = threading.Barrier(num_workers)
threads = [
threading.Thread(target=parallel_computation, args=(i, barrier), name=f"Worker-{i}")
for i in range(num_workers)
]
for t in threads:
t.start()
for t in threads:
t.join()16.2.3 线程安全的数据结构
基于Queue的生产者-消费者模式:
import threading
import queue
import time
import random
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
class TaskQueue:
def __init__(self, maxsize=0):
self._queue = queue.Queue(maxsize=maxsize)
self._shutdown = threading.Event()
def submit(self, task):
if not self._shutdown.is_set():
self._queue.put(task)
return True
return False
def get(self, timeout=1.0):
try:
return self._queue.get(timeout=timeout)
except queue.Empty:
return None
def task_done(self):
self._queue.task_done()
def shutdown(self):
self._shutdown.set()
def join(self):
self._queue.join()
@property
def size(self):
return self._queue.qsize()
class WorkerPool:
def __init__(self, num_workers, task_handler):
self._task_queue = TaskQueue(maxsize=100)
self._workers = []
self._handler = task_handler
for i in range(num_workers):
t = threading.Thread(target=self._worker_loop, name=f"Worker-{i}", daemon=True)
t.start()
self._workers.append(t)
def _worker_loop(self):
while True:
task = self._task_queue.get()
if task is None:
break
try:
self._handler(task)
except Exception as e:
logging.error(f"任务处理失败: {e}")
finally:
self._task_queue.task_done()
def submit(self, task):
self._task_queue.submit(task)
def wait_completion(self):
self._task_queue.join()
def shutdown(self):
self._task_queue.shutdown()
for _ in self._workers:
self._task_queue._queue.put(None)
for t in self._workers:
t.join()
def process_task(task):
task_id, data = task
processing_time = random.uniform(0.1, 0.5)
time.sleep(processing_time)
result = data * 2
logging.info(f"任务 {task_id}: {data} → {result}")
pool = WorkerPool(num_workers=3, task_handler=process_task)
for i in range(10):
pool.submit((i, i * 10))
pool.wait_completion()
logging.info("所有任务完成")
pool.shutdown()PriorityQueue与LifoQueue:
import queue
import threading
import time
priority_q = queue.PriorityQueue()
def priority_producer():
tasks = [
(3, "低优先级任务"),
(1, "高优先级任务"),
(2, "中优先级任务"),
(1, "另一个高优先级任务"),
(3, "另一个低优先级任务"),
]
for priority, task in tasks:
priority_q.put((priority, task))
time.sleep(0.1)
def priority_consumer():
while True:
try:
priority, task = priority_q.get(timeout=2)
print(f"处理: [{priority}] {task}")
priority_q.task_done()
except queue.Empty:
break
t1 = threading.Thread(target=priority_producer)
t2 = threading.Thread(target=priority_consumer)
t1.start()
t1.join()
t2.start()
t2.join()
lifo_q = queue.LifoQueue()
for i in range(5):
lifo_q.put(f"Item-{i}")
print("\nLIFO顺序:")
while not lifo_q.empty():
print(f" 取出: {lifo_q.get()}")16.2.4 线程本地存储
import threading
import random
import time
class RequestContext:
_local = threading.local()
@classmethod
def set_request_id(cls, request_id):
cls._local.request_id = request_id
@classmethod
def get_request_id(cls):
return getattr(cls._local, 'request_id', 'unknown')
@classmethod
def set_user(cls, user):
cls._local.user = user
@classmethod
def get_user(cls):
return getattr(cls._local, 'user', None)
def handle_request(request_id, user):
RequestContext.set_request_id(request_id)
RequestContext.set_user(user)
time.sleep(random.uniform(0.1, 0.3))
print(f"RequestID={RequestContext.get_request_id()}, "
f"User={RequestContext.get_user()}, "
f"Thread={threading.current_thread().name}")
threads = []
for i in range(5):
t = threading.Thread(
target=handle_request,
args=(f"REQ-{i:04d}", f"user_{i}"),
name=f"Handler-{i}"
)
threads.append(t)
t.start()
for t in threads:
t.join()16.2.5 线程池深度应用
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
import threading
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(threadName)s] %(message)s")
class ManagedThreadPool:
def __init__(self, max_workers, thread_name_prefix="Worker"):
self._executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=thread_name_prefix
)
self._futures = []
self._lock = threading.Lock()
def submit(self, fn, *args, **kwargs) -> Future:
future = self._executor.submit(fn, *args, **kwargs)
with self._lock:
self._futures.append(future)
future.add_done_callback(self._on_task_done)
return future
def _on_task_done(self, future):
try:
result = future.result()
logging.info(f"任务完成: {result}")
except Exception as e:
logging.error(f"任务失败: {e}")
def map_as_completed(self, fn, iterables):
futures = {self._executor.submit(fn, item): item for item in iterables}
results = []
for future in as_completed(futures):
item = futures[future]
try:
results.append((item, future.result()))
except Exception as e:
results.append((item, e))
return results
def shutdown(self, wait=True):
self._executor.shutdown(wait=wait)
def fetch_url(url):
time.sleep(1)
return f"Response from {url}"
urls = [f"http://api.example.com/data/{i}" for i in range(8)]
pool = ManagedThreadPool(max_workers=4)
futures = [pool.submit(fetch_url, url) for url in urls]
for future in as_completed(futures):
try:
print(future.result())
except Exception as e:
print(f"Error: {e}")
pool.shutdown()Future回调链:
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_data(source):
time.sleep(1)
return f"raw_data_from_{source}"
def parse_data(raw_data):
time.sleep(0.5)
return raw_data.replace("raw_data_from_", "parsed: ")
def store_data(parsed_data):
time.sleep(0.3)
return f"stored[{parsed_data}]"
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(fetch_data, "database")
def on_fetch_done(f):
raw = f.result()
parse_future = executor.submit(parse_data, raw)
parse_future.add_done_callback(on_parse_done)
def on_parse_done(f):
parsed = f.result()
store_future = executor.submit(store_data, parsed)
store_future.add_done_callback(on_store_done)
def on_store_done(f):
print(f"Pipeline完成: {f.result()}")
future.add_done_callback(on_fetch_done)
time.sleep(3)16.3 多进程编程
16.3.1 进程模型与内存模型
每个进程拥有独立的内存空间,进程间不共享变量,因此天然避免了竞态条件,但也带来了进程间通信的复杂性。
import multiprocessing
import os
def show_process_info():
print(f"PID: {os.getpid()}, PPID: {os.getppid()}")
print(f"进程名: {multiprocessing.current_process().name}")
if __name__ == "__main__":
show_process_info()
p = multiprocessing.Process(target=show_process_info, name="ChildProcess")
p.start()
p.join()进程启动方法:
import multiprocessing as mp
available = mp.get_all_start_methods()
print(f"可用启动方法: {available}")
print(f"当前方法: {mp.get_start_method()}")
if 'spawn' in available:
ctx = mp.get_context('spawn')
print(f"spawn上下文: {ctx}")| 启动方法 | 平台 | 特点 |
|---|---|---|
spawn | 全平台 | 创建新进程,导入主模块,安全但慢 |
fork | Unix | 复制父进程内存,快但可能不安全 |
forkserver | Unix | 先创建服务器进程,再fork,折中方案 |
进程间数据隔离:
import multiprocessing
global_var = 0
def modify_global():
global global_var
global_var = 100
print(f"子进程 global_var = {global_var}")
if __name__ == "__main__":
p = multiprocessing.Process(target=modify_global)
p.start()
p.join()
print(f"主进程 global_var = {global_var}")16.3.2 进程间通信(IPC)
Queue:
import multiprocessing
import time
def producer(q):
for i in range(5):
item = f"Item-{i}"
q.put(item)
print(f"生产: {item}")
time.sleep(0.1)
q.put(None)
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(0.2)
if __name__ == "__main__":
q = multiprocessing.Queue(maxsize=10)
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()Pipe:
import multiprocessing
def sender(conn):
messages = ["Hello", "World", "Python", "Concurrency"]
for msg in messages:
conn.send(msg)
print(f"发送: {msg}")
conn.send(None)
conn.close()
def receiver(conn):
while True:
msg = conn.recv()
if msg is None:
break
print(f"接收: {msg}")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(child_conn,))
p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()共享内存(Value与Array):
import multiprocessing
import ctypes
def worker(shared_value, shared_array, lock):
with lock:
shared_value.value += 1
for i in range(len(shared_array)):
shared_array[i] += 1
if __name__ == "__main__":
lock = multiprocessing.Lock()
value = multiprocessing.Value(ctypes.c_int, 0)
array = multiprocessing.Array(ctypes.c_int, [0, 0, 0, 0, 0])
processes = []
for _ in range(5):
p = multiprocessing.Process(target=worker, args=(value, array, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Value: {value.value}")
print(f"Array: {list(array)}")Manager(托管对象):
import multiprocessing
import time
def worker(namespace, shared_list, shared_dict, lock):
with lock:
namespace.count += 1
shared_list.append(multiprocessing.current_process().name)
shared_dict[multiprocessing.current_process().name] = time.time()
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
namespace = manager.Namespace()
namespace.count = 0
shared_list = manager.list()
shared_dict = manager.dict()
lock = manager.Lock()
processes = []
for i in range(3):
p = multiprocessing.Process(
target=worker,
args=(namespace, shared_list, shared_dict, lock),
name=f"Worker-{i}"
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"计数: {namespace.count}")
print(f"列表: {list(shared_list)}")
print(f"字典: {dict(shared_dict)}")16.3.3 进程池
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math
def is_prime(n):
if n < 2:
return False
if n < 4:
return True
if n % 2 == 0 or n % 3 == 0:
return False
for i in range(5, int(math.sqrt(n)) + 1, 6):
if n % i == 0 or n % (i + 2) == 0:
return False
return True
def find_primes_in_range(start, end):
return [n for n in range(start, end) if is_prime(n)]
if __name__ == "__main__":
ranges = [(i * 100000, (i + 1) * 100000) for i in range(8)]
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(find_primes_in_range, s, e): (s, e) for s, e in ranges}
total_primes = 0
for future in as_completed(futures):
r = futures[future]
primes = future.result()
total_primes += len(primes)
print(f"范围 {r}: 找到 {len(primes)} 个素数")
print(f"总计: {total_primes} 个素数, 耗时: {time.perf_counter() - start:.2f}s")
start = time.perf_counter()
all_primes = []
for s, e in ranges:
all_primes.extend(find_primes_in_range(s, e))
print(f"顺序执行: {len(all_primes)} 个素数, 耗时: {time.perf_counter() - start:.2f}s")进程池初始化与大数据处理:
from concurrent.futures import ProcessPoolExecutor
def init_worker(shared_data):
global _shared_data
_shared_data = shared_data
def process_chunk(chunk):
return sum(x * _shared_data for x in chunk)
if __name__ == "__main__":
data_chunks = [list(range(1000)) for _ in range(4)]
shared_value = 2
with ProcessPoolExecutor(
max_workers=4,
initializer=init_worker,
initargs=(shared_value,)
) as executor:
results = list(executor.map(process_chunk, data_chunks))
print(f"各块结果: {results}")
print(f"总和: {sum(results)}")16.4 异步编程(asyncio)
16.4.1 事件循环与协程原理
asyncio是Python的异步I/O框架,基于事件循环(Event Loop)和协程(Coroutine)实现单线程并发。
协程的本质:
import asyncio
async def modern_coroutine():
await asyncio.sleep(1)
return "result"
coro = modern_coroutine()
print(f"类型: {type(coro)}")
print(f"是否为协程: {asyncio.iscoroutine(coro)}")
async def demonstrate_await():
result = await modern_coroutine()
print(f"结果: {result}")
asyncio.run(demonstrate_await())事件循环的工作机制:
事件循环 (Event Loop)
┌──────────────────────────────────────┐
│ 1. 检查就绪的I/O事件 │
│ 2. 检查到期的定时器 │
│ 3. 执行就绪的回调 │
│ 4. 推进挂起的协程 │
│ 5. 重复步骤1-4 │
└──────────────────────────────────────┘import asyncio
async def task_lifecycle():
print("1. 协程开始")
await asyncio.sleep(0)
print("2. 让出控制权后恢复")
await asyncio.sleep(0)
print("3. 再次恢复")
return "完成"
async def main():
result = await task_lifecycle()
print(f"结果: {result}")
asyncio.run(main())16.4.2 Task与任务调度
import asyncio
import time
async def fetch_data(name, delay):
print(f"{name} 开始请求")
await asyncio.sleep(delay)
print(f"{name} 完成")
return f"{name}的数据"
async def gather_demo():
start = time.perf_counter()
results = await asyncio.gather(
fetch_data("API-A", 2),
fetch_data("API-B", 1),
fetch_data("API-C", 3),
)
elapsed = time.perf_counter() - start
print(f"总耗时: {elapsed:.1f}s (并行执行,约等于最长任务)")
print(f"结果: {results}")
asyncio.run(gather_demo())create_task与gather的区别:
import asyncio
async def task_demo():
t1 = asyncio.create_task(fetch_data("Task-1", 2), name="Task-1")
t2 = asyncio.create_task(fetch_data("Task-2", 1), name="Task-2")
print(f"Task-1 状态: {t1.done()}")
print(f"Task-2 名称: {t2.get_name()}")
r1 = await t1
r2 = await t2
print(f"Task-1 状态: {t1.done()}")
print(f"结果: {r1}, {r2}")
asyncio.run(task_demo())TaskGroup(Python 3.11+):
import asyncio
async def taskgroup_demo():
results = []
async with asyncio.TaskGroup() as tg:
async def tracked_fetch(name, delay):
result = await fetch_data(name, delay)
results.append(result)
tg.create_task(tracked_fetch("TG-A", 2))
tg.create_task(tracked_fetch("TG-B", 1))
tg.create_task(tracked_fetch("TG-C", 3))
print(f"所有任务完成: {results}")
asyncio.run(taskgroup_demo())异常处理对比:
import asyncio
async def failing_task():
await asyncio.sleep(0.5)
raise ValueError("任务失败")
async def gather_with_error():
results = await asyncio.gather(
fetch_data("OK-Task", 1),
failing_task(),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"异常: {r}")
else:
print(f"成功: {r}")
asyncio.run(gather_with_error())
async def taskgroup_with_error():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_data("OK-Task", 1))
tg.create_task(failing_task())
except* ValueError as eg:
print(f"TaskGroup捕获ValueError: {eg.exceptions}")
asyncio.run(taskgroup_with_error())16.4.3 异步同步原语
asyncio提供了与threading对应的异步同步原语,但它们在事件循环中协作式调度,不会真正阻塞线程。
import asyncio
class AsyncCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
self._value += 1
@property
def value(self):
return self._value
async def worker(counter, worker_id, increments):
for _ in range(increments):
await counter.increment()
print(f"Worker-{worker_id} 完成")
async def lock_demo():
counter = AsyncCounter()
increments = 10000
workers = 5
await asyncio.gather(*[
worker(counter, i, increments) for i in range(workers)
])
expected = increments * workers
print(f"期望: {expected}, 实际: {counter.value}")
asyncio.run(lock_demo())asyncio.Event:
import asyncio
class AsyncService:
def __init__(self):
self._ready = asyncio.Event()
self._data = {}
async def initialize(self):
await asyncio.sleep(1)
self._data = {"status": "ready", "version": "2.0"}
self._ready.set()
async def wait_ready(self):
await self._ready.wait()
return self._data
async def service_demo():
service = AsyncService()
async def client(name):
print(f"{name}: 等待服务就绪...")
data = await service.wait_ready()
print(f"{name}: 服务就绪,数据={data}")
await asyncio.gather(
service.initialize(),
client("Client-A"),
client("Client-B"),
)
asyncio.run(service_demo())asyncio.Queue:
import asyncio
import random
class AsyncProducerConsumer:
def __init__(self, buffer_size=5):
self._queue = asyncio.Queue(maxsize=buffer_size)
self._produced = 0
self._consumed = 0
async def producer(self, name, count):
for i in range(count):
item = f"{name}-{i}"
await self._queue.put(item)
self._produced += 1
print(f"生产: {item} (队列大小: {self._queue.qsize()})")
await asyncio.sleep(random.uniform(0.05, 0.15))
async def consumer(self, name, count):
for _ in range(count):
item = await self._queue.get()
self._consumed += 1
print(f"消费: {item} by {name}")
self._queue.task_done()
await asyncio.sleep(random.uniform(0.1, 0.2))
async def run(self, num_producers=2, num_consumers=2, items_per_producer=5):
producers = [
self.producer(f"P{i}", items_per_producer)
for i in range(num_producers)
]
consumers = [
self.consumer(f"C{i}", num_producers * items_per_producer // num_consumers)
for i in range(num_consumers)
]
await asyncio.gather(*producers)
await self._queue.join()
await asyncio.gather(*consumers)
print(f"生产: {self._produced}, 消费: {self._consumed}")
asyncio.run(AsyncProducerConsumer().run())asyncio.Semaphore与限流:
import asyncio
import time
class RateLimiter:
def __init__(self, max_concurrent=3):
self._semaphore = asyncio.Semaphore(max_concurrent)
async def __aenter__(self):
await self._semaphore.acquire()
return self
async def __aexit__(self, *args):
self._semaphore.release()
async def rate_limited_fetch(url, limiter):
async with limiter:
print(f"开始请求: {url}")
await asyncio.sleep(1)
print(f"完成请求: {url}")
return f"Response from {url}"
async def rate_limiter_demo():
limiter = RateLimiter(max_concurrent=2)
urls = [f"http://api.example.com/{i}" for i in range(6)]
start = time.perf_counter()
results = await asyncio.gather(*[
rate_limited_fetch(url, limiter) for url in urls
])
elapsed = time.perf_counter() - start
print(f"6个请求,并发限制2,耗时: {elapsed:.1f}s (约3批次×1s)")
asyncio.run(rate_limiter_demo())16.4.4 异步迭代器与生成器
import asyncio
class AsyncPageIterator:
def __init__(self, total_items, page_size=10):
self._total = total_items
self._page_size = page_size
self._current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self._current >= self._total:
raise StopAsyncIteration
start = self._current
end = min(self._current + self._page_size, self._total)
self._current = end
await asyncio.sleep(0.1)
return list(range(start, end))
async def pagination_demo():
async for page in AsyncPageIterator(total_items=35, page_size=10):
print(f"获取页面: {page}")
asyncio.run(pagination_demo())
async def async_chunk_generator(iterable, chunk_size):
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) == chunk_size:
await asyncio.sleep(0)
yield chunk
chunk = []
if chunk:
yield chunk
async def chunk_demo():
data = range(20)
async for chunk in async_chunk_generator(data, 5):
print(f"处理批次: {chunk}")
asyncio.run(chunk_demo())异步推导式:
import asyncio
async def async_transform(x):
await asyncio.sleep(0.01)
return x ** 2
async def comprehension_demo():
result = [await async_transform(i) for i in range(10)]
print(f"顺序异步推导: {result}")
result = await asyncio.gather(*[async_transform(i) for i in range(10)])
print(f"并行异步推导: {result}")
asyncio.run(comprehension_demo())16.4.5 异步上下文管理器与子进程
import asyncio
from contextlib import asynccontextmanager
class AsyncDatabaseConnection:
def __init__(self, url):
self._url = url
self._connected = False
async def __aenter__(self):
await asyncio.sleep(0.1)
self._connected = True
print(f"连接到: {self._url}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.05)
self._connected = False
print(f"断开连接: {self._url}")
async def query(self, sql):
if not self._connected:
raise RuntimeError("未连接")
await asyncio.sleep(0.05)
return f"[{sql}] → 结果"
@asynccontextmanager
async def managed_connection(url):
conn = AsyncDatabaseConnection(url)
await conn.__aenter__()
try:
yield conn
finally:
await conn.__aexit__(None, None, None)
async def db_demo():
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
result = await db.query("SELECT * FROM users")
print(result)
async with managed_connection("mysql://localhost/test") as db:
result = await db.query("SELECT COUNT(*) FROM orders")
print(result)
asyncio.run(db_demo())异步子进程:
import asyncio
async def run_command(cmd):
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return process.returncode, stdout.decode(), stderr.decode()
async def subprocess_demo():
returncode, stdout, stderr = await run_command(
["python", "-c", "print('Hello from subprocess')"]
)
print(f"返回码: {returncode}")
print(f"输出: {stdout.strip()}")
async def parallel_commands():
commands = [
["python", "-c", f"import time; time.sleep({i}); print('Task {i} done')"]
for i in range(1, 4)
]
results = await asyncio.gather(*[run_command(cmd) for cmd in commands])
for i, (rc, out, err) in enumerate(results, 1):
print(f"命令{i}: 返回码={rc}, 输出={out.strip()}")
asyncio.run(parallel_commands())16.4.6 异步网络编程
import asyncio
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"新连接: {addr}")
try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode()
print(f"收到来自 {addr}: {message}")
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()
except ConnectionResetError:
print(f"连接断开: {addr}")
finally:
writer.close()
await writer.wait_closed()
print(f"关闭连接: {addr}")
async def start_echo_server():
server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
addrs = ', '.join(str(s.getsockname()) for s in server.sockets)
print(f"服务器运行在 {addrs}")
async with server:
await server.serve_forever()
async def tcp_client():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
messages = ["Hello", "World", "Python"]
for msg in messages:
writer.write(msg.encode())
await writer.drain()
data = await reader.read(1024)
print(f"收到响应: {data.decode()}")
writer.close()
await writer.wait_closed()
async def server_client_demo():
server_task = asyncio.create_task(start_echo_server())
await asyncio.sleep(0.5)
await tcp_client()
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
print("服务器已关闭")
asyncio.run(server_client_demo())16.4.7 异步与同步代码的桥接
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def blocking_io_task(duration):
time.sleep(duration)
return f"阻塞任务完成 ({duration}s)"
def cpu_heavy(n):
total = sum(i * i for i in range(n))
return total
async def bridge_demo():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, blocking_io_task, 2)
print(result)
process_executor = ProcessPoolExecutor(max_workers=2)
results = await asyncio.gather(
loop.run_in_executor(process_executor, cpu_heavy, 10_000_000),
loop.run_in_executor(process_executor, cpu_heavy, 20_000_000),
fetch_data("async-task", 1),
)
print(f"混合执行结果: {results}")
process_executor.shutdown(wait=True)
asyncio.run(bridge_demo())16.5 高级并发模式
16.5.1 生产者-消费者模式的完整实现
import asyncio
import random
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
class AsyncPipeline:
def __init__(self, buffer_size=10):
self._input_queue = asyncio.Queue(maxsize=buffer_size)
self._output_queue = asyncio.Queue(maxsize=buffer_size)
self._error_queue = asyncio.Queue()
self._stats = {"produced": 0, "processed": 0, "errors": 0}
async def producer(self, count):
for i in range(count):
item = {"id": i, "data": random.random(), "timestamp": time.time()}
await self._input_queue.put(item)
self._stats["produced"] += 1
logging.info(f"生产: Item-{i}")
await asyncio.sleep(random.uniform(0.01, 0.05))
await self._input_queue.put(None)
async def processor(self, name):
while True:
item = await self._input_queue.get()
if item is None:
await self._input_queue.put(None)
break
try:
if random.random() < 0.1:
raise ValueError(f"处理 Item-{item['id']} 时出错")
processed = {
"id": item["id"],
"result": item["data"] ** 2,
"processed_by": name,
}
await self._output_queue.put(processed)
self._stats["processed"] += 1
logging.info(f"{name} 处理: Item-{item['id']}")
except Exception as e:
error_info = {"item_id": item["id"], "error": str(e)}
await self._error_queue.put(error_info)
self._stats["errors"] += 1
logging.error(f"{name} 错误: {e}")
finally:
self._input_queue.task_done()
await asyncio.sleep(random.uniform(0.02, 0.08))
async def consumer(self, count):
consumed = 0
while consumed < count:
item = await self._output_queue.get()
logging.info(
f"消费: Item-{item['id']} = {item['result']:.4f} "
f"(by {item['processed_by']})"
)
self._output_queue.task_done()
consumed += 1
async def error_handler(self):
while True:
try:
error = await asyncio.wait_for(self._error_queue.get(), timeout=0.5)
logging.warning(f"错误记录: Item-{error['item_id']} - {error['error']}")
except asyncio.TimeoutError:
break
async def run(self, num_items=20, num_processors=3):
start = time.perf_counter()
producers = [self.producer(num_items)]
processors = [self.processor(f"Proc-{i}") for i in range(num_processors)]
consumers = [self.consumer(num_items)]
error_handlers = [self.error_handler()]
await asyncio.gather(*producers)
await asyncio.gather(*processors)
await asyncio.gather(*consumers)
await asyncio.gather(*error_handlers)
elapsed = time.perf_counter() - start
print(f"\n统计: {self._stats}")
print(f"耗时: {elapsed:.2f}s")
asyncio.run(AsyncPipeline().run())16.5.2 限流与背压
import asyncio
import time
class TokenBucket:
def __init__(self, rate, capacity):
self._rate = rate
self._capacity = capacity
self._tokens = capacity
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens=1):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self._capacity, self._tokens + elapsed * self._rate)
self._last_refill = now
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False
async def wait_and_acquire(self, tokens=1):
while True:
if await self.acquire(tokens):
return
wait_time = (tokens - self._tokens) / self._rate
await asyncio.sleep(wait_time)
class ThrottledClient:
def __init__(self, requests_per_second=5):
self._bucket = TokenBucket(
rate=requests_per_second,
capacity=requests_per_second * 2
)
self._request_count = 0
async def request(self, url):
await self._bucket.wait_and_acquire()
self._request_count += 1
await asyncio.sleep(0.1)
return f"Response-{self._request_count} from {url}"
async def throttled_demo():
client = ThrottledClient(requests_per_second=3)
urls = [f"http://api.example.com/{i}" for i in range(10)]
start = time.perf_counter()
results = await asyncio.gather(*[client.request(url) for url in urls])
elapsed = time.perf_counter() - start
for r in results:
print(r)
print(f"10个请求,3/s限流,耗时: {elapsed:.1f}s")
asyncio.run(throttled_demo())16.5.3 超时与重试模式
import asyncio
import random
import time
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
class RetryPolicy:
def __init__(self, max_retries=3, base_delay=1.0, max_delay=30.0, exponential=True):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential = exponential
def get_delay(self, attempt):
if not self.exponential:
return self.base_delay
delay = self.base_delay * (2 ** attempt)
jitter = random.uniform(0, delay * 0.1)
return min(delay + jitter, self.max_delay)
async def retry_async(coro_factory, policy=None, retryable_exceptions=(Exception,)):
policy = policy or RetryPolicy()
last_exception = None
for attempt in range(policy.max_retries + 1):
try:
return await asyncio.wait_for(coro_factory(), timeout=5.0)
except retryable_exceptions as e:
last_exception = e
if attempt < policy.max_retries:
delay = policy.get_delay(attempt)
logging.warning(f"第{attempt+1}次失败: {e}, {delay:.1f}s后重试...")
await asyncio.sleep(delay)
else:
logging.error(f"达到最大重试次数({policy.max_retries})")
raise last_exception
async def unreliable_service():
if random.random() < 0.7:
raise ConnectionError("服务不可用")
return "成功响应"
async def retry_demo():
try:
result = await retry_async(
unreliable_service,
policy=RetryPolicy(max_retries=5, base_delay=0.5),
retryable_exceptions=(ConnectionError,)
)
print(f"最终结果: {result}")
except ConnectionError as e:
print(f"彻底失败: {e}")
asyncio.run(retry_demo())16.5.4 结构化并发
结构化并发(Structured Concurrency)是近年来并发编程领域的重要范式,其核心思想是:所有并发任务的生命周期必须被严格限定在一个语法作用域内。
import asyncio
async def structured_concurrency_demo():
async def task(name, duration):
print(f"{name} 开始")
await asyncio.sleep(duration)
print(f"{name} 完成")
return f"{name}-结果"
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task("A", 2))
t2 = tg.create_task(task("B", 1))
t3 = tg.create_task(task("C", 3))
print(f"所有结果: {t1.result()}, {t2.result()}, {t3.result()}")
asyncio.run(structured_concurrency_demo())结构化并发的优势:
- 异常安全:子任务异常会传播到父作用域,不会出现"孤儿"任务
- 资源管理:任务完成前不会退出作用域,确保资源正确释放
- 可推理性:并发逻辑被限定在明确的作用域内,易于理解和调试
超时取消模式:
import asyncio
async def cancellable_task(name, duration):
try:
print(f"{name} 开始")
await asyncio.sleep(duration)
print(f"{name} 完成")
return f"{name}-结果"
except asyncio.CancelledError:
print(f"{name} 被取消,执行清理...")
await asyncio.sleep(0.1)
print(f"{name} 清理完成")
raise
async def timeout_pattern():
try:
result = await asyncio.wait_for(
cancellable_task("SlowTask", 10),
timeout=2.0
)
except asyncio.TimeoutError:
print("任务超时,已取消")
asyncio.run(timeout_pattern())16.5.5 Actor模式
Actor模式是一种基于消息传递的并发模型,每个Actor独立处理消息,避免共享状态:
import asyncio
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
class Actor:
def __init__(self, name):
self._name = name
self._inbox = asyncio.Queue()
self._running = False
async def start(self):
self._running = True
asyncio.create_task(self._process_messages())
logging.info(f"Actor {self._name} 启动")
async def stop(self):
self._running = False
await self._inbox.put(None)
async def send(self, message):
await self._inbox.put(message)
async def _process_messages(self):
while self._running:
message = await self._inbox.get()
if message is None:
break
await self.handle(message)
async def handle(self, message):
raise NotImplementedError
class CounterActor(Actor):
def __init__(self, name):
super().__init__(name)
self._count = 0
async def handle(self, message):
match message:
case {"action": "increment", "value": v}:
self._count += v
logging.info(f"{self._name}: count = {self._count}")
case {"action": "decrement", "value": v}:
self._count -= v
logging.info(f"{self._name}: count = {self._count}")
case {"action": "get", "reply_to": reply_queue}:
await reply_queue.put(self._count)
case _:
logging.warning(f"{self._name}: 未知消息 {message}")
class PrinterActor(Actor):
async def handle(self, message):
logging.info(f"{self._name} 打印: {message}")
async def actor_demo():
counter = CounterActor("Counter")
printer = PrinterActor("Printer")
await counter.start()
await printer.start()
await asyncio.gather(
counter.send({"action": "increment", "value": 5}),
counter.send({"action": "increment", "value": 3}),
counter.send({"action": "decrement", "value": 2}),
printer.send("Hello from Actor system"),
)
await asyncio.sleep(0.5)
reply_queue = asyncio.Queue()
await counter.send({"action": "get", "reply_to": reply_queue})
result = await reply_queue.get()
print(f"计数器值: {result}")
await counter.stop()
await printer.stop()
asyncio.run(actor_demo())16.6 并发调试与性能分析
16.6.1 常见并发问题
死锁:
import threading
import time
def demonstrate_deadlock():
lock_a = threading.Lock()
lock_b = threading.Lock()
def task1():
with lock_a:
time.sleep(0.1)
print("Task1: 等待lock_b...")
with lock_b:
print("Task1: 获得lock_b")
def task2():
with lock_b:
time.sleep(0.1)
print("Task2: 等待lock_a...")
with lock_a:
print("Task2: 获得lock_a")
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join(timeout=3)
t2.join(timeout=3)
if t1.is_alive() or t2.is_alive():
print("检测到死锁!")
return True
return False
def avoid_deadlock_with_ordered_locks():
lock_a = threading.Lock()
lock_b = threading.Lock()
def task1():
with lock_a:
time.sleep(0.1)
with lock_b:
print("Task1: 安全获得两把锁")
def task2():
with lock_a:
time.sleep(0.1)
with lock_b:
print("Task2: 安全获得两把锁")
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()
print("无死锁完成")避免死锁的策略:
- 锁排序:始终按固定顺序获取多把锁
- 超时机制:使用
acquire(timeout=...)避免无限等待 - 锁粒度:尽量减小临界区范围,减少持锁时间
- 避免嵌套:尽量不要在持有一把锁时获取另一把锁
16.6.2 性能分析与调优
import asyncio
import time
import statistics
async def benchmark_async(coro_factory, iterations=100):
times = []
for _ in range(iterations):
start = time.perf_counter()
await coro_factory()
times.append(time.perf_counter() - start)
return {
"mean": statistics.mean(times),
"median": statistics.median(times),
"stdev": statistics.stdev(times) if len(times) > 1 else 0,
"min": min(times),
"max": max(times),
}
async def benchmark_comparison():
async def async_sleep():
await asyncio.sleep(0.001)
async def async_compute():
total = 0
for i in range(1000):
total += i * i
return total
sleep_stats = await benchmark_async(async_sleep, 50)
compute_stats = await benchmark_async(async_compute, 50)
print("异步sleep基准:")
for k, v in sleep_stats.items():
print(f" {k}: {v*1000:.3f}ms")
print("\n异步计算基准:")
for k, v in compute_stats.items():
print(f" {k}: {v*1000:.3f}ms")
asyncio.run(benchmark_comparison())最优工作线程/进程数计算:
import os
def optimal_pool_size(task_type="mixed", cpu_count=None):
cpu_count = cpu_count or os.cpu_count() or 1
if task_type == "cpu_bound":
return cpu_count
elif task_type == "io_bound":
return min(32, cpu_count * 5)
else:
return min(16, cpu_count * 2)
print(f"CPU核心数: {os.cpu_count()}")
print(f"CPU密集型池大小: {optimal_pool_size('cpu_bound')}")
print(f"I/O密集型池大小: {optimal_pool_size('io_bound')}")
print(f"混合型池大小: {optimal_pool_size('mixed')}")16.7 前沿技术动态
16.7.1 Python 3.12+ 并发新特性
asyncio.timeout(Python 3.12+):
import asyncio
async def per_task_timeout_demo():
async def risky_task(name, duration):
await asyncio.sleep(duration)
return f"{name} 完成"
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(risky_task("Fast", 0.5))
t2 = tg.create_task(risky_task("Slow", 5.0))
try:
async with asyncio.timeout(2.0):
await t2
except TimeoutError:
print("Slow任务超时")
t2.cancel()
if t1.done():
print(f"Fast任务结果: {t1.result()}")16.7.2 PEP 703 — 自由线程CPython
Python 3.13引入了实验性的自由线程模式(Free-threaded CPython),这是Python并发领域最具革命性的变化:
- 移除GIL,允许多线程真正并行执行Python字节码
- 通过
--disable-gil编译选项启用 - 对引用计数机制进行了重新设计,采用 biased reference counting + per-object lock
- 预计在Python 3.14+中逐步稳定
import sys
if hasattr(sys, '_is_gil_enabled'):
print(f"GIL启用: {sys._is_gil_enabled()}")
else:
print("当前Python版本不支持GIL状态查询")16.7.3 PEP 734 — 子解释器
Python 3.12+引入的子解释器(Sub-interpreters)提供了一种介于线程和进程之间的并发模型:
- 每个子解释器拥有独立的GIL
- 比多进程更轻量,启动更快
- 通过
interpreters模块使用 - 适合需要隔离但又不希望付出进程创建开销的场景
import interpreters
interp = interpreters.create()
interp.run("print('Hello from sub-interpreter')")16.8 本章小结
本章系统学习了Python并发编程的四大模型:
| 模型 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|
| threading | I/O密集型 | 简单易用,共享内存 | GIL限制,需同步 |
| multiprocessing | CPU密集型 | 真正并行,绕过GIL | 进程开销大,IPC复杂 |
| concurrent.futures | 通用任务提交 | 统一接口,Future模式 | 不够灵活 |
| asyncio | 高并发I/O | 高效,低资源消耗 | 学习曲线陡,需异步生态 |
核心原则:
- I/O密集型优先选择asyncio或threading
- CPU密集型选择multiprocessing
- 混合型使用asyncio + run_in_executor
- 始终考虑死锁、竞态条件和资源泄漏
- 优先使用高层抽象(TaskGroup、线程池)而非底层原语
16.9 练习题
基础题
使用
threading.Lock实现一个线程安全的字典类ThreadSafeDict,支持get、set、delete操作。使用
multiprocessing.Queue实现一个简单的分布式任务分发系统,包含一个调度进程和多个工作进程。使用
asyncio.gather并发请求5个URL(模拟即可),并统计总耗时。
进阶题
实现一个基于
asyncio.Semaphore的连接池管理器,支持最大连接数限制、连接超时和自动回收。使用
ProcessPoolExecutor实现一个并行MapReduce框架,能够处理大规模数据集的词频统计。实现一个异步爬虫框架,支持:
- 并发控制(最大并发数)
- 请求限流(每秒请求数)
- 自动重试(指数退避)
- 结果持久化
思考题
为什么Python的GIL在CPU密集型任务中成为瓶颈,而在I/O密集型任务中影响不大?请从GIL的调度机制角度深入分析。
比较结构化并发(TaskGroup)与非结构化并发(create_task + gather)的优劣,在什么场景下应该优先选择哪种方式?
随着PEP 703(自由线程CPython)的推进,Python并发编程的生态将如何变化?现有代码需要做哪些适配?
项目练习
并发Web服务器:使用asyncio实现一个支持路由、中间件和静态文件服务的HTTP服务器,要求能处理至少1000个并发连接。
并行数据处理管道:设计一个多阶段数据处理系统,使用多进程处理数据加载、清洗、转换和存储四个阶段,支持背压控制和错误恢复。
分布式任务队列:实现一个类似Celery的轻量级任务队列系统,支持任务优先级、重试、超时和结果存储。
16.10 延伸阅读
16.10.1 并发理论
- 《The Art of Multiprocessor Programming》 (Herlihy & Shavit) — 多处理器编程理论
- 《Concurrency in Practice》 (Brian Goetz) — Java并发编程经典,原理通用
- 《Seven Concurrency Models in Seven Weeks》 — 并发模型全景
16.10.2 Python并发
- 《Python Concurrency with asyncio》 (Matthew Fowler) — asyncio权威指南
- 《Fluent Python》第19-21章 — 并发编程深度解析
- PEP 703 — A Free-Threaded CPython (https://peps.python.org/pep-0703/) — 无GIL Python设计
16.10.3 asyncio生态
- asyncio官方文档 (https://docs.python.org/3/library/asyncio.html) — 异步IO框架
- aiohttp (https://docs.aiohttp.org/) — 异步HTTP客户端/服务器
- uvloop (https://github.com/MagicStack/uvloop) — 高性能事件循环
16.10.4 并发工具
- Celery (https://docs.celeryq.dev/) — 分布式任务队列
- Dask (https://docs.dask.org/) — 并行计算框架
- Ray (https://docs.ray.io/) — 分布式计算框架
下一章:第17章 Flask入门