Skip to content

第30章 云原生设计模式

学习目标

完成本章学习后,读者将能够:

  • 理解云原生架构的核心原则与形式化定义
  • 掌握容器化、服务网格、可观测性等关键模式
  • 实现熔断器、限流器、重试等弹性模式
  • 应用云原生模式构建高可用分布式系统

30.1 理论基础与形式化定义

30.1.1 云原生的形式化定义

定义 30.1(云原生系统):云原生系统 $\mathcal{CN}$ 是一个五元组:

$$\mathcal{CN} = \langle \mathcal{C}, \mathcal{O}, \mathcal{M}, \mathcal{S}, \mathcal{R} \rangle$$

其中:

  • $\mathcal{C}$:容器化层,提供应用隔离与可移植性
  • $\mathcal{O}$:编排层,管理容器生命周期与调度
  • $\mathcal{M}$:服务网格层,处理服务间通信
  • $\mathcal{S}$:可观测性层,提供监控、日志、追踪
  • $\mathcal{R}$:弹性层,实现容错与自愈

定义 30.2(弹性):系统弹性 $E$ 定义为系统在故障情况下的可用性保持能力:

$$E = \frac{MTBF}{MTBF + MTTR}$$

其中:

  • $MTBF$:平均故障间隔时间
  • $MTTR$:平均恢复时间

定义 30.3(可用性):系统可用性 $A$ 表示为:

$$A = 1 - \frac{\sum_{i=1}^{n} downtime_i}{\sum_{i=1}^{n} (uptime_i + downtime_i)}$$

30.1.2 云原生架构层次

┌─────────────────────────────────────────────────────────────────────────┐
│                        云原生架构层次模型                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                      应用层 (Application)                        │   │
│  │    微服务 │ Serverless │ 事件驱动 │ API网关                      │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                    │                                    │
│                                    ▼                                    │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                    服务网格层 (Service Mesh)                     │   │
│  │         Istio │ Linkerd │ Envoy │ Consul Connect                │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                    │                                    │
│                                    ▼                                    │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     编排层 (Orchestration)                       │   │
│  │              Kubernetes │ Docker Swarm │ Nomad                   │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                    │                                    │
│                                    ▼                                    │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                   容器运行时 (Container Runtime)                  │   │
│  │             containerd │ CRI-O │ Docker │ Kata Containers        │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                    │                                    │
│                                    ▼                                    │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                    基础设施层 (Infrastructure)                    │   │
│  │              公有云 │ 私有云 │ 混合云 │ 边缘计算                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

30.1.3 历史背景与发展脉络

时期里程碑代表项目核心贡献
2013容器化兴起Docker容器标准化
2014容器编排Kubernetes容器编排平台
2015云原生定义CNCF云原生基金会成立
2016服务网格Linkerd, Istio服务间通信管理
2017无服务器Knative, OpenFaaSServerless框架
2018可观测性OpenTelemetry统一可观测性标准
2019GitOpsArgoCD, Flux声明式持续部署
2020多集群KubeFed, OCM多集群管理
2021+平台工程Backstage开发者门户

30.2 容器化模式

30.2.1 容器配置的形式化定义

定义 30.4(容器配置):容器配置 $\mathcal{CC}$ 定义为:

$$\mathcal{CC} = \langle I, E, R, P, V \rangle$$

其中:

  • $I$:镜像定义
  • $E$:环境变量配置
  • $R$:资源限制
  • $P$:探针配置
  • $V$:卷挂载

30.2.2 UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                           ContainerConfig                                │
├─────────────────────────────────────────────────────────────────────────┤
│ - name: str                                                              │
│ - image: str                                                             │
│ - port: int                                                              │
│ - env: Dict[str, str]                                                    │
│ - resources: ResourceLimits                                              │
│ - liveness_probe: HealthProbe                                            │
│ - readiness_probe: HealthProbe                                           │
│ - volumes: List[Volume]                                                  │
├─────────────────────────────────────────────────────────────────────────┤
│ + add_env(key: str, value: str): ContainerConfig                        │
│ + set_resources(cpu: str, memory: str): ContainerConfig                 │
│ + to_kubernetes_manifest(): Dict                                         │
│ + to_docker_compose(): Dict                                              │
└─────────────────────────────────────────────────────────────────────────┘

                                    │ uses

        ┌───────────────────────────┼───────────────────────────┐
        │                           │                           │
        ▼                           ▼                           ▼
┌───────────────┐          ┌───────────────┐          ┌───────────────┐
│ ResourceLimits │          │  HealthProbe  │          │    Volume     │
├───────────────┤          ├───────────────┤          ├───────────────┤
│ + cpu: str    │          │ + path: str   │          │ + name: str   │
│ + memory: str │          │ + port: int   │          │ + mount_path  │
│ + cpu_limit   │          │ + initial_delay│          │ + volume_type │
│ + memory_limit│          │ + period      │          └───────────────┘
└───────────────┘          └───────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                        <<Builder>>                                       │
│                        ContainerBuilder                                  │
├─────────────────────────────────────────────────────────────────────────┤
│ - config: ContainerConfig                                               │
├─────────────────────────────────────────────────────────────────────────┤
│ + with_env(key, value): ContainerBuilder                                │
│ + with_resources(cpu, memory): ContainerBuilder                         │
│ + with_liveness_probe(path): ContainerBuilder                           │
│ + with_volume(name, path): ContainerBuilder                             │
│ + build(): ContainerConfig                                              │
└─────────────────────────────────────────────────────────────────────────┘

30.2.3 容器配置完整实现

python
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum
import json

class RestartPolicy(Enum):
    ALWAYS = "Always"
    ON_FAILURE = "OnFailure"
    NEVER = "Never"

class ProbeType(Enum):
    HTTP_GET = "httpGet"
    TCP_SOCKET = "tcpSocket"
    EXEC = "exec"

@dataclass
class ResourceLimits:
    cpu_request: str = "100m"
    memory_request: str = "128Mi"
    cpu_limit: str = "500m"
    memory_limit: str = "512Mi"
    
    def to_kubernetes(self) -> Dict[str, Dict[str, str]]:
        return {
            "requests": {
                "cpu": self.cpu_request,
                "memory": self.memory_request
            },
            "limits": {
                "cpu": self.cpu_limit,
                "memory": self.memory_limit
            }
        }

@dataclass
class HealthProbe:
    probe_type: ProbeType = ProbeType.HTTP_GET
    path: str = "/health"
    port: int = 8080
    initial_delay_seconds: int = 30
    period_seconds: int = 10
    timeout_seconds: int = 5
    failure_threshold: int = 3
    success_threshold: int = 1
    
    def to_kubernetes(self) -> Dict[str, Any]:
        probe = {
            "initialDelaySeconds": self.initial_delay_seconds,
            "periodSeconds": self.period_seconds,
            "timeoutSeconds": self.timeout_seconds,
            "failureThreshold": self.failure_threshold,
            "successThreshold": self.success_threshold
        }
        
        if self.probe_type == ProbeType.HTTP_GET:
            probe["httpGet"] = {
                "path": self.path,
                "port": self.port
            }
        elif self.probe_type == ProbeType.TCP_SOCKET:
            probe["tcpSocket"] = {"port": self.port}
        
        return probe

@dataclass
class Volume:
    name: str
    mount_path: str
    volume_type: str = "emptyDir"
    read_only: bool = False
    storage_class: Optional[str] = None
    size: Optional[str] = None
    
    def to_volume_mount(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "mountPath": self.mount_path,
            "readOnly": self.read_only
        }
    
    def to_volume(self) -> Dict[str, Any]:
        volume = {"name": self.name}
        
        if self.volume_type == "emptyDir":
            volume["emptyDir"] = {}
        elif self.volume_type == "persistentVolumeClaim":
            volume["persistentVolumeClaim"] = {
                "claimName": self.name,
                "readOnly": self.read_only
            }
        elif self.volume_type == "configMap":
            volume["configMap"] = {"name": self.name}
        elif self.volume_type == "secret":
            volume["secret"] = {"secretName": self.name}
        
        return volume

