Skip to content

第36章 异步编程进阶

学习目标

完成本章学习后,你将能够:

  1. 深入理解异步原理:事件循环、协程、任务调度机制
  2. 掌握asyncio高级特性:TaskGroup、信号量、队列、同步原语
  3. 实现异步网络编程:aiohttp客户端/服务器、WebSocket
  4. 使用异步数据库:asyncpg、aiomysql、SQLAlchemy异步
  5. 处理异步异常:异常传播、取消处理、超时控制
  6. 实现异步设计模式:生产者-消费者、限流器、连接池
  7. 调试异步代码:调试工具、性能分析、常见问题
  8. 构建高性能应用:异步Web服务、高并发处理

36.1 异步编程原理

36.1.1 事件循环机制

python
import asyncio
import inspect
from typing import Any, Callable, Coroutine, List, Optional, TypeVar
from dataclasses import dataclass
from enum import Enum
import time


T = TypeVar("T")


class TaskState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE = "done"
    CANCELLED = "cancelled"


@dataclass
class TaskInfo:
    id: int
    name: str
    state: TaskState
    result: Any = None
    exception: Optional[Exception] = None
    created_at: float = 0.0
    started_at: Optional[float] = None
    completed_at: Optional[float] = None

    @property
    def duration(self) -> Optional[float]:
        if self.started_at and self.completed_at:
            return self.completed_at - self.started_at
        return None


class EventLoopInspector:
    @staticmethod
    def get_running_loop() -> asyncio.AbstractEventLoop:
        return asyncio.get_running_loop()

    @staticmethod
    def get_event_loop() -> asyncio.AbstractEventLoop:
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            return asyncio.new_event_loop()

    @staticmethod
    def get_all_tasks(loop: Optional[asyncio.AbstractEventLoop] = None) -> List[asyncio.Task]:
        return asyncio.all_tasks(loop)

    @staticmethod
    def get_current_task(loop: Optional[asyncio.AbstractEventLoop] = None) -> Optional[asyncio.Task]:
        return asyncio.current_task(loop)

    @staticmethod
    def print_task_tree(loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
        tasks = asyncio.all_tasks(loop)
        print(f"Total tasks: {len(tasks)}")
        for task in tasks:
            print(f"  - {task.get_name()}: {task.get_coro()}")

    @staticmethod
    def get_task_info(task: asyncio.Task) -> TaskInfo:
        state = TaskState.PENDING
        if task.done():
            state = TaskState.CANCELLED if task.cancelled() else TaskState.DONE
        elif task._must_cancel:
            state = TaskState.CANCELLED
        else:
            state = TaskState.RUNNING

        result = None
        exception = None
        if task.done() and not task.cancelled():
            try:
                result = task.result()
            except Exception as e:
                exception = e

        return TaskInfo(
            id=id(task),
            name=task.get_name(),
            state=state,
            result=result,
            exception=exception
        )


class CustomEventLoop:
    def __init__(self):
        self._ready: List[Callable] = []
        self._scheduled: List[tuple] = []
        self._running = False

    def call_soon(self, callback: Callable, *args) -> None:
        self._ready.append((callback, args))

    def call_later(self, delay: float, callback: Callable, *args) -> None:
        when = time.time() + delay
        self._scheduled.append((when, callback, args))
        self._scheduled.sort(key=lambda x: x[0])

    def run_until_complete(self, coro: Coroutine) -> Any:
        task = asyncio.Task(coro, loop=asyncio.get_event_loop())
        return asyncio.get_event_loop().run_until_complete(task)

    def run_forever(self) -> None:
        self._running = True
        while self._running:
            now = time.time()

            while self._scheduled and self._scheduled[0][0] <= now:
                _, callback, args = self._scheduled.pop(0)
                self._ready.append((callback, args))

            if self._ready:
                callback, args = self._ready.pop(0)
                callback(*args)
            else:
                time.sleep(0.001)

    def stop(self) -> None:
        self._running = False


async def demonstrate_event_loop():
    loop = asyncio.get_running_loop()
    print(f"Event loop: {loop}")
    print(f"Is running: {loop.is_running()}")

    current = asyncio.current_task()
    print(f"Current task: {current.get_name()}")

    all_tasks = asyncio.all_tasks()
    print(f"All tasks: {len(all_tasks)}")

36.1.2 协程深入

python
class CoroutineAnalyzer:
    @staticmethod
    def is_coroutine(obj: Any) -> bool:
        return inspect.iscoroutine(obj)

    @staticmethod
    def is_coroutine_function(obj: Any) -> bool:
        return inspect.iscoroutinefunction(obj)

    @staticmethod
    def is_async_generator(obj: Any) -> bool:
        return inspect.isasyncgen(obj)

    @staticmethod
    def get_coroutine_state(coro: Coroutine) -> str:
        if coro.cr_running:
            return "running"
        elif coro.cr_frame is None:
            return "closed"
        else:
            return "suspended"

    @staticmethod
    def analyze_coroutine(coro: Coroutine) -> dict:
        return {
            "name": coro.__name__,
            "qualname": coro.__qualname__,
            "state": CoroutineAnalyzer.get_coroutine_state(coro),
            "frame": coro.cr_frame,
            "code": coro.cr_code,
            "origin": coro.cr_origin
        }


class CoroutineManager:
    def __init__(self):
        self._coroutines: List[Coroutine] = []
        self._results: List[Any] = []

    def add(self, coro: Coroutine) -> None:
        self._coroutines.append(coro)

    async def run_all(self) -> List[Any]:
        self._results = await asyncio.gather(*self._coroutines)
        return self._results

    async def run_sequentially(self) -> List[Any]:
        self._results = []
        for coro in self._coroutines:
            result = await coro
            self._results.append(result)
        return self._results

    def clear(self) -> None:
        self._coroutines.clear()
        self._results.clear()


async def coroutine_examples():
    async def simple_coroutine():
        await asyncio.sleep(0.1)
        return "result"

    async def coroutine_with_params(x: int, y: int):
        await asyncio.sleep(0.1)
        return x + y

    async def nested_coroutine():
        result = await simple_coroutine()
        return f"nested: {result}"

    result1 = await simple_coroutine()
    result2 = await coroutine_with_params(1, 2)
    result3 = await nested_coroutine()

    print(f"Result 1: {result1}")
    print(f"Result 2: {result2}")
    print(f"Result 3: {result3}")

36.2 asyncio高级特性

36.2.1 TaskGroup与结构化并发

python
from typing import AsyncIterator
import contextlib


class TaskGroupManager:
    def __init__(self):
        self._tasks: List[asyncio.Task] = []
        self._results: List[Any] = []

    async def run_with_taskgroup(self, coros: List[Coroutine]) -> List[Any]:
        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(coro) for coro in coros]

        return [task.result() for task in tasks]

    async def run_with_error_handling(
        self,
        coros: List[Coroutine]
    ) -> tuple[List[Any], List[Exception]]:
        results = []
        errors = []

        try:
            async with asyncio.TaskGroup() as tg:
                tasks = [tg.create_task(coro) for coro in coros]

            results = [task.result() for task in tasks]
        except ExceptionGroup as eg:
            for exc in eg.exceptions:
                errors.append(exc)

        return results, errors

    async def run_with_timeout(
        self,
        coros: List[Coroutine],
        timeout: float
    ) -> List[Any]:
        async with asyncio.timeout(timeout):
            return await self.run_with_taskgroup(coros)


