Skip to content

第44章 微服务架构

学习目标

完成本章学习后,你将能够:

  1. 理解微服务架构:单体架构vs微服务、设计原则、拆分策略
  2. 实现服务发现:Consul、etcd、服务注册与发现
  3. 构建API网关:Kong、Traefik、路由、限流、认证
  4. 使用服务网格:Istio、Envoy、流量管理、安全通信
  5. 实现分布式追踪:Jaeger、Zipkin、OpenTelemetry
  6. 处理服务通信:同步通信、异步通信、事件驱动
  7. 实现服务容错:熔断器、重试、超时、降级
  8. 管理配置中心:配置分发、动态更新、环境隔离

44.1 微服务架构基础

44.1.1 架构设计原则

python
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable
from enum import Enum
import asyncio
import json
from abc import ABC, abstractmethod


class ServiceType(Enum):
    API_GATEWAY = "api_gateway"
    BUSINESS_SERVICE = "business_service"
    DATA_SERVICE = "data_service"
    INFRASTRUCTURE = "infrastructure"


@dataclass
class ServiceDefinition:
    name: str
    version: str
    service_type: ServiceType
    port: int
    host: str = "0.0.0.0"
    description: str = ""
    dependencies: List[str] = field(default_factory=list)
    endpoints: List[str] = field(default_factory=list)
    health_check_path: str = "/health"
    metadata: Dict[str, str] = field(default_factory=dict)


@dataclass
class MicroserviceConfig:
    service: ServiceDefinition
    database_url: Optional[str] = None
    cache_url: Optional[str] = None
    message_queue_url: Optional[str] = None
    config_server_url: Optional[str] = None
    tracing_enabled: bool = True
    metrics_enabled: bool = True
    log_level: str = "INFO"


class MicroservicePatterns:
    @staticmethod
    def single_responsibility() -> str:
        return """
        单一职责原则:
        - 每个微服务只负责一个业务领域
        - 服务内聚,服务间松耦合
        - 独立部署、独立扩展
        """

    @staticmethod
    def database_per_service() -> str:
        return """
        数据库每服务原则:
        - 每个服务拥有独立的数据库
        - 避免跨服务的数据库共享
        - 通过API或事件进行数据同步
        """

    @staticmethod
    def api_composition() -> str:
        return """
        API组合模式:
        - API网关聚合多个服务的数据
        - 适用于读多写少的场景
        - 减少客户端的请求次数
        """

    @staticmethod
    def saga_pattern() -> str:
        return """
        Saga模式:
        - 将分布式事务分解为一系列本地事务
        - 每个事务发布事件触发下一个事务
        - 失败时执行补偿事务
        """


class ServiceRegistry:
    def __init__(self):
        self._services: Dict[str, List[ServiceDefinition]] = {}
        self._health_status: Dict[str, bool] = {}

    def register(self, service: ServiceDefinition) -> None:
        key = f"{service.name}:{service.version}"
        if key not in self._services:
            self._services[key] = []
        self._services[key].append(service)
        self._health_status[f"{service.host}:{service.port}"] = True

    def deregister(self, service_name: str, host: str, port: int) -> None:
        for key, services in self._services.items():
            self._services[key] = [
                s for s in services
                if not (s.host == host and s.port == port)
            ]

    def discover(self, service_name: str) -> Optional[ServiceDefinition]:
        for key, services in self._services.items():
            if key.startswith(service_name):
                healthy = [
                    s for s in services
                    if self._health_status.get(f"{s.host}:{s.port}", False)
                ]
                if healthy:
                    import random
                    return random.choice(healthy)
        return None

    def discover_all(self, service_name: str) -> List[ServiceDefinition]:
        result = []
        for key, services in self._services.items():
            if key.startswith(service_name):
                healthy = [
                    s for s in services
                    if self._health_status.get(f"{s.host}:{s.port}", False)
                ]
                result.extend(healthy)
        return result

    def set_health(self, host: str, port: int, healthy: bool) -> None:
        self._health_status[f"{host}:{port}"] = healthy

    def list_services(self) -> Dict[str, List[Dict]]:
        return {
            key: [
                {
                    "host": s.host,
                    "port": s.port,
                    "healthy": self._health_status.get(f"{s.host}:{s.port}", False)
                }
                for s in services
            ]
            for key, services in self._services.items()
        }