@dataclass
class EnvVar:
    name: str
    value: Optional[str] = None
    value_from: Optional[Dict[str, str]] = None
    
    def to_kubernetes(self) -> Dict[str, Any]:
        env = {"name": self.name}
        if self.value is not None:
            env["value"] = self.value
        if self.value_from:
            env["valueFrom"] = self.value_from
        return env

class ContainerConfig:
    def __init__(
        self,
        name: str,
        image: str,
        port: int,
        replicas: int = 1
    ):
        self.name = name
        self.image = image
        self.port = port
        self.replicas = replicas
        self.env_vars: List[EnvVar] = []
        self.resources = ResourceLimits()
        self.liveness_probe: Optional[HealthProbe] = None
        self.readiness_probe: Optional[HealthProbe] = None
        self.volumes: List[Volume] = []
        self.restart_policy = RestartPolicy.ALWAYS
        self.labels: Dict[str, str] = {}
        self.annotations: Dict[str, str] = {}
    
    def add_env(self, name: str, value: str) -> 'ContainerConfig':
        self.env_vars.append(EnvVar(name=name, value=value))
        return self
    
    def add_env_from_configmap(self, name: str) -> 'ContainerConfig':
        self.env_vars.append(EnvVar(
            name=name,
            value_from={"configMapKeyRef": {"name": name}}
        ))
        return self
    
    def add_env_from_secret(self, name: str, key: str = None) -> 'ContainerConfig':
        self.env_vars.append(EnvVar(
            name=name,
            value_from={
                "secretKeyRef": {
                    "name": name,
                    "key": key or name
                }
            }
        ))
        return self
    
    def set_resources(
        self,
        cpu_request: str,
        memory_request: str,
        cpu_limit: str = None,
        memory_limit: str = None
    ) -> 'ContainerConfig':
        self.resources = ResourceLimits(
            cpu_request=cpu_request,
            memory_request=memory_request,
            cpu_limit=cpu_limit or cpu_request,
            memory_limit=memory_limit or memory_request
        )
        return self
    
    def set_liveness_probe(
        self,
        path: str = "/health",
        **kwargs
    ) -> 'ContainerConfig':
        self.liveness_probe = HealthProbe(
            path=path,
            port=self.port,
            **kwargs
        )
        return self
    
    def set_readiness_probe(
        self,
        path: str = "/ready",
        **kwargs
    ) -> 'ContainerConfig':
        self.readiness_probe = HealthProbe(
            path=path,
            port=self.port,
            **kwargs
        )
        return self
    
    def add_volume(
        self,
        name: str,
        mount_path: str,
        volume_type: str = "emptyDir",
        **kwargs
    ) -> 'ContainerConfig':
        self.volumes.append(Volume(
            name=name,
            mount_path=mount_path,
            volume_type=volume_type,
            **kwargs
        ))
        return self
    
    def add_label(self, key: str, value: str) -> 'ContainerConfig':
        self.labels[key] = value
        return self
    
    def to_deployment_manifest(self) -> Dict[str, Any]:
        container = {
            "name": self.name,
            "image": self.image,
            "ports": [{"containerPort": self.port}],
            "resources": self.resources.to_kubernetes(),
            "env": [e.to_kubernetes() for e in self.env_vars]
        }
        
        if self.liveness_probe:
            container["livenessProbe"] = self.liveness_probe.to_kubernetes()
        
        if self.readiness_probe:
            container["readinessProbe"] = self.readiness_probe.to_kubernetes()
        
        if self.volumes:
            container["volumeMounts"] = [v.to_volume_mount() for v in self.volumes]
        
        spec = {
            "replicas": self.replicas,
            "selector": {
                "matchLabels": {"app": self.name}
            },
            "template": {
                "metadata": {
                    "labels": {"app": self.name, **self.labels}
                },
                "spec": {
                    "containers": [container]
                }
            }
        }
        
        if self.volumes:
            spec["template"]["spec"]["volumes"] = [v.to_volume() for v in self.volumes]
        
        return {
            "apiVersion": "apps/v1",
            "kind": "Deployment",
            "metadata": {
                "name": self.name,
                "labels": self.labels
            },
            "spec": spec
        }
    
    def to_service_manifest(self, service_type: str = "ClusterIP") -> Dict[str, Any]:
        return {
            "apiVersion": "v1",
            "kind": "Service",
            "metadata": {
                "name": self.name,
                "labels": self.labels
            },
            "spec": {
                "type": service_type,
                "selector": {"app": self.name},
                "ports": [{
                    "port": 80,
                    "targetPort": self.port,
                    "protocol": "TCP"
                }]
            }
        }

class ContainerBuilder:
    def __init__(self, name: str, image: str, port: int):
        self._config = ContainerConfig(name=name, image=image, port=port)
    
    def with_replicas(self, replicas: int) -> 'ContainerBuilder':
        self._config.replicas = replicas
        return self
    
    def with_env(self, key: str, value: str) -> 'ContainerBuilder':
        self._config.add_env(key, value)
        return self
    
    def with_env_from_configmap(self, name: str) -> 'ContainerBuilder':
        self._config.add_env_from_configmap(name)
        return self
    
    def with_env_from_secret(self, name: str, key: str = None) -> 'ContainerBuilder':
        self._config.add_env_from_secret(name, key)
        return self
    
    def with_resources(
        self,
        cpu_request: str,
        memory_request: str,
        cpu_limit: str = None,
        memory_limit: str = None
    ) -> 'ContainerBuilder':
        self._config.set_resources(cpu_request, memory_request, cpu_limit, memory_limit)
        return self
    
    def with_liveness_probe(
        self,
        path: str = "/health",
        **kwargs
    ) -> 'ContainerBuilder':
        self._config.set_liveness_probe(path, **kwargs)
        return self
    
    def with_readiness_probe(
        self,
        path: str = "/ready",
        **kwargs
    ) -> 'ContainerBuilder':
        self._config.set_readiness_probe(path, **kwargs)
        return self
    
    def with_volume(
        self,
        name: str,
        mount_path: str,
        volume_type: str = "emptyDir"
    ) -> 'ContainerBuilder':
        self._config.add_volume(name, mount_path, volume_type)
        return self
    
    def with_label(self, key: str, value: str) -> 'ContainerBuilder':
        self._config.add_label(key, value)
        return self
    
    def build(self) -> ContainerConfig:
        return self._config

config = (ContainerBuilder("web-app", "python:3.11-slim", 8080)
    .with_replicas(3)
    .with_env("APP_ENV", "production")
    .with_env("LOG_LEVEL", "INFO")
    .with_env_from_configmap("app-config")
    .with_env_from_secret("db-credentials", "password")
    .with_resources("250m", "256Mi", "500m", "512Mi")
    .with_liveness_probe("/health", initial_delay_seconds=30, period_seconds=10)
    .with_readiness_probe("/ready", initial_delay_seconds=5, period_seconds=5)
    .with_volume("data", "/app/data", "persistentVolumeClaim")
    .with_label("version", "v1.0.0")
    .build())

print(json.dumps(config.to_deployment_manifest(), indent=2))

30.2.4 配置外部化模式

python
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
import os
import json
import threading
import time

class ConfigSource(ABC):
    @abstractmethod
    def get(self, key: str, default: Any = None) -> Any:
        pass
    
    @abstractmethod
    def get_all(self) -> Dict[str, Any]:
        pass
    
    @abstractmethod
    def watch(self, callback: Callable[[str, Any], None]) -> None:
        pass

class EnvironmentConfigSource(ConfigSource):
    def __init__(self, prefix: str = ""):
        self.prefix = prefix
        self._watchers: List[Callable] = []
    
    def get(self, key: str, default: Any = None) -> Any:
        env_key = f"{self.prefix}{key}".upper().replace(".", "_")
        return os.environ.get(env_key, default)
    
    def get_all(self) -> Dict[str, Any]:
        result = {}
        for key, value in os.environ.items():
            if self.prefix and key.startswith(self.prefix.upper()):
                config_key = key[len(self.prefix):].lower().replace("_", ".")
                result[config_key] = value
            elif not self.prefix:
                result[key.lower().replace("_", ".")] = value
        return result
    
    def watch(self, callback: Callable[[str, Any], None]) -> None:
        self._watchers.append(callback)