async def taskgroup_examples():
    async def worker(name: str, duration: float):
        await asyncio.sleep(duration)
        return f"{name} completed after {duration}s"

    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(worker("A", 0.1))
        task2 = tg.create_task(worker("B", 0.2))
        task3 = tg.create_task(worker("C", 0.3))

    print(f"Task 1: {task1.result()}")
    print(f"Task 2: {task2.result()}")
    print(f"Task 3: {task3.result()}")


class StructuredConcurrency:
    @staticmethod
    async def map_concurrent(
        func: Callable,
        items: List[Any],
        max_concurrency: int = 10
    ) -> List[Any]:
        semaphore = asyncio.Semaphore(max_concurrency)

        async def limited(item):
            async with semaphore:
                if asyncio.iscoroutinefunction(func):
                    return await func(item)
                else:
                    return func(item)

        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(limited(item)) for item in items]

        return [task.result() for task in tasks]

    @staticmethod
    async def pipeline(
        stages: List[Callable[[Any], Coroutine]],
        initial_data: List[Any]
    ) -> List[Any]:
        data = initial_data

        for stage in stages:
            async with asyncio.TaskGroup() as tg:
                tasks = [tg.create_task(stage(item)) for item in data]
            data = [task.result() for task in tasks]

        return data

    @staticmethod
    async def fan_out_fan_in(
        producer: Callable[[], Coroutine],
        workers: int,
        consumer: Callable[[List[Any]], Coroutine]
    ) -> Any:
        results = []

        async def worker_task():
            return await producer()

        async with asyncio.TaskGroup() as tg:
            tasks = [tg.create_task(worker_task()) for _ in range(workers)]

        results = [task.result() for task in tasks]
        return await consumer(results)

36.2.2 同步原语

python
class AsyncLock:
    def __init__(self):
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        await self._lock.acquire()

    def release(self) -> None:
        self._lock.release()

    @contextlib.asynccontextmanager
    async def locked(self):
        await self.acquire()
        try:
            yield
        finally:
            self.release()


class AsyncSemaphore:
    def __init__(self, value: int = 1):
        self._semaphore = asyncio.Semaphore(value)

    async def acquire(self) -> None:
        await self._semaphore.acquire()

    def release(self) -> None:
        self._semaphore.release()

    @contextlib.asynccontextmanager
    async def limited(self):
        await self.acquire()
        try:
            yield
        finally:
            self.release()


class AsyncEvent:
    def __init__(self):
        self._event = asyncio.Event()

    def set(self) -> None:
        self._event.set()

    def clear(self) -> None:
        self._event.clear()

    def is_set(self) -> bool:
        return self._event.is_set()

    async def wait(self) -> None:
        await self._event.wait()


class AsyncCondition:
    def __init__(self, lock: Optional[asyncio.Lock] = None):
        self._condition = asyncio.Condition(lock)

    async def acquire(self) -> None:
        await self._condition.acquire()

    def release(self) -> None:
        self._condition.release()

    async def wait(self) -> None:
        await self._condition.wait()

    async def wait_for(self, predicate: Callable[[], bool]) -> None:
        await self._condition.wait_for(predicate)

    def notify(self, n: int = 1) -> None:
        self._condition.notify(n)

    def notify_all(self) -> None:
        self._condition.notify_all()

    @contextlib.asynccontextmanager
    async def locked(self):
        await self.acquire()
        try:
            yield self
        finally:
            self.release()