class LoadBalancer:
    def __init__(self):
        self._counters: Dict[str, int] = {}

    def round_robin(self, services: List[ServiceDefinition], service_name: str) -> Optional[ServiceDefinition]:
        if not services:
            return None

        if service_name not in self._counters:
            self._counters[service_name] = 0

        index = self._counters[service_name] % len(services)
        self._counters[service_name] += 1
        return services[index]

    def weighted_round_robin(
        self,
        services: List[tuple]
    ) -> Optional[ServiceDefinition]:
        if not services:
            return None

        total_weight = sum(weight for _, weight in services)
        if total_weight == 0:
            return None

        import random
        r = random.randint(1, total_weight)
        current_weight = 0

        for service, weight in services:
            current_weight += weight
            if r <= current_weight:
                return service

        return services[0][0]

    def least_connections(
        self,
        services: List[tuple]
    ) -> Optional[ServiceDefinition]:
        if not services:
            return None

        return min(services, key=lambda x: x[1])[0]

44.1.2 服务通信

python
import aiohttp
from typing import TypeVar, Generic, Optional
from dataclasses import dataclass

T = TypeVar('T')


@dataclass
class ServiceResponse(Generic[T]):
    success: bool
    data: Optional[T] = None
    error: Optional[str] = None
    status_code: int = 200


class ServiceClient:
    def __init__(
        self,
        registry: ServiceRegistry,
        load_balancer: LoadBalancer,
        timeout: float = 30.0
    ):
        self.registry = registry
        self.load_balancer = load_balancer
        self.timeout = timeout
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(
                timeout=aiohttp.ClientTimeout(total=self.timeout)
            )
        return self._session

    async def call(
        self,
        service_name: str,
        endpoint: str,
        method: str = "GET",
        data: Dict = None,
        headers: Dict = None
    ) -> ServiceResponse:
        service = self.registry.discover(service_name)
        if not service:
            return ServiceResponse(
                success=False,
                error=f"Service {service_name} not found",
                status_code=503
            )

        url = f"http://{service.host}:{service.port}{endpoint}"
        session = await self._get_session()

        try:
            async with session.request(
                method,
                url,
                json=data,
                headers=headers
            ) as response:
                if response.status == 200:
                    result = await response.json()
                    return ServiceResponse(success=True, data=result)
                else:
                    error = await response.text()
                    return ServiceResponse(
                        success=False,
                        error=error,
                        status_code=response.status
                    )
        except asyncio.TimeoutError:
            return ServiceResponse(
                success=False,
                error="Request timeout",
                status_code=504
            )
        except Exception as e:
            return ServiceResponse(
                success=False,
                error=str(e),
                status_code=500
            )

    async def close(self) -> None:
        if self._session and not self._session.closed:
            await self._session.close()


class CircuitBreaker:
    class State(Enum):
        CLOSED = "closed"
        OPEN = "open"
        HALF_OPEN = "half_open"

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        self._state = self.State.CLOSED
        self._failure_count = 0
        self._last_failure_time: Optional[float] = None
        self._half_open_calls = 0

    @property
    def state(self) -> State:
        if self._state == self.State.OPEN:
            import time
            if time.time() - self._last_failure_time >= self.recovery_timeout:
                self._state = self.State.HALF_OPEN
                self._half_open_calls = 0
        return self._state

    def can_execute(self) -> bool:
        if self.state == self.State.CLOSED:
            return True
        elif self.state == self.State.HALF_OPEN:
            return self._half_open_calls < self.half_open_max_calls
        return False

    def record_success(self) -> None:
        if self._state == self.State.HALF_OPEN:
            self._half_open_calls += 1
            if self._half_open_calls >= self.half_open_max_calls:
                self._state = self.State.CLOSED
                self._failure_count = 0

    def record_failure(self) -> None:
        import time
        self._failure_count += 1
        self._last_failure_time = time.time()

        if self._state == self.State.HALF_OPEN:
            self._state = self.State.OPEN
        elif self._failure_count >= self.failure_threshold:
            self._state = self.State.OPEN


class ResilientServiceClient:
    def __init__(
        self,
        client: ServiceClient,
        circuit_breaker: CircuitBreaker,
        max_retries: int = 3,
        retry_delay: float = 1.0
    ):
        self.client = client
        self.circuit_breaker = circuit_breaker
        self.max_retries = max_retries
        self.retry_delay = retry_delay

    async def call_with_retry(
        self,
        service_name: str,
        endpoint: str,
        method: str = "GET",
        data: Dict = None,
        headers: Dict = None
    ) -> ServiceResponse:
        if not self.circuit_breaker.can_execute():
            return ServiceResponse(
                success=False,
                error="Circuit breaker is open",
                status_code=503
            )

        last_error = None
        for attempt in range(self.max_retries):
            response = await self.client.call(
                service_name, endpoint, method, data, headers
            )

            if response.success:
                self.circuit_breaker.record_success()
                return response

            last_error = response.error

            if response.status_code >= 500:
                self.circuit_breaker.record_failure()

            if attempt < self.max_retries - 1:
                await asyncio.sleep(self.retry_delay * (attempt + 1))

        return ServiceResponse(
            success=False,
            error=f"Max retries exceeded: {last_error}",
            status_code=503
        )