class FileConfigSource(ConfigSource):
    def __init__(self, file_path: str, watch_changes: bool = False):
        self.file_path = file_path
        self._config: Dict[str, Any] = {}
        self._watchers: List[Callable] = []
        self._last_modified: float = 0
        self._load()
        
        if watch_changes:
            self._start_watcher()
    
    def _load(self) -> None:
        try:
            with open(self.file_path, 'r') as f:
                content = f.read().strip()
                if content:
                    self._config = json.loads(content)
                    self._last_modified = os.path.getmtime(self.file_path)
        except FileNotFoundError:
            self._config = {}
    
    def get(self, key: str, default: Any = None) -> Any:
        keys = key.split(".")
        value = self._config
        for k in keys:
            if isinstance(value, dict):
                value = value.get(k)
            else:
                return default
        return value if value is not None else default
    
    def get_all(self) -> Dict[str, Any]:
        return self._config.copy()
    
    def watch(self, callback: Callable[[str, Any], None]) -> None:
        self._watchers.append(callback)
    
    def _start_watcher(self) -> None:
        def watch_loop():
            while True:
                time.sleep(1)
                try:
                    current_mtime = os.path.getmtime(self.file_path)
                    if current_mtime > self._last_modified:
                        old_config = self._config.copy()
                        self._load()
                        self._notify_changes(old_config, self._config)
                except Exception:
                    pass
        
        thread = threading.Thread(target=watch_loop, daemon=True)
        thread.start()
    
    def _notify_changes(
        self,
        old_config: Dict[str, Any],
        new_config: Dict[str, Any]
    ) -> None:
        for key in set(old_config.keys()) | set(new_config.keys()):
            old_value = old_config.get(key)
            new_value = new_config.get(key)
            if old_value != new_value:
                for watcher in self._watchers:
                    watcher(key, new_value)

class KubernetesConfigMapSource(ConfigSource):
    def __init__(self, name: str, namespace: str = "default"):
        self.name = name
        self.namespace = namespace
        self._config: Dict[str, Any] = {}
        self._watchers: List[Callable] = []
        self._load()
    
    def _load(self) -> None:
        mount_path = f"/etc/config/{self.name}"
        if os.path.exists(mount_path):
            for filename in os.listdir(mount_path):
                file_path = os.path.join(mount_path, filename)
                try:
                    with open(file_path, 'r') as f:
                        self._config[filename] = f.read().strip()
                except Exception:
                    pass
    
    def get(self, key: str, default: Any = None) -> Any:
        return self._config.get(key, default)
    
    def get_all(self) -> Dict[str, Any]:
        return self._config.copy()
    
    def watch(self, callback: Callable[[str, Any], None]) -> None:
        self._watchers.append(callback)

class KubernetesSecretSource(ConfigSource):
    def __init__(self, name: str, namespace: str = "default"):
        self.name = name
        self.namespace = namespace
        self._secrets: Dict[str, str] = {}
        self._watchers: List[Callable] = []
        self._load()
    
    def _load(self) -> None:
        mount_path = f"/etc/secrets/{self.name}"
        if os.path.exists(mount_path):
            for filename in os.listdir(mount_path):
                file_path = os.path.join(mount_path, filename)
                try:
                    with open(file_path, 'r') as f:
                        self._secrets[filename] = f.read().strip()
                except Exception:
                    pass
    
    def get(self, key: str, default: Any = None) -> Any:
        return self._secrets.get(key, default)
    
    def get_all(self) -> Dict[str, Any]:
        return self._secrets.copy()
    
    def watch(self, callback: Callable[[str, Any], None]) -> None:
        self._watchers.append(callback)

@dataclass
class PrioritySource:
    priority: int
    source: ConfigSource

class CloudConfig:
    def __init__(self):
        self._sources: List[PrioritySource] = []
        self._cache: Dict[str, Any] = {}
        self._watchers: List[Callable[[str, Any], None]] = []
        self._lock = threading.RLock()
    
    def add_source(self, source: ConfigSource, priority: int = 0) -> 'CloudConfig':
        with self._lock:
            self._sources.append(PrioritySource(priority=priority, source=source))
            self._sources.sort(key=lambda x: x.priority, reverse=True)
            source.watch(self._on_config_change)
        return self
    
    def get(self, key: str, default: Any = None) -> Any:
        with self._lock:
            if key in self._cache:
                return self._cache[key]
            
            for ps in self._sources:
                value = ps.source.get(key)
                if value is not None:
                    self._cache[key] = value
                    return value
            
            return default
    
    def get_int(self, key: str, default: int = 0) -> int:
        value = self.get(key)
        if value is None:
            return default
        try:
            return int(value)
        except (ValueError, TypeError):
            return default
    
    def get_float(self, key: str, default: float = 0.0) -> float:
        value = self.get(key)
        if value is None:
            return default
        try:
            return float(value)
        except (ValueError, TypeError):
            return default
    
    def get_bool(self, key: str, default: bool = False) -> bool:
        value = self.get(key)
        if value is None:
            return default
        if isinstance(value, bool):
            return value
        if isinstance(value, str):
            return value.lower() in ("true", "1", "yes", "on")
        return bool(value)
    
    def get_list(self, key: str, default: List = None) -> List:
        value = self.get(key)
        if value is None:
            return default or []
        if isinstance(value, list):
            return value
        if isinstance(value, str):
            return [v.strip() for v in value.split(",")]
        return [value]
    
    def get_dict(self, key: str, default: Dict = None) -> Dict:
        value = self.get(key)
        if value is None:
            return default or {}
        if isinstance(value, dict):
            return value
        if isinstance(value, str):
            try:
                return json.loads(value)
            except json.JSONDecodeError:
                return default or {}
        return default or {}
    
    def watch(self, callback: Callable[[str, Any], None]) -> None:
        with self._lock:
            self._watchers.append(callback)
    
    def refresh(self) -> None:
        with self._lock:
            self._cache.clear()
    
    def _on_config_change(self, key: str, value: Any) -> None:
        with self._lock:
            self._cache.pop(key, None)
            for watcher in self._watchers:
                try:
                    watcher(key, value)
                except Exception:
                    pass
    
    def get_all(self) -> Dict[str, Any]:
        result = {}
        with self._lock:
            for ps in reversed(self._sources):
                result.update(ps.source.get_all())
        return result

config = CloudConfig()
config.add_source(EnvironmentConfigSource("APP_"), priority=100)
config.add_source(FileConfigSource("/app/config.json"), priority=50)
config.add_source(KubernetesConfigMapSource("app-config"), priority=30)
config.add_source(KubernetesSecretSource("app-secrets"), priority=10)

print(f"数据库URL: {config.get('database.url', 'localhost:5432')}")
print(f"调试模式: {config.get_bool('debug', False)}")
print(f"最大连接数: {config.get_int('max.connections', 100)}")

30.3 服务网格模式

30.3.1 服务网格的形式化定义

定义 30.5(服务网格):服务网格 $\mathcal{SM}$ 是一个四元组:

$$\mathcal{SM} = \langle \mathcal{P}, \mathcal{T}, \mathcal{S}, \mathcal{O} \rangle$$

其中:

  • $\mathcal{P}$:代理层(Sidecar),处理服务间通信
  • $\mathcal{T}$:流量管理,包括路由、负载均衡、故障注入
  • $\mathcal{S}$:安全层,提供mTLS、认证、授权
  • $\mathcal{O}$:可观测性,包括指标、日志、追踪

30.3.2 UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                           ServiceMesh                                    │
├─────────────────────────────────────────────────────────────────────────┤
│ - proxies: Dict[str, SidecarProxy]                                      │
│ - mtls_enabled: bool                                                    │
│ - tracing_enabled: bool                                                 │
├─────────────────────────────────────────────────────────────────────────┤
│ + register_service(name: str): SidecarProxy                             │
│ + get_proxy(name: str): SidecarProxy                                    │
│ + enable_mtls(): void                                                   │
│ + enable_tracing(): void                                                │
└─────────────────────────────────────────────────────────────────────────┘

                                    │ manages