class AsyncBarrier:
    def __init__(self, parties: int):
        self._parties = parties
        self._count = 0
        self._event = asyncio.Event()
        self._lock = asyncio.Lock()

    async def wait(self) -> None:
        async with self._lock:
            self._count += 1
            if self._count == self._parties:
                self._event.set()
                self._count = 0
                self._event = asyncio.Event()
                return

        await self._event.wait()


class AsyncRWLock:
    def __init__(self):
        self._read_lock = asyncio.Lock()
        self._write_lock = asyncio.Lock()
        self._readers = 0
        self._read_count_lock = asyncio.Lock()

    async def acquire_read(self) -> None:
        async with self._read_count_lock:
            self._readers += 1
            if self._readers == 1:
                await self._write_lock.acquire()

    async def release_read(self) -> None:
        async with self._read_count_lock:
            self._readers -= 1
            if self._readers == 0:
                self._write_lock.release()

    async def acquire_write(self) -> None:
        await self._write_lock.acquire()

    def release_write(self) -> None:
        self._write_lock.release()

    @contextlib.asynccontextmanager
    async def read(self):
        await self.acquire_read()
        try:
            yield
        finally:
            await self.release_read()

    @contextlib.asynccontextmanager
    async def write(self):
        await self.acquire_write()
        try:
            yield
        finally:
            self.release_write()

36.2.3 异步队列

python
from typing import Generic, TypeVar
from collections import deque
import heapq


T = TypeVar("T")


class AsyncQueue(Generic[T]):
    def __init__(self, maxsize: int = 0):
        self._queue: asyncio.Queue[T] = asyncio.Queue(maxsize)

    async def put(self, item: T) -> None:
        await self._queue.put(item)

    async def get(self) -> T:
        return await self._queue.get()

    def put_nowait(self, item: T) -> None:
        self._queue.put_nowait(item)

    def get_nowait(self) -> T:
        return self._queue.get_nowait()

    def qsize(self) -> int:
        return self._queue.qsize()

    def empty(self) -> bool:
        return self._queue.empty()

    def full(self) -> bool:
        return self._queue.full()

    async def join(self) -> None:
        await self._queue.join()

    def task_done(self) -> None:
        self._queue.task_done()


class AsyncPriorityQueue(Generic[T]):
    def __init__(self, maxsize: int = 0):
        self._queue: asyncio.PriorityQueue[T] = asyncio.PriorityQueue(maxsize)

    async def put(self, item: tuple[int, T]) -> None:
        await self._queue.put(item)

    async def get(self) -> tuple[int, T]:
        return await self._queue.get()

    def qsize(self) -> int:
        return self._queue.qsize()

    def empty(self) -> bool:
        return self._queue.empty()


class AsyncLifoQueue(Generic[T]):
    def __init__(self, maxsize: int = 0):
        self._queue: asyncio.LifoQueue[T] = asyncio.LifoQueue(maxsize)

    async def put(self, item: T) -> None:
        await self._queue.put(item)

    async def get(self) -> T:
        return await self._queue.get()

    def qsize(self) -> int:
        return self._queue.qsize()


class BoundedAsyncQueue(Generic[T]):
    def __init__(self, maxsize: int = 100):
        self._maxsize = maxsize
        self._queue: deque[T] = deque()
        self._not_empty = asyncio.Event()
        self._not_full = asyncio.Event()
        self._not_full.set()
        self._lock = asyncio.Lock()

    async def put(self, item: T, timeout: Optional[float] = None) -> bool:
        try:
            async with asyncio.timeout(timeout):
                await self._not_full.wait()

            async with self._lock:
                self._queue.append(item)
                self._not_empty.set()

                if len(self._queue) >= self._maxsize:
                    self._not_full.clear()

            return True
        except asyncio.TimeoutError:
            return False

    async def get(self, timeout: Optional[float] = None) -> T:
        try:
            async with asyncio.timeout(timeout):
                await self._not_empty.wait()

            async with self._lock:
                item = self._queue.popleft()
                self._not_full.set()

                if not self._queue:
                    self._not_empty.clear()

            return item
        except asyncio.TimeoutError:
            raise asyncio.TimeoutError("Queue get timeout")

    def qsize(self) -> int:
        return len(self._queue)


class ProducerConsumer:
    def __init__(self, queue_size: int = 10, num_producers: int = 3, num_consumers: int = 2):
        self.queue: asyncio.Queue = asyncio.Queue(queue_size)
        self.num_producers = num_producers
        self.num_consumers = num_consumers
        self._stop_event = asyncio.Event()

    async def producer(self, producer_id: int) -> None:
        count = 0
        while not self._stop_event.is_set():
            item = f"Item-{producer_id}-{count}"
            await self.queue.put(item)
            print(f"Producer {producer_id} produced: {item}")
            count += 1
            await asyncio.sleep(0.1)

    async def consumer(self, consumer_id: int) -> None:
        while not self._stop_event.is_set() or not self.queue.empty():
            try:
                item = await asyncio.wait_for(self.queue.get(), timeout=0.5)
                print(f"Consumer {consumer_id} consumed: {item}")
                self.queue.task_done()
                await asyncio.sleep(0.2)
            except asyncio.TimeoutError:
                pass

    async def run(self, duration: float = 5.0) -> None:
        producers = [
            asyncio.create_task(self.producer(i))
            for i in range(self.num_producers)
        ]
        consumers = [
            asyncio.create_task(self.consumer(i))
            for i in range(self.num_consumers)
        ]

        await asyncio.sleep(duration)
        self._stop_event.set()

        await asyncio.gather(*producers)
        await asyncio.gather(*consumers)