44.2 服务发现

44.2.1 Consul集成

python
from typing import Optional
import aiohttp


class ConsulClient:
    def __init__(self, host: str = "localhost", port: int = 8500):
        self.base_url = f"http://{host}:{port}/v1"
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session

    async def register_service(
        self,
        name: str,
        service_id: str,
        address: str,
        port: int,
        tags: List[str] = None,
        check: Dict = None
    ) -> bool:
        session = await self._get_session()

        payload = {
            "Name": name,
            "ID": service_id,
            "Address": address,
            "Port": port
        }

        if tags:
            payload["Tags"] = tags

        if check:
            payload["Check"] = check

        async with session.put(
            f"{self.base_url}/agent/service/register",
            json=payload
        ) as response:
            return response.status == 200

    async def deregister_service(self, service_id: str) -> bool:
        session = await self._get_session()

        async with session.put(
            f"{self.base_url}/agent/service/deregister/{service_id}"
        ) as response:
            return response.status == 200

    async def discover_service(self, service_name: str) -> List[Dict]:
        session = await self._get_session()

        async with session.get(
            f"{self.base_url}/health/service/{service_name}?passing"
        ) as response:
            if response.status == 200:
                data = await response.json()
                return [
                    {
                        "address": item["Service"]["Address"],
                        "port": item["Service"]["Port"],
                        "tags": item["Service"].get("Tags", [])
                    }
                    for item in data
                ]
            return []

    async def get_health_status(self, service_name: str) -> Dict:
        session = await self._get_session()

        async with session.get(
            f"{self.base_url}/health/service/{service_name}"
        ) as response:
            if response.status == 200:
                data = await response.json()
                return {
                    "total": len(data),
                    "passing": sum(1 for item in data if all(
                        check["Status"] == "passing"
                        for check in item.get("Checks", [])
                    ))
                }
            return {"total": 0, "passing": 0}

    async def set_key_value(self, key: str, value: str) -> bool:
        session = await self._get_session()

        async with session.put(
            f"{self.base_url}/kv/{key}",
            data=value
        ) as response:
            return response.status == 200

    async def get_key_value(self, key: str) -> Optional[str]:
        session = await self._get_session()

        async with session.get(
            f"{self.base_url}/kv/{key}?raw"
        ) as response:
            if response.status == 200:
                return await response.text()
            return None

    async def close(self) -> None:
        if self._session and not self._session.closed:
            await self._session.close()


class ServiceDiscoveryManager:
    def __init__(self, consul_client: ConsulClient):
        self.consul = consul_client
        self._registered_services: Dict[str, str] = {}

    async def register(
        self,
        service: ServiceDefinition
    ) -> bool:
        service_id = f"{service.name}-{service.host}-{service.port}"

        check = {
            "HTTP": f"http://{service.host}:{service.port}{service.health_check_path}",
            "Interval": "10s",
            "Timeout": "5s"
        }

        success = await self.consul.register_service(
            name=service.name,
            service_id=service_id,
            address=service.host,
            port=service.port,
            tags=[service.service_type.value],
            check=check
        )

        if success:
            self._registered_services[service.name] = service_id

        return success

    async def deregister(self, service_name: str) -> bool:
        if service_name not in self._registered_services:
            return False

        service_id = self._registered_services[service_name]
        success = await self.consul.deregister_service(service_id)

        if success:
            del self._registered_services[service_name]

        return success

    async def discover(self, service_name: str) -> List[Dict]:
        return await self.consul.discover_service(service_name)

    async def get_healthy_instances(self, service_name: str) -> List[Dict]:
        instances = await self.discover(service_name)
        return instances

44.3 API网关

44.3.1 网关实现

python
from dataclasses import dataclass
from typing import Callable, Awaitable
import time


@dataclass
class Route:
    path: str
    service_name: str
    methods: List[str]
    strip_prefix: bool = True
    rate_limit: Optional[int] = None
    auth_required: bool = False
    cache_ttl: Optional[int] = None