┌─────────────────────────────────────────────────────────────────────────┐
│                          SidecarProxy                                    │
├─────────────────────────────────────────────────────────────────────────┤
│ - service_name: str                                                     │
│ - endpoints: List[ServiceEndpoint]                                      │
│ - traffic_policy: TrafficPolicy                                         │
│ - retry_policy: RetryPolicy                                             │
│ - circuit_breaker: CircuitBreakerPolicy                                 │
│ - stats: Dict[str, int]                                                 │
├─────────────────────────────────────────────────────────────────────────┤
│ + add_endpoint(host: str, port: int): void                              │
│ + select_endpoint(): ServiceEndpoint                                    │
│ + request(method: str, path: str): Dict                                 │
│ + get_stats(): Dict                                                     │
└─────────────────────────────────────────────────────────────────────────┘

30.3.3 Sidecar代理实现

python
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
import random
from abc import ABC, abstractmethod

class TrafficPolicy(Enum):
    RANDOM = "random"
    ROUND_ROBIN = "round_robin"
    LEAST_CONNECTIONS = "least_connections"
    WEIGHTED = "weighted"

@dataclass
class ServiceEndpoint:
    host: str
    port: int
    weight: int = 100
    healthy: bool = True
    connections: int = 0
    zone: str = "default"
    
    def __hash__(self):
        return hash((self.host, self.port))

@dataclass
class RetryPolicy:
    max_retries: int = 3
    retry_on: List[str] = field(default_factory=lambda: ["5xx", "reset"])
    per_try_timeout: float = 1.0
    backoff_base: float = 0.5
    
    def calculate_delay(self, attempt: int) -> float:
        return self.backoff_base * (2 ** attempt)

@dataclass
class TimeoutPolicy:
    connect_timeout: float = 5.0
    request_timeout: float = 30.0
    idle_timeout: float = 300.0

@dataclass
class CircuitBreakerPolicy:
    max_connections: int = 100
    max_pending_requests: int = 100
    max_requests: int = 100
    sleep_window: float = 30.0
    error_threshold_percentage: float = 50.0
    request_volume_threshold: int = 20

class LoadBalancer(ABC):
    @abstractmethod
    def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
        pass

class RoundRobinLoadBalancer(LoadBalancer):
    def __init__(self):
        self._index = 0
        self._lock = threading.Lock()
    
    def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
        healthy = [e for e in endpoints if e.healthy]
        if not healthy:
            return None
        
        with self._lock:
            endpoint = healthy[self._index % len(healthy)]
            self._index += 1
        return endpoint

class RandomLoadBalancer(LoadBalancer):
    def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
        healthy = [e for e in endpoints if e.healthy]
        if not healthy:
            return None
        return random.choice(healthy)

class LeastConnectionsLoadBalancer(LoadBalancer):
    def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
        healthy = [e for e in endpoints if e.healthy]
        if not healthy:
            return None
        return min(healthy, key=lambda e: e.connections)

class WeightedLoadBalancer(LoadBalancer):
    def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
        healthy = [e for e in endpoints if e.healthy]
        if not healthy:
            return None
        
        total_weight = sum(e.weight for e in healthy)
        r = random.randint(1, total_weight)
        current = 0
        
        for endpoint in healthy:
            current += endpoint.weight
            if r <= current:
                return endpoint
        
        return healthy[0]

class SidecarProxy:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.endpoints: List[ServiceEndpoint] = []
        self.traffic_policy = TrafficPolicy.ROUND_ROBIN
        self.retry_policy = RetryPolicy()
        self.timeout_policy = TimeoutPolicy()
        self.circuit_breaker_policy = CircuitBreakerPolicy()
        
        self._load_balancer: LoadBalancer = RoundRobinLoadBalancer()
        self._stats = {
            "requests": 0,
            "successes": 0,
            "failures": 0,
            "retries": 0,
            "timeouts": 0,
            "circuit_opens": 0
        }
        self._circuit_open = False
        self._circuit_open_time = 0.0
        self._failure_count = 0
        self._success_count = 0
        self._lock = threading.RLock()
    
    def add_endpoint(
        self,
        host: str,
        port: int,
        weight: int = 100,
        zone: str = "default"
    ) -> None:
        with self._lock:
            self.endpoints.append(ServiceEndpoint(
                host=host,
                port=port,
                weight=weight,
                zone=zone
            ))
    
    def remove_endpoint(self, host: str, port: int) -> None:
        with self._lock:
            self.endpoints = [
                e for e in self.endpoints
                if not (e.host == host and e.port == port)
            ]
    
    def set_traffic_policy(self, policy: TrafficPolicy) -> None:
        self.traffic_policy = policy
        if policy == TrafficPolicy.ROUND_ROBIN:
            self._load_balancer = RoundRobinLoadBalancer()
        elif policy == TrafficPolicy.RANDOM:
            self._load_balancer = RandomLoadBalancer()
        elif policy == TrafficPolicy.LEAST_CONNECTIONS:
            self._load_balancer = LeastConnectionsLoadBalancer()
        elif policy == TrafficPolicy.WEIGHTED:
            self._load_balancer = WeightedLoadBalancer()
    
    def select_endpoint(self) -> Optional[ServiceEndpoint]:
        with self._lock:
            return self._load_balancer.select(self.endpoints)
    
    def request(
        self,
        method: str,
        path: str,
        headers: Dict = None,
        body: Any = None
    ) -> Dict:
        if self._is_circuit_open():
            self._stats["circuit_opens"] += 1
            return {"error": "Circuit breaker open", "status": 503}
        
        endpoint = self.select_endpoint()
        if not endpoint:
            return {"error": "No available endpoints", "status": 503}
        
        endpoint.connections += 1
        self._stats["requests"] += 1
        
        try:
            result = self._execute_with_retry(
                endpoint, method, path, headers, body
            )
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            return {"error": str(e), "status": 500}
        finally:
            endpoint.connections -= 1
    
    def _execute_with_retry(
        self,
        endpoint: ServiceEndpoint,
        method: str,
        path: str,
        headers: Dict,
        body: Any
    ) -> Dict:
        last_error = None
        
        for attempt in range(self.retry_policy.max_retries + 1):
            try:
                return self._send_request(
                    endpoint, method, path, headers, body
                )
            except Exception as e:
                last_error = e
                if attempt < self.retry_policy.max_retries:
                    self._stats["retries"] += 1
                    delay = self.retry_policy.calculate_delay(attempt)
                    time.sleep(delay)
        
        raise last_error
    
    def _send_request(
        self,
        endpoint: ServiceEndpoint,
        method: str,
        path: str,
        headers: Dict,
        body: Any
    ) -> Dict:
        print(f"[{self.service_name}] {method} http://{endpoint.host}:{endpoint.port}{path}")
        return {"status": 200, "body": {"success": True}}
    
    def _is_circuit_open(self) -> bool:
        with self._lock:
            if not self._circuit_open:
                return False
            
            if time.time() - self._circuit_open_time >= self.circuit_breaker_policy.sleep_window:
                self._circuit_open = False
                self._failure_count = 0
                self._success_count = 0
                return False
            
            return True
    
    def _record_success(self) -> None:
        with self._lock:
            self._stats["successes"] += 1
            self._success_count += 1
    
    def _record_failure(self) -> None:
        with self._lock:
            self._stats["failures"] += 1
            self._failure_count += 1
            
            total = self._failure_count + self._success_count
            if total >= self.circuit_breaker_policy.request_volume_threshold:
                error_rate = (self._failure_count / total) * 100
                if error_rate >= self.circuit_breaker_policy.error_threshold_percentage:
                    self._circuit_open = True
                    self._circuit_open_time = time.time()
    
    def get_stats(self) -> Dict:
        with self._lock:
            return {
                **self._stats,
                "endpoints": len(self.endpoints),
                "healthy_endpoints": sum(1 for e in self.endpoints if e.healthy),
                "circuit_open": self._circuit_open
            }