36.3 异步网络编程

36.3.1 aiohttp客户端

python
import aiohttp
from typing import Dict, Optional, List, Any
import json


class AsyncHTTPClient:
    def __init__(
        self,
        base_url: str = "",
        timeout: float = 30.0,
        max_connections: int = 100,
        headers: Optional[Dict[str, str]] = None
    ):
        self.base_url = base_url
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.max_connections = max_connections
        self.headers = headers or {}
        self._session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self) -> "AsyncHTTPClient":
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()

    async def connect(self) -> None:
        connector = aiohttp.TCPConnector(limit=self.max_connections)
        self._session = aiohttp.ClientSession(
            base_url=self.base_url,
            timeout=self.timeout,
            connector=connector,
            headers=self.headers
        )

    async def close(self) -> None:
        if self._session:
            await self._session.close()
            self._session = None

    async def get(
        self,
        url: str,
        params: Optional[Dict] = None,
        headers: Optional[Dict] = None
    ) -> Any:
        async with self._session.get(url, params=params, headers=headers) as response:
            response.raise_for_status()
            return await response.json()

    async def post(
        self,
        url: str,
        data: Optional[Dict] = None,
        json: Optional[Dict] = None,
        headers: Optional[Dict] = None
    ) -> Any:
        async with self._session.post(url, data=data, json=json, headers=headers) as response:
            response.raise_for_status()
            return await response.json()

    async def put(
        self,
        url: str,
        data: Optional[Dict] = None,
        headers: Optional[Dict] = None
    ) -> Any:
        async with self._session.put(url, data=data, headers=headers) as response:
            response.raise_for_status()
            return await response.json()

    async def delete(
        self,
        url: str,
        headers: Optional[Dict] = None
    ) -> Any:
        async with self._session.delete(url, headers=headers) as response:
            response.raise_for_status()
            return await response.json()

    async def request(
        self,
        method: str,
        url: str,
        **kwargs
    ) -> aiohttp.ClientResponse:
        async with self._session.request(method, url, **kwargs) as response:
            return response


class AsyncRequestBatch:
    def __init__(self, client: AsyncHTTPClient, max_concurrent: int = 10):
        self.client = client
        self.max_concurrent = max_concurrent
        self._requests: List[tuple] = []

    def add(self, method: str, url: str, **kwargs) -> None:
        self._requests.append((method, url, kwargs))

    async def execute_all(self) -> List[Any]:
        semaphore = asyncio.Semaphore(self.max_concurrent)

        async def limited_request(method: str, url: str, **kwargs):
            async with semaphore:
                return await self.client.request(method, url, **kwargs)

        tasks = [
            limited_request(method, url, **kwargs)
            for method, url, kwargs in self._requests
        ]

        return await asyncio.gather(*tasks, return_exceptions=True)

    def clear(self) -> None:
        self._requests.clear()


class RetryableClient:
    def __init__(
        self,
        client: AsyncHTTPClient,
        max_retries: int = 3,
        retry_delay: float = 1.0,
        retry_exceptions: tuple = (aiohttp.ClientError, asyncio.TimeoutError)
    ):
        self.client = client
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.retry_exceptions = retry_exceptions

    async def request_with_retry(
        self,
        method: str,
        url: str,
        **kwargs
    ) -> aiohttp.ClientResponse:
        last_exception = None

        for attempt in range(self.max_retries):
            try:
                return await self.client.request(method, url, **kwargs)
            except self.retry_exceptions as e:
                last_exception = e
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(self.retry_delay * (attempt + 1))

        raise last_exception


async def http_client_example():
    async with AsyncHTTPClient("https://jsonplaceholder.typicode.com") as client:
        posts = await client.get("/posts")
        print(f"Fetched {len(posts)} posts")

        post = await client.get("/posts/1")
        print(f"Post 1: {post['title']}")

        new_post = await client.post("/posts", json={
            "title": "Test Post",
            "body": "This is a test",
            "userId": 1
        })
        print(f"Created post with ID: {new_post['id']}")

36.3.2 WebSocket通信

python
class AsyncWebSocketClient:
    def __init__(self, url: str):
        self.url = url
        self._session: Optional[aiohttp.ClientSession] = None
        self._ws: Optional[aiohttp.ClientWebSocketResponse] = None

    async def connect(self) -> None:
        self._session = aiohttp.ClientSession()
        self._ws = await self._session.ws_connect(self.url)

    async def close(self) -> None:
        if self._ws:
            await self._ws.close()
        if self._session:
            await self._session.close()

    async def send_str(self, message: str) -> None:
        await self._ws.send_str(message)

    async def send_json(self, data: Dict) -> None:
        await self._ws.send_json(data)

    async def send_bytes(self, data: bytes) -> None:
        await self._ws.send_bytes(data)

    async def receive(self) -> aiohttp.WSMessage:
        return await self._ws.receive()

    async def receive_str(self) -> str:
        msg = await self.receive()
        if msg.type == aiohttp.WSMsgType.TEXT:
            return msg.data
        raise ValueError(f"Expected text message, got {msg.type}")

    async def receive_json(self) -> Dict:
        msg = await self.receive()
        if msg.type == aiohttp.WSMsgType.TEXT:
            return json.loads(msg.data)
        raise ValueError(f"Expected text message, got {msg.type}")

    @property
    def closed(self) -> bool:
        return self._ws.closed if self._ws else True

    async def __aenter__(self) -> "AsyncWebSocketClient":
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()


