第36章 异步编程进阶
学习目标
完成本章学习后,你将能够:
- 深入理解异步原理:事件循环、协程、任务调度机制
- 掌握asyncio高级特性:TaskGroup、信号量、队列、同步原语
- 实现异步网络编程:aiohttp客户端/服务器、WebSocket
- 使用异步数据库:asyncpg、aiomysql、SQLAlchemy异步
- 处理异步异常:异常传播、取消处理、超时控制
- 实现异步设计模式:生产者-消费者、限流器、连接池
- 调试异步代码:调试工具、性能分析、常见问题
- 构建高性能应用:异步Web服务、高并发处理
36.1 异步编程原理
36.1.1 事件循环机制
python
import asyncio
import inspect
from typing import Any, Callable, Coroutine, List, Optional, TypeVar
from dataclasses import dataclass
from enum import Enum
import time
T = TypeVar("T")
class TaskState(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
CANCELLED = "cancelled"
@dataclass
class TaskInfo:
id: int
name: str
state: TaskState
result: Any = None
exception: Optional[Exception] = None
created_at: float = 0.0
started_at: Optional[float] = None
completed_at: Optional[float] = None
@property
def duration(self) -> Optional[float]:
if self.started_at and self.completed_at:
return self.completed_at - self.started_at
return None
class EventLoopInspector:
@staticmethod
def get_running_loop() -> asyncio.AbstractEventLoop:
return asyncio.get_running_loop()
@staticmethod
def get_event_loop() -> asyncio.AbstractEventLoop:
try:
return asyncio.get_event_loop()
except RuntimeError:
return asyncio.new_event_loop()
@staticmethod
def get_all_tasks(loop: Optional[asyncio.AbstractEventLoop] = None) -> List[asyncio.Task]:
return asyncio.all_tasks(loop)
@staticmethod
def get_current_task(loop: Optional[asyncio.AbstractEventLoop] = None) -> Optional[asyncio.Task]:
return asyncio.current_task(loop)
@staticmethod
def print_task_tree(loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
tasks = asyncio.all_tasks(loop)
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task.get_coro()}")
@staticmethod
def get_task_info(task: asyncio.Task) -> TaskInfo:
state = TaskState.PENDING
if task.done():
state = TaskState.CANCELLED if task.cancelled() else TaskState.DONE
elif task._must_cancel:
state = TaskState.CANCELLED
else:
state = TaskState.RUNNING
result = None
exception = None
if task.done() and not task.cancelled():
try:
result = task.result()
except Exception as e:
exception = e
return TaskInfo(
id=id(task),
name=task.get_name(),
state=state,
result=result,
exception=exception
)
class CustomEventLoop:
def __init__(self):
self._ready: List[Callable] = []
self._scheduled: List[tuple] = []
self._running = False
def call_soon(self, callback: Callable, *args) -> None:
self._ready.append((callback, args))
def call_later(self, delay: float, callback: Callable, *args) -> None:
when = time.time() + delay
self._scheduled.append((when, callback, args))
self._scheduled.sort(key=lambda x: x[0])
def run_until_complete(self, coro: Coroutine) -> Any:
task = asyncio.Task(coro, loop=asyncio.get_event_loop())
return asyncio.get_event_loop().run_until_complete(task)
def run_forever(self) -> None:
self._running = True
while self._running:
now = time.time()
while self._scheduled and self._scheduled[0][0] <= now:
_, callback, args = self._scheduled.pop(0)
self._ready.append((callback, args))
if self._ready:
callback, args = self._ready.pop(0)
callback(*args)
else:
time.sleep(0.001)
def stop(self) -> None:
self._running = False
async def demonstrate_event_loop():
loop = asyncio.get_running_loop()
print(f"Event loop: {loop}")
print(f"Is running: {loop.is_running()}")
current = asyncio.current_task()
print(f"Current task: {current.get_name()}")
all_tasks = asyncio.all_tasks()
print(f"All tasks: {len(all_tasks)}")36.1.2 协程深入
python
class CoroutineAnalyzer:
@staticmethod
def is_coroutine(obj: Any) -> bool:
return inspect.iscoroutine(obj)
@staticmethod
def is_coroutine_function(obj: Any) -> bool:
return inspect.iscoroutinefunction(obj)
@staticmethod
def is_async_generator(obj: Any) -> bool:
return inspect.isasyncgen(obj)
@staticmethod
def get_coroutine_state(coro: Coroutine) -> str:
if coro.cr_running:
return "running"
elif coro.cr_frame is None:
return "closed"
else:
return "suspended"
@staticmethod
def analyze_coroutine(coro: Coroutine) -> dict:
return {
"name": coro.__name__,
"qualname": coro.__qualname__,
"state": CoroutineAnalyzer.get_coroutine_state(coro),
"frame": coro.cr_frame,
"code": coro.cr_code,
"origin": coro.cr_origin
}
class CoroutineManager:
def __init__(self):
self._coroutines: List[Coroutine] = []
self._results: List[Any] = []
def add(self, coro: Coroutine) -> None:
self._coroutines.append(coro)
async def run_all(self) -> List[Any]:
self._results = await asyncio.gather(*self._coroutines)
return self._results
async def run_sequentially(self) -> List[Any]:
self._results = []
for coro in self._coroutines:
result = await coro
self._results.append(result)
return self._results
def clear(self) -> None:
self._coroutines.clear()
self._results.clear()
async def coroutine_examples():
async def simple_coroutine():
await asyncio.sleep(0.1)
return "result"
async def coroutine_with_params(x: int, y: int):
await asyncio.sleep(0.1)
return x + y
async def nested_coroutine():
result = await simple_coroutine()
return f"nested: {result}"
result1 = await simple_coroutine()
result2 = await coroutine_with_params(1, 2)
result3 = await nested_coroutine()
print(f"Result 1: {result1}")
print(f"Result 2: {result2}")
print(f"Result 3: {result3}")36.2 asyncio高级特性
36.2.1 TaskGroup与结构化并发
python
from typing import AsyncIterator
import contextlib
class TaskGroupManager:
def __init__(self):
self._tasks: List[asyncio.Task] = []
self._results: List[Any] = []
async def run_with_taskgroup(self, coros: List[Coroutine]) -> List[Any]:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(coro) for coro in coros]
return [task.result() for task in tasks]
async def run_with_error_handling(
self,
coros: List[Coroutine]
) -> tuple[List[Any], List[Exception]]:
results = []
errors = []
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(coro) for coro in coros]
results = [task.result() for task in tasks]
except ExceptionGroup as eg:
for exc in eg.exceptions:
errors.append(exc)
return results, errors
async def run_with_timeout(
self,
coros: List[Coroutine],
timeout: float
) -> List[Any]:
async with asyncio.timeout(timeout):
return await self.run_with_taskgroup(coros)
async def taskgroup_examples():
async def worker(name: str, duration: float):
await asyncio.sleep(duration)
return f"{name} completed after {duration}s"
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(worker("A", 0.1))
task2 = tg.create_task(worker("B", 0.2))
task3 = tg.create_task(worker("C", 0.3))
print(f"Task 1: {task1.result()}")
print(f"Task 2: {task2.result()}")
print(f"Task 3: {task3.result()}")
class StructuredConcurrency:
@staticmethod
async def map_concurrent(
func: Callable,
items: List[Any],
max_concurrency: int = 10
) -> List[Any]:
semaphore = asyncio.Semaphore(max_concurrency)
async def limited(item):
async with semaphore:
if asyncio.iscoroutinefunction(func):
return await func(item)
else:
return func(item)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(limited(item)) for item in items]
return [task.result() for task in tasks]
@staticmethod
async def pipeline(
stages: List[Callable[[Any], Coroutine]],
initial_data: List[Any]
) -> List[Any]:
data = initial_data
for stage in stages:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(stage(item)) for item in data]
data = [task.result() for task in tasks]
return data
@staticmethod
async def fan_out_fan_in(
producer: Callable[[], Coroutine],
workers: int,
consumer: Callable[[List[Any]], Coroutine]
) -> Any:
results = []
async def worker_task():
return await producer()
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(worker_task()) for _ in range(workers)]
results = [task.result() for task in tasks]
return await consumer(results)36.2.2 同步原语
python
class AsyncLock:
def __init__(self):
self._lock = asyncio.Lock()
async def acquire(self) -> None:
await self._lock.acquire()
def release(self) -> None:
self._lock.release()
@contextlib.asynccontextmanager
async def locked(self):
await self.acquire()
try:
yield
finally:
self.release()
class AsyncSemaphore:
def __init__(self, value: int = 1):
self._semaphore = asyncio.Semaphore(value)
async def acquire(self) -> None:
await self._semaphore.acquire()
def release(self) -> None:
self._semaphore.release()
@contextlib.asynccontextmanager
async def limited(self):
await self.acquire()
try:
yield
finally:
self.release()
class AsyncEvent:
def __init__(self):
self._event = asyncio.Event()
def set(self) -> None:
self._event.set()
def clear(self) -> None:
self._event.clear()
def is_set(self) -> bool:
return self._event.is_set()
async def wait(self) -> None:
await self._event.wait()
class AsyncCondition:
def __init__(self, lock: Optional[asyncio.Lock] = None):
self._condition = asyncio.Condition(lock)
async def acquire(self) -> None:
await self._condition.acquire()
def release(self) -> None:
self._condition.release()
async def wait(self) -> None:
await self._condition.wait()
async def wait_for(self, predicate: Callable[[], bool]) -> None:
await self._condition.wait_for(predicate)
def notify(self, n: int = 1) -> None:
self._condition.notify(n)
def notify_all(self) -> None:
self._condition.notify_all()
@contextlib.asynccontextmanager
async def locked(self):
await self.acquire()
try:
yield self
finally:
self.release()
class AsyncBarrier:
def __init__(self, parties: int):
self._parties = parties
self._count = 0
self._event = asyncio.Event()
self._lock = asyncio.Lock()
async def wait(self) -> None:
async with self._lock:
self._count += 1
if self._count == self._parties:
self._event.set()
self._count = 0
self._event = asyncio.Event()
return
await self._event.wait()
class AsyncRWLock:
def __init__(self):
self._read_lock = asyncio.Lock()
self._write_lock = asyncio.Lock()
self._readers = 0
self._read_count_lock = asyncio.Lock()
async def acquire_read(self) -> None:
async with self._read_count_lock:
self._readers += 1
if self._readers == 1:
await self._write_lock.acquire()
async def release_read(self) -> None:
async with self._read_count_lock:
self._readers -= 1
if self._readers == 0:
self._write_lock.release()
async def acquire_write(self) -> None:
await self._write_lock.acquire()
def release_write(self) -> None:
self._write_lock.release()
@contextlib.asynccontextmanager
async def read(self):
await self.acquire_read()
try:
yield
finally:
await self.release_read()
@contextlib.asynccontextmanager
async def write(self):
await self.acquire_write()
try:
yield
finally:
self.release_write()36.2.3 异步队列
python
from typing import Generic, TypeVar
from collections import deque
import heapq
T = TypeVar("T")
class AsyncQueue(Generic[T]):
def __init__(self, maxsize: int = 0):
self._queue: asyncio.Queue[T] = asyncio.Queue(maxsize)
async def put(self, item: T) -> None:
await self._queue.put(item)
async def get(self) -> T:
return await self._queue.get()
def put_nowait(self, item: T) -> None:
self._queue.put_nowait(item)
def get_nowait(self) -> T:
return self._queue.get_nowait()
def qsize(self) -> int:
return self._queue.qsize()
def empty(self) -> bool:
return self._queue.empty()
def full(self) -> bool:
return self._queue.full()
async def join(self) -> None:
await self._queue.join()
def task_done(self) -> None:
self._queue.task_done()
class AsyncPriorityQueue(Generic[T]):
def __init__(self, maxsize: int = 0):
self._queue: asyncio.PriorityQueue[T] = asyncio.PriorityQueue(maxsize)
async def put(self, item: tuple[int, T]) -> None:
await self._queue.put(item)
async def get(self) -> tuple[int, T]:
return await self._queue.get()
def qsize(self) -> int:
return self._queue.qsize()
def empty(self) -> bool:
return self._queue.empty()
class AsyncLifoQueue(Generic[T]):
def __init__(self, maxsize: int = 0):
self._queue: asyncio.LifoQueue[T] = asyncio.LifoQueue(maxsize)
async def put(self, item: T) -> None:
await self._queue.put(item)
async def get(self) -> T:
return await self._queue.get()
def qsize(self) -> int:
return self._queue.qsize()
class BoundedAsyncQueue(Generic[T]):
def __init__(self, maxsize: int = 100):
self._maxsize = maxsize
self._queue: deque[T] = deque()
self._not_empty = asyncio.Event()
self._not_full = asyncio.Event()
self._not_full.set()
self._lock = asyncio.Lock()
async def put(self, item: T, timeout: Optional[float] = None) -> bool:
try:
async with asyncio.timeout(timeout):
await self._not_full.wait()
async with self._lock:
self._queue.append(item)
self._not_empty.set()
if len(self._queue) >= self._maxsize:
self._not_full.clear()
return True
except asyncio.TimeoutError:
return False
async def get(self, timeout: Optional[float] = None) -> T:
try:
async with asyncio.timeout(timeout):
await self._not_empty.wait()
async with self._lock:
item = self._queue.popleft()
self._not_full.set()
if not self._queue:
self._not_empty.clear()
return item
except asyncio.TimeoutError:
raise asyncio.TimeoutError("Queue get timeout")
def qsize(self) -> int:
return len(self._queue)
class ProducerConsumer:
def __init__(self, queue_size: int = 10, num_producers: int = 3, num_consumers: int = 2):
self.queue: asyncio.Queue = asyncio.Queue(queue_size)
self.num_producers = num_producers
self.num_consumers = num_consumers
self._stop_event = asyncio.Event()
async def producer(self, producer_id: int) -> None:
count = 0
while not self._stop_event.is_set():
item = f"Item-{producer_id}-{count}"
await self.queue.put(item)
print(f"Producer {producer_id} produced: {item}")
count += 1
await asyncio.sleep(0.1)
async def consumer(self, consumer_id: int) -> None:
while not self._stop_event.is_set() or not self.queue.empty():
try:
item = await asyncio.wait_for(self.queue.get(), timeout=0.5)
print(f"Consumer {consumer_id} consumed: {item}")
self.queue.task_done()
await asyncio.sleep(0.2)
except asyncio.TimeoutError:
pass
async def run(self, duration: float = 5.0) -> None:
producers = [
asyncio.create_task(self.producer(i))
for i in range(self.num_producers)
]
consumers = [
asyncio.create_task(self.consumer(i))
for i in range(self.num_consumers)
]
await asyncio.sleep(duration)
self._stop_event.set()
await asyncio.gather(*producers)
await asyncio.gather(*consumers)36.3 异步网络编程
36.3.1 aiohttp客户端
python
import aiohttp
from typing import Dict, Optional, List, Any
import json
class AsyncHTTPClient:
def __init__(
self,
base_url: str = "",
timeout: float = 30.0,
max_connections: int = 100,
headers: Optional[Dict[str, str]] = None
):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_connections = max_connections
self.headers = headers or {}
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self) -> "AsyncHTTPClient":
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
async def connect(self) -> None:
connector = aiohttp.TCPConnector(limit=self.max_connections)
self._session = aiohttp.ClientSession(
base_url=self.base_url,
timeout=self.timeout,
connector=connector,
headers=self.headers
)
async def close(self) -> None:
if self._session:
await self._session.close()
self._session = None
async def get(
self,
url: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None
) -> Any:
async with self._session.get(url, params=params, headers=headers) as response:
response.raise_for_status()
return await response.json()
async def post(
self,
url: str,
data: Optional[Dict] = None,
json: Optional[Dict] = None,
headers: Optional[Dict] = None
) -> Any:
async with self._session.post(url, data=data, json=json, headers=headers) as response:
response.raise_for_status()
return await response.json()
async def put(
self,
url: str,
data: Optional[Dict] = None,
headers: Optional[Dict] = None
) -> Any:
async with self._session.put(url, data=data, headers=headers) as response:
response.raise_for_status()
return await response.json()
async def delete(
self,
url: str,
headers: Optional[Dict] = None
) -> Any:
async with self._session.delete(url, headers=headers) as response:
response.raise_for_status()
return await response.json()
async def request(
self,
method: str,
url: str,
**kwargs
) -> aiohttp.ClientResponse:
async with self._session.request(method, url, **kwargs) as response:
return response
class AsyncRequestBatch:
def __init__(self, client: AsyncHTTPClient, max_concurrent: int = 10):
self.client = client
self.max_concurrent = max_concurrent
self._requests: List[tuple] = []
def add(self, method: str, url: str, **kwargs) -> None:
self._requests.append((method, url, kwargs))
async def execute_all(self) -> List[Any]:
semaphore = asyncio.Semaphore(self.max_concurrent)
async def limited_request(method: str, url: str, **kwargs):
async with semaphore:
return await self.client.request(method, url, **kwargs)
tasks = [
limited_request(method, url, **kwargs)
for method, url, kwargs in self._requests
]
return await asyncio.gather(*tasks, return_exceptions=True)
def clear(self) -> None:
self._requests.clear()
class RetryableClient:
def __init__(
self,
client: AsyncHTTPClient,
max_retries: int = 3,
retry_delay: float = 1.0,
retry_exceptions: tuple = (aiohttp.ClientError, asyncio.TimeoutError)
):
self.client = client
self.max_retries = max_retries
self.retry_delay = retry_delay
self.retry_exceptions = retry_exceptions
async def request_with_retry(
self,
method: str,
url: str,
**kwargs
) -> aiohttp.ClientResponse:
last_exception = None
for attempt in range(self.max_retries):
try:
return await self.client.request(method, url, **kwargs)
except self.retry_exceptions as e:
last_exception = e
if attempt < self.max_retries - 1:
await asyncio.sleep(self.retry_delay * (attempt + 1))
raise last_exception
async def http_client_example():
async with AsyncHTTPClient("https://jsonplaceholder.typicode.com") as client:
posts = await client.get("/posts")
print(f"Fetched {len(posts)} posts")
post = await client.get("/posts/1")
print(f"Post 1: {post['title']}")
new_post = await client.post("/posts", json={
"title": "Test Post",
"body": "This is a test",
"userId": 1
})
print(f"Created post with ID: {new_post['id']}")36.3.2 WebSocket通信
python
class AsyncWebSocketClient:
def __init__(self, url: str):
self.url = url
self._session: Optional[aiohttp.ClientSession] = None
self._ws: Optional[aiohttp.ClientWebSocketResponse] = None
async def connect(self) -> None:
self._session = aiohttp.ClientSession()
self._ws = await self._session.ws_connect(self.url)
async def close(self) -> None:
if self._ws:
await self._ws.close()
if self._session:
await self._session.close()
async def send_str(self, message: str) -> None:
await self._ws.send_str(message)
async def send_json(self, data: Dict) -> None:
await self._ws.send_json(data)
async def send_bytes(self, data: bytes) -> None:
await self._ws.send_bytes(data)
async def receive(self) -> aiohttp.WSMessage:
return await self._ws.receive()
async def receive_str(self) -> str:
msg = await self.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
return msg.data
raise ValueError(f"Expected text message, got {msg.type}")
async def receive_json(self) -> Dict:
msg = await self.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
return json.loads(msg.data)
raise ValueError(f"Expected text message, got {msg.type}")
@property
def closed(self) -> bool:
return self._ws.closed if self._ws else True
async def __aenter__(self) -> "AsyncWebSocketClient":
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
class WebSocketServer:
def __init__(self, host: str = "0.0.0.0", port: int = 8080):
self.host = host
self.port = port
self._clients: set = set()
self._app = aiohttp.web.Application()
self._runner: Optional[aiohttp.web.AppRunner] = None
async def handle_websocket(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
self._clients.add(ws)
print(f"Client connected. Total clients: {len(self._clients)}")
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.on_message(ws, msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
finally:
self._clients.discard(ws)
print(f"Client disconnected. Total clients: {len(self._clients)}")
return ws
async def on_message(self, ws: aiohttp.web.WebSocketResponse, message: str) -> None:
await ws.send_str(f"Echo: {message}")
async def broadcast(self, message: str) -> None:
for ws in self._clients:
try:
await ws.send_str(message)
except Exception as e:
print(f"Broadcast error: {e}")
async def start(self) -> None:
self._app.router.add_get("/ws", self.handle_websocket)
self._runner = aiohttp.web.AppRunner(self._app)
await self._runner.setup()
site = aiohttp.web.TCPSite(self._runner, self.host, self.port)
await site.start()
print(f"WebSocket server started on {self.host}:{self.port}")
async def stop(self) -> None:
if self._runner:
await self._runner.cleanup()36.4 异步数据库
36.4.1 异步数据库连接
python
class AsyncDatabasePool:
def __init__(self, db_url: str, min_size: int = 5, max_size: int = 20):
self.db_url = db_url
self.min_size = min_size
self.max_size = max_size
self._pool = None
async def connect(self) -> None:
pass
async def close(self) -> None:
if self._pool:
await self._pool.close()
async def execute(self, query: str, *args) -> Any:
pass
async def fetch(self, query: str, *args) -> List[Dict]:
pass
async def fetchone(self, query: str, *args) -> Optional[Dict]:
pass
async def fetchval(self, query: str, *args) -> Any:
pass
class AsyncSQLite:
def __init__(self, db_path: str):
self.db_path = db_path
self._connection = None
async def connect(self) -> None:
import aiosqlite
self._connection = await aiosqlite.connect(self.db_path)
self._connection.row_factory = aiosqlite.Row
async def close(self) -> None:
if self._connection:
await self._connection.close()
async def execute(self, query: str, params: tuple = ()) -> None:
await self._connection.execute(query, params)
await self._connection.commit()
async def fetchall(self, query: str, params: tuple = ()) -> List[Dict]:
async with self._connection.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def fetchone(self, query: str, params: tuple = ()) -> Optional[Dict]:
async with self._connection.execute(query, params) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def executemany(self, query: str, params_list: List[tuple]) -> None:
await self._connection.executemany(query, params_list)
await self._connection.commit()
@contextlib.asynccontextmanager
async def transaction(self):
await self._connection.execute("BEGIN")
try:
yield
await self._connection.commit()
except Exception:
await self._connection.rollback()
raise
async def __aenter__(self) -> "AsyncSQLite":
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
class AsyncPostgreSQL:
def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool = None
async def connect(self) -> None:
import asyncpg
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=self.min_size,
max_size=self.max_size
)
async def close(self) -> None:
if self._pool:
await self._pool.close()
async def execute(self, query: str, *args) -> str:
async with self._pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args) -> List[asyncpg.Record]:
async with self._pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args) -> Optional[asyncpg.Record]:
async with self._pool.acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetchval(self, query: str, *args) -> Any:
async with self._pool.acquire() as conn:
return await conn.fetchval(query, *args)
@contextlib.asynccontextmanager
async def transaction(self):
async with self._pool.acquire() as conn:
async with conn.transaction():
yield conn
async def __aenter__(self) -> "AsyncPostgreSQL":
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()36.4.2 异步ORM
python
from dataclasses import dataclass, field
from typing import TypeVar, Generic, List, Optional, Any
import json
ModelType = TypeVar("ModelType", bound="BaseModel")
@dataclass
class BaseModel:
id: Optional[int] = None
def to_dict(self) -> Dict:
return {
k: v for k, v in self.__dict__.items()
if not k.startswith("_")
}
@classmethod
def from_dict(cls: type[ModelType], data: Dict) -> ModelType:
return cls(**{
k: v for k, v in data.items()
if k in cls.__dataclass_fields__
})
class AsyncRepository(Generic[ModelType]):
def __init__(self, db: AsyncSQLite, model_class: type[ModelType], table_name: str):
self.db = db
self.model_class = model_class
self.table_name = table_name
async def create_table(self, columns: Dict[str, str]) -> None:
cols = ["id INTEGER PRIMARY KEY AUTOINCREMENT"]
cols.extend([f"{name} {dtype}" for name, dtype in columns.items()])
query = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({', '.join(cols)})"
await self.db.execute(query)
async def insert(self, model: ModelType) -> int:
data = model.to_dict()
data.pop("id", None)
columns = ", ".join(data.keys())
placeholders = ", ".join("?" * len(data))
query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})"
await self.db.execute(query, tuple(data.values()))
result = await self.db.fetchone("SELECT last_insert_rowid() as id")
return result["id"]
async def find_by_id(self, id: int) -> Optional[ModelType]:
query = f"SELECT * FROM {self.table_name} WHERE id = ?"
row = await self.db.fetchone(query, (id,))
return self.model_class.from_dict(row) if row else None
async def find_all(self) -> List[ModelType]:
query = f"SELECT * FROM {self.table_name}"
rows = await self.db.fetchall(query)
return [self.model_class.from_dict(row) for row in rows]
async def find_where(self, condition: str, *args) -> List[ModelType]:
query = f"SELECT * FROM {self.table_name} WHERE {condition}"
rows = await self.db.fetchall(query, args)
return [self.model_class.from_dict(row) for row in rows]
async def update(self, model: ModelType) -> None:
if model.id is None:
raise ValueError("Cannot update model without id")
data = model.to_dict()
data.pop("id")
set_clause = ", ".join(f"{k} = ?" for k in data.keys())
query = f"UPDATE {self.table_name} SET {set_clause} WHERE id = ?"
await self.db.execute(query, tuple(data.values()) + (model.id,))
async def delete(self, id: int) -> None:
query = f"DELETE FROM {self.table_name} WHERE id = ?"
await self.db.execute(query, (id,))
async def count(self, condition: str = "1=1", *args) -> int:
query = f"SELECT COUNT(*) as count FROM {self.table_name} WHERE {condition}"
result = await self.db.fetchone(query, args)
return result["count"]
@dataclass
class User(BaseModel):
username: str = ""
email: str = ""
created_at: str = ""
class UserRepository(AsyncRepository[User]):
def __init__(self, db: AsyncSQLite):
super().__init__(db, User, "users")
async def find_by_email(self, email: str) -> Optional[User]:
return (await self.find_where("email = ?", email))[0] if await self.find_where("email = ?", email) else None
async def find_by_username(self, username: str) -> Optional[User]:
return (await self.find_where("username = ?", username))[0] if await self.find_where("username = ?", username) else None36.5 异步异常处理
36.5.1 异常传播
python
class AsyncExceptionHandler:
@staticmethod
async def safe_execute(
coro: Coroutine,
default: Any = None,
exceptions: tuple = (Exception,)
) -> Any:
try:
return await coro
except exceptions as e:
print(f"Exception caught: {e}")
return default
@staticmethod
async def with_retry(
coro_factory: Callable[[], Coroutine],
max_retries: int = 3,
delay: float = 1.0,
exceptions: tuple = (Exception,)
) -> Any:
last_exception = None
for attempt in range(max_retries):
try:
return await coro_factory()
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay * (attempt + 1))
raise last_exception
@staticmethod
async def with_timeout(coro: Coroutine, timeout: float) -> Any:
async with asyncio.timeout(timeout):
return await coro
@staticmethod
async def with_deadline(coro: Coroutine, deadline: float) -> Any:
loop = asyncio.get_running_loop()
remaining = deadline - loop.time()
if remaining <= 0:
raise asyncio.TimeoutError("Deadline already passed")
async with asyncio.timeout(remaining):
return await coro
class TaskCancellationHandler:
def __init__(self):
self._cancelled_tasks: List[asyncio.Task] = []
async def run_with_cancellation(
self,
coro: Coroutine,
on_cancel: Optional[Callable] = None
) -> Any:
task = asyncio.create_task(coro)
try:
return await task
except asyncio.CancelledError:
self._cancelled_tasks.append(task)
if on_cancel:
await on_cancel()
raise
async def cancel_after(
self,
coro: Coroutine,
delay: float,
on_cancel: Optional[Callable] = None
) -> Any:
task = asyncio.create_task(coro)
async def canceller():
await asyncio.sleep(delay)
if not task.done():
task.cancel()
canceller_task = asyncio.create_task(canceller())
try:
return await task
except asyncio.CancelledError:
if on_cancel:
await on_cancel()
raise
finally:
canceller_task.cancel()
try:
await canceller_task
except asyncio.CancelledError:
pass
@staticmethod
async def shield(coro: Coroutine) -> Any:
return await asyncio.shield(coro)
@staticmethod
async def graceful_shutdown(
tasks: List[asyncio.Task],
timeout: float = 5.0
) -> None:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)36.6 知识图谱
36.6.1 异步编程技术架构
┌─────────────────────────────────────────────────────────────────────┐
│ Python 异步编程技术栈 │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 应用层 │ │ Web框架 │ │ 数据库 │ │ 网络通信 │ │
│ │ FastAPI │ │ Aiohttp │ │ AsyncPG │ │ WebSocket │ │
│ │ Sanic │ │ Starlette │ │ AIOMySQL │ │ TCP/UDP │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ ┌──────┴────────────────┴────────────────┴────────────────┴──────┐ │
│ │ asyncio 核心层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │TaskGroup │ │ Semaphore│ │ Queue │ │ Lock │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Event │ │Condition │ │ Barrier │ │ Future │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ 事件循环 (Event Loop) │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Selector │ │ Scheduler │ │ Ready Queue │ │ │
│ │ │ (epoll/kq) │ │ (定时任务) │ │ (就绪队列) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ 协程层 (Coroutine) │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ async/await │ │ Task │ │ Future │ │ │
│ │ │ (语法糖) │ │ (任务封装) │ │ (结果容器) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘36.6.2 异步执行流程
┌─────────────────────────────────────────────────────────────────────┐
│ 异步任务执行流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 创建协程 │───▶│ 封装Task │───▶│ 提交队列 │───▶│ 等待调度 │ │
│ │ async def│ │create_task│ │ ready │ │ pending │ │
│ └──────────┘ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ┌──────────────────────────────────┘ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 事件循环调度 │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ 1. 检查就绪队列 → 执行可运行任务 │ │ │
│ │ │ 2. 检查I/O状态 → 唤醒完成的I/O操作 │ │ │
│ │ │ 3. 检查定时器 → 执行到期的定时任务 │ │ │
│ │ │ 4. 处理信号 → 响应外部中断 │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 执行协程 │───▶│ 遇到await│───▶│ 挂起任务 │───▶│ I/O操作 │ │
│ │ running │ │ 阻塞点 │ │ suspended│ │ 非阻塞 │ │
│ └──────────┘ └──────────┘ └──────────┘ └────┬─────┘ │
│ │ │
│ ┌──────────────────────────────────┘ │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ I/O完成 │───▶│ 重新入队 │───▶│ 继续执行 │───▶ 返回结果 │
│ │ callback │ │ ready │ │ resume │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘36.7 技术选型指南
36.7.1 异步框架选型
| 框架 | 适用场景 | 性能特点 | 学习曲线 | 生态系统 |
|---|---|---|---|---|
| asyncio | 标准库、基础异步 | 原生支持、稳定 | 中等 | Python内置 |
| aiohttp | HTTP客户端/服务端 | 高性能、功能丰富 | 中等 | 成熟完善 |
| FastAPI | Web API开发 | 极高性能、自动文档 | 低 | 快速增长 |
| Sanic | 高性能Web服务 | 接近Go性能 | 低 | 中等规模 |
| Starlette | 轻量级ASGI框架 | 高性能、灵活 | 中等 | 适中 |
| Tornado | 长连接、WebSocket | 成熟稳定 | 中等 | 完善生态 |
36.7.2 异步数据库驱动选型
| 数据库 | 驱动 | 连接池 | 事务支持 | ORM兼容 | 推荐指数 |
|---|---|---|---|---|---|
| PostgreSQL | asyncpg | ✅ 原生 | ✅ 完整 | SQLAlchemy 2.0 | ★★★★★ |
| MySQL | aiomysql | ✅ 原生 | ✅ 完整 | SQLAlchemy 2.0 | ★★★★☆ |
| SQLite | aiosqlite | ❌ 单连接 | ✅ 完整 | SQLAlchemy 2.0 | ★★★★☆ |
| MongoDB | motor | ✅ 原生 | ✅ 会话 | ODM (Beanie) | ★★★★★ |
| Redis | aioredis | ✅ 原生 | ✅ 事务 | - | ★★★★★ |
36.7.3 同步原语选型
| 同步原语 | 适用场景 | 特点 | 使用建议 |
|---|---|---|---|
| Lock | 互斥访问共享资源 | 简单可靠 | 保护临界区代码 |
| Semaphore | 限制并发数量 | 可配置许可数 | 连接池、限流 |
| Event | 事件通知机制 | 一对多通知 | 状态同步、启动信号 |
| Condition | 复杂条件等待 | 支持谓词等待 | 生产者-消费者 |
| Barrier | 多任务同步点 | 等待所有参与者 | 并行计算分阶段 |
| Queue | 任务队列 | 线程安全传递 | 解耦生产消费 |
36.8 常见问题与解决方案
36.8.1 事件循环阻塞问题
python
import asyncio
import time
class EventLoopBlockSolution:
@staticmethod
async def bad_blocking():
"""错误示例:阻塞事件循环"""
time.sleep(5) # 阻塞整个事件循环
return "done"
@staticmethod
async def good_non_blocking():
"""正确示例:使用异步sleep"""
await asyncio.sleep(5) # 非阻塞,允许其他任务执行
return "done"
@staticmethod
async def run_blocking_in_executor():
"""解决方案:在线程池中运行阻塞代码"""
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
time.sleep,
5
)
return result
@staticmethod
async def run_cpu_bound_in_process():
"""解决方案:在进程池中运行CPU密集型任务"""
import concurrent.futures
def cpu_intensive_task(n):
return sum(i * i for i in range(n))
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool,
cpu_intensive_task,
1000000
)
return result36.8.2 任务取消与清理
python
class TaskCancellationSolution:
def __init__(self):
self._cleanup_handlers = {}
async def run_with_cleanup(self, coro, task_name: str):
"""带清理机制的任务执行"""
task = asyncio.create_task(coro, name=task_name)
try:
return await task
except asyncio.CancelledError:
print(f"Task {task_name} was cancelled")
await self._cleanup(task_name)
raise
except Exception as e:
print(f"Task {task_name} failed: {e}")
await self._cleanup(task_name)
raise
async def _cleanup(self, task_name: str):
"""执行清理操作"""
if task_name in self._cleanup_handlers:
handler = self._cleanup_handlers[task_name]
await handler()
del self._cleanup_handlers[task_name]
def register_cleanup(self, task_name: str, handler):
"""注册清理处理器"""
self._cleanup_handlers[task_name] = handler
@staticmethod
async def graceful_cancel_tasks(tasks: list, timeout: float = 5.0):
"""优雅取消多个任务"""
for task in tasks:
task.cancel()
results = await asyncio.gather(*tasks, return_exceptions=True)
for task, result in zip(tasks, results):
if isinstance(result, asyncio.CancelledError):
print(f"Task {task.get_name()} cancelled gracefully")
elif isinstance(result, Exception):
print(f"Task {task.get_name()} ended with error: {result}")36.8.3 异步上下文管理器资源泄漏
python
class AsyncResourceManager:
"""异步资源管理解决方案"""
def __init__(self):
self._resources = []
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.release()
return False
async def acquire(self):
"""获取资源"""
resource = await self._create_resource()
self._resources.append(resource)
return resource
async def release(self):
"""释放所有资源"""
for resource in self._resources:
try:
await self._close_resource(resource)
except Exception as e:
print(f"Error closing resource: {e}")
self._resources.clear()
async def _create_resource(self):
return {"id": id(self), "created": time.time()}
async def _close_resource(self, resource):
await asyncio.sleep(0.01)
print(f"Closed resource {resource['id']}")
async def safe_resource_usage():
"""安全的资源使用模式"""
async with AsyncResourceManager() as manager:
resource = await manager.acquire()
try:
await process_resource(resource)
except Exception as e:
print(f"Error processing: {e}")
raise36.8.4 异步超时处理
python
class AsyncTimeoutSolution:
@staticmethod
async def with_timeout(coro, timeout: float, default=None):
"""带超时和默认值的执行"""
try:
async with asyncio.timeout(timeout):
return await coro
except asyncio.TimeoutError:
print(f"Operation timed out after {timeout}s")
return default
@staticmethod
async def with_retry_timeout(coro, timeout: float, max_retries: int = 3):
"""带重试的超时执行"""
last_error = None
for attempt in range(max_retries):
try:
async with asyncio.timeout(timeout):
return await coro
except asyncio.TimeoutError as e:
last_error = e
print(f"Attempt {attempt + 1} timed out")
await asyncio.sleep(2 ** attempt) # 指数退避
raise last_error
@staticmethod
async def deadline_scheduler(tasks: list, deadline: float):
"""截止时间调度器"""
results = []
start_time = time.time()
for task in tasks:
elapsed = time.time() - start_time
remaining = deadline - elapsed
if remaining <= 0:
print("Deadline exceeded, skipping remaining tasks")
break
try:
async with asyncio.timeout(remaining):
result = await task
results.append(result)
except asyncio.TimeoutError:
print("Task exceeded remaining deadline")
break
return results36.9 本章小结
本章详细介绍了Python异步编程进阶的核心概念和实践:
- 事件循环机制:事件循环原理、协程状态管理
- TaskGroup:结构化并发、错误处理、超时控制
- 同步原语:Lock、Semaphore、Event、Condition、Barrier
- 异步队列:Queue、PriorityQueue、生产者-消费者模式
- 异步网络:aiohttp客户端、WebSocket通信
- 异步数据库:连接池、异步ORM、事务处理
- 异常处理:异常传播、取消处理、重试机制
练习题
- 实现一个异步限流器,控制并发请求数量
- 开发一个异步爬虫,支持并发抓取和错误重试
- 实现一个异步消息队列系统,支持发布订阅模式
- 开发一个异步API客户端,支持请求缓存和自动重试
- 实现一个异步任务调度器,支持定时任务和依赖管理