@dataclass
class RequestContext:
    request_id: str
    path: str
    method: str
    headers: Dict[str, str]
    query_params: Dict[str, str]
    body: Optional[bytes] = None
    client_ip: str = ""
    user: Optional[Dict] = None


@dataclass
class RouteMatch:
    route: Route
    path_params: Dict[str, str]


class Router:
    def __init__(self):
        self._routes: List[Route] = []

    def add_route(self, route: Route) -> None:
        self._routes.append(route)

    def match(self, path: str, method: str) -> Optional[RouteMatch]:
        for route in self._routes:
            if self._path_matches(route.path, path):
                if method in route.methods:
                    path_params = self._extract_params(route.path, path)
                    return RouteMatch(route=route, path_params=path_params)
        return None

    def _path_matches(self, pattern: str, path: str) -> bool:
        pattern_parts = pattern.strip("/").split("/")
        path_parts = path.strip("/").split("/")

        if len(pattern_parts) != len(path_parts):
            return False

        for p_part, a_part in zip(pattern_parts, path_parts):
            if p_part.startswith("{") and p_part.endswith("}"):
                continue
            if p_part != a_part:
                return False

        return True

    def _extract_params(self, pattern: str, path: str) -> Dict[str, str]:
        params = {}
        pattern_parts = pattern.strip("/").split("/")
        path_parts = path.strip("/").split("/")

        for p_part, a_part in zip(pattern_parts, path_parts):
            if p_part.startswith("{") and p_part.endswith("}"):
                param_name = p_part[1:-1]
                params[param_name] = a_part

        return params


class RateLimiter:
    def __init__(self):
        self._requests: Dict[str, List[float]] = {}

    def is_allowed(
        self,
        key: str,
        limit: int,
        window: int = 60
    ) -> bool:
        now = time.time()
        window_start = now - window

        if key not in self._requests:
            self._requests[key] = []

        self._requests[key] = [
            t for t in self._requests[key]
            if t > window_start
        ]

        if len(self._requests[key]) >= limit:
            return False

        self._requests[key].append(now)
        return True

    def get_remaining(self, key: str, limit: int, window: int = 60) -> int:
        now = time.time()
        window_start = now - window

        if key not in self._requests:
            return limit

        count = sum(1 for t in self._requests[key] if t > window_start)
        return max(0, limit - count)


class APIGateway:
    def __init__(
        self,
        router: Router,
        service_client: ServiceClient,
        rate_limiter: RateLimiter
    ):
        self.router = router
        self.service_client = service_client
        self.rate_limiter = rate_limiter
        self._middleware: List[Callable] = []

    def add_middleware(self, middleware: Callable) -> None:
        self._middleware.append(middleware)

    async def handle_request(self, ctx: RequestContext) -> ServiceResponse:
        match = self.router.match(ctx.path, ctx.method)

        if not match:
            return ServiceResponse(
                success=False,
                error="Not Found",
                status_code=404
            )

        route = match.route

        if route.auth_required and not ctx.user:
            return ServiceResponse(
                success=False,
                error="Unauthorized",
                status_code=401
            )

        if route.rate_limit:
            rate_key = f"{ctx.client_ip}:{route.path}"
            if not self.rate_limiter.is_allowed(rate_key, route.rate_limit):
                return ServiceResponse(
                    success=False,
                    error="Rate limit exceeded",
                    status_code=429
                )

        for middleware in self._middleware:
            result = await middleware(ctx)
            if result:
                return result

        target_path = ctx.path
        if route.strip_prefix:
            prefix = route.path.split("{")[0].rstrip("/")
            target_path = ctx.path[len(prefix):] or "/"

        if ctx.query_params:
            query_string = "&".join(f"{k}={v}" for k, v in ctx.query_params.items())
            target_path = f"{target_path}?{query_string}"

        headers = dict(ctx.headers)
        if ctx.user:
            headers["X-User-ID"] = str(ctx.user.get("id", ""))

        return await self.service_client.call(
            service_name=route.service_name,
            endpoint=target_path,
            method=ctx.method,
            headers=headers
        )


class GatewayBuilder:
    def __init__(self):
        self.router = Router()
        self.routes: List[Route] = []
        self._rate_limiter = RateLimiter()

    def route(
        self,
        path: str,
        service_name: str,
        methods: List[str] = None,
        **kwargs
    ) -> "GatewayBuilder":
        route = Route(
            path=path,
            service_name=service_name,
            methods=methods or ["GET"],
            **kwargs
        )
        self.router.add_route(route)
        return self

    def build(
        self,
        registry: ServiceRegistry,
        load_balancer: LoadBalancer
    ) -> APIGateway:
        service_client = ServiceClient(registry, load_balancer)
        return APIGateway(
            router=self.router,
            service_client=service_client,
            rate_limiter=self._rate_limiter
        )