class WebSocketServer:
    def __init__(self, host: str = "0.0.0.0", port: int = 8080):
        self.host = host
        self.port = port
        self._clients: set = set()
        self._app = aiohttp.web.Application()
        self._runner: Optional[aiohttp.web.AppRunner] = None

    async def handle_websocket(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
        ws = aiohttp.web.WebSocketResponse()
        await ws.prepare(request)

        self._clients.add(ws)
        print(f"Client connected. Total clients: {len(self._clients)}")

        try:
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    await self.on_message(ws, msg.data)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"WebSocket error: {ws.exception()}")
        finally:
            self._clients.discard(ws)
            print(f"Client disconnected. Total clients: {len(self._clients)}")

        return ws

    async def on_message(self, ws: aiohttp.web.WebSocketResponse, message: str) -> None:
        await ws.send_str(f"Echo: {message}")

    async def broadcast(self, message: str) -> None:
        for ws in self._clients:
            try:
                await ws.send_str(message)
            except Exception as e:
                print(f"Broadcast error: {e}")

    async def start(self) -> None:
        self._app.router.add_get("/ws", self.handle_websocket)
        self._runner = aiohttp.web.AppRunner(self._app)
        await self._runner.setup()
        site = aiohttp.web.TCPSite(self._runner, self.host, self.port)
        await site.start()
        print(f"WebSocket server started on {self.host}:{self.port}")

    async def stop(self) -> None:
        if self._runner:
            await self._runner.cleanup()

36.4 异步数据库

36.4.1 异步数据库连接

python
class AsyncDatabasePool:
    def __init__(self, db_url: str, min_size: int = 5, max_size: int = 20):
        self.db_url = db_url
        self.min_size = min_size
        self.max_size = max_size
        self._pool = None

    async def connect(self) -> None:
        pass

    async def close(self) -> None:
        if self._pool:
            await self._pool.close()

    async def execute(self, query: str, *args) -> Any:
        pass

    async def fetch(self, query: str, *args) -> List[Dict]:
        pass

    async def fetchone(self, query: str, *args) -> Optional[Dict]:
        pass

    async def fetchval(self, query: str, *args) -> Any:
        pass


class AsyncSQLite:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._connection = None

    async def connect(self) -> None:
        import aiosqlite
        self._connection = await aiosqlite.connect(self.db_path)
        self._connection.row_factory = aiosqlite.Row

    async def close(self) -> None:
        if self._connection:
            await self._connection.close()

    async def execute(self, query: str, params: tuple = ()) -> None:
        await self._connection.execute(query, params)
        await self._connection.commit()

    async def fetchall(self, query: str, params: tuple = ()) -> List[Dict]:
        async with self._connection.execute(query, params) as cursor:
            rows = await cursor.fetchall()
            return [dict(row) for row in rows]

    async def fetchone(self, query: str, params: tuple = ()) -> Optional[Dict]:
        async with self._connection.execute(query, params) as cursor:
            row = await cursor.fetchone()
            return dict(row) if row else None

    async def executemany(self, query: str, params_list: List[tuple]) -> None:
        await self._connection.executemany(query, params_list)
        await self._connection.commit()

    @contextlib.asynccontextmanager
    async def transaction(self):
        await self._connection.execute("BEGIN")
        try:
            yield
            await self._connection.commit()
        except Exception:
            await self._connection.rollback()
            raise

    async def __aenter__(self) -> "AsyncSQLite":
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()


class AsyncPostgreSQL:
    def __init__(self, dsn: str, min_size: int = 5, max_size: int = 20):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self._pool = None

    async def connect(self) -> None:
        import asyncpg
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size
        )

    async def close(self) -> None:
        if self._pool:
            await self._pool.close()

    async def execute(self, query: str, *args) -> str:
        async with self._pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args) -> List[asyncpg.Record]:
        async with self._pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args) -> Optional[asyncpg.Record]:
        async with self._pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

    async def fetchval(self, query: str, *args) -> Any:
        async with self._pool.acquire() as conn:
            return await conn.fetchval(query, *args)

    @contextlib.asynccontextmanager
    async def transaction(self):
        async with self._pool.acquire() as conn:
            async with conn.transaction():
                yield conn

    async def __aenter__(self) -> "AsyncPostgreSQL":
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()

36.4.2 异步ORM

python
from dataclasses import dataclass, field
from typing import TypeVar, Generic, List, Optional, Any
import json


ModelType = TypeVar("ModelType", bound="BaseModel")


@dataclass
class BaseModel:
    id: Optional[int] = None

    def to_dict(self) -> Dict:
        return {
            k: v for k, v in self.__dict__.items()
            if not k.startswith("_")
        }

    @classmethod
    def from_dict(cls: type[ModelType], data: Dict) -> ModelType:
        return cls(**{
            k: v for k, v in data.items()
            if k in cls.__dataclass_fields__
        })