class ServiceMesh:
    def __init__(self):
        self.proxies: Dict[str, SidecarProxy] = {}
        self.mtls_enabled: bool = False
        self.tracing_enabled: bool = True
        self._lock = threading.RLock()
    
    def register_service(self, service_name: str) -> SidecarProxy:
        with self._lock:
            proxy = SidecarProxy(service_name)
            self.proxies[service_name] = proxy
            return proxy
    
    def get_proxy(self, service_name: str) -> Optional[SidecarProxy]:
        return self.proxies.get(service_name)
    
    def enable_mtls(self) -> None:
        self.mtls_enabled = True
        print("mTLS enabled for all services")
    
    def enable_tracing(self) -> None:
        self.tracing_enabled = True
        print("Distributed tracing enabled")
    
    def get_all_stats(self) -> Dict[str, Dict]:
        return {
            name: proxy.get_stats()
            for name, proxy in self.proxies.items()
        }

mesh = ServiceMesh()

user_proxy = mesh.register_service("user-service")
user_proxy.add_endpoint("10.0.0.1", 8080, weight=70)
user_proxy.add_endpoint("10.0.0.2", 8080, weight=30)

order_proxy = mesh.register_service("order-service")
order_proxy.add_endpoint("10.0.1.1", 8080)
order_proxy.add_endpoint("10.0.1.2", 8080)

mesh.enable_mtls()
mesh.enable_tracing()

result = user_proxy.request("GET", "/users/123")
print(f"Result: {result}")
print(f"Stats: {user_proxy.get_stats()}")

30.4 可观测性模式

30.4.1 可观测性的形式化定义

定义 30.6(可观测性):系统可观测性 $\mathcal{O}$ 定义为:

$$\mathcal{O} = \langle \mathcal{M}, \mathcal{L}, \mathcal{T} \rangle$$

其中:

  • $\mathcal{M}$:指标(Metrics),量化系统状态
  • $\mathcal{L}$:日志(Logs),记录离散事件
  • $\mathcal{T}$:追踪(Traces),跟踪请求路径

定义 30.7(追踪):追踪 $\mathcal{T}_r$ 定义为:

$$\mathcal{T}_r = \langle trace_id, {span_1, span_2, ..., span_n} \rangle$$

其中每个 $span_i$ 包含:

  • $span_id$:唯一标识
  • $parent_span_id$:父Span标识
  • $operation_name$:操作名称
  • $start_time, end_time$:时间戳

30.4.2 分布式追踪实现

python
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from contextlib import contextmanager
from datetime import datetime
import time
import uuid
import json

@dataclass
class SpanContext:
    trace_id: str
    span_id: str
    parent_span_id: Optional[str] = None
    baggage: Dict[str, str] = field(default_factory=dict)
    
    def to_headers(self) -> Dict[str, str]:
        headers = {
            "x-trace-id": self.trace_id,
            "x-span-id": self.span_id,
        }
        if self.parent_span_id:
            headers["x-parent-span-id"] = self.parent_span_id
        for key, value in self.baggage.items():
            headers[f"x-baggage-{key}"] = value
        return headers
    
    @classmethod
    def from_headers(cls, headers: Dict[str, str]) -> 'SpanContext':
        trace_id = headers.get("x-trace-id", str(uuid.uuid4()).replace("-", ""))
        span_id = headers.get("x-span-id", str(uuid.uuid4()).replace("-", "")[:16])
        parent_span_id = headers.get("x-parent-span-id")
        
        baggage = {}
        for key, value in headers.items():
            if key.startswith("x-baggage-"):
                baggage_key = key[len("x-baggage-"):]
                baggage[baggage_key] = value
        
        return cls(
            trace_id=trace_id,
            span_id=span_id,
            parent_span_id=parent_span_id,
            baggage=baggage
        )

@dataclass
class Span:
    context: SpanContext
    operation_name: str
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    tags: Dict[str, Any] = field(default_factory=dict)
    logs: List[Dict[str, Any]] = field(default_factory=list)
    status: str = "OK"
    
    def finish(self) -> None:
        self.end_time = time.time()
    
    def set_tag(self, key: str, value: Any) -> 'Span':
        self.tags[key] = value
        return self
    
    def log(self, message: str, **kwargs) -> 'Span':
        self.logs.append({
            "timestamp": time.time(),
            "message": message,
            **kwargs
        })
        return self
    
    def set_error(self, error: Exception) -> 'Span':
        self.status = "ERROR"
        self.set_tag("error", True)
        self.set_tag("error.type", type(error).__name__)
        self.log(str(error), error_stack=str(error))
        return self
    
    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "traceId": self.context.trace_id,
            "spanId": self.context.span_id,
            "parentSpanId": self.context.parent_span_id,
            "operationName": self.operation_name,
            "startTime": self.start_time,
            "endTime": self.end_time,
            "duration": self.duration_ms,
            "tags": self.tags,
            "logs": self.logs,
            "status": self.status
        }

class Tracer:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self._spans: List[Span] = []
        self._current_span: Optional[Span] = None
        self._lock = threading.RLock()
    
    def start_span(
        self,
        operation_name: str,
        parent: Optional[Span] = None,
        context: Optional[SpanContext] = None
    ) -> Span:
        if context:
            span_context = SpanContext(
                trace_id=context.trace_id,
                span_id=str(uuid.uuid4()).replace("-", "")[:16],
                parent_span_id=context.span_id,
                baggage=context.baggage.copy()
            )
        elif parent:
            span_context = SpanContext(
                trace_id=parent.context.trace_id,
                span_id=str(uuid.uuid4()).replace("-", "")[:16],
                parent_span_id=parent.context.span_id,
                baggage=parent.context.baggage.copy()
            )
        else:
            span_context = SpanContext(
                trace_id=str(uuid.uuid4()).replace("-", ""),
                span_id=str(uuid.uuid4()).replace("-", "")[:16]
            )
        
        span = Span(
            context=span_context,
            operation_name=operation_name
        )
        span.set_tag("service", self.service_name)
        
        with self._lock:
            self._spans.append(span)
        
        return span
    
    @contextmanager
    def trace(
        self,
        operation_name: str,
        parent: Optional[Span] = None,
        context: Optional[SpanContext] = None
    ):
        span = self.start_span(operation_name, parent, context)
        try:
            yield span
        except Exception as e:
            span.set_error(e)
            raise
        finally:
            span.finish()
    
    def get_trace(self, trace_id: str) -> List[Span]:
        with self._lock:
            return [s for s in self._spans if s.context.trace_id == trace_id]
    
    def export_traces(self) -> List[Dict]:
        with self._lock:
            return [s.to_dict() for s in self._spans]
    
    def clear(self) -> None:
        with self._lock:
            self._spans.clear()

tracer = Tracer("order-service")

with tracer.trace("process_order") as parent_span:
    parent_span.set_tag("order_id", "ORD-12345")
    parent_span.log("Starting order processing")
    
    with tracer.trace("validate_payment", parent=parent_span) as payment_span:
        payment_span.set_tag("payment_method", "credit_card")
        time.sleep(0.1)
        payment_span.log("Payment validated")
    
    with tracer.trace("update_inventory", parent=parent_span) as inventory_span:
        inventory_span.set_tag("warehouse", "WH-001")
        time.sleep(0.05)
        inventory_span.log("Inventory updated")
    
    parent_span.log("Order processing completed")

print(f"Trace export: {json.dumps(tracer.export_traces(), indent=2)}")

30.4.3 指标收集实现

python
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
from collections import defaultdict

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"
    SUMMARY = "summary"