44.4 分布式追踪

44.4.1 OpenTelemetry集成

python
from dataclasses import dataclass
import uuid
import time


@dataclass
class Span:
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    start_time: float
    end_time: Optional[float] = None
    tags: Dict[str, str] = field(default_factory=dict)
    logs: List[Dict] = field(default_factory=list)

    def finish(self) -> None:
        self.end_time = time.time()

    def set_tag(self, key: str, value: str) -> None:
        self.tags[key] = value

    def log(self, message: str, **kwargs) -> None:
        self.logs.append({
            "timestamp": time.time(),
            "message": message,
            **kwargs
        })

    @property
    def duration_ms(self) -> Optional[float]:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return None


class Tracer:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self._spans: List[Span] = []

    def start_span(
        self,
        operation_name: str,
        parent: Optional[Span] = None
    ) -> Span:
        span = Span(
            trace_id=parent.trace_id if parent else self._generate_trace_id(),
            span_id=self._generate_span_id(),
            parent_span_id=parent.span_id if parent else None,
            operation_name=operation_name,
            start_time=time.time()
        )
        span.set_tag("service", self.service_name)
        self._spans.append(span)
        return span

    def _generate_trace_id(self) -> str:
        return uuid.uuid4().hex

    def _generate_span_id(self) -> str:
        return uuid.uuid4().hex[:16]

    def get_trace(self, trace_id: str) -> List[Span]:
        return [s for s in self._spans if s.trace_id == trace_id]

    def export_spans(self) -> List[Dict]:
        return [
            {
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_span_id": span.parent_span_id,
                "operation_name": span.operation_name,
                "start_time": span.start_time,
                "end_time": span.end_time,
                "duration_ms": span.duration_ms,
                "tags": span.tags,
                "logs": span.logs
            }
            for span in self._spans
            if span.end_time is not None
        ]


class TracingMiddleware:
    def __init__(self, tracer: Tracer):
        self.tracer = tracer

    async def __call__(self, ctx: RequestContext) -> Optional[ServiceResponse]:
        trace_id = ctx.headers.get("X-Trace-ID", self.tracer._generate_trace_id())
        span_id = ctx.headers.get("X-Span-ID", self.tracer._generate_span_id())

        span = self.tracer.start_span(f"{ctx.method} {ctx.path}")
        span.trace_id = trace_id
        span.span_id = span_id

        span.set_tag("http.method", ctx.method)
        span.set_tag("http.path", ctx.path)
        span.set_tag("http.client_ip", ctx.client_ip)

        ctx.headers["X-Trace-ID"] = trace_id
        ctx.headers["X-Span-ID"] = span_id

        return None

44.5 知识图谱

44.5.1 微服务架构体系

┌─────────────────────────────────────────────────────────────────────┐
│                      微服务架构全景图                                 │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      客户端层 (Client)                        │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ Web应用  │ │ 移动应用  │ │ 桌面应用  │ │ IoT设备  │       │   │
│  │  └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘       │   │
│  └───────┼────────────┼────────────┼────────────┼──────────────┘   │
│          └────────────┴────────────┴────────────┘                   │
│                                │                                    │
│                                ▼                                    │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      API网关层 (Gateway)                      │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 路由转发  │ │ 认证授权  │ │ 限流熔断  │ │ 负载均衡  │       │   │
│  │  │ Kong    │ │ OAuth2   │ │ RateLimit│ │ Nginx   │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│          ┌─────────────────────┼─────────────────────┐             │
│          ▼                     ▼                     ▼             │
│  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐     │
│  │  用户服务    │      │  订单服务    │      │  商品服务    │     │
│  │  ┌────────┐  │      │  ┌────────┐  │      │  ┌────────┐  │     │
│  │  │业务逻辑│  │      │  │业务逻辑│  │      │  │业务逻辑│  │     │
│  │  │数据访问│  │      │  │数据访问│  │      │  │数据访问│  │     │
│  │  └────────┘  │      │  └────────┘  │      │  └────────┘  │     │
│  │    User DB   │      │   Order DB   │      │ Product DB   │     │
│  └──────────────┘      └──────────────┘      └──────────────┘     │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      服务网格 (Service Mesh)                  │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 服务发现  │ │ 熔断器   │ │ 链路追踪  │ │ 负载均衡  │       │   │
│  │  │ Consul  │ │ Hystrix │ │ Jaeger  │ │ Envoy   │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      基础设施层 (Infrastructure)              │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 消息队列  │ │ 缓存     │ │ 配置中心  │ │ 日志监控  │       │   │
│  │  │ Kafka   │ │ Redis   │ │ Apollo  │ │ ELK     │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