class AsyncRepository(Generic[ModelType]):
    def __init__(self, db: AsyncSQLite, model_class: type[ModelType], table_name: str):
        self.db = db
        self.model_class = model_class
        self.table_name = table_name

    async def create_table(self, columns: Dict[str, str]) -> None:
        cols = ["id INTEGER PRIMARY KEY AUTOINCREMENT"]
        cols.extend([f"{name} {dtype}" for name, dtype in columns.items()])
        query = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({', '.join(cols)})"
        await self.db.execute(query)

    async def insert(self, model: ModelType) -> int:
        data = model.to_dict()
        data.pop("id", None)

        columns = ", ".join(data.keys())
        placeholders = ", ".join("?" * len(data))
        query = f"INSERT INTO {self.table_name} ({columns}) VALUES ({placeholders})"

        await self.db.execute(query, tuple(data.values()))

        result = await self.db.fetchone("SELECT last_insert_rowid() as id")
        return result["id"]

    async def find_by_id(self, id: int) -> Optional[ModelType]:
        query = f"SELECT * FROM {self.table_name} WHERE id = ?"
        row = await self.db.fetchone(query, (id,))
        return self.model_class.from_dict(row) if row else None

    async def find_all(self) -> List[ModelType]:
        query = f"SELECT * FROM {self.table_name}"
        rows = await self.db.fetchall(query)
        return [self.model_class.from_dict(row) for row in rows]

    async def find_where(self, condition: str, *args) -> List[ModelType]:
        query = f"SELECT * FROM {self.table_name} WHERE {condition}"
        rows = await self.db.fetchall(query, args)
        return [self.model_class.from_dict(row) for row in rows]

    async def update(self, model: ModelType) -> None:
        if model.id is None:
            raise ValueError("Cannot update model without id")

        data = model.to_dict()
        data.pop("id")

        set_clause = ", ".join(f"{k} = ?" for k in data.keys())
        query = f"UPDATE {self.table_name} SET {set_clause} WHERE id = ?"

        await self.db.execute(query, tuple(data.values()) + (model.id,))

    async def delete(self, id: int) -> None:
        query = f"DELETE FROM {self.table_name} WHERE id = ?"
        await self.db.execute(query, (id,))

    async def count(self, condition: str = "1=1", *args) -> int:
        query = f"SELECT COUNT(*) as count FROM {self.table_name} WHERE {condition}"
        result = await self.db.fetchone(query, args)
        return result["count"]


@dataclass
class User(BaseModel):
    username: str = ""
    email: str = ""
    created_at: str = ""


class UserRepository(AsyncRepository[User]):
    def __init__(self, db: AsyncSQLite):
        super().__init__(db, User, "users")

    async def find_by_email(self, email: str) -> Optional[User]:
        return (await self.find_where("email = ?", email))[0] if await self.find_where("email = ?", email) else None

    async def find_by_username(self, username: str) -> Optional[User]:
        return (await self.find_where("username = ?", username))[0] if await self.find_where("username = ?", username) else None

36.5 异步异常处理

36.5.1 异常传播

python
class AsyncExceptionHandler:
    @staticmethod
    async def safe_execute(
        coro: Coroutine,
        default: Any = None,
        exceptions: tuple = (Exception,)
    ) -> Any:
        try:
            return await coro
        except exceptions as e:
            print(f"Exception caught: {e}")
            return default

    @staticmethod
    async def with_retry(
        coro_factory: Callable[[], Coroutine],
        max_retries: int = 3,
        delay: float = 1.0,
        exceptions: tuple = (Exception,)
    ) -> Any:
        last_exception = None

        for attempt in range(max_retries):
            try:
                return await coro_factory()
            except exceptions as e:
                last_exception = e
                if attempt < max_retries - 1:
                    await asyncio.sleep(delay * (attempt + 1))

        raise last_exception

    @staticmethod
    async def with_timeout(coro: Coroutine, timeout: float) -> Any:
        async with asyncio.timeout(timeout):
            return await coro

    @staticmethod
    async def with_deadline(coro: Coroutine, deadline: float) -> Any:
        loop = asyncio.get_running_loop()
        remaining = deadline - loop.time()

        if remaining <= 0:
            raise asyncio.TimeoutError("Deadline already passed")

        async with asyncio.timeout(remaining):
            return await coro


class TaskCancellationHandler:
    def __init__(self):
        self._cancelled_tasks: List[asyncio.Task] = []

    async def run_with_cancellation(
        self,
        coro: Coroutine,
        on_cancel: Optional[Callable] = None
    ) -> Any:
        task = asyncio.create_task(coro)

        try:
            return await task
        except asyncio.CancelledError:
            self._cancelled_tasks.append(task)
            if on_cancel:
                await on_cancel()
            raise

    async def cancel_after(
        self,
        coro: Coroutine,
        delay: float,
        on_cancel: Optional[Callable] = None
    ) -> Any:
        task = asyncio.create_task(coro)

        async def canceller():
            await asyncio.sleep(delay)
            if not task.done():
                task.cancel()

        canceller_task = asyncio.create_task(canceller())

        try:
            return await task
        except asyncio.CancelledError:
            if on_cancel:
                await on_cancel()
            raise
        finally:
            canceller_task.cancel()
            try:
                await canceller_task
            except asyncio.CancelledError:
                pass

    @staticmethod
    async def shield(coro: Coroutine) -> Any:
        return await asyncio.shield(coro)

    @staticmethod
    async def graceful_shutdown(
        tasks: List[asyncio.Task],
        timeout: float = 5.0
    ) -> None:
        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

36.6 知识图谱

36.6.1 异步编程技术架构

