第30章 云原生设计模式
学习目标
完成本章学习后,读者将能够:
- 理解云原生架构的核心原则与形式化定义
- 掌握容器化、服务网格、可观测性等关键模式
- 实现熔断器、限流器、重试等弹性模式
- 应用云原生模式构建高可用分布式系统
30.1 理论基础与形式化定义
30.1.1 云原生的形式化定义
定义 30.1(云原生系统):云原生系统 $\mathcal{CN}$ 是一个五元组:
$$\mathcal{CN} = \langle \mathcal{C}, \mathcal{O}, \mathcal{M}, \mathcal{S}, \mathcal{R} \rangle$$
其中:
- $\mathcal{C}$:容器化层,提供应用隔离与可移植性
- $\mathcal{O}$:编排层,管理容器生命周期与调度
- $\mathcal{M}$:服务网格层,处理服务间通信
- $\mathcal{S}$:可观测性层,提供监控、日志、追踪
- $\mathcal{R}$:弹性层,实现容错与自愈
定义 30.2(弹性):系统弹性 $E$ 定义为系统在故障情况下的可用性保持能力:
$$E = \frac{MTBF}{MTBF + MTTR}$$
其中:
- $MTBF$:平均故障间隔时间
- $MTTR$:平均恢复时间
定义 30.3(可用性):系统可用性 $A$ 表示为:
$$A = 1 - \frac{\sum_{i=1}^{n} downtime_i}{\sum_{i=1}^{n} (uptime_i + downtime_i)}$$
30.1.2 云原生架构层次
┌─────────────────────────────────────────────────────────────────────────┐
│ 云原生架构层次模型 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 应用层 (Application) │ │
│ │ 微服务 │ Serverless │ 事件驱动 │ API网关 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 服务网格层 (Service Mesh) │ │
│ │ Istio │ Linkerd │ Envoy │ Consul Connect │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 编排层 (Orchestration) │ │
│ │ Kubernetes │ Docker Swarm │ Nomad │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 容器运行时 (Container Runtime) │ │
│ │ containerd │ CRI-O │ Docker │ Kata Containers │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 基础设施层 (Infrastructure) │ │
│ │ 公有云 │ 私有云 │ 混合云 │ 边缘计算 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘30.1.3 历史背景与发展脉络
| 时期 | 里程碑 | 代表项目 | 核心贡献 |
|---|---|---|---|
| 2013 | 容器化兴起 | Docker | 容器标准化 |
| 2014 | 容器编排 | Kubernetes | 容器编排平台 |
| 2015 | 云原生定义 | CNCF | 云原生基金会成立 |
| 2016 | 服务网格 | Linkerd, Istio | 服务间通信管理 |
| 2017 | 无服务器 | Knative, OpenFaaS | Serverless框架 |
| 2018 | 可观测性 | OpenTelemetry | 统一可观测性标准 |
| 2019 | GitOps | ArgoCD, Flux | 声明式持续部署 |
| 2020 | 多集群 | KubeFed, OCM | 多集群管理 |
| 2021+ | 平台工程 | Backstage | 开发者门户 |
30.2 容器化模式
30.2.1 容器配置的形式化定义
定义 30.4(容器配置):容器配置 $\mathcal{CC}$ 定义为:
$$\mathcal{CC} = \langle I, E, R, P, V \rangle$$
其中:
- $I$:镜像定义
- $E$:环境变量配置
- $R$:资源限制
- $P$:探针配置
- $V$:卷挂载
30.2.2 UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ ContainerConfig │
├─────────────────────────────────────────────────────────────────────────┤
│ - name: str │
│ - image: str │
│ - port: int │
│ - env: Dict[str, str] │
│ - resources: ResourceLimits │
│ - liveness_probe: HealthProbe │
│ - readiness_probe: HealthProbe │
│ - volumes: List[Volume] │
├─────────────────────────────────────────────────────────────────────────┤
│ + add_env(key: str, value: str): ContainerConfig │
│ + set_resources(cpu: str, memory: str): ContainerConfig │
│ + to_kubernetes_manifest(): Dict │
│ + to_docker_compose(): Dict │
└─────────────────────────────────────────────────────────────────────────┘
│
│ uses
│
┌───────────────────────────┼───────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ ResourceLimits │ │ HealthProbe │ │ Volume │
├───────────────┤ ├───────────────┤ ├───────────────┤
│ + cpu: str │ │ + path: str │ │ + name: str │
│ + memory: str │ │ + port: int │ │ + mount_path │
│ + cpu_limit │ │ + initial_delay│ │ + volume_type │
│ + memory_limit│ │ + period │ └───────────────┘
└───────────────┘ └───────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<Builder>> │
│ ContainerBuilder │
├─────────────────────────────────────────────────────────────────────────┤
│ - config: ContainerConfig │
├─────────────────────────────────────────────────────────────────────────┤
│ + with_env(key, value): ContainerBuilder │
│ + with_resources(cpu, memory): ContainerBuilder │
│ + with_liveness_probe(path): ContainerBuilder │
│ + with_volume(name, path): ContainerBuilder │
│ + build(): ContainerConfig │
└─────────────────────────────────────────────────────────────────────────┘30.2.3 容器配置完整实现
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from enum import Enum
import json
class RestartPolicy(Enum):
ALWAYS = "Always"
ON_FAILURE = "OnFailure"
NEVER = "Never"
class ProbeType(Enum):
HTTP_GET = "httpGet"
TCP_SOCKET = "tcpSocket"
EXEC = "exec"
@dataclass
class ResourceLimits:
cpu_request: str = "100m"
memory_request: str = "128Mi"
cpu_limit: str = "500m"
memory_limit: str = "512Mi"
def to_kubernetes(self) -> Dict[str, Dict[str, str]]:
return {
"requests": {
"cpu": self.cpu_request,
"memory": self.memory_request
},
"limits": {
"cpu": self.cpu_limit,
"memory": self.memory_limit
}
}
@dataclass
class HealthProbe:
probe_type: ProbeType = ProbeType.HTTP_GET
path: str = "/health"
port: int = 8080
initial_delay_seconds: int = 30
period_seconds: int = 10
timeout_seconds: int = 5
failure_threshold: int = 3
success_threshold: int = 1
def to_kubernetes(self) -> Dict[str, Any]:
probe = {
"initialDelaySeconds": self.initial_delay_seconds,
"periodSeconds": self.period_seconds,
"timeoutSeconds": self.timeout_seconds,
"failureThreshold": self.failure_threshold,
"successThreshold": self.success_threshold
}
if self.probe_type == ProbeType.HTTP_GET:
probe["httpGet"] = {
"path": self.path,
"port": self.port
}
elif self.probe_type == ProbeType.TCP_SOCKET:
probe["tcpSocket"] = {"port": self.port}
return probe
@dataclass
class Volume:
name: str
mount_path: str
volume_type: str = "emptyDir"
read_only: bool = False
storage_class: Optional[str] = None
size: Optional[str] = None
def to_volume_mount(self) -> Dict[str, Any]:
return {
"name": self.name,
"mountPath": self.mount_path,
"readOnly": self.read_only
}
def to_volume(self) -> Dict[str, Any]:
volume = {"name": self.name}
if self.volume_type == "emptyDir":
volume["emptyDir"] = {}
elif self.volume_type == "persistentVolumeClaim":
volume["persistentVolumeClaim"] = {
"claimName": self.name,
"readOnly": self.read_only
}
elif self.volume_type == "configMap":
volume["configMap"] = {"name": self.name}
elif self.volume_type == "secret":
volume["secret"] = {"secretName": self.name}
return volume
@dataclass
class EnvVar:
name: str
value: Optional[str] = None
value_from: Optional[Dict[str, str]] = None
def to_kubernetes(self) -> Dict[str, Any]:
env = {"name": self.name}
if self.value is not None:
env["value"] = self.value
if self.value_from:
env["valueFrom"] = self.value_from
return env
class ContainerConfig:
def __init__(
self,
name: str,
image: str,
port: int,
replicas: int = 1
):
self.name = name
self.image = image
self.port = port
self.replicas = replicas
self.env_vars: List[EnvVar] = []
self.resources = ResourceLimits()
self.liveness_probe: Optional[HealthProbe] = None
self.readiness_probe: Optional[HealthProbe] = None
self.volumes: List[Volume] = []
self.restart_policy = RestartPolicy.ALWAYS
self.labels: Dict[str, str] = {}
self.annotations: Dict[str, str] = {}
def add_env(self, name: str, value: str) -> 'ContainerConfig':
self.env_vars.append(EnvVar(name=name, value=value))
return self
def add_env_from_configmap(self, name: str) -> 'ContainerConfig':
self.env_vars.append(EnvVar(
name=name,
value_from={"configMapKeyRef": {"name": name}}
))
return self
def add_env_from_secret(self, name: str, key: str = None) -> 'ContainerConfig':
self.env_vars.append(EnvVar(
name=name,
value_from={
"secretKeyRef": {
"name": name,
"key": key or name
}
}
))
return self
def set_resources(
self,
cpu_request: str,
memory_request: str,
cpu_limit: str = None,
memory_limit: str = None
) -> 'ContainerConfig':
self.resources = ResourceLimits(
cpu_request=cpu_request,
memory_request=memory_request,
cpu_limit=cpu_limit or cpu_request,
memory_limit=memory_limit or memory_request
)
return self
def set_liveness_probe(
self,
path: str = "/health",
**kwargs
) -> 'ContainerConfig':
self.liveness_probe = HealthProbe(
path=path,
port=self.port,
**kwargs
)
return self
def set_readiness_probe(
self,
path: str = "/ready",
**kwargs
) -> 'ContainerConfig':
self.readiness_probe = HealthProbe(
path=path,
port=self.port,
**kwargs
)
return self
def add_volume(
self,
name: str,
mount_path: str,
volume_type: str = "emptyDir",
**kwargs
) -> 'ContainerConfig':
self.volumes.append(Volume(
name=name,
mount_path=mount_path,
volume_type=volume_type,
**kwargs
))
return self
def add_label(self, key: str, value: str) -> 'ContainerConfig':
self.labels[key] = value
return self
def to_deployment_manifest(self) -> Dict[str, Any]:
container = {
"name": self.name,
"image": self.image,
"ports": [{"containerPort": self.port}],
"resources": self.resources.to_kubernetes(),
"env": [e.to_kubernetes() for e in self.env_vars]
}
if self.liveness_probe:
container["livenessProbe"] = self.liveness_probe.to_kubernetes()
if self.readiness_probe:
container["readinessProbe"] = self.readiness_probe.to_kubernetes()
if self.volumes:
container["volumeMounts"] = [v.to_volume_mount() for v in self.volumes]
spec = {
"replicas": self.replicas,
"selector": {
"matchLabels": {"app": self.name}
},
"template": {
"metadata": {
"labels": {"app": self.name, **self.labels}
},
"spec": {
"containers": [container]
}
}
}
if self.volumes:
spec["template"]["spec"]["volumes"] = [v.to_volume() for v in self.volumes]
return {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": self.name,
"labels": self.labels
},
"spec": spec
}
def to_service_manifest(self, service_type: str = "ClusterIP") -> Dict[str, Any]:
return {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": self.name,
"labels": self.labels
},
"spec": {
"type": service_type,
"selector": {"app": self.name},
"ports": [{
"port": 80,
"targetPort": self.port,
"protocol": "TCP"
}]
}
}
class ContainerBuilder:
def __init__(self, name: str, image: str, port: int):
self._config = ContainerConfig(name=name, image=image, port=port)
def with_replicas(self, replicas: int) -> 'ContainerBuilder':
self._config.replicas = replicas
return self
def with_env(self, key: str, value: str) -> 'ContainerBuilder':
self._config.add_env(key, value)
return self
def with_env_from_configmap(self, name: str) -> 'ContainerBuilder':
self._config.add_env_from_configmap(name)
return self
def with_env_from_secret(self, name: str, key: str = None) -> 'ContainerBuilder':
self._config.add_env_from_secret(name, key)
return self
def with_resources(
self,
cpu_request: str,
memory_request: str,
cpu_limit: str = None,
memory_limit: str = None
) -> 'ContainerBuilder':
self._config.set_resources(cpu_request, memory_request, cpu_limit, memory_limit)
return self
def with_liveness_probe(
self,
path: str = "/health",
**kwargs
) -> 'ContainerBuilder':
self._config.set_liveness_probe(path, **kwargs)
return self
def with_readiness_probe(
self,
path: str = "/ready",
**kwargs
) -> 'ContainerBuilder':
self._config.set_readiness_probe(path, **kwargs)
return self
def with_volume(
self,
name: str,
mount_path: str,
volume_type: str = "emptyDir"
) -> 'ContainerBuilder':
self._config.add_volume(name, mount_path, volume_type)
return self
def with_label(self, key: str, value: str) -> 'ContainerBuilder':
self._config.add_label(key, value)
return self
def build(self) -> ContainerConfig:
return self._config
config = (ContainerBuilder("web-app", "python:3.11-slim", 8080)
.with_replicas(3)
.with_env("APP_ENV", "production")
.with_env("LOG_LEVEL", "INFO")
.with_env_from_configmap("app-config")
.with_env_from_secret("db-credentials", "password")
.with_resources("250m", "256Mi", "500m", "512Mi")
.with_liveness_probe("/health", initial_delay_seconds=30, period_seconds=10)
.with_readiness_probe("/ready", initial_delay_seconds=5, period_seconds=5)
.with_volume("data", "/app/data", "persistentVolumeClaim")
.with_label("version", "v1.0.0")
.build())
print(json.dumps(config.to_deployment_manifest(), indent=2))30.2.4 配置外部化模式
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
import os
import json
import threading
import time
class ConfigSource(ABC):
@abstractmethod
def get(self, key: str, default: Any = None) -> Any:
pass
@abstractmethod
def get_all(self) -> Dict[str, Any]:
pass
@abstractmethod
def watch(self, callback: Callable[[str, Any], None]) -> None:
pass
class EnvironmentConfigSource(ConfigSource):
def __init__(self, prefix: str = ""):
self.prefix = prefix
self._watchers: List[Callable] = []
def get(self, key: str, default: Any = None) -> Any:
env_key = f"{self.prefix}{key}".upper().replace(".", "_")
return os.environ.get(env_key, default)
def get_all(self) -> Dict[str, Any]:
result = {}
for key, value in os.environ.items():
if self.prefix and key.startswith(self.prefix.upper()):
config_key = key[len(self.prefix):].lower().replace("_", ".")
result[config_key] = value
elif not self.prefix:
result[key.lower().replace("_", ".")] = value
return result
def watch(self, callback: Callable[[str, Any], None]) -> None:
self._watchers.append(callback)
class FileConfigSource(ConfigSource):
def __init__(self, file_path: str, watch_changes: bool = False):
self.file_path = file_path
self._config: Dict[str, Any] = {}
self._watchers: List[Callable] = []
self._last_modified: float = 0
self._load()
if watch_changes:
self._start_watcher()
def _load(self) -> None:
try:
with open(self.file_path, 'r') as f:
content = f.read().strip()
if content:
self._config = json.loads(content)
self._last_modified = os.path.getmtime(self.file_path)
except FileNotFoundError:
self._config = {}
def get(self, key: str, default: Any = None) -> Any:
keys = key.split(".")
value = self._config
for k in keys:
if isinstance(value, dict):
value = value.get(k)
else:
return default
return value if value is not None else default
def get_all(self) -> Dict[str, Any]:
return self._config.copy()
def watch(self, callback: Callable[[str, Any], None]) -> None:
self._watchers.append(callback)
def _start_watcher(self) -> None:
def watch_loop():
while True:
time.sleep(1)
try:
current_mtime = os.path.getmtime(self.file_path)
if current_mtime > self._last_modified:
old_config = self._config.copy()
self._load()
self._notify_changes(old_config, self._config)
except Exception:
pass
thread = threading.Thread(target=watch_loop, daemon=True)
thread.start()
def _notify_changes(
self,
old_config: Dict[str, Any],
new_config: Dict[str, Any]
) -> None:
for key in set(old_config.keys()) | set(new_config.keys()):
old_value = old_config.get(key)
new_value = new_config.get(key)
if old_value != new_value:
for watcher in self._watchers:
watcher(key, new_value)
class KubernetesConfigMapSource(ConfigSource):
def __init__(self, name: str, namespace: str = "default"):
self.name = name
self.namespace = namespace
self._config: Dict[str, Any] = {}
self._watchers: List[Callable] = []
self._load()
def _load(self) -> None:
mount_path = f"/etc/config/{self.name}"
if os.path.exists(mount_path):
for filename in os.listdir(mount_path):
file_path = os.path.join(mount_path, filename)
try:
with open(file_path, 'r') as f:
self._config[filename] = f.read().strip()
except Exception:
pass
def get(self, key: str, default: Any = None) -> Any:
return self._config.get(key, default)
def get_all(self) -> Dict[str, Any]:
return self._config.copy()
def watch(self, callback: Callable[[str, Any], None]) -> None:
self._watchers.append(callback)
class KubernetesSecretSource(ConfigSource):
def __init__(self, name: str, namespace: str = "default"):
self.name = name
self.namespace = namespace
self._secrets: Dict[str, str] = {}
self._watchers: List[Callable] = []
self._load()
def _load(self) -> None:
mount_path = f"/etc/secrets/{self.name}"
if os.path.exists(mount_path):
for filename in os.listdir(mount_path):
file_path = os.path.join(mount_path, filename)
try:
with open(file_path, 'r') as f:
self._secrets[filename] = f.read().strip()
except Exception:
pass
def get(self, key: str, default: Any = None) -> Any:
return self._secrets.get(key, default)
def get_all(self) -> Dict[str, Any]:
return self._secrets.copy()
def watch(self, callback: Callable[[str, Any], None]) -> None:
self._watchers.append(callback)
@dataclass
class PrioritySource:
priority: int
source: ConfigSource
class CloudConfig:
def __init__(self):
self._sources: List[PrioritySource] = []
self._cache: Dict[str, Any] = {}
self._watchers: List[Callable[[str, Any], None]] = []
self._lock = threading.RLock()
def add_source(self, source: ConfigSource, priority: int = 0) -> 'CloudConfig':
with self._lock:
self._sources.append(PrioritySource(priority=priority, source=source))
self._sources.sort(key=lambda x: x.priority, reverse=True)
source.watch(self._on_config_change)
return self
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
if key in self._cache:
return self._cache[key]
for ps in self._sources:
value = ps.source.get(key)
if value is not None:
self._cache[key] = value
return value
return default
def get_int(self, key: str, default: int = 0) -> int:
value = self.get(key)
if value is None:
return default
try:
return int(value)
except (ValueError, TypeError):
return default
def get_float(self, key: str, default: float = 0.0) -> float:
value = self.get(key)
if value is None:
return default
try:
return float(value)
except (ValueError, TypeError):
return default
def get_bool(self, key: str, default: bool = False) -> bool:
value = self.get(key)
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ("true", "1", "yes", "on")
return bool(value)
def get_list(self, key: str, default: List = None) -> List:
value = self.get(key)
if value is None:
return default or []
if isinstance(value, list):
return value
if isinstance(value, str):
return [v.strip() for v in value.split(",")]
return [value]
def get_dict(self, key: str, default: Dict = None) -> Dict:
value = self.get(key)
if value is None:
return default or {}
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return default or {}
return default or {}
def watch(self, callback: Callable[[str, Any], None]) -> None:
with self._lock:
self._watchers.append(callback)
def refresh(self) -> None:
with self._lock:
self._cache.clear()
def _on_config_change(self, key: str, value: Any) -> None:
with self._lock:
self._cache.pop(key, None)
for watcher in self._watchers:
try:
watcher(key, value)
except Exception:
pass
def get_all(self) -> Dict[str, Any]:
result = {}
with self._lock:
for ps in reversed(self._sources):
result.update(ps.source.get_all())
return result
config = CloudConfig()
config.add_source(EnvironmentConfigSource("APP_"), priority=100)
config.add_source(FileConfigSource("/app/config.json"), priority=50)
config.add_source(KubernetesConfigMapSource("app-config"), priority=30)
config.add_source(KubernetesSecretSource("app-secrets"), priority=10)
print(f"数据库URL: {config.get('database.url', 'localhost:5432')}")
print(f"调试模式: {config.get_bool('debug', False)}")
print(f"最大连接数: {config.get_int('max.connections', 100)}")30.3 服务网格模式
30.3.1 服务网格的形式化定义
定义 30.5(服务网格):服务网格 $\mathcal{SM}$ 是一个四元组:
$$\mathcal{SM} = \langle \mathcal{P}, \mathcal{T}, \mathcal{S}, \mathcal{O} \rangle$$
其中:
- $\mathcal{P}$:代理层(Sidecar),处理服务间通信
- $\mathcal{T}$:流量管理,包括路由、负载均衡、故障注入
- $\mathcal{S}$:安全层,提供mTLS、认证、授权
- $\mathcal{O}$:可观测性,包括指标、日志、追踪
30.3.2 UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ ServiceMesh │
├─────────────────────────────────────────────────────────────────────────┤
│ - proxies: Dict[str, SidecarProxy] │
│ - mtls_enabled: bool │
│ - tracing_enabled: bool │
├─────────────────────────────────────────────────────────────────────────┤
│ + register_service(name: str): SidecarProxy │
│ + get_proxy(name: str): SidecarProxy │
│ + enable_mtls(): void │
│ + enable_tracing(): void │
└─────────────────────────────────────────────────────────────────────────┘
│
│ manages
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ SidecarProxy │
├─────────────────────────────────────────────────────────────────────────┤
│ - service_name: str │
│ - endpoints: List[ServiceEndpoint] │
│ - traffic_policy: TrafficPolicy │
│ - retry_policy: RetryPolicy │
│ - circuit_breaker: CircuitBreakerPolicy │
│ - stats: Dict[str, int] │
├─────────────────────────────────────────────────────────────────────────┤
│ + add_endpoint(host: str, port: int): void │
│ + select_endpoint(): ServiceEndpoint │
│ + request(method: str, path: str): Dict │
│ + get_stats(): Dict │
└─────────────────────────────────────────────────────────────────────────┘30.3.3 Sidecar代理实现
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
import random
from abc import ABC, abstractmethod
class TrafficPolicy(Enum):
RANDOM = "random"
ROUND_ROBIN = "round_robin"
LEAST_CONNECTIONS = "least_connections"
WEIGHTED = "weighted"
@dataclass
class ServiceEndpoint:
host: str
port: int
weight: int = 100
healthy: bool = True
connections: int = 0
zone: str = "default"
def __hash__(self):
return hash((self.host, self.port))
@dataclass
class RetryPolicy:
max_retries: int = 3
retry_on: List[str] = field(default_factory=lambda: ["5xx", "reset"])
per_try_timeout: float = 1.0
backoff_base: float = 0.5
def calculate_delay(self, attempt: int) -> float:
return self.backoff_base * (2 ** attempt)
@dataclass
class TimeoutPolicy:
connect_timeout: float = 5.0
request_timeout: float = 30.0
idle_timeout: float = 300.0
@dataclass
class CircuitBreakerPolicy:
max_connections: int = 100
max_pending_requests: int = 100
max_requests: int = 100
sleep_window: float = 30.0
error_threshold_percentage: float = 50.0
request_volume_threshold: int = 20
class LoadBalancer(ABC):
@abstractmethod
def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
pass
class RoundRobinLoadBalancer(LoadBalancer):
def __init__(self):
self._index = 0
self._lock = threading.Lock()
def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
healthy = [e for e in endpoints if e.healthy]
if not healthy:
return None
with self._lock:
endpoint = healthy[self._index % len(healthy)]
self._index += 1
return endpoint
class RandomLoadBalancer(LoadBalancer):
def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
healthy = [e for e in endpoints if e.healthy]
if not healthy:
return None
return random.choice(healthy)
class LeastConnectionsLoadBalancer(LoadBalancer):
def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
healthy = [e for e in endpoints if e.healthy]
if not healthy:
return None
return min(healthy, key=lambda e: e.connections)
class WeightedLoadBalancer(LoadBalancer):
def select(self, endpoints: List[ServiceEndpoint]) -> Optional[ServiceEndpoint]:
healthy = [e for e in endpoints if e.healthy]
if not healthy:
return None
total_weight = sum(e.weight for e in healthy)
r = random.randint(1, total_weight)
current = 0
for endpoint in healthy:
current += endpoint.weight
if r <= current:
return endpoint
return healthy[0]
class SidecarProxy:
def __init__(self, service_name: str):
self.service_name = service_name
self.endpoints: List[ServiceEndpoint] = []
self.traffic_policy = TrafficPolicy.ROUND_ROBIN
self.retry_policy = RetryPolicy()
self.timeout_policy = TimeoutPolicy()
self.circuit_breaker_policy = CircuitBreakerPolicy()
self._load_balancer: LoadBalancer = RoundRobinLoadBalancer()
self._stats = {
"requests": 0,
"successes": 0,
"failures": 0,
"retries": 0,
"timeouts": 0,
"circuit_opens": 0
}
self._circuit_open = False
self._circuit_open_time = 0.0
self._failure_count = 0
self._success_count = 0
self._lock = threading.RLock()
def add_endpoint(
self,
host: str,
port: int,
weight: int = 100,
zone: str = "default"
) -> None:
with self._lock:
self.endpoints.append(ServiceEndpoint(
host=host,
port=port,
weight=weight,
zone=zone
))
def remove_endpoint(self, host: str, port: int) -> None:
with self._lock:
self.endpoints = [
e for e in self.endpoints
if not (e.host == host and e.port == port)
]
def set_traffic_policy(self, policy: TrafficPolicy) -> None:
self.traffic_policy = policy
if policy == TrafficPolicy.ROUND_ROBIN:
self._load_balancer = RoundRobinLoadBalancer()
elif policy == TrafficPolicy.RANDOM:
self._load_balancer = RandomLoadBalancer()
elif policy == TrafficPolicy.LEAST_CONNECTIONS:
self._load_balancer = LeastConnectionsLoadBalancer()
elif policy == TrafficPolicy.WEIGHTED:
self._load_balancer = WeightedLoadBalancer()
def select_endpoint(self) -> Optional[ServiceEndpoint]:
with self._lock:
return self._load_balancer.select(self.endpoints)
def request(
self,
method: str,
path: str,
headers: Dict = None,
body: Any = None
) -> Dict:
if self._is_circuit_open():
self._stats["circuit_opens"] += 1
return {"error": "Circuit breaker open", "status": 503}
endpoint = self.select_endpoint()
if not endpoint:
return {"error": "No available endpoints", "status": 503}
endpoint.connections += 1
self._stats["requests"] += 1
try:
result = self._execute_with_retry(
endpoint, method, path, headers, body
)
self._record_success()
return result
except Exception as e:
self._record_failure()
return {"error": str(e), "status": 500}
finally:
endpoint.connections -= 1
def _execute_with_retry(
self,
endpoint: ServiceEndpoint,
method: str,
path: str,
headers: Dict,
body: Any
) -> Dict:
last_error = None
for attempt in range(self.retry_policy.max_retries + 1):
try:
return self._send_request(
endpoint, method, path, headers, body
)
except Exception as e:
last_error = e
if attempt < self.retry_policy.max_retries:
self._stats["retries"] += 1
delay = self.retry_policy.calculate_delay(attempt)
time.sleep(delay)
raise last_error
def _send_request(
self,
endpoint: ServiceEndpoint,
method: str,
path: str,
headers: Dict,
body: Any
) -> Dict:
print(f"[{self.service_name}] {method} http://{endpoint.host}:{endpoint.port}{path}")
return {"status": 200, "body": {"success": True}}
def _is_circuit_open(self) -> bool:
with self._lock:
if not self._circuit_open:
return False
if time.time() - self._circuit_open_time >= self.circuit_breaker_policy.sleep_window:
self._circuit_open = False
self._failure_count = 0
self._success_count = 0
return False
return True
def _record_success(self) -> None:
with self._lock:
self._stats["successes"] += 1
self._success_count += 1
def _record_failure(self) -> None:
with self._lock:
self._stats["failures"] += 1
self._failure_count += 1
total = self._failure_count + self._success_count
if total >= self.circuit_breaker_policy.request_volume_threshold:
error_rate = (self._failure_count / total) * 100
if error_rate >= self.circuit_breaker_policy.error_threshold_percentage:
self._circuit_open = True
self._circuit_open_time = time.time()
def get_stats(self) -> Dict:
with self._lock:
return {
**self._stats,
"endpoints": len(self.endpoints),
"healthy_endpoints": sum(1 for e in self.endpoints if e.healthy),
"circuit_open": self._circuit_open
}
class ServiceMesh:
def __init__(self):
self.proxies: Dict[str, SidecarProxy] = {}
self.mtls_enabled: bool = False
self.tracing_enabled: bool = True
self._lock = threading.RLock()
def register_service(self, service_name: str) -> SidecarProxy:
with self._lock:
proxy = SidecarProxy(service_name)
self.proxies[service_name] = proxy
return proxy
def get_proxy(self, service_name: str) -> Optional[SidecarProxy]:
return self.proxies.get(service_name)
def enable_mtls(self) -> None:
self.mtls_enabled = True
print("mTLS enabled for all services")
def enable_tracing(self) -> None:
self.tracing_enabled = True
print("Distributed tracing enabled")
def get_all_stats(self) -> Dict[str, Dict]:
return {
name: proxy.get_stats()
for name, proxy in self.proxies.items()
}
mesh = ServiceMesh()
user_proxy = mesh.register_service("user-service")
user_proxy.add_endpoint("10.0.0.1", 8080, weight=70)
user_proxy.add_endpoint("10.0.0.2", 8080, weight=30)
order_proxy = mesh.register_service("order-service")
order_proxy.add_endpoint("10.0.1.1", 8080)
order_proxy.add_endpoint("10.0.1.2", 8080)
mesh.enable_mtls()
mesh.enable_tracing()
result = user_proxy.request("GET", "/users/123")
print(f"Result: {result}")
print(f"Stats: {user_proxy.get_stats()}")30.4 可观测性模式
30.4.1 可观测性的形式化定义
定义 30.6(可观测性):系统可观测性 $\mathcal{O}$ 定义为:
$$\mathcal{O} = \langle \mathcal{M}, \mathcal{L}, \mathcal{T} \rangle$$
其中:
- $\mathcal{M}$:指标(Metrics),量化系统状态
- $\mathcal{L}$:日志(Logs),记录离散事件
- $\mathcal{T}$:追踪(Traces),跟踪请求路径
定义 30.7(追踪):追踪 $\mathcal{T}_r$ 定义为:
$$\mathcal{T}_r = \langle trace_id, {span_1, span_2, ..., span_n} \rangle$$
其中每个 $span_i$ 包含:
- $span_id$:唯一标识
- $parent_span_id$:父Span标识
- $operation_name$:操作名称
- $start_time, end_time$:时间戳
30.4.2 分布式追踪实现
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from contextlib import contextmanager
from datetime import datetime
import time
import uuid
import json
@dataclass
class SpanContext:
trace_id: str
span_id: str
parent_span_id: Optional[str] = None
baggage: Dict[str, str] = field(default_factory=dict)
def to_headers(self) -> Dict[str, str]:
headers = {
"x-trace-id": self.trace_id,
"x-span-id": self.span_id,
}
if self.parent_span_id:
headers["x-parent-span-id"] = self.parent_span_id
for key, value in self.baggage.items():
headers[f"x-baggage-{key}"] = value
return headers
@classmethod
def from_headers(cls, headers: Dict[str, str]) -> 'SpanContext':
trace_id = headers.get("x-trace-id", str(uuid.uuid4()).replace("-", ""))
span_id = headers.get("x-span-id", str(uuid.uuid4()).replace("-", "")[:16])
parent_span_id = headers.get("x-parent-span-id")
baggage = {}
for key, value in headers.items():
if key.startswith("x-baggage-"):
baggage_key = key[len("x-baggage-"):]
baggage[baggage_key] = value
return cls(
trace_id=trace_id,
span_id=span_id,
parent_span_id=parent_span_id,
baggage=baggage
)
@dataclass
class Span:
context: SpanContext
operation_name: str
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
tags: Dict[str, Any] = field(default_factory=dict)
logs: List[Dict[str, Any]] = field(default_factory=list)
status: str = "OK"
def finish(self) -> None:
self.end_time = time.time()
def set_tag(self, key: str, value: Any) -> 'Span':
self.tags[key] = value
return self
def log(self, message: str, **kwargs) -> 'Span':
self.logs.append({
"timestamp": time.time(),
"message": message,
**kwargs
})
return self
def set_error(self, error: Exception) -> 'Span':
self.status = "ERROR"
self.set_tag("error", True)
self.set_tag("error.type", type(error).__name__)
self.log(str(error), error_stack=str(error))
return self
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
def to_dict(self) -> Dict[str, Any]:
return {
"traceId": self.context.trace_id,
"spanId": self.context.span_id,
"parentSpanId": self.context.parent_span_id,
"operationName": self.operation_name,
"startTime": self.start_time,
"endTime": self.end_time,
"duration": self.duration_ms,
"tags": self.tags,
"logs": self.logs,
"status": self.status
}
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
self._spans: List[Span] = []
self._current_span: Optional[Span] = None
self._lock = threading.RLock()
def start_span(
self,
operation_name: str,
parent: Optional[Span] = None,
context: Optional[SpanContext] = None
) -> Span:
if context:
span_context = SpanContext(
trace_id=context.trace_id,
span_id=str(uuid.uuid4()).replace("-", "")[:16],
parent_span_id=context.span_id,
baggage=context.baggage.copy()
)
elif parent:
span_context = SpanContext(
trace_id=parent.context.trace_id,
span_id=str(uuid.uuid4()).replace("-", "")[:16],
parent_span_id=parent.context.span_id,
baggage=parent.context.baggage.copy()
)
else:
span_context = SpanContext(
trace_id=str(uuid.uuid4()).replace("-", ""),
span_id=str(uuid.uuid4()).replace("-", "")[:16]
)
span = Span(
context=span_context,
operation_name=operation_name
)
span.set_tag("service", self.service_name)
with self._lock:
self._spans.append(span)
return span
@contextmanager
def trace(
self,
operation_name: str,
parent: Optional[Span] = None,
context: Optional[SpanContext] = None
):
span = self.start_span(operation_name, parent, context)
try:
yield span
except Exception as e:
span.set_error(e)
raise
finally:
span.finish()
def get_trace(self, trace_id: str) -> List[Span]:
with self._lock:
return [s for s in self._spans if s.context.trace_id == trace_id]
def export_traces(self) -> List[Dict]:
with self._lock:
return [s.to_dict() for s in self._spans]
def clear(self) -> None:
with self._lock:
self._spans.clear()
tracer = Tracer("order-service")
with tracer.trace("process_order") as parent_span:
parent_span.set_tag("order_id", "ORD-12345")
parent_span.log("Starting order processing")
with tracer.trace("validate_payment", parent=parent_span) as payment_span:
payment_span.set_tag("payment_method", "credit_card")
time.sleep(0.1)
payment_span.log("Payment validated")
with tracer.trace("update_inventory", parent=parent_span) as inventory_span:
inventory_span.set_tag("warehouse", "WH-001")
time.sleep(0.05)
inventory_span.log("Inventory updated")
parent_span.log("Order processing completed")
print(f"Trace export: {json.dumps(tracer.export_traces(), indent=2)}")30.4.3 指标收集实现
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
from collections import defaultdict
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
SUMMARY = "summary"
class Counter:
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._value: float = 0.0
self._labels: Dict[str, Dict[str, float]] = defaultdict(lambda: 0.0)
self._lock = threading.Lock()
def inc(self, amount: float = 1.0, labels: Dict[str, str] = None) -> None:
with self._lock:
if labels:
label_key = self._labels_to_key(labels)
self._labels[label_key] += amount
else:
self._value += amount
def get(self, labels: Dict[str, str] = None) -> float:
with self._lock:
if labels:
label_key = self._labels_to_key(labels)
return self._labels.get(label_key, 0.0)
return self._value
def _labels_to_key(self, labels: Dict[str, str]) -> str:
return ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
def export_prometheus(self) -> List[str]:
lines = []
lines.append(f"# TYPE {self.name} counter")
if self.description:
lines.append(f"# HELP {self.name} {self.description}")
with self._lock:
if self._value != 0:
lines.append(f"{self.name} {self._value}")
for label_key, value in self._labels.items():
lines.append(f"{self.name}{{{label_key}}} {value}")
return lines
class Gauge:
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
self._value: float = 0.0
self._labels: Dict[str, Dict[str, float]] = defaultdict(lambda: 0.0)
self._lock = threading.Lock()
def set(self, value: float, labels: Dict[str, str] = None) -> None:
with self._lock:
if labels:
label_key = self._labels_to_key(labels)
self._labels[label_key] = value
else:
self._value = value
def inc(self, amount: float = 1.0, labels: Dict[str, str] = None) -> None:
with self._lock:
if labels:
label_key = self._labels_to_key(labels)
self._labels[label_key] += amount
else:
self._value += amount
def dec(self, amount: float = 1.0, labels: Dict[str, str] = None) -> None:
with self._lock:
if labels:
label_key = self._labels_to_key(labels)
self._labels[label_key] -= amount
else:
self._value -= amount
def get(self, labels: Dict[str, str] = None) -> float:
with self._lock:
if labels:
label_key = self._labels_to_key(labels)
return self._labels.get(label_key, 0.0)
return self._value
def _labels_to_key(self, labels: Dict[str, str]) -> str:
return ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
def export_prometheus(self) -> List[str]:
lines = []
lines.append(f"# TYPE {self.name} gauge")
if self.description:
lines.append(f"# HELP {self.name} {self.description}")
with self._lock:
lines.append(f"{self.name} {self._value}")
for label_key, value in self._labels.items():
lines.append(f"{self.name}{{{label_key}}} {value}")
return lines
class Histogram:
DEFAULT_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
def __init__(
self,
name: str,
buckets: List[float] = None,
description: str = ""
):
self.name = name
self.description = description
self.buckets = sorted(buckets or self.DEFAULT_BUCKETS)
self._counts: Dict[float, int] = {b: 0 for b in self.buckets}
self._counts["+Inf"] = 0
self._sum: float = 0.0
self._count: int = 0
self._lock = threading.Lock()
def observe(self, value: float) -> None:
with self._lock:
self._sum += value
self._count += 1
for bucket in self.buckets:
if value <= bucket:
self._counts[bucket] += 1
self._counts["+Inf"] += 1
def get(self) -> Dict[str, Any]:
with self._lock:
return {
"buckets": dict(self._counts),
"sum": self._sum,
"count": self._count
}
def export_prometheus(self) -> List[str]:
lines = []
lines.append(f"# TYPE {self.name} histogram")
if self.description:
lines.append(f"# HELP {self.name} {self.description}")
with self._lock:
for bucket, count in self._counts.items():
bucket_label = f'le="{bucket}"' if bucket != "+Inf" else 'le="+Inf"'
lines.append(f"{self.name}_bucket{{{bucket_label}}} {count}")
lines.append(f"{self.name}_sum {self._sum}")
lines.append(f"{self.name}_count {self._count}")
return lines
class MetricsRegistry:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._metrics: Dict[str, Any] = {}
cls._instance._lock = threading.Lock()
return cls._instance
def register_counter(
self,
name: str,
description: str = ""
) -> Counter:
with self._lock:
if name not in self._metrics:
self._metrics[name] = Counter(name, description)
return self._metrics[name]
def register_gauge(
self,
name: str,
description: str = ""
) -> Gauge:
with self._lock:
if name not in self._metrics:
self._metrics[name] = Gauge(name, description)
return self._metrics[name]
def register_histogram(
self,
name: str,
buckets: List[float] = None,
description: str = ""
) -> Histogram:
with self._lock:
if name not in self._metrics:
self._metrics[name] = Histogram(name, buckets, description)
return self._metrics[name]
def get_metric(self, name: str) -> Optional[Any]:
return self._metrics.get(name)
def export_prometheus(self) -> str:
lines = []
for metric in self._metrics.values():
lines.extend(metric.export_prometheus())
return "\n".join(lines)
registry = MetricsRegistry()
request_counter = registry.register_counter(
"http_requests_total",
"Total number of HTTP requests"
)
active_connections = registry.register_gauge(
"active_connections",
"Number of active connections"
)
request_duration = registry.register_histogram(
"http_request_duration_seconds",
description="HTTP request duration in seconds"
)
request_counter.inc(labels={"method": "GET", "path": "/api/users"})
request_counter.inc(labels={"method": "POST", "path": "/api/orders"})
active_connections.set(10)
request_duration.observe(0.123)
request_duration.observe(0.456)
print(registry.export_prometheus())30.5 弹性模式
30.5.1 熔断器模式
定义 30.8(熔断器状态):熔断器状态机定义为:
$$CircuitBreaker = \langle S, \delta, s_0, F \rangle$$
其中:
- $S = {CLOSED, OPEN, HALF_OPEN}$:状态集合
- $\delta$:状态转移函数
- $s_0 = CLOSED$:初始状态
- $F = {OPEN}$:失败状态
状态转移规则:
- $CLOSED \xrightarrow{failures > threshold} OPEN$
- $OPEN \xrightarrow{timeout} HALF_OPEN$
- $HALF_OPEN \xrightarrow{success} CLOSED$
- $HALF_OPEN \xrightarrow{failure} OPEN$
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitStats:
successes: int = 0
failures: int = 0
last_failure_time: float = 0
last_success_time: float = 0
total_requests: int = 0
class CircuitBreaker:
def __init__(
self,
name: str,
failure_threshold: int = 5,
success_threshold: int = 3,
timeout: float = 30.0,
error_rate_threshold: float = 50.0,
request_volume_threshold: int = 10
):
self.name = name
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.error_rate_threshold = error_rate_threshold
self.request_volume_threshold = request_volume_threshold
self.state = CircuitState.CLOSED
self.stats = CircuitStats()
self._half_open_successes = 0
self._lock = threading.RLock()
self._listeners: List[Callable] = []
def add_listener(self, listener: Callable) -> None:
self._listeners.append(listener)
def allow_request(self) -> bool:
with self._lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.stats.last_failure_time >= self.timeout:
self._transition_to(CircuitState.HALF_OPEN)
return True
return False
return True
def record_success(self) -> None:
with self._lock:
self.stats.successes += 1
self.stats.total_requests += 1
self.stats.last_success_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.success_threshold:
self._transition_to(CircuitState.CLOSED)
def record_failure(self) -> None:
with self._lock:
self.stats.failures += 1
self.stats.total_requests += 1
self.stats.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self._transition_to(CircuitState.OPEN)
elif self.stats.total_requests >= self.request_volume_threshold:
error_rate = (self.stats.failures / self.stats.total_requests) * 100
if error_rate >= self.error_rate_threshold:
self._transition_to(CircuitState.OPEN)
elif self.stats.failures >= self.failure_threshold:
self._transition_to(CircuitState.OPEN)
def _transition_to(self, new_state: CircuitState) -> None:
old_state = self.state
self.state = new_state
if new_state == CircuitState.CLOSED:
self.stats = CircuitStats()
self._half_open_successes = 0
elif new_state == CircuitState.HALF_OPEN:
self._half_open_successes = 0
for listener in self._listeners:
try:
listener(self.name, old_state, new_state)
except Exception:
pass
def get_state(self) -> Dict[str, Any]:
with self._lock:
return {
"name": self.name,
"state": self.state.value,
"successes": self.stats.successes,
"failures": self.stats.failures,
"total_requests": self.stats.total_requests,
"last_failure_time": self.stats.last_failure_time
}
def force_open(self) -> None:
with self._lock:
self._transition_to(CircuitState.OPEN)
def force_close(self) -> None:
with self._lock:
self._transition_to(CircuitState.CLOSED)
class ResilientClient:
def __init__(self):
self.circuits: Dict[str, CircuitBreaker] = {}
self.fallback_handlers: Dict[str, Callable] = {}
self._lock = threading.Lock()
def register_circuit(
self,
name: str,
**kwargs
) -> CircuitBreaker:
with self._lock:
circuit = CircuitBreaker(name, **kwargs)
self.circuits[name] = circuit
return circuit
def set_fallback(self, name: str, handler: Callable) -> None:
self.fallback_handlers[name] = handler
def execute(
self,
service_name: str,
operation: Callable,
*args,
**kwargs
) -> Any:
circuit = self.circuits.get(service_name)
if circuit and not circuit.allow_request():
fallback = self.fallback_handlers.get(service_name)
if fallback:
return fallback(*args, **kwargs)
raise Exception(f"Circuit breaker open: {service_name}")
try:
result = operation(*args, **kwargs)
if circuit:
circuit.record_success()
return result
except Exception as e:
if circuit:
circuit.record_failure()
raise e
def circuit_breaker(
self,
service_name: str,
**kwargs
) -> Callable:
def decorator(func: Callable) -> Callable:
if service_name not in self.circuits:
self.register_circuit(service_name, **kwargs)
@wraps(func)
def wrapper(*args, **kw):
return self.execute(service_name, func, *args, **kw)
return wrapper
return decorator
client = ResilientClient()
circuit = client.register_circuit(
"payment-service",
failure_threshold=3,
timeout=10.0
)
def fallback_payment(*args, **kwargs):
return {"status": "fallback", "message": "Payment service unavailable"}
client.set_fallback("payment-service", fallback_payment)
@client.circuit_breaker("order-service", failure_threshold=5)
def process_order(order_id: str):
print(f"Processing order: {order_id}")
return {"status": "success", "order_id": order_id}30.5.2 重试模式
from typing import Callable, Any, List, Type, Tuple
from dataclasses import dataclass
from enum import Enum
import time
import random
from functools import wraps
class BackoffStrategy(Enum):
FIXED = "fixed"
LINEAR = "linear"
EXPONENTIAL = "exponential"
EXPONENTIAL_JITTER = "exponential_jitter"
@dataclass
class RetryConfig:
max_retries: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL_JITTER
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
jitter_factor: float = 0.5
def calculate_delay(self, attempt: int) -> float:
if self.backoff_strategy == BackoffStrategy.FIXED:
delay = self.initial_delay
elif self.backoff_strategy == BackoffStrategy.LINEAR:
delay = self.initial_delay * (attempt + 1)
elif self.backoff_strategy == BackoffStrategy.EXPONENTIAL:
delay = self.initial_delay * (2 ** attempt)
elif self.backoff_strategy == BackoffStrategy.EXPONENTIAL_JITTER:
base_delay = self.initial_delay * (2 ** attempt)
jitter = random.uniform(0, self.jitter_factor * base_delay)
delay = base_delay + jitter
return min(delay, self.max_delay)
class RetryExecutor:
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self._stats = {
"total_attempts": 0,
"successful_retries": 0,
"failed_after_retries": 0
}
def execute(
self,
operation: Callable,
*args,
**kwargs
) -> Any:
last_exception = None
for attempt in range(self.config.max_retries + 1):
self._stats["total_attempts"] += 1
try:
return operation(*args, **kwargs)
except self.config.retryable_exceptions as e:
last_exception = e
if attempt < self.config.max_retries:
delay = self.config.calculate_delay(attempt)
print(f"Retry {attempt + 1}/{self.config.max_retries}, waiting {delay:.2f}s")
time.sleep(delay)
self._stats["failed_after_retries"] += 1
raise last_exception
def get_stats(self) -> Dict[str, int]:
return self._stats.copy()
def retry(
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL_JITTER,
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
) -> Callable:
config = RetryConfig(
max_retries=max_retries,
initial_delay=initial_delay,
backoff_strategy=backoff_strategy,
retryable_exceptions=retryable_exceptions
)
executor = RetryExecutor(config)
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
return executor.execute(func, *args, **kwargs)
wrapper._retry_executor = executor
return wrapper
return decorator
@retry(
max_retries=3,
initial_delay=1.0,
backoff_strategy=BackoffStrategy.EXPONENTIAL_JITTER,
retryable_exceptions=(ConnectionError, TimeoutError)
)
def call_external_api(url: str) -> Dict:
print(f"Calling API: {url}")
if random.random() < 0.5:
raise ConnectionError("Connection failed")
return {"status": "ok"}30.5.3 限流模式
from typing import Dict, Optional, Callable
from dataclasses import dataclass
from enum import Enum
import time
import threading
from collections import deque
class RateLimitAlgorithm(Enum):
TOKEN_BUCKET = "token_bucket"
LEAKY_BUCKET = "leaky_bucket"
SLIDING_WINDOW = "sliding_window"
FIXED_WINDOW = "fixed_window"
@dataclass
class RateLimitConfig:
requests_per_second: float = 10.0
burst_size: int = 20
algorithm: RateLimitAlgorithm = RateLimitAlgorithm.TOKEN_BUCKET
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = float(capacity)
self.last_refill = time.time()
self._lock = threading.Lock()
def consume(self, tokens: int = 1) -> bool:
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
refill = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + refill)
self.last_refill = now
def get_tokens(self) -> float:
with self._lock:
self._refill()
return self.tokens
class SlidingWindowCounter:
def __init__(self, window_size: float, max_requests: int):
self.window_size = window_size
self.max_requests = max_requests
self.requests: deque = deque()
self._lock = threading.Lock()
def is_allowed(self) -> bool:
with self._lock:
now = time.time()
window_start = now - self.window_size
while self.requests and self.requests[0] <= window_start:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def get_count(self) -> int:
with self._lock:
now = time.time()
window_start = now - self.window_size
return sum(1 for t in self.requests if t > window_start)
class RateLimiter:
def __init__(self, config: RateLimitConfig = None):
self.config = config or RateLimitConfig()
self._limiters: Dict[str, Any] = {}
self._lock = threading.Lock()
def is_allowed(self, key: str = "default") -> bool:
with self._lock:
if key not in self._limiters:
self._limiters[key] = self._create_limiter()
limiter = self._limiters[key]
if hasattr(limiter, 'is_allowed'):
return limiter.is_allowed()
return limiter.consume()
def _create_limiter(self) -> Any:
if self.config.algorithm == RateLimitAlgorithm.TOKEN_BUCKET:
return TokenBucket(
rate=self.config.requests_per_second,
capacity=self.config.burst_size
)
elif self.config.algorithm == RateLimitAlgorithm.SLIDING_WINDOW:
return SlidingWindowCounter(
window_size=1.0,
max_requests=self.config.burst_size
)
return TokenBucket(
rate=self.config.requests_per_second,
capacity=self.config.burst_size
)
def get_stats(self, key: str = "default") -> Dict[str, Any]:
with self._lock:
if key not in self._limiters:
return {"key": key, "status": "not_found"}
limiter = self._limiters[key]
if isinstance(limiter, TokenBucket):
return {
"key": key,
"algorithm": "token_bucket",
"tokens": limiter.get_tokens(),
"capacity": limiter.capacity,
"rate": limiter.rate
}
elif isinstance(limiter, SlidingWindowCounter):
return {
"key": key,
"algorithm": "sliding_window",
"current_count": limiter.get_count(),
"max_requests": limiter.max_requests
}
return {"key": key, "status": "unknown"}
def rate_limit(
requests_per_second: float = 10.0,
burst_size: int = 20,
key_func: Callable = None
) -> Callable:
config = RateLimitConfig(
requests_per_second=requests_per_second,
burst_size=burst_size
)
limiter = RateLimiter(config)
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
key = "default"
if key_func:
key = key_func(*args, **kwargs)
if not limiter.is_allowed(key):
raise Exception(f"Rate limit exceeded for key: {key}")
return func(*args, **kwargs)
wrapper._rate_limiter = limiter
return wrapper
return decorator
@rate_limit(requests_per_second=5.0, burst_size=10)
def api_endpoint(user_id: str) -> Dict:
return {"status": "ok", "user_id": user_id}30.6 反模式与最佳实践
30.6.1 常见反模式
| 反模式 | 描述 | 后果 | 解决方案 |
|---|---|---|---|
| 配置硬编码 | 配置写入代码 | 难以修改,环境迁移困难 | 使用ConfigMap/Secret |
| 日志丢失 | 不输出结构化日志 | 问题排查困难 | 使用结构化日志 |
| 无健康检查 | 缺少探针配置 | 无法自动恢复 | 配置liveness/readiness探针 |
| 无限重试 | 重试无上限 | 资源耗尽 | 设置重试上限和退避 |
| 单点故障 | 无副本部署 | 可用性低 | 多副本+反亲和性 |
| 忽略资源限制 | 不设置资源限制 | 资源竞争,OOM | 设置requests和limits |
30.6.2 最佳实践清单
class CloudNativeBestPractices:
CONTAINER_RULES = [
"每个容器运行单一进程",
"使用不可变基础设施",
"最小化镜像大小",
"不使用root用户运行",
"正确处理信号(SIGTERM)",
"健康检查端点必须轻量"
]
OBSERVABILITY_RULES = [
"结构化日志输出",
"追踪ID贯穿请求链路",
"关键业务指标监控",
"告警规则明确可执行",
"日志不包含敏感信息"
]
RESILIENCE_RULES = [
"假设故障会发生",
"快速失败优于长时间等待",
"优雅降级核心功能",
"隔离故障防止级联",
"重试必须有上限和退避",
"熔断器保护下游服务"
]
SECURITY_RULES = [
"最小权限原则",
"Secret不进入代码仓库",
"启用mTLS",
"网络策略限制流量",
"定期扫描镜像漏洞"
]30.7 决策指南
30.7.1 部署策略选择
┌─────────────────────────────────────┐
│ 是否需要零停机部署? │
└─────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 否 │ │ 是 │
└───────────────┘ └───────────────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 滚动更新 │ │ 是否需要快速回滚?│
└───────────────┘ └───────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 蓝绿部署 │ │ 金丝雀发布 │
│ (Blue-Green) │ │ (Canary) │
└───────────────┘ └───────────────┘30.7.2 弹性模式选择
| 场景 | 推荐模式 | 说明 |
|---|---|---|
| 外部服务调用 | 熔断器 + 重试 | 防止级联故障 |
| API限流 | 令牌桶/滑动窗口 | 保护后端服务 |
| 数据库连接 | 连接池 + 超时 | 资源复用 |
| 消息处理 | 死信队列 + 重试 | 保证可靠性 |
| 批量操作 | 批量隔离 + 超时 | 防止阻塞 |
30.8 快速参考卡片
30.8.1 核心概念速查
┌─────────────────────────────────────────────────────────────────────────┐
│ 云原生核心概念速查表 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 容器化原则 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 1. 单一进程原则 │ │
│ │ 2. 不可变基础设施 │ │
│ │ 3. 资源限制设置 │ │
│ │ 4. 健康检查配置 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 可观测性三支柱 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 指标(Metrics): Prometheus, Grafana │ │
│ │ 日志(Logs): ELK, Loki │ │
│ │ 追踪(Traces): Jaeger, Zipkin │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 弹性模式 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 熔断器: CLOSED → OPEN → HALF_OPEN → CLOSED │ │
│ │ 重试: 指数退避 + 抖动 │ │
│ │ 限流: 令牌桶 / 滑动窗口 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘30.8.2 Kubernetes资源速查
KUBERNETES_RESOURCES = {
"workloads": ["Deployment", "StatefulSet", "DaemonSet", "Job", "CronJob"],
"services": ["Service", "Ingress", "Gateway"],
"config": ["ConfigMap", "Secret"],
"storage": ["PersistentVolume", "PersistentVolumeClaim", "StorageClass"],
"network": ["NetworkPolicy", "ServiceEntry"],
"rbac": ["Role", "RoleBinding", "ClusterRole", "ClusterRoleBinding"],
}
PROBE_CONFIGURATION = {
"liveness": {
"purpose": "检测死锁/僵尸进程",
"failure_action": "重启容器",
"recommended_path": "/health",
},
"readiness": {
"purpose": "检测服务就绪",
"failure_action": "从Service移除",
"recommended_path": "/ready",
},
"startup": {
"purpose": "慢启动应用保护",
"failure_action": "重启容器",
"recommended_for": "启动时间>30s的应用",
}
}30.9 小结
30.9.1 核心要点
- 容器化:实现应用的可移植性和隔离性
- 服务网格:统一处理服务间通信
- 可观测性:指标、日志、追踪三位一体
- 弹性设计:熔断、重试、限流保护系统
- 配置外部化:ConfigMap/Secret管理配置
- 健康检查:liveness/readiness探针保障可用性
30.9.2 适用场景
| 适用 | 不适用 |
|---|---|
| 微服务架构 | 单体应用 |
| 高可用要求 | 简单应用 |
| 弹性伸缩需求 | 固定资源 |
| DevOps实践 | 传统运维 |
| 多环境部署 | 单一环境 |
30.9.3 实施建议
- 渐进式采用:从容器化开始,逐步引入服务网格
- 可观测性优先:先建立监控,再优化系统
- 弹性设计内置:在开发阶段考虑故障场景
- 自动化一切:CI/CD、测试、部署全面自动化
- 安全左移:在开发阶段考虑安全需求