44.5.2 服务通信模式

┌─────────────────────────────────────────────────────────────────────┐
│                      服务通信模式对比                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │              同步通信 (Synchronous)                          │   │
│  │                                                               │   │
│  │   服务A ──────HTTP/gRPC──────▶ 服务B                         │   │
│  │            等待响应                                            │   │
│  │                                                               │   │
│  │  优点:简单直观、实时响应                                      │   │
│  │  缺点:耦合度高、级联失败                                      │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │              异步通信 (Asynchronous)                          │   │
│  │                                                               │   │
│  │   服务A ───▶ 消息队列 ───▶ 服务B                              │   │
│  │            解耦                                                │   │
│  │                                                               │   │
│  │  优点:解耦、削峰填谷、容错                                    │   │
│  │  缺点:复杂度高、最终一致性                                    │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │              事件驱动 (Event-Driven)                          │   │
│  │                                                               │   │
│  │   服务A ───▶ 事件总线 ◀─── 服务B, 服务C, 服务D               │   │
│  │            发布/订阅                                          │   │
│  │                                                               │   │
│  │  优点:高度解耦、灵活扩展                                      │   │
│  │  缺点:事件追踪困难                                            │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

44.6 技术选型指南

44.6.1 微服务框架选型

框架语言性能特点推荐指数
FastAPIPython极高异步、自动文档、类型提示★★★★★
NamekoPython微服务框架、RPC支持★★★★☆
gRPC多语言极高强类型、双向流、Protocol Buffers★★★★★
Spring CloudJava全栈解决方案、生态完善★★★★★
Go MicroGo极高轻量级、插件化★★★★☆

44.6.2 服务发现工具选型

工具特点一致性健康检查推荐指数
Consul功能全面、支持KV存储强一致★★★★★
Nacos阿里开源、配置管理最终一致★★★★★
EurekaNetflix开源、AP架构最终一致★★★★☆
ZookeeperCP架构、强一致强一致★★★☆☆

44.6.3 消息队列选型

消息队列吞吐量延迟持久化协议推荐场景
Kafka极高自定义大数据、日志
RabbitMQAMQP企业应用
Redis Stream极低可选自定义实时消息
RocketMQ极高自定义金融交易

44.7 常见问题与解决方案

44.7.1 服务雪崩与熔断