┌─────────────────────────────────────────────────────────────────────┐
│                      Python 异步编程技术栈                            │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   应用层     │  │   Web框架   │  │   数据库    │  │   网络通信   │ │
│  │  FastAPI    │  │  Aiohttp   │  │  AsyncPG   │  │  WebSocket  │ │
│  │  Sanic      │  │  Starlette │  │  AIOMySQL  │  │  TCP/UDP    │ │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘ │
│         │                │                │                │        │
│  ┌──────┴────────────────┴────────────────┴────────────────┴──────┐ │
│  │                      asyncio 核心层                              │ │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐           │ │
│  │  │TaskGroup │ │ Semaphore│ │  Queue   │ │  Lock    │           │ │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘           │ │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐           │ │
│  │  │  Event   │ │Condition │ │  Barrier │ │  Future  │           │ │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘           │ │
│  └─────────────────────────────────────────────────────────────────┘ │
│                                │                                     │
│  ┌─────────────────────────────┴───────────────────────────────────┐ │
│  │                      事件循环 (Event Loop)                        │ │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │ │
│  │  │   Selector   │  │   Scheduler  │  │   Ready Queue │          │ │
│  │  │  (epoll/kq)  │  │  (定时任务)   │  │   (就绪队列)  │          │ │
│  │  └──────────────┘  └──────────────┘  └──────────────┘          │ │
│  └─────────────────────────────────────────────────────────────────┘ │
│                                │                                     │
│  ┌─────────────────────────────┴───────────────────────────────────┐ │
│  │                      协程层 (Coroutine)                           │ │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │ │
│  │  │ async/await  │  │   Task       │  │   Future     │          │ │
│  │  │  (语法糖)     │  │  (任务封装)   │  │  (结果容器)  │          │ │
│  │  └──────────────┘  └──────────────┘  └──────────────┘          │ │
│  └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

36.6.2 异步执行流程

┌─────────────────────────────────────────────────────────────────────┐
│                        异步任务执行流程                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    │
│   │ 创建协程  │───▶│ 封装Task │───▶│ 提交队列  │───▶│ 等待调度  │    │
│   │ async def│    │create_task│   │  ready   │    │  pending │    │
│   └──────────┘    └──────────┘    └──────────┘    └────┬─────┘    │
│                                                        │           │
│                     ┌──────────────────────────────────┘           │
│                     ▼                                              │
│   ┌──────────────────────────────────────────────────────────┐    │
│   │                    事件循环调度                             │    │
│   │  ┌────────────────────────────────────────────────────┐  │    │
│   │  │  1. 检查就绪队列 → 执行可运行任务                      │  │    │
│   │  │  2. 检查I/O状态 → 唤醒完成的I/O操作                    │  │    │
│   │  │  3. 检查定时器 → 执行到期的定时任务                     │  │    │
│   │  │  4. 处理信号 → 响应外部中断                            │  │    │
│   │  └────────────────────────────────────────────────────┘  │    │
│   └──────────────────────────────────────────────────────────┘    │
│                     │                                              │
│                     ▼                                              │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    │
│   │ 执行协程  │───▶│ 遇到await│───▶│ 挂起任务  │───▶│ I/O操作   │    │
│   │ running  │    │ 阻塞点   │    │ suspended│    │ 非阻塞    │    │
│   └──────────┘    └──────────┘    └──────────┘    └────┬─────┘    │
│                                                        │           │
│                     ┌──────────────────────────────────┘           │
│                     ▼                                              │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐                   │
│   │ I/O完成  │───▶│ 重新入队  │───▶│ 继续执行  │───▶ 返回结果      │
│   │ callback │    │  ready   │    │ resume   │                   │
│   └──────────┘    └──────────┘    └──────────┘                   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

36.7 技术选型指南

36.7.1 异步框架选型

框架适用场景性能特点学习曲线生态系统
asyncio标准库、基础异步原生支持、稳定中等Python内置
aiohttpHTTP客户端/服务端高性能、功能丰富中等成熟完善
FastAPIWeb API开发极高性能、自动文档快速增长
Sanic高性能Web服务接近Go性能中等规模
Starlette轻量级ASGI框架高性能、灵活中等适中
Tornado长连接、WebSocket成熟稳定中等完善生态

36.7.2 异步数据库驱动选型

数据库驱动连接池事务支持ORM兼容推荐指数
PostgreSQLasyncpg✅ 原生✅ 完整SQLAlchemy 2.0★★★★★
MySQLaiomysql✅ 原生✅ 完整SQLAlchemy 2.0★★★★☆
SQLiteaiosqlite❌ 单连接✅ 完整SQLAlchemy 2.0★★★★☆
MongoDBmotor✅ 原生✅ 会话ODM (Beanie)★★★★★
Redisaioredis✅ 原生✅ 事务-★★★★★

36.7.3 同步原语选型

同步原语适用场景特点使用建议
Lock互斥访问共享资源简单可靠保护临界区代码
Semaphore限制并发数量可配置许可数连接池、限流
Event事件通知机制一对多通知状态同步、启动信号
Condition复杂条件等待支持谓词等待生产者-消费者
Barrier多任务同步点等待所有参与者并行计算分阶段
Queue任务队列线程安全传递解耦生产消费

36.8 常见问题与解决方案

36.8.1 事件循环阻塞问题

python
import asyncio
import time