class Counter:
    def __init__(self, name: str, description: str = ""):
        self.name = name
        self.description = description
        self._value: float = 0.0
        self._labels: Dict[str, Dict[str, float]] = defaultdict(lambda: 0.0)
        self._lock = threading.Lock()
    
    def inc(self, amount: float = 1.0, labels: Dict[str, str] = None) -> None:
        with self._lock:
            if labels:
                label_key = self._labels_to_key(labels)
                self._labels[label_key] += amount
            else:
                self._value += amount
    
    def get(self, labels: Dict[str, str] = None) -> float:
        with self._lock:
            if labels:
                label_key = self._labels_to_key(labels)
                return self._labels.get(label_key, 0.0)
            return self._value
    
    def _labels_to_key(self, labels: Dict[str, str]) -> str:
        return ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
    
    def export_prometheus(self) -> List[str]:
        lines = []
        lines.append(f"# TYPE {self.name} counter")
        if self.description:
            lines.append(f"# HELP {self.name} {self.description}")
        
        with self._lock:
            if self._value != 0:
                lines.append(f"{self.name} {self._value}")
            
            for label_key, value in self._labels.items():
                lines.append(f"{self.name}{{{label_key}}} {value}")
        
        return lines

class Gauge:
    def __init__(self, name: str, description: str = ""):
        self.name = name
        self.description = description
        self._value: float = 0.0
        self._labels: Dict[str, Dict[str, float]] = defaultdict(lambda: 0.0)
        self._lock = threading.Lock()
    
    def set(self, value: float, labels: Dict[str, str] = None) -> None:
        with self._lock:
            if labels:
                label_key = self._labels_to_key(labels)
                self._labels[label_key] = value
            else:
                self._value = value
    
    def inc(self, amount: float = 1.0, labels: Dict[str, str] = None) -> None:
        with self._lock:
            if labels:
                label_key = self._labels_to_key(labels)
                self._labels[label_key] += amount
            else:
                self._value += amount
    
    def dec(self, amount: float = 1.0, labels: Dict[str, str] = None) -> None:
        with self._lock:
            if labels:
                label_key = self._labels_to_key(labels)
                self._labels[label_key] -= amount
            else:
                self._value -= amount
    
    def get(self, labels: Dict[str, str] = None) -> float:
        with self._lock:
            if labels:
                label_key = self._labels_to_key(labels)
                return self._labels.get(label_key, 0.0)
            return self._value
    
    def _labels_to_key(self, labels: Dict[str, str]) -> str:
        return ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
    
    def export_prometheus(self) -> List[str]:
        lines = []
        lines.append(f"# TYPE {self.name} gauge")
        if self.description:
            lines.append(f"# HELP {self.name} {self.description}")
        
        with self._lock:
            lines.append(f"{self.name} {self._value}")
            for label_key, value in self._labels.items():
                lines.append(f"{self.name}{{{label_key}}} {value}")
        
        return lines

class Histogram:
    DEFAULT_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
    
    def __init__(
        self,
        name: str,
        buckets: List[float] = None,
        description: str = ""
    ):
        self.name = name
        self.description = description
        self.buckets = sorted(buckets or self.DEFAULT_BUCKETS)
        self._counts: Dict[float, int] = {b: 0 for b in self.buckets}
        self._counts["+Inf"] = 0
        self._sum: float = 0.0
        self._count: int = 0
        self._lock = threading.Lock()
    
    def observe(self, value: float) -> None:
        with self._lock:
            self._sum += value
            self._count += 1
            
            for bucket in self.buckets:
                if value <= bucket:
                    self._counts[bucket] += 1
            self._counts["+Inf"] += 1
    
    def get(self) -> Dict[str, Any]:
        with self._lock:
            return {
                "buckets": dict(self._counts),
                "sum": self._sum,
                "count": self._count
            }
    
    def export_prometheus(self) -> List[str]:
        lines = []
        lines.append(f"# TYPE {self.name} histogram")
        if self.description:
            lines.append(f"# HELP {self.name} {self.description}")
        
        with self._lock:
            for bucket, count in self._counts.items():
                bucket_label = f'le="{bucket}"' if bucket != "+Inf" else 'le="+Inf"'
                lines.append(f"{self.name}_bucket{{{bucket_label}}} {count}")
            lines.append(f"{self.name}_sum {self._sum}")
            lines.append(f"{self.name}_count {self._count}")
        
        return lines

class MetricsRegistry:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._metrics: Dict[str, Any] = {}
            cls._instance._lock = threading.Lock()
        return cls._instance
    
    def register_counter(
        self,
        name: str,
        description: str = ""
    ) -> Counter:
        with self._lock:
            if name not in self._metrics:
                self._metrics[name] = Counter(name, description)
            return self._metrics[name]
    
    def register_gauge(
        self,
        name: str,
        description: str = ""
    ) -> Gauge:
        with self._lock:
            if name not in self._metrics:
                self._metrics[name] = Gauge(name, description)
            return self._metrics[name]
    
    def register_histogram(
        self,
        name: str,
        buckets: List[float] = None,
        description: str = ""
    ) -> Histogram:
        with self._lock:
            if name not in self._metrics:
                self._metrics[name] = Histogram(name, buckets, description)
            return self._metrics[name]
    
    def get_metric(self, name: str) -> Optional[Any]:
        return self._metrics.get(name)
    
    def export_prometheus(self) -> str:
        lines = []
        for metric in self._metrics.values():
            lines.extend(metric.export_prometheus())
        return "\n".join(lines)

registry = MetricsRegistry()

request_counter = registry.register_counter(
    "http_requests_total",
    "Total number of HTTP requests"
)

active_connections = registry.register_gauge(
    "active_connections",
    "Number of active connections"
)

request_duration = registry.register_histogram(
    "http_request_duration_seconds",
    description="HTTP request duration in seconds"
)

request_counter.inc(labels={"method": "GET", "path": "/api/users"})
request_counter.inc(labels={"method": "POST", "path": "/api/orders"})
active_connections.set(10)
request_duration.observe(0.123)
request_duration.observe(0.456)

print(registry.export_prometheus())

30.5 弹性模式

30.5.1 熔断器模式

定义 30.8(熔断器状态):熔断器状态机定义为:

$$CircuitBreaker = \langle S, \delta, s_0, F \rangle$$

其中:

  • $S = {CLOSED, OPEN, HALF_OPEN}$:状态集合
  • $\delta$:状态转移函数
  • $s_0 = CLOSED$:初始状态
  • $F = {OPEN}$:失败状态

状态转移规则:

  • $CLOSED \xrightarrow{failures > threshold} OPEN$
  • $OPEN \xrightarrow{timeout} HALF_OPEN$
  • $HALF_OPEN \xrightarrow{success} CLOSED$
  • $HALF_OPEN \xrightarrow{failure} OPEN$
python
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitStats:
    successes: int = 0
    failures: int = 0
    last_failure_time: float = 0
    last_success_time: float = 0
    total_requests: int = 0