python
import time
import threading
from typing import Callable, Any, Dict, Optional
from dataclasses import dataclass, field
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class CircuitBreaker:
    """熔断器实现"""
    
    name: str
    failure_threshold: int = 5
    success_threshold: int = 3
    timeout: float = 60.0
    
    _state: CircuitState = field(default=CircuitState.CLOSED, init=False)
    _failure_count: int = field(default=0, init=False)
    _success_count: int = field(default=0, init=False)
    _last_failure_time: float = field(default=0, init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """执行受保护的调用"""
        with self._lock:
            if self._state == CircuitState.OPEN:
                if time.time() - self._last_failure_time >= self.timeout:
                    self._state = CircuitState.HALF_OPEN
                    self._success_count = 0
                else:
                    raise CircuitBreakerOpenError(
                        f"Circuit breaker '{self.name}' is open"
                    )
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        with self._lock:
            self._failure_count = 0
            if self._state == CircuitState.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.success_threshold:
                    self._state = CircuitState.CLOSED
    
    def _on_failure(self):
        with self._lock:
            self._failure_count += 1
            self._last_failure_time = time.time()
            
            if self._state == CircuitState.HALF_OPEN:
                self._state = CircuitState.OPEN
            elif self._failure_count >= self.failure_threshold:
                self._state = CircuitState.OPEN
    
    @property
    def state(self) -> CircuitState:
        return self._state
    
    def reset(self):
        with self._lock:
            self._state = CircuitState.CLOSED
            self._failure_count = 0
            self._success_count = 0


class CircuitBreakerOpenError(Exception):
    """熔断器打开异常"""
    pass


class RetryPolicy:
    """重试策略"""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
    
    def execute(self, func: Callable, *args, **kwargs) -> Any:
        """执行带重试的调用"""
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e
                if attempt < self.max_retries:
                    delay = min(
                        self.base_delay * (self.exponential_base ** attempt),
                        self.max_delay
                    )
                    time.sleep(delay)
        
        raise last_exception


class Bulkhead:
    """舱壁隔离"""
    
    def __init__(self, max_concurrent: int = 10):
        self.max_concurrent = max_concurrent
        self._semaphore = threading.Semaphore(max_concurrent)
    
    def execute(self, func: Callable, *args, **kwargs) -> Any:
        """在舱壁内执行"""
        acquired = self._semaphore.acquire(blocking=False)
        if not acquired:
            raise BulkheadFullError("Bulkhead is full")
        
        try:
            return func(*args, **kwargs)
        finally:
            self._semaphore.release()


class BulkheadFullError(Exception):
    """舱壁已满异常"""
    pass

44.7.2 分布式事务处理

python
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import uuid

class TransactionState(Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    CANCELLED = "cancelled"


@dataclass
class TransactionParticipant:
    name: str
    prepare: Callable
    commit: Callable
    rollback: Callable
    state: TransactionState = TransactionState.PENDING


class TwoPhaseCommit:
    """两阶段提交协调器"""
    
    def __init__(self, name: str):
        self.name = name
        self.participants: List[TransactionParticipant] = []
        self.transaction_id: Optional[str] = None
    
    def add_participant(self, participant: TransactionParticipant):
        self.participants.append(participant)
    
    def execute(self) -> bool:
        """执行两阶段提交"""
        self.transaction_id = str(uuid.uuid4())
        
        if not self._prepare_phase():
            self._rollback_phase()
            return False
        
        self._commit_phase()
        return True
    
    def _prepare_phase(self) -> bool:
        """准备阶段"""
        for participant in self.participants:
            try:
                result = participant.prepare()
                if not result:
                    return False
            except Exception:
                return False
        return True
    
    def _commit_phase(self):
        """提交阶段"""
        for participant in self.participants:
            try:
                participant.commit()
                participant.state = TransactionState.CONFIRMED
            except Exception as e:
                print(f"Commit failed for {participant.name}: {e}")
    
    def _rollback_phase(self):
        """回滚阶段"""
        for participant in self.participants:
            try:
                participant.rollback()
                participant.state = TransactionState.CANCELLED
            except Exception as e:
                print(f"Rollback failed for {participant.name}: {e}")


class SagaCoordinator:
    """Saga模式协调器"""
    
    def __init__(self):
        self.steps: List[Dict] = []
        self.compensations: List[Dict] = []
    
    def add_step(
        self,
        name: str,
        action: Callable,
        compensation: Callable
    ):
        """添加Saga步骤"""
        self.steps.append({"name": name, "action": action})
        self.compensations.append({"name": name, "compensation": compensation})
    
    def execute(self) -> Dict:
        """执行Saga事务"""
        executed_steps = []
        
        for i, step in enumerate(self.steps):
            try:
                result = step["action"]()
                executed_steps.append(i)
            except Exception as e:
                self._compensate(executed_steps)
                return {
                    "success": False,
                    "failed_step": step["name"],
                    "error": str(e)
                }
        
        return {"success": True}
    
    def _compensate(self, executed_steps: List[int]):
        """执行补偿操作"""
        for i in reversed(executed_steps):
            try:
                self.compensations[i]["compensation"]()
            except Exception as e:
                print(f"Compensation failed: {e}")


class OutboxPattern:
    """发件箱模式"""
    
    def __init__(self, db_connection):
        self.db = db_connection
        self._create_table()
    
    def _create_table(self):
        self.db.execute("""
            CREATE TABLE IF NOT EXISTS outbox (
                id VARCHAR(36) PRIMARY KEY,
                aggregate_type VARCHAR(100),
                aggregate_id VARCHAR(100),
                event_type VARCHAR(100),
                payload JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                processed_at TIMESTAMP NULL
            )
        """)
    
    def save_event(
        self,
        aggregate_type: str,
        aggregate_id: str,
        event_type: str,
        payload: Dict
    ):
        """保存事件到发件箱"""
        event_id = str(uuid.uuid4())
        self.db.execute(
            """INSERT INTO outbox 
               (id, aggregate_type, aggregate_id, event_type, payload)
               VALUES (?, ?, ?, ?, ?)""",
            (event_id, aggregate_type, aggregate_id, event_type, str(payload))
        )
        return event_id
    
    def get_unprocessed_events(self, limit: int = 100) -> List[Dict]:
        """获取未处理的事件"""
        return self.db.fetch_all(
            """SELECT * FROM outbox 
               WHERE processed_at IS NULL 
               ORDER BY created_at LIMIT ?""",
            (limit,)
        )
    
    def mark_processed(self, event_id: str):
        """标记事件已处理"""
        self.db.execute(
            "UPDATE outbox SET processed_at = CURRENT_TIMESTAMP WHERE id = ?",
            (event_id,)
        )

44.7.3 服务监控与可观测性

python
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from collections import defaultdict
import threading

@dataclass
class Metric:
    """指标数据"""
    name: str
    value: float
    labels: Dict[str, str] = field(default_factory=dict)
    timestamp: float = field(default_factory=time.time)


class MetricsCollector:
    """指标收集器"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self._counters: Dict[str, float] = defaultdict(float)
        self._gauges: Dict[str, float] = {}
        self._histograms: Dict[str, List[float]] = defaultdict(list)
        self._lock = threading.Lock()
    
    def counter(self, name: str, value: float = 1, labels: Dict = None):
        """计数器"""
        with self._lock:
            key = self._make_key(name, labels)
            self._counters[key] += value
    
    def gauge(self, name: str, value: float, labels: Dict = None):
        """仪表盘"""
        with self._lock:
            key = self._make_key(name, labels)
            self._gauges[key] = value
    
    def histogram(self, name: str, value: float, labels: Dict = None):
        """直方图"""
        with self._lock:
            key = self._make_key(name, labels)
            self._histograms[key].append(value)
    
    def timer(self, name: str, labels: Dict = None):
        """计时器上下文管理器"""
        return TimerContext(self, name, labels)
    
    def _make_key(self, name: str, labels: Dict = None) -> str:
        if labels:
            label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items()))
            return f"{name}{{{label_str}}}"
        return name
    
    def export_prometheus(self) -> str:
        """导出Prometheus格式"""
        lines = []
        
        for key, value in self._counters.items():
            lines.append(f"{key} {value}")
        
        for key, value in self._gauges.items():
            lines.append(f"{key} {value}")
        
        for key, values in self._histograms.items():
            if values:
                sorted_values = sorted(values)
                count = len(sorted_values)
                total = sum(sorted_values)
                
                lines.append(f"{key}_count {count}")
                lines.append(f"{key}_sum {total}")
                
                percentiles = [0.5, 0.9, 0.95, 0.99]
                for p in percentiles:
                    idx = int(count * p)
                    if idx < count:
                        lines.append(f"{key}_p{int(p*100)} {sorted_values[idx]}")
        
        return "\n".join(lines)


class TimerContext:
    """计时器上下文"""
    
    def __init__(self, collector: MetricsCollector, name: str, labels: Dict = None):
        self.collector = collector
        self.name = name
        self.labels = labels
        self.start_time = None
    
    def __enter__(self):
        self.start_time = time.time()
        return self
    
    def __exit__(self, *args):
        elapsed = time.time() - self.start_time
        self.collector.histogram(self.name, elapsed, self.labels)


class HealthChecker:
    """健康检查器"""
    
    def __init__(self):
        self._checks: Dict[str, Callable] = {}
    
    def register(self, name: str, check_func: Callable):
        """注册健康检查"""
        self._checks[name] = check_func
    
    def check(self) -> Dict:
        """执行所有健康检查"""
        results = {}
        overall_healthy = True
        
        for name, check_func in self._checks.items():
            try:
                result = check_func()
                results[name] = {
                    "status": "healthy" if result else "unhealthy",
                    "details": result if isinstance(result, dict) else None
                }
                if not result:
                    overall_healthy = False
            except Exception as e:
                results[name] = {
                    "status": "unhealthy",
                    "error": str(e)
                }
                overall_healthy = False
        
        return {
            "status": "healthy" if overall_healthy else "unhealthy",
            "checks": results
        }

44.8 本章小结

本章详细介绍了Python微服务架构的核心概念和实践:

  1. 微服务架构基础:设计原则、服务定义、服务注册
  2. 服务通信:同步通信、熔断器、重试机制
  3. 服务发现:Consul集成、服务注册与发现
  4. API网关:路由、限流、认证、中间件
  5. 分布式追踪:OpenTelemetry、Span、Trace

练习题

  1. 实现一个完整的服务注册中心,支持健康检查
  2. 开发一个API网关,支持路由、限流和认证
  3. 实现一个熔断器组件,支持状态转换和监控
  4. 开发一个分布式追踪系统,支持链路追踪
  5. 实现一个服务网格代理,支持流量管理

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校