class EventLoopBlockSolution:
    @staticmethod
    async def bad_blocking():
        """错误示例:阻塞事件循环"""
        time.sleep(5)  # 阻塞整个事件循环
        return "done"
    
    @staticmethod
    async def good_non_blocking():
        """正确示例:使用异步sleep"""
        await asyncio.sleep(5)  # 非阻塞,允许其他任务执行
        return "done"
    
    @staticmethod
    async def run_blocking_in_executor():
        """解决方案:在线程池中运行阻塞代码"""
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            None,  # 使用默认线程池
            time.sleep,
            5
        )
        return result
    
    @staticmethod
    async def run_cpu_bound_in_process():
        """解决方案:在进程池中运行CPU密集型任务"""
        import concurrent.futures
        
        def cpu_intensive_task(n):
            return sum(i * i for i in range(n))
        
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            result = await loop.run_in_executor(
                pool,
                cpu_intensive_task,
                1000000
            )
        return result

36.8.2 任务取消与清理

python
class TaskCancellationSolution:
    def __init__(self):
        self._cleanup_handlers = {}
    
    async def run_with_cleanup(self, coro, task_name: str):
        """带清理机制的任务执行"""
        task = asyncio.create_task(coro, name=task_name)
        
        try:
            return await task
        except asyncio.CancelledError:
            print(f"Task {task_name} was cancelled")
            await self._cleanup(task_name)
            raise
        except Exception as e:
            print(f"Task {task_name} failed: {e}")
            await self._cleanup(task_name)
            raise
    
    async def _cleanup(self, task_name: str):
        """执行清理操作"""
        if task_name in self._cleanup_handlers:
            handler = self._cleanup_handlers[task_name]
            await handler()
            del self._cleanup_handlers[task_name]
    
    def register_cleanup(self, task_name: str, handler):
        """注册清理处理器"""
        self._cleanup_handlers[task_name] = handler
    
    @staticmethod
    async def graceful_cancel_tasks(tasks: list, timeout: float = 5.0):
        """优雅取消多个任务"""
        for task in tasks:
            task.cancel()
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for task, result in zip(tasks, results):
            if isinstance(result, asyncio.CancelledError):
                print(f"Task {task.get_name()} cancelled gracefully")
            elif isinstance(result, Exception):
                print(f"Task {task.get_name()} ended with error: {result}")

36.8.3 异步上下文管理器资源泄漏

python
class AsyncResourceManager:
    """异步资源管理解决方案"""
    
    def __init__(self):
        self._resources = []
    
    async def __aenter__(self):
        await self.acquire()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.release()
        return False
    
    async def acquire(self):
        """获取资源"""
        resource = await self._create_resource()
        self._resources.append(resource)
        return resource
    
    async def release(self):
        """释放所有资源"""
        for resource in self._resources:
            try:
                await self._close_resource(resource)
            except Exception as e:
                print(f"Error closing resource: {e}")
        self._resources.clear()
    
    async def _create_resource(self):
        return {"id": id(self), "created": time.time()}
    
    async def _close_resource(self, resource):
        await asyncio.sleep(0.01)
        print(f"Closed resource {resource['id']}")


async def safe_resource_usage():
    """安全的资源使用模式"""
    async with AsyncResourceManager() as manager:
        resource = await manager.acquire()
        try:
            await process_resource(resource)
        except Exception as e:
            print(f"Error processing: {e}")
            raise

36.8.4 异步超时处理

python
class AsyncTimeoutSolution:
    @staticmethod
    async def with_timeout(coro, timeout: float, default=None):
        """带超时和默认值的执行"""
        try:
            async with asyncio.timeout(timeout):
                return await coro
        except asyncio.TimeoutError:
            print(f"Operation timed out after {timeout}s")
            return default
    
    @staticmethod
    async def with_retry_timeout(coro, timeout: float, max_retries: int = 3):
        """带重试的超时执行"""
        last_error = None
        for attempt in range(max_retries):
            try:
                async with asyncio.timeout(timeout):
                    return await coro
            except asyncio.TimeoutError as e:
                last_error = e
                print(f"Attempt {attempt + 1} timed out")
                await asyncio.sleep(2 ** attempt)  # 指数退避
        
        raise last_error
    
    @staticmethod
    async def deadline_scheduler(tasks: list, deadline: float):
        """截止时间调度器"""
        results = []
        start_time = time.time()
        
        for task in tasks:
            elapsed = time.time() - start_time
            remaining = deadline - elapsed
            
            if remaining <= 0:
                print("Deadline exceeded, skipping remaining tasks")
                break
            
            try:
                async with asyncio.timeout(remaining):
                    result = await task
                    results.append(result)
            except asyncio.TimeoutError:
                print("Task exceeded remaining deadline")
                break
        
        return results

36.9 本章小结

本章详细介绍了Python异步编程进阶的核心概念和实践:

  1. 事件循环机制:事件循环原理、协程状态管理
  2. TaskGroup:结构化并发、错误处理、超时控制
  3. 同步原语:Lock、Semaphore、Event、Condition、Barrier
  4. 异步队列:Queue、PriorityQueue、生产者-消费者模式
  5. 异步网络:aiohttp客户端、WebSocket通信
  6. 异步数据库:连接池、异步ORM、事务处理
  7. 异常处理:异常传播、取消处理、重试机制

练习题

  1. 实现一个异步限流器,控制并发请求数量
  2. 开发一个异步爬虫,支持并发抓取和错误重试
  3. 实现一个异步消息队列系统,支持发布订阅模式
  4. 开发一个异步API客户端,支持请求缓存和自动重试
  5. 实现一个异步任务调度器,支持定时任务和依赖管理

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校