class CircuitBreaker:
    def __init__(
        self,
        name: str,
        failure_threshold: int = 5,
        success_threshold: int = 3,
        timeout: float = 30.0,
        error_rate_threshold: float = 50.0,
        request_volume_threshold: int = 10
    ):
        self.name = name
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timeout
        self.error_rate_threshold = error_rate_threshold
        self.request_volume_threshold = request_volume_threshold
        
        self.state = CircuitState.CLOSED
        self.stats = CircuitStats()
        self._half_open_successes = 0
        self._lock = threading.RLock()
        self._listeners: List[Callable] = []
    
    def add_listener(self, listener: Callable) -> None:
        self._listeners.append(listener)
    
    def allow_request(self) -> bool:
        with self._lock:
            if self.state == CircuitState.CLOSED:
                return True
            
            if self.state == CircuitState.OPEN:
                if time.time() - self.stats.last_failure_time >= self.timeout:
                    self._transition_to(CircuitState.HALF_OPEN)
                    return True
                return False
            
            return True
    
    def record_success(self) -> None:
        with self._lock:
            self.stats.successes += 1
            self.stats.total_requests += 1
            self.stats.last_success_time = time.time()
            
            if self.state == CircuitState.HALF_OPEN:
                self._half_open_successes += 1
                if self._half_open_successes >= self.success_threshold:
                    self._transition_to(CircuitState.CLOSED)
    
    def record_failure(self) -> None:
        with self._lock:
            self.stats.failures += 1
            self.stats.total_requests += 1
            self.stats.last_failure_time = time.time()
            
            if self.state == CircuitState.HALF_OPEN:
                self._transition_to(CircuitState.OPEN)
            elif self.stats.total_requests >= self.request_volume_threshold:
                error_rate = (self.stats.failures / self.stats.total_requests) * 100
                if error_rate >= self.error_rate_threshold:
                    self._transition_to(CircuitState.OPEN)
                elif self.stats.failures >= self.failure_threshold:
                    self._transition_to(CircuitState.OPEN)
    
    def _transition_to(self, new_state: CircuitState) -> None:
        old_state = self.state
        self.state = new_state
        
        if new_state == CircuitState.CLOSED:
            self.stats = CircuitStats()
            self._half_open_successes = 0
        elif new_state == CircuitState.HALF_OPEN:
            self._half_open_successes = 0
        
        for listener in self._listeners:
            try:
                listener(self.name, old_state, new_state)
            except Exception:
                pass
    
    def get_state(self) -> Dict[str, Any]:
        with self._lock:
            return {
                "name": self.name,
                "state": self.state.value,
                "successes": self.stats.successes,
                "failures": self.stats.failures,
                "total_requests": self.stats.total_requests,
                "last_failure_time": self.stats.last_failure_time
            }
    
    def force_open(self) -> None:
        with self._lock:
            self._transition_to(CircuitState.OPEN)
    
    def force_close(self) -> None:
        with self._lock:
            self._transition_to(CircuitState.CLOSED)

class ResilientClient:
    def __init__(self):
        self.circuits: Dict[str, CircuitBreaker] = {}
        self.fallback_handlers: Dict[str, Callable] = {}
        self._lock = threading.Lock()
    
    def register_circuit(
        self,
        name: str,
        **kwargs
    ) -> CircuitBreaker:
        with self._lock:
            circuit = CircuitBreaker(name, **kwargs)
            self.circuits[name] = circuit
            return circuit
    
    def set_fallback(self, name: str, handler: Callable) -> None:
        self.fallback_handlers[name] = handler
    
    def execute(
        self,
        service_name: str,
        operation: Callable,
        *args,
        **kwargs
    ) -> Any:
        circuit = self.circuits.get(service_name)
        
        if circuit and not circuit.allow_request():
            fallback = self.fallback_handlers.get(service_name)
            if fallback:
                return fallback(*args, **kwargs)
            raise Exception(f"Circuit breaker open: {service_name}")
        
        try:
            result = operation(*args, **kwargs)
            if circuit:
                circuit.record_success()
            return result
        except Exception as e:
            if circuit:
                circuit.record_failure()
            raise e
    
    def circuit_breaker(
        self,
        service_name: str,
        **kwargs
    ) -> Callable:
        def decorator(func: Callable) -> Callable:
            if service_name not in self.circuits:
                self.register_circuit(service_name, **kwargs)
            
            @wraps(func)
            def wrapper(*args, **kw):
                return self.execute(service_name, func, *args, **kw)
            return wrapper
        return decorator

client = ResilientClient()
circuit = client.register_circuit(
    "payment-service",
    failure_threshold=3,
    timeout=10.0
)

def fallback_payment(*args, **kwargs):
    return {"status": "fallback", "message": "Payment service unavailable"}

client.set_fallback("payment-service", fallback_payment)

@client.circuit_breaker("order-service", failure_threshold=5)
def process_order(order_id: str):
    print(f"Processing order: {order_id}")
    return {"status": "success", "order_id": order_id}

30.5.2 重试模式

python
from typing import Callable, Any, List, Type, Tuple
from dataclasses import dataclass
from enum import Enum
import time
import random
from functools import wraps

class BackoffStrategy(Enum):
    FIXED = "fixed"
    LINEAR = "linear"
    EXPONENTIAL = "exponential"
    EXPONENTIAL_JITTER = "exponential_jitter"

@dataclass
class RetryConfig:
    max_retries: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL_JITTER
    retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
    jitter_factor: float = 0.5
    
    def calculate_delay(self, attempt: int) -> float:
        if self.backoff_strategy == BackoffStrategy.FIXED:
            delay = self.initial_delay
        
        elif self.backoff_strategy == BackoffStrategy.LINEAR:
            delay = self.initial_delay * (attempt + 1)
        
        elif self.backoff_strategy == BackoffStrategy.EXPONENTIAL:
            delay = self.initial_delay * (2 ** attempt)
        
        elif self.backoff_strategy == BackoffStrategy.EXPONENTIAL_JITTER:
            base_delay = self.initial_delay * (2 ** attempt)
            jitter = random.uniform(0, self.jitter_factor * base_delay)
            delay = base_delay + jitter
        
        return min(delay, self.max_delay)

class RetryExecutor:
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self._stats = {
            "total_attempts": 0,
            "successful_retries": 0,
            "failed_after_retries": 0
        }
    
    def execute(
        self,
        operation: Callable,
        *args,
        **kwargs
    ) -> Any:
        last_exception = None
        
        for attempt in range(self.config.max_retries + 1):
            self._stats["total_attempts"] += 1
            
            try:
                return operation(*args, **kwargs)
            except self.config.retryable_exceptions as e:
                last_exception = e
                
                if attempt < self.config.max_retries:
                    delay = self.config.calculate_delay(attempt)
                    print(f"Retry {attempt + 1}/{self.config.max_retries}, waiting {delay:.2f}s")
                    time.sleep(delay)
        
        self._stats["failed_after_retries"] += 1
        raise last_exception
    
    def get_stats(self) -> Dict[str, int]:
        return self._stats.copy()

def retry(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL_JITTER,
    retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
) -> Callable:
    config = RetryConfig(
        max_retries=max_retries,
        initial_delay=initial_delay,
        backoff_strategy=backoff_strategy,
        retryable_exceptions=retryable_exceptions
    )
    executor = RetryExecutor(config)
    
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            return executor.execute(func, *args, **kwargs)
        wrapper._retry_executor = executor
        return wrapper
    
    return decorator

@retry(
    max_retries=3,
    initial_delay=1.0,
    backoff_strategy=BackoffStrategy.EXPONENTIAL_JITTER,
    retryable_exceptions=(ConnectionError, TimeoutError)
)
def call_external_api(url: str) -> Dict:
    print(f"Calling API: {url}")
    if random.random() < 0.5:
        raise ConnectionError("Connection failed")
    return {"status": "ok"}

30.5.3 限流模式

python
from typing import Dict, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import time
import threading
from collections import deque

class RateLimitAlgorithm(Enum):
    TOKEN_BUCKET = "token_bucket"
    LEAKY_BUCKET = "leaky_bucket"
    SLIDING_WINDOW = "sliding_window"
    FIXED_WINDOW = "fixed_window"

@dataclass
class RateLimitConfig:
    requests_per_second: float = 10.0
    burst_size: int = 20
    algorithm: RateLimitAlgorithm = RateLimitAlgorithm.TOKEN_BUCKET

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.tokens = float(capacity)
        self.last_refill = time.time()
        self._lock = threading.Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        with self._lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def _refill(self) -> None:
        now = time.time()
        elapsed = now - self.last_refill
        refill = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + refill)
        self.last_refill = now
    
    def get_tokens(self) -> float:
        with self._lock:
            self._refill()
            return self.tokens

class SlidingWindowCounter:
    def __init__(self, window_size: float, max_requests: int):
        self.window_size = window_size
        self.max_requests = max_requests
        self.requests: deque = deque()
        self._lock = threading.Lock()
    
    def is_allowed(self) -> bool:
        with self._lock:
            now = time.time()
            window_start = now - self.window_size
            
            while self.requests and self.requests[0] <= window_start:
                self.requests.popleft()
            
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            
            return False
    
    def get_count(self) -> int:
        with self._lock:
            now = time.time()
            window_start = now - self.window_size
            return sum(1 for t in self.requests if t > window_start)

class RateLimiter:
    def __init__(self, config: RateLimitConfig = None):
        self.config = config or RateLimitConfig()
        self._limiters: Dict[str, Any] = {}
        self._lock = threading.Lock()
    
    def is_allowed(self, key: str = "default") -> bool:
        with self._lock:
            if key not in self._limiters:
                self._limiters[key] = self._create_limiter()
            
            limiter = self._limiters[key]
            if hasattr(limiter, 'is_allowed'):
                return limiter.is_allowed()
            return limiter.consume()
    
    def _create_limiter(self) -> Any:
        if self.config.algorithm == RateLimitAlgorithm.TOKEN_BUCKET:
            return TokenBucket(
                rate=self.config.requests_per_second,
                capacity=self.config.burst_size
            )
        elif self.config.algorithm == RateLimitAlgorithm.SLIDING_WINDOW:
            return SlidingWindowCounter(
                window_size=1.0,
                max_requests=self.config.burst_size
            )
        return TokenBucket(
            rate=self.config.requests_per_second,
            capacity=self.config.burst_size
        )
    
    def get_stats(self, key: str = "default") -> Dict[str, Any]:
        with self._lock:
            if key not in self._limiters:
                return {"key": key, "status": "not_found"}
            
            limiter = self._limiters[key]
            if isinstance(limiter, TokenBucket):
                return {
                    "key": key,
                    "algorithm": "token_bucket",
                    "tokens": limiter.get_tokens(),
                    "capacity": limiter.capacity,
                    "rate": limiter.rate
                }
            elif isinstance(limiter, SlidingWindowCounter):
                return {
                    "key": key,
                    "algorithm": "sliding_window",
                    "current_count": limiter.get_count(),
                    "max_requests": limiter.max_requests
                }
            
            return {"key": key, "status": "unknown"}

def rate_limit(
    requests_per_second: float = 10.0,
    burst_size: int = 20,
    key_func: Callable = None
) -> Callable:
    config = RateLimitConfig(
        requests_per_second=requests_per_second,
        burst_size=burst_size
    )
    limiter = RateLimiter(config)
    
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = "default"
            if key_func:
                key = key_func(*args, **kwargs)
            
            if not limiter.is_allowed(key):
                raise Exception(f"Rate limit exceeded for key: {key}")
            
            return func(*args, **kwargs)
        
        wrapper._rate_limiter = limiter
        return wrapper
    
    return decorator

@rate_limit(requests_per_second=5.0, burst_size=10)
def api_endpoint(user_id: str) -> Dict:
    return {"status": "ok", "user_id": user_id}

30.6 反模式与最佳实践

30.6.1 常见反模式

反模式描述后果解决方案
配置硬编码配置写入代码难以修改,环境迁移困难使用ConfigMap/Secret
日志丢失不输出结构化日志问题排查困难使用结构化日志
无健康检查缺少探针配置无法自动恢复配置liveness/readiness探针
无限重试重试无上限资源耗尽设置重试上限和退避
单点故障无副本部署可用性低多副本+反亲和性
忽略资源限制不设置资源限制资源竞争,OOM设置requests和limits

30.6.2 最佳实践清单

python
class CloudNativeBestPractices:
    CONTAINER_RULES = [
        "每个容器运行单一进程",
        "使用不可变基础设施",
        "最小化镜像大小",
        "不使用root用户运行",
        "正确处理信号(SIGTERM)",
        "健康检查端点必须轻量"
    ]
    
    OBSERVABILITY_RULES = [
        "结构化日志输出",
        "追踪ID贯穿请求链路",
        "关键业务指标监控",
        "告警规则明确可执行",
        "日志不包含敏感信息"
    ]
    
    RESILIENCE_RULES = [
        "假设故障会发生",
        "快速失败优于长时间等待",
        "优雅降级核心功能",
        "隔离故障防止级联",
        "重试必须有上限和退避",
        "熔断器保护下游服务"
    ]
    
    SECURITY_RULES = [
        "最小权限原则",
        "Secret不进入代码仓库",
        "启用mTLS",
        "网络策略限制流量",
        "定期扫描镜像漏洞"
    ]

30.7 决策指南

30.7.1 部署策略选择

                    ┌─────────────────────────────────────┐
                    │      是否需要零停机部署?            │
                    └─────────────────────────────────────┘

                    ┌───────────────┴───────────────┐
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │     否        │               │     是        │
            └───────────────┘               └───────────────┘
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │  滚动更新      │               │ 是否需要快速回滚?│
            └───────────────┘               └───────────────┘

                                    ┌───────────────┴───────────────┐
                                    │                               │
                                    ▼                               ▼
                            ┌───────────────┐               ┌───────────────┐
                            │ 蓝绿部署      │               │ 金丝雀发布    │
                            │ (Blue-Green)  │               │ (Canary)      │
                            └───────────────┘               └───────────────┘

30.7.2 弹性模式选择

场景推荐模式说明
外部服务调用熔断器 + 重试防止级联故障
API限流令牌桶/滑动窗口保护后端服务
数据库连接连接池 + 超时资源复用
消息处理死信队列 + 重试保证可靠性
批量操作批量隔离 + 超时防止阻塞

30.8 快速参考卡片

30.8.1 核心概念速查

┌─────────────────────────────────────────────────────────────────────────┐
│                        云原生核心概念速查表                              │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  容器化原则                                                              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ 1. 单一进程原则                                                  │   │
│  │ 2. 不可变基础设施                                                │   │
│  │ 3. 资源限制设置                                                  │   │
│  │ 4. 健康检查配置                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  可观测性三支柱                                                          │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ 指标(Metrics): Prometheus, Grafana                              │   │
│  │ 日志(Logs): ELK, Loki                                           │   │
│  │ 追踪(Traces): Jaeger, Zipkin                                    │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  弹性模式                                                                │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ 熔断器: CLOSED → OPEN → HALF_OPEN → CLOSED                      │   │
│  │ 重试: 指数退避 + 抖动                                            │   │
│  │ 限流: 令牌桶 / 滑动窗口                                          │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

30.8.2 Kubernetes资源速查

python
KUBERNETES_RESOURCES = {
    "workloads": ["Deployment", "StatefulSet", "DaemonSet", "Job", "CronJob"],
    "services": ["Service", "Ingress", "Gateway"],
    "config": ["ConfigMap", "Secret"],
    "storage": ["PersistentVolume", "PersistentVolumeClaim", "StorageClass"],
    "network": ["NetworkPolicy", "ServiceEntry"],
    "rbac": ["Role", "RoleBinding", "ClusterRole", "ClusterRoleBinding"],
}

PROBE_CONFIGURATION = {
    "liveness": {
        "purpose": "检测死锁/僵尸进程",
        "failure_action": "重启容器",
        "recommended_path": "/health",
    },
    "readiness": {
        "purpose": "检测服务就绪",
        "failure_action": "从Service移除",
        "recommended_path": "/ready",
    },
    "startup": {
        "purpose": "慢启动应用保护",
        "failure_action": "重启容器",
        "recommended_for": "启动时间>30s的应用",
    }
}

30.9 小结

30.9.1 核心要点

  1. 容器化:实现应用的可移植性和隔离性
  2. 服务网格:统一处理服务间通信
  3. 可观测性:指标、日志、追踪三位一体
  4. 弹性设计:熔断、重试、限流保护系统
  5. 配置外部化:ConfigMap/Secret管理配置
  6. 健康检查:liveness/readiness探针保障可用性

30.9.2 适用场景

适用不适用
微服务架构单体应用
高可用要求简单应用
弹性伸缩需求固定资源
DevOps实践传统运维
多环境部署单一环境

30.9.3 实施建议

  1. 渐进式采用:从容器化开始,逐步引入服务网格
  2. 可观测性优先:先建立监控,再优化系统
  3. 弹性设计内置:在开发阶段考虑故障场景
  4. 自动化一切:CI/CD、测试、部署全面自动化
  5. 安全左移:在开发阶段考虑安全需求

Python技术丛书 - 江苏省宿城中等专业学校