第20章 状态模式
学习目标
完成本章学习后,读者将能够:
- 理解状态模式的核心概念、数学定义与有限状态机理论
- 掌握状态驱动的行为变化方法与状态转换机制
- 实现有限状态机、层次状态机与并发状态机
- 识别状态模式的适用场景与局限性
- 理解状态模式与策略模式的本质区别
20.1 模式定义
20.1.1 核心定义
状态模式(State Pattern) 允许一个对象在其内部状态改变时改变它的行为,对象看起来好像修改了它的类。该模式将状态相关的行为封装到独立的状态类中,通过状态切换实现行为的动态变化。
20.1.2 形式化定义
从数学角度,状态模式基于有限状态机(Finite State Machine, FSM)理论:
定义 20.1(有限状态机) 一个确定性有限状态机是一个五元组:
$$M = (Q, \Sigma, \delta, q_0, F)$$
其中:
- $Q$:有限状态集合
- $\Sigma$:输入字母表(事件/触发器集合)
- $\delta: Q \times \Sigma \rightarrow Q$:状态转移函数
- $q_0 \in Q$:初始状态
- $F \subseteq Q$:接受状态(终止状态)集合
定义 20.2(状态转移) 状态转移可以表示为:
$$q \xrightarrow{\sigma} q' \Leftrightarrow \delta(q, \sigma) = q'$$
定义 20.3(状态可达性) 状态 $q'$ 从状态 $q$ 可达,当且仅当存在事件序列 $\sigma_1, \sigma_2, \ldots, \sigma_n$ 使得:
$$q \xrightarrow{\sigma_1} q_1 \xrightarrow{\sigma_2} q_2 \xrightarrow{\sigma_3} \ldots \xrightarrow{\sigma_n} q'$$
定义 20.4(状态模式语义) 在状态模式中,上下文对象 $C$ 的行为函数 $f$ 定义为:
$$f_C(e) = f_{\text{state}(C)}(e)$$
其中 $\text{state}(C)$ 返回 $C$ 的当前状态,$f_s$ 是状态 $s$ 的行为实现。
20.1.3 状态空间分析
定理 20.1(状态爆炸问题) 对于具有 $n$ 个独立布尔状态变量的系统,状态空间大小为:
$$|Q| = 2^n$$
推论 20.1 状态模式通过将状态变量编码为状态类,将条件判断复杂度从 $O(2^n)$ 降低到 $O(n)$。
20.1.4 状态模式 vs 策略模式
| 特性 | 状态模式 | 策略模式 |
|---|---|---|
| 目的 | 管理对象内部状态变化 | 算法/策略的动态切换 |
| 状态感知 | 状态类知道其他状态 | 策略类相互独立 |
| 转换控制 | 状态类可触发状态转换 | 客户端控制策略切换 |
| 生命周期 | 状态可能有生命周期 | 策略通常无状态 |
| 典型应用 | 订单状态、工作流 | 排序算法、支付方式 |
20.2 历史背景与理论渊源
20.2.1 发展历程
| 年份 | 里程碑 | 贡献者 | 意义 |
|---|---|---|---|
| 1943 | 有限自动机理论 | Warren McCulloch & Walter Pitts | 提出神经网络的有限自动机模型 |
| 1956 | 自动机理论 | Stephen Kleene | 正则表达式与有限自动机的等价性 |
| 1969 | 状态图 | David Harel | 发明状态图,支持层次和并发状态 |
| 1987 | 状态机模式 | 面向对象设计中的状态管理 | |
| 1994 | GoF状态模式 | Gamma et al. | 将状态模式标准化为23种设计模式之一 |
| 1997 | UML状态机 | OMG | UML标准中的状态机建模 |
| 2000s | 响应式状态管理 | 多方 | Redux、RxJS等状态管理框架 |
| 2010s | 有限状态机库 | 多方 | Python transitions、XState等 |
20.2.2 理论基础
状态模式的理论基础源于:
有限自动机理论:状态模式是有限状态机的面向对象实现
开闭原则(OCP):新增状态无需修改现有状态类或上下文
单一职责原则(SRP):每个状态类只负责该状态下的行为
多态性:通过多态实现状态行为的动态绑定
20.3 UML结构图
20.3.1 标准结构
┌─────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ State │
├─────────────────────────────────────────────────────────────────┤
│ + handle(context: Context): void │
│ + enter(context: Context): void ← 进入状态 │
│ + exit(context: Context): void ← 退出状态 │
└─────────────────────────────────────────────────────────────────┘
△
│
┌───────────────┼───────────────┐
│ │ │
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ ConcreteStateA │ │ ConcreteStateB │ │ ConcreteStateC │
├─────────────────────┤ ├─────────────────────┤ ├─────────────────────┤
│ - context: Context │ │ - context: Context │ │ - context: Context │
├─────────────────────┤ ├─────────────────────┤ ├─────────────────────┤
│ + handle(context) │ │ + handle(context) │ │ + handle(context) │
│ + enter(context) │ │ + enter(context) │ │ + enter(context) │
│ + exit(context) │ │ + exit(context) │ │ + exit(context) │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
│ │ │
└───────────────┼───────────────┘
│ 状态转换
↓
┌─────────────────────────────────────────────────────────────────┐
│ Context │
├─────────────────────────────────────────────────────────────────┤
│ - state: State │
├─────────────────────────────────────────────────────────────────┤
│ + setState(state: State): void │
│ + request(): void │
│ + getState(): State │
└─────────────────────────────────────────────────────────────────┘20.3.2 状态转换图
┌─────────────────────────────────────────┐
│ State Diagram │
└─────────────────────────────────────────┘
┌─────────┐ ┌─────────┐
│ StateA │ ──── event1 ────→ │ StateB │
└─────────┘ └─────────┘
↑ │
│ │ event2
│ event3 ↓
│ ┌─────────┐
└─────────────────────── │ StateC │
└─────────┘
│
│ event4
↓
┌─────────┐
│ StateD │
└─────────┘
状态转换表:
┌─────────┬─────────┬─────────┐
│ 当前状态 │ 事件 │ 下一状态 │
├─────────┼─────────┼─────────┤
│ StateA │ event1 │ StateB │
│ StateB │ event2 │ StateC │
│ StateC │ event3 │ StateA │
│ StateC │ event4 │ StateD │
└─────────┴─────────┴─────────┘20.3.3 层次状态机
┌─────────────────────────────────────────────────────────────────┐
│ TopState │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ NestedState │ │
│ │ ┌─────────────┐ event1 ┌─────────────┐ │ │
│ │ │ SubStateA │ ───────→ │ SubStateB │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ ↑ event2 │ event3 │ │
│ │ └──────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ↑ │
│ │ event4 │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ OtherNestedState │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ SubStateC │ │ SubStateD │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘20.4 Python实现
20.4.1 基于ABC的标准实现
from abc import ABC, abstractmethod
from typing import Optional, Type, TYPE_CHECKING, Dict, Any
from dataclasses import dataclass
from enum import Enum, auto
if TYPE_CHECKING:
from typing import TypeAlias
class State(ABC):
@abstractmethod
def handle(self, context: 'Context') -> None:
pass
def enter(self, context: 'Context') -> None:
pass
def exit(self, context: 'Context') -> None:
pass
@property
@abstractmethod
def name(self) -> str:
pass
class Context:
def __init__(self, initial_state: State):
self._state: State = initial_state
self._state_history: list[str] = []
self._data: Dict[str, Any] = {}
self._transition_count: int = 0
@property
def state(self) -> State:
return self._state
def set_state(self, new_state: State) -> None:
if self._state:
self._state.exit(self)
self._state_history.append(self._state.name)
self._state = new_state
self._transition_count += 1
self._state.enter(self)
def request(self) -> None:
self._state.handle(self)
def get_history(self) -> list[str]:
return self._state_history.copy()
def set_data(self, key: str, value: Any) -> None:
self._data[key] = value
def get_data(self, key: str, default: Any = None) -> Any:
return self._data.get(key, default)
@property
def transition_count(self) -> int:
return self._transition_count
class IdleState(State):
@property
def name(self) -> str:
return "IDLE"
def handle(self, context: 'Context') -> None:
print(f"[{self.name}] 开始处理任务...")
context.set_state(ProcessingState())
def enter(self, context: 'Context') -> None:
print(f"[{self.name}] 进入空闲状态")
class ProcessingState(State):
@property
def name(self) -> str:
return "PROCESSING"
def handle(self, context: 'Context') -> None:
progress = context.get_data("progress", 0) + 30
context.set_data("progress", progress)
print(f"[{self.name}] 处理进度: {progress}%")
if progress >= 100:
context.set_state(CompletedState())
def enter(self, context: 'Context') -> None:
print(f"[{self.name}] 开始处理")
context.set_data("progress", 0)
def exit(self, context: 'Context') -> None:
print(f"[{self.name}] 处理完成")
class CompletedState(State):
@property
def name(self) -> str:
return "COMPLETED"
def handle(self, context: 'Context') -> None:
print(f"[{self.name}] 任务已完成,重置...")
context.set_data("progress", 0)
context.set_state(IdleState())
def enter(self, context: 'Context') -> None:
print(f"[{self.name}] 任务完成!")
context = Context(IdleState())
context.request()
context.request()
context.request()
context.request()
context.request()
print(f"\n状态历史: {context.get_history()}")
print(f"转换次数: {context.transition_count}")20.4.2 枚举状态机
from enum import Enum, auto
from typing import Callable, Dict, Optional, Any
from dataclasses import dataclass, field
class OrderStatus(Enum):
NEW = auto()
PAID = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
REFUNDED = auto()
@dataclass
class Transition:
from_state: OrderStatus
to_state: OrderStatus
event: str
guard: Optional[Callable[[], bool]] = None
action: Optional[Callable[[], None]] = None
class EnumStateMachine:
TRANSITIONS: Dict[tuple[OrderStatus, str], OrderStatus] = {
(OrderStatus.NEW, "pay"): OrderStatus.PAID,
(OrderStatus.NEW, "cancel"): OrderStatus.CANCELLED,
(OrderStatus.PAID, "ship"): OrderStatus.SHIPPED,
(OrderStatus.PAID, "cancel"): OrderStatus.REFUNDED,
(OrderStatus.SHIPPED, "deliver"): OrderStatus.DELIVERED,
(OrderStatus.SHIPPED, "return"): OrderStatus.REFUNDED,
}
def __init__(self, initial_state: OrderStatus = OrderStatus.NEW):
self._state = initial_state
self._history: list[tuple[OrderStatus, str, OrderStatus]] = []
self._callbacks: Dict[str, list[Callable]] = {}
@property
def state(self) -> OrderStatus:
return self._state
def can_transition(self, event: str) -> bool:
return (self._state, event) in self.TRANSITIONS
def trigger(self, event: str) -> bool:
key = (self._state, event)
if key not in self.TRANSITIONS:
print(f"无效转换: {self._state.name} + {event}")
return False
old_state = self._state
new_state = self.TRANSITIONS[key]
self._history.append((old_state, event, new_state))
self._state = new_state
self._notify_callbacks(event, old_state, new_state)
return True
def on(self, event: str, callback: Callable) -> None:
if event not in self._callbacks:
self._callbacks[event] = []
self._callbacks[event].append(callback)
def _notify_callbacks(self, event: str, old_state: OrderStatus, new_state: OrderStatus) -> None:
for callback in self._callbacks.get(event, []):
callback(old_state, new_state)
def get_history(self) -> list[tuple[OrderStatus, str, OrderStatus]]:
return self._history.copy()
def get_available_events(self) -> list[str]:
return [event for (state, event) in self.TRANSITIONS.keys() if state == self._state]
sm = EnumStateMachine()
print(f"初始状态: {sm.state.name}")
print(f"可用事件: {sm.get_available_events()}")
sm.on("pay", lambda o, n: print(f" [回调] 状态变化: {o.name} -> {n.name}"))
sm.trigger("pay")
print(f"当前状态: {sm.state.name}")
sm.trigger("ship")
print(f"当前状态: {sm.state.name}")
sm.trigger("deliver")
print(f"当前状态: {sm.state.name}")
print(f"\n转换历史: {[(o.name, e, n.name) for o, e, n in sm.get_history()]}")20.4.3 状态机框架
from typing import Dict, List, Callable, Any, Optional, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum
import inspect
@dataclass
class StateConfig:
name: str
on_enter: Optional[Callable] = None
on_exit: Optional[Callable] = None
parent: Optional[str] = None
initial: bool = False
final: bool = False
@dataclass
class TransitionConfig:
trigger: str
source: str
dest: str
conditions: List[Callable] = field(default_factory=list)
before: Optional[Callable] = None
after: Optional[Callable] = None
prepare: Optional[Callable] = None
class MachineError(Exception):
pass
class StateMachine:
def __init__(
self,
model: Any = None,
states: List[StateConfig] = None,
transitions: List[TransitionConfig] = None,
initial: str = None,
auto_transitions: bool = True
):
self._model = model or self
self._states: Dict[str, StateConfig] = {}
self._transitions: Dict[str, List[TransitionConfig]] = {}
self._current_state: Optional[str] = None
self._history: List[str] = []
self._before_state_change: List[Callable] = []
self._after_state_change: List[Callable] = []
if states:
for state in states:
self.add_state(state)
if transitions:
for transition in transitions:
self.add_transition(transition)
if initial:
self._set_initial_state(initial)
def add_state(self, state: StateConfig) -> None:
self._states[state.name] = state
if state.initial and self._current_state is None:
self._set_initial_state(state.name)
def add_transition(self, transition: TransitionConfig) -> None:
if transition.trigger not in self._transitions:
self._transitions[transition.trigger] = []
self._transitions[transition.trigger].append(transition)
if not hasattr(self, transition.trigger):
def trigger_method(*args, **kwargs):
return self.trigger(transition.trigger, *args, **kwargs)
setattr(self, transition.trigger, trigger_method)
def _set_initial_state(self, state_name: str) -> None:
if state_name not in self._states:
raise MachineError(f"State '{state_name}' not found")
self._current_state = state_name
self._history.append(state_name)
state = self._states[state_name]
if state.on_enter:
state.on_enter(self._model)
@property
def state(self) -> str:
return self._current_state
@property
def is_(self) -> 'StateChecker':
return StateChecker(self._current_state)
def trigger(self, trigger_name: str, *args, **kwargs) -> bool:
if self._current_state is None:
raise MachineError("No initial state set")
if trigger_name not in self._transitions:
raise MachineError(f"Unknown trigger: {trigger_name}")
for transition in self._transitions[trigger_name]:
if self._match_source(transition.source):
return self._execute_transition(transition, *args, **kwargs)
return False
def _match_source(self, source: str) -> bool:
if source == "*":
return True
if source == self._current_state:
return True
if "," in source:
sources = [s.strip() for s in source.split(",")]
return self._current_state in sources
return False
def _execute_transition(self, transition: TransitionConfig, *args, **kwargs) -> bool:
for condition in transition.conditions:
if not condition(self._model):
return False
old_state = self._current_state
new_state = transition.dest
for callback in self._before_state_change:
callback(self._model, old_state, new_state)
if transition.prepare:
transition.prepare(self._model, *args, **kwargs)
old_state_config = self._states.get(old_state)
if old_state_config and old_state_config.on_exit:
old_state_config.on_exit(self._model)
if transition.before:
transition.before(self._model, *args, **kwargs)
self._current_state = new_state
self._history.append(new_state)
if transition.after:
transition.after(self._model, *args, **kwargs)
new_state_config = self._states.get(new_state)
if new_state_config and new_state_config.on_enter:
new_state_config.on_enter(self._model)
for callback in self._after_state_change:
callback(self._model, old_state, new_state)
return True
def get_history(self) -> List[str]:
return self._history.copy()
def get_available_triggers(self) -> Set[str]:
available = set()
for trigger, transitions in self._transitions.items():
for t in transitions:
if self._match_source(t.source):
available.add(trigger)
return available
class StateChecker:
def __init__(self, current_state: str):
self._current_state = current_state
def __getattr__(self, name: str) -> bool:
return name == self._current_state
class TrafficLight:
def __init__(self):
self.color = "red"
self.cars_passed = 0
def on_enter_green(self):
self.color = "green"
print(f" [TrafficLight] 绿灯亮起,车辆通行")
def on_enter_yellow(self):
self.color = "yellow"
print(f" [TrafficLight] 黄灯亮起,准备停车")
def on_enter_red(self):
self.color = "red"
print(f" [TrafficLight] 红灯亮起,禁止通行")
def count_car(self):
self.cars_passed += 1
light = TrafficLight()
states = [
StateConfig("red", on_enter=light.on_enter_red, initial=True),
StateConfig("green", on_enter=light.on_enter_green),
StateConfig("yellow", on_enter=light.on_enter_yellow),
]
transitions = [
TransitionConfig("next", "red", "green"),
TransitionConfig("next", "green", "yellow"),
TransitionConfig("next", "yellow", "red"),
TransitionConfig("emergency", "*", "red"),
]
machine = StateMachine(
model=light,
states=states,
transitions=transitions
)
print(f"初始状态: {machine.state}")
print(f"可用触发器: {machine.get_available_triggers()}")
machine.next()
machine.next()
machine.next()
print(f"\n紧急情况:")
machine.emergency()
print(f"当前状态: {machine.state}")20.4.4 层次状态机
from typing import Dict, List, Optional, Set, Any, Callable
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
@dataclass
class HState:
name: str
parent: Optional['HState'] = None
children: List['HState'] = field(default_factory=list)
initial_child: Optional[str] = None
on_enter: Optional[Callable] = None
on_exit: Optional[Callable] = None
final: bool = False
def add_child(self, child: 'HState') -> 'HState':
child.parent = self
self.children.append(child)
return self
def is_active(self, current_state: str) -> bool:
if self.name == current_state:
return True
return any(child.is_active(current_state) for child in self.children)
def get_ancestors(self) -> List['HState']:
ancestors = []
current = self.parent
while current:
ancestors.append(current)
current = current.parent
return ancestors
@dataclass
class HTransition:
trigger: str
source: str
dest: str
action: Optional[Callable] = None
guard: Optional[Callable] = None
class HierarchicalStateMachine:
def __init__(self):
self._states: Dict[str, HState] = {}
self._transitions: Dict[str, List[HTransition]] = {}
self._current_state: Optional[str] = None
self._active_states: Set[str] = set()
def add_state(self, state: HState) -> None:
self._states[state.name] = state
for child in state.children:
self.add_state(child)
def add_transition(self, transition: HTransition) -> None:
if transition.trigger not in self._transitions:
self._transitions[transition.trigger] = []
self._transitions[transition.trigger].append(transition)
def initialize(self, initial_state: str) -> None:
self._current_state = initial_state
self._update_active_states()
state = self._states.get(initial_state)
if state:
self._enter_state(state)
def _update_active_states(self) -> None:
self._active_states = set()
if self._current_state:
state = self._states.get(self._current_state)
while state:
self._active_states.add(state.name)
state = state.parent
def _enter_state(self, state: HState) -> None:
for ancestor in reversed(state.get_ancestors()):
if ancestor.on_enter:
ancestor.on_enter()
if state.on_enter:
state.on_enter()
if state.initial_child:
child = self._states.get(state.initial_child)
if child:
self._current_state = child.name
self._enter_state(child)
def _exit_state(self, state: HState) -> None:
if state.on_exit:
state.on_exit()
for ancestor in state.get_ancestors():
if ancestor.on_exit:
ancestor.on_exit()
def trigger(self, trigger_name: str) -> bool:
if trigger_name not in self._transitions:
return False
for transition in self._transitions[trigger_name]:
if self._match_source(transition.source):
if transition.guard and not transition.guard():
continue
return self._execute_transition(transition)
return False
def _match_source(self, source: str) -> bool:
if source == "*":
return True
if source == self._current_state:
return True
state = self._states.get(self._current_state)
if state and state.is_active(source):
return True
return False
def _execute_transition(self, transition: HTransition) -> bool:
old_state = self._states.get(self._current_state)
new_state = self._states.get(transition.dest)
if not new_state:
return False
if old_state:
self._exit_state(old_state)
if transition.action:
transition.action()
self._current_state = transition.dest
self._update_active_states()
self._enter_state(new_state)
return True
@property
def state(self) -> str:
return self._current_state
@property
def active_states(self) -> Set[str]:
return self._active_states.copy()
def on_device_on():
print(" [Device] 设备启动")
def on_device_off():
print(" [Device] 设备关闭")
def on_operational():
print(" [Operational] 进入运行模式")
def on_maintenance():
print(" [Maintenance] 进入维护模式")
device_on = HState("DeviceOn", on_enter=on_device_on)
device_off = HState("DeviceOff", on_enter=on_device_off, final=True)
operational = HState("Operational", on_enter=on_operational)
maintenance = HState("Maintenance", on_enter=on_maintenance)
idle = HState("Idle")
running = HState("Running")
paused = HState("Paused")
operational.add_child(idle)
operational.add_child(running)
operational.add_child(paused)
operational.initial_child = "Idle"
maintenance.add_child(HState("Diagnosing"))
maintenance.add_child(HState("Repairing"))
device_on.add_child(operational)
device_on.add_child(maintenance)
device_on.initial_child = "Operational"
hsm = HierarchicalStateMachine()
hsm.add_state(device_on)
hsm.add_state(device_off)
hsm.add_transition(HTransition("turn_off", "DeviceOn", "DeviceOff"))
hsm.add_transition(HTransition("to_maintenance", "Operational", "Maintenance"))
hsm.add_transition(HTransition("to_operational", "Maintenance", "Operational"))
hsm.add_transition(HTransition("start", "Idle", "Running"))
hsm.add_transition(HTransition("pause", "Running", "Paused"))
hsm.add_transition(HTransition("resume", "Paused", "Running"))
hsm.initialize("DeviceOn")
print(f"当前状态: {hsm.state}")
print(f"活动状态: {hsm.active_states}")
hsm.trigger("start")
print(f"当前状态: {hsm.state}")
hsm.trigger("to_maintenance")
print(f"当前状态: {hsm.state}")
print(f"活动状态: {hsm.active_states}")
hsm.trigger("turn_off")
print(f"当前状态: {hsm.state}")20.4.5 并发状态机
from typing import Dict, List, Optional, Set, Any, Callable, Tuple
from dataclasses import dataclass, field
from threading import Lock
import time
@dataclass
class Region:
name: str
states: Dict[str, 'ConcurrentState'] = field(default_factory=dict)
initial_state: Optional[str] = None
current_state: Optional[str] = None
@dataclass
class ConcurrentState:
name: str
on_enter: Optional[Callable] = None
on_exit: Optional[Callable] = None
class ConcurrentStateMachine:
def __init__(self):
self._regions: Dict[str, Region] = {}
self._transitions: List[Tuple[str, str, str, str, Callable]] = []
self._lock = Lock()
def add_region(self, name: str, states: List[ConcurrentState], initial: str) -> None:
region = Region(name=name, initial_state=initial, current_state=initial)
for state in states:
region.states[state.name] = state
self._regions[name] = region
def add_transition(
self,
trigger: str,
region: str,
source: str,
dest: str,
action: Callable = None
) -> None:
self._transitions.append((trigger, region, source, dest, action))
def trigger(self, trigger_name: str) -> bool:
with self._lock:
executed = False
for trigger, region_name, source, dest, action in self._transitions:
if trigger != trigger_name:
continue
region = self._regions.get(region_name)
if not region:
continue
if region.current_state != source:
continue
old_state = region.states.get(region.current_state)
if old_state and old_state.on_exit:
old_state.on_exit()
if action:
action()
region.current_state = dest
new_state = region.states.get(dest)
if new_state and new_state.on_enter:
new_state.on_enter()
executed = True
return executed
def get_state(self, region_name: str) -> Optional[str]:
with self._lock:
region = self._regions.get(region_name)
return region.current_state if region else None
def get_all_states(self) -> Dict[str, str]:
with self._lock:
return {name: region.current_state for name, region in self._regions.items()}
def on_heating():
print(" [HVAC] 开始加热")
def on_cooling():
print(" [HVAC] 开始制冷")
def on_idle_hvac():
print(" [HVAC] 空调待机")
def on_low():
print(" [Fan] 低速运转")
def on_high():
print(" [Fan] 高速运转")
def on_idle_fan():
print(" [Fan] 风扇停止")
csm = ConcurrentStateMachine()
csm.add_region("hvac", [
ConcurrentState("heating", on_enter=on_heating),
ConcurrentState("cooling", on_enter=on_cooling),
ConcurrentState("idle_hvac", on_enter=on_idle_hvac),
], "idle_hvac")
csm.add_region("fan", [
ConcurrentState("low", on_enter=on_low),
ConcurrentState("high", on_enter=on_high),
ConcurrentState("idle_fan", on_enter=on_idle_fan),
], "idle_fan")
csm.add_transition("heat", "hvac", "idle_hvac", "heating")
csm.add_transition("cool", "hvac", "idle_hvac", "cooling")
csm.add_transition("hvac_off", "hvac", "heating", "idle_hvac")
csm.add_transition("hvac_off", "hvac", "cooling", "idle_hvac")
csm.add_transition("fan_low", "fan", "idle_fan", "low")
csm.add_transition("fan_high", "fan", "low", "high")
csm.add_transition("fan_high", "fan", "idle_fan", "high")
csm.add_transition("fan_off", "fan", "low", "idle_fan")
csm.add_transition("fan_off", "fan", "high", "idle_fan")
print("初始状态:", csm.get_all_states())
csm.trigger("heat")
csm.trigger("fan_high")
print("当前状态:", csm.get_all_states())
csm.trigger("cool")
csm.trigger("fan_low")
print("当前状态:", csm.get_all_states())
csm.trigger("hvac_off")
csm.trigger("fan_off")
print("最终状态:", csm.get_all_states())20.5 企业级应用示例
20.5.1 订单生命周期管理
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from datetime import datetime
from decimal import Decimal
import uuid
@dataclass
class OrderItem:
product_id: str
product_name: str
quantity: int
unit_price: Decimal
@property
def total(self) -> Decimal:
return self.unit_price * self.quantity
@dataclass
class OrderContext:
order_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
customer_id: str = ""
items: List[OrderItem] = field(default_factory=list)
total_amount: Decimal = Decimal("0")
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
class OrderState(ABC):
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def pay(self, context: 'Order', amount: Decimal) -> str:
pass
@abstractmethod
def ship(self, context: 'Order', tracking_number: str) -> str:
pass
@abstractmethod
def deliver(self, context: 'Order') -> str:
pass
@abstractmethod
def cancel(self, context: 'Order', reason: str) -> str:
pass
@abstractmethod
def refund(self, context: 'Order', reason: str) -> str:
pass
def enter(self, context: 'Order') -> None:
pass
def exit(self, context: 'Order') -> None:
pass
class NewState(OrderState):
@property
def name(self) -> str:
return "NEW"
def pay(self, context: 'Order', amount: Decimal) -> str:
if amount < context._data.total_amount:
return f"支付金额不足: 需要 {context._data.total_amount}, 实付 {amount}"
context._data.metadata["paid_amount"] = amount
context._data.metadata["paid_at"] = datetime.now()
context._set_state(PaidState())
return f"支付成功: {amount}"
def ship(self, context: 'Order', tracking_number: str) -> str:
return "错误: 订单未支付,无法发货"
def deliver(self, context: 'Order') -> str:
return "错误: 订单未发货"
def cancel(self, context: 'Order', reason: str) -> str:
context._data.metadata["cancel_reason"] = reason
context._set_state(CancelledState())
return f"订单已取消: {reason}"
def refund(self, context: 'Order', reason: str) -> str:
return "错误: 订单未支付,无需退款"
class PaidState(OrderState):
@property
def name(self) -> str:
return "PAID"
def pay(self, context: 'Order', amount: Decimal) -> str:
return "错误: 订单已支付"
def ship(self, context: 'Order', tracking_number: str) -> str:
context._data.metadata["tracking_number"] = tracking_number
context._data.metadata["shipped_at"] = datetime.now()
context._set_state(ShippedState())
return f"订单已发货,运单号: {tracking_number}"
def deliver(self, context: 'Order') -> str:
return "错误: 订单未发货"
def cancel(self, context: 'Order', reason: str) -> str:
context._data.metadata["cancel_reason"] = reason
context._set_state(RefundedState())
return f"订单已取消,退款处理中: {reason}"
def refund(self, context: 'Order', reason: str) -> str:
context._data.metadata["refund_reason"] = reason
context._set_state(RefundedState())
return f"退款处理中: {reason}"
class ShippedState(OrderState):
@property
def name(self) -> str:
return "SHIPPED"
def pay(self, context: 'Order', amount: Decimal) -> str:
return "错误: 订单已支付"
def ship(self, context: 'Order', tracking_number: str) -> str:
return "错误: 订单已发货"
def deliver(self, context: 'Order') -> str:
context._data.metadata["delivered_at"] = datetime.now()
context._set_state(DeliveredState())
return "订单已送达"
def cancel(self, context: 'Order', reason: str) -> str:
return "错误: 订单已发货,无法取消。请申请退货退款。"
def refund(self, context: 'Order', reason: str) -> str:
context._data.metadata["refund_reason"] = reason
context._set_state(ReturnedState())
return f"退货申请已提交: {reason}"
class DeliveredState(OrderState):
@property
def name(self) -> str:
return "DELIVERED"
def pay(self, context: 'Order', amount: Decimal) -> str:
return "错误: 订单已完成"
def ship(self, context: 'Order', tracking_number: str) -> str:
return "错误: 订单已完成"
def deliver(self, context: 'Order') -> str:
return "错误: 订单已送达"
def cancel(self, context: 'Order', reason: str) -> str:
return "错误: 订单已完成,无法取消"
def refund(self, context: 'Order', reason: str) -> str:
context._data.metadata["refund_reason"] = reason
context._set_state(ReturnedState())
return f"退货申请已提交: {reason}"
class CancelledState(OrderState):
@property
def name(self) -> str:
return "CANCELLED"
def pay(self, context: 'Order', amount: Decimal) -> str:
return "错误: 订单已取消"
def ship(self, context: 'Order', tracking_number: str) -> str:
return "错误: 订单已取消"
def deliver(self, context: 'Order') -> str:
return "错误: 订单已取消"
def cancel(self, context: 'Order', reason: str) -> str:
return "错误: 订单已取消"
def refund(self, context: 'Order', reason: str) -> str:
return "错误: 订单已取消,无需退款"
class RefundedState(OrderState):
@property
def name(self) -> str:
return "REFUNDED"
def pay(self, context: 'Order', amount: Decimal) -> str:
return "错误: 订单已退款"
def ship(self, context: 'Order', tracking_number: str) -> str:
return "错误: 订单已退款"
def deliver(self, context: 'Order') -> str:
return "错误: 订单已退款"
def cancel(self, context: 'Order', reason: str) -> str:
return "错误: 订单已退款"
def refund(self, context: 'Order', reason: str) -> str:
return "错误: 订单已退款"
class ReturnedState(OrderState):
@property
def name(self) -> str:
return "RETURNED"
def pay(self, context: 'Order', amount: Decimal) -> str:
return "错误: 订单退货中"
def ship(self, context: 'Order', tracking_number: str) -> str:
return "错误: 订单退货中"
def deliver(self, context: 'Order') -> str:
return "错误: 订单退货中"
def cancel(self, context: 'Order', reason: str) -> str:
return "错误: 订单退货中"
def refund(self, context: 'Order', reason: str) -> str:
return "错误: 退货处理中"
class Order:
VALID_TRANSITIONS = {
"NEW": ["PAID", "CANCELLED"],
"PAID": ["SHIPPED", "REFUNDED"],
"SHIPPED": ["DELIVERED", "RETURNED"],
"DELIVERED": ["RETURNED"],
"CANCELLED": [],
"REFUNDED": [],
"RETURNED": ["REFUNDED"],
}
def __init__(self, customer_id: str):
self._data = OrderContext(customer_id=customer_id)
self._state: OrderState = NewState()
self._observers: List[Callable] = []
@property
def order_id(self) -> str:
return self._data.order_id
@property
def status(self) -> str:
return self._state.name
@property
def total_amount(self) -> Decimal:
return self._data.total_amount
def add_item(self, product_id: str, name: str, price: Decimal, quantity: int = 1) -> None:
item = OrderItem(product_id, name, quantity, price)
self._data.items.append(item)
self._recalculate_total()
def _recalculate_total(self) -> None:
self._data.total_amount = sum(item.total for item in self._data.items)
def _set_state(self, new_state: OrderState) -> None:
old_state = self._state
self._state.exit(self)
self._state = new_state
self._state.enter(self)
self._data.updated_at = datetime.now()
for observer in self._observers:
observer(old_state.name, new_state.name, self._data.order_id)
def add_observer(self, observer: Callable) -> None:
self._observers.append(observer)
def pay(self, amount: Decimal) -> str:
return self._state.pay(self, amount)
def ship(self, tracking_number: str) -> str:
return self._state.ship(self, tracking_number)
def deliver(self) -> str:
return self._state.deliver(self)
def cancel(self, reason: str = "") -> str:
return self._state.cancel(self, reason)
def refund(self, reason: str = "") -> str:
return self._state.refund(self, reason)
def get_info(self) -> Dict[str, Any]:
return {
"order_id": self._data.order_id,
"customer_id": self._data.customer_id,
"status": self._state.name,
"total_amount": str(self._data.total_amount),
"items": [{"name": i.product_name, "qty": i.quantity, "price": str(i.unit_price)} for i in self._data.items],
"created_at": self._data.created_at.isoformat(),
"metadata": self._data.metadata
}
def order_status_logger(old_status: str, new_status: str, order_id: str):
print(f" [Logger] 订单 {order_id}: {old_status} -> {new_status}")
order = Order("customer_001")
order.add_observer(order_status_logger)
order.add_item("prod_001", "Python高级编程", Decimal("89.00"), 2)
order.add_item("prod_002", "设计模式精解", Decimal("69.00"), 1)
print(f"订单ID: {order.order_id}")
print(f"总金额: {order.total_amount}")
print(f"状态: {order.status}")
print("\n--- 支付 ---")
print(order.pay(Decimal("247.00")))
print(f"状态: {order.status}")
print("\n--- 发货 ---")
print(order.ship("SF1234567890"))
print(f"状态: {order.status}")
print("\n--- 送达 ---")
print(order.deliver())
print(f"状态: {order.status}")
print(f"\n订单详情: {order.get_info()}")20.5.2 游戏角色状态系统
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from enum import Enum, auto
import time
class CharacterAttribute(Enum):
HEALTH = auto()
MANA = auto()
STAMINA = auto()
@dataclass
class CharacterStats:
health: int = 100
max_health: int = 100
mana: int = 50
max_mana: int = 50
stamina: int = 100
max_stamina: int = 100
attack_power: int = 10
defense: int = 5
speed: float = 1.0
def is_alive(self) -> bool:
return self.health > 0
def has_mana(self, amount: int) -> bool:
return self.mana >= amount
def has_stamina(self, amount: int) -> bool:
return self.stamina >= amount
class CharacterState(ABC):
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def enter(self, character: 'Character') -> None:
pass
@abstractmethod
def exit(self, character: 'Character') -> None:
pass
@abstractmethod
def update(self, character: 'Character', delta_time: float) -> None:
pass
@abstractmethod
def attack(self, character: 'Character', target: 'Character') -> str:
pass
@abstractmethod
def take_damage(self, character: 'Character', amount: int) -> str:
pass
@abstractmethod
def cast_spell(self, character: 'Character', spell_name: str) -> str:
pass
@abstractmethod
def move(self, character: 'Character', direction: str) -> str:
pass
class IdleState(CharacterState):
@property
def name(self) -> str:
return "IDLE"
def enter(self, character: 'Character') -> None:
print(f" [{character.name}] 进入待机状态")
def exit(self, character: 'Character') -> None:
pass
def update(self, character: 'Character', delta_time: float) -> None:
stats = character.stats
stats.health = min(stats.max_health, stats.health + int(5 * delta_time))
stats.mana = min(stats.max_mana, stats.mana + int(2 * delta_time))
stats.stamina = min(stats.max_stamina, stats.stamina + int(10 * delta_time))
def attack(self, character: 'Character', target: 'Character') -> str:
if not character.stats.has_stamina(10):
return "体力不足,无法攻击"
character.stats.stamina -= 10
character.set_state(AttackingState())
return character._state.attack(character, target)
def take_damage(self, character: 'Character', amount: int) -> str:
actual_damage = max(1, amount - character.stats.defense)
character.stats.health -= actual_damage
if character.stats.health <= 0:
character.set_state(DeadState())
return f"{character.name} 受到 {actual_damage} 点伤害,死亡!"
return f"{character.name} 受到 {actual_damage} 点伤害"
def cast_spell(self, character: 'Character', spell_name: str) -> str:
spell_costs = {"fireball": 20, "heal": 15, "shield": 10}
cost = spell_costs.get(spell_name, 0)
if not character.stats.has_mana(cost):
return f"法力不足,无法施放 {spell_name}"
character.stats.mana -= cost
character.set_state(CastingState(spell_name))
return f"{character.name} 开始施放 {spell_name}"
def move(self, character: 'Character', direction: str) -> str:
if not character.stats.has_stamina(5):
return "体力不足,无法移动"
character.set_state(MovingState(direction))
return character._state.move(character, direction)
class AttackingState(CharacterState):
@property
def name(self) -> str:
return "ATTACKING"
def enter(self, character: 'Character') -> None:
print(f" [{character.name}] 进入攻击状态")
character._attack_timer = 0.5
def exit(self, character: 'Character') -> None:
character._attack_timer = 0
def update(self, character: 'Character', delta_time: float) -> None:
character._attack_timer -= delta_time
if character._attack_timer <= 0:
character.set_state(IdleState())
def attack(self, character: 'Character', target: 'Character') -> str:
return "正在攻击中,无法再次攻击"
def take_damage(self, character: 'Character', amount: int) -> str:
actual_damage = max(1, amount - character.stats.defense // 2)
character.stats.health -= actual_damage
if character.stats.health <= 0:
character.set_state(DeadState())
return f"{character.name} 受到 {actual_damage} 点伤害,死亡!"
return f"{character.name} 攻击中受到 {actual_damage} 点伤害"
def cast_spell(self, character: 'Character', spell_name: str) -> str:
return "正在攻击中,无法施法"
def move(self, character: 'Character', direction: str) -> str:
return "正在攻击中,无法移动"
class MovingState(CharacterState):
def __init__(self, direction: str = "forward"):
self._direction = direction
self._move_time = 0.0
@property
def name(self) -> str:
return "MOVING"
def enter(self, character: 'Character') -> None:
print(f" [{character.name}] 开始向 {self._direction} 移动")
self._move_time = 0.0
def exit(self, character: 'Character') -> None:
pass
def update(self, character: 'Character', delta_time: float) -> None:
character.stats.stamina = max(0, character.stats.stamina - int(5 * delta_time))
self._move_time += delta_time
if character.stats.stamina <= 0 or self._move_time >= 2.0:
character.set_state(IdleState())
def attack(self, character: 'Character', target: 'Character') -> str:
character.set_state(AttackingState())
return character._state.attack(character, target)
def take_damage(self, character: 'Character', amount: int) -> str:
actual_damage = max(1, amount - character.stats.defense // 2)
character.stats.health -= actual_damage
if character.stats.health <= 0:
character.set_state(DeadState())
return f"{character.name} 受到 {actual_damage} 点伤害,死亡!"
return f"{character.name} 移动中受到 {actual_damage} 点伤害"
def cast_spell(self, character: 'Character', spell_name: str) -> str:
return "移动中无法施法"
def move(self, character: 'Character', direction: str) -> str:
self._direction = direction
self._move_time = 0.0
return f"{character.name} 改变方向,向 {direction} 移动"
class CastingState(CharacterState):
def __init__(self, spell_name: str):
self._spell_name = spell_name
self._cast_time = 1.0
@property
def name(self) -> str:
return "CASTING"
def enter(self, character: 'Character') -> None:
print(f" [{character.name}] 正在施放 {self._spell_name}")
def exit(self, character: 'Character') -> None:
pass
def update(self, character: 'Character', delta_time: float) -> None:
self._cast_time -= delta_time
if self._cast_time <= 0:
self._complete_cast(character)
character.set_state(IdleState())
def _complete_cast(self, character: 'Character') -> None:
if self._spell_name == "heal":
heal_amount = 30
character.stats.health = min(character.stats.max_health, character.stats.health + heal_amount)
print(f" [{character.name}] 治疗完成,恢复 {heal_amount} 生命值")
elif self._spell_name == "shield":
character.stats.defense += 10
print(f" [{character.name}] 护盾激活,防御+10")
def attack(self, character: 'Character', target: 'Character') -> str:
return "正在施法中"
def take_damage(self, character: 'Character', amount: int) -> str:
actual_damage = max(1, amount - character.stats.defense)
character.stats.health -= actual_damage
if character.stats.health <= 0:
character.set_state(DeadState())
return f"{character.name} 受到 {actual_damage} 点伤害,死亡!"
return f"{character.name} 施法中受到 {actual_damage} 点伤害"
def cast_spell(self, character: 'Character', spell_name: str) -> str:
return "正在施法中"
def move(self, character: 'Character', direction: str) -> str:
return "施法中无法移动"
class StunnedState(CharacterState):
def __init__(self, duration: float = 2.0):
self._duration = duration
@property
def name(self) -> str:
return "STUNNED"
def enter(self, character: 'Character') -> None:
print(f" [{character.name}] 被眩晕!")
def exit(self, character: 'Character') -> None:
print(f" [{character.name}] 眩晕结束")
def update(self, character: 'Character', delta_time: float) -> None:
self._duration -= delta_time
if self._duration <= 0:
character.set_state(IdleState())
def attack(self, character: 'Character', target: 'Character') -> str:
return "眩晕中,无法行动"
def take_damage(self, character: 'Character', amount: int) -> str:
actual_damage = max(1, amount - character.stats.defense // 2)
character.stats.health -= actual_damage
if character.stats.health <= 0:
character.set_state(DeadState())
return f"{character.name} 受到 {actual_damage} 点伤害,死亡!"
return f"{character.name} 眩晕中受到 {actual_damage} 点伤害"
def cast_spell(self, character: 'Character', spell_name: str) -> str:
return "眩晕中,无法施法"
def move(self, character: 'Character', direction: str) -> str:
return "眩晕中,无法移动"
class DeadState(CharacterState):
@property
def name(self) -> str:
return "DEAD"
def enter(self, character: 'Character') -> None:
print(f" [{character.name}] 死亡!")
def exit(self, character: 'Character') -> None:
print(f" [{character.name}] 复活!")
def update(self, character: 'Character', delta_time: float) -> None:
pass
def attack(self, character: 'Character', target: 'Character') -> str:
return "已死亡"
def take_damage(self, character: 'Character', amount: int) -> str:
return "已死亡"
def cast_spell(self, character: 'Character', spell_name: str) -> str:
return "已死亡"
def move(self, character: 'Character', direction: str) -> str:
return "已死亡"
class Character:
def __init__(self, name: str):
self.name = name
self.stats = CharacterStats()
self._state: CharacterState = IdleState()
self._attack_timer: float = 0
self._state.enter(self)
@property
def state(self) -> str:
return self._state.name
def set_state(self, new_state: CharacterState) -> None:
self._state.exit(self)
self._state = new_state
self._state.enter(self)
def update(self, delta_time: float) -> None:
self._state.update(self, delta_time)
def attack(self, target: 'Character') -> str:
return self._state.attack(self, target)
def take_damage(self, amount: int) -> str:
return self._state.take_damage(self, amount)
def cast_spell(self, spell_name: str) -> str:
return self._state.cast_spell(self, spell_name)
def move(self, direction: str) -> str:
return self._state.move(self, direction)
def stun(self, duration: float = 2.0) -> str:
self.set_state(StunnedState(duration))
return f"{self.name} 被眩晕 {duration} 秒"
def revive(self) -> str:
if self._state.name != "DEAD":
return "角色未死亡"
self.stats.health = self.stats.max_health // 2
self.stats.mana = self.stats.max_mana // 2
self.stats.stamina = self.stats.max_stamina
self.set_state(IdleState())
return f"{self.name} 复活!"
hero = Character("勇士")
enemy = Character("哥布林")
print(f"\n=== 角色状态: {hero.state} ===")
print(f"生命: {hero.stats.health}/{hero.stats.max_health}")
print(f"\n--- 攻击 ---")
print(hero.attack(enemy))
print(f"角色状态: {hero.state}")
print(f"\n--- 更新 (0.6秒) ---")
hero.update(0.6)
print(f"角色状态: {hero.state}")
print(f"\n--- 移动 ---")
print(hero.move("forward"))
print(f"角色状态: {hero.state}")
print(f"\n--- 施法 ---")
print(hero.cast_spell("heal"))
print(f"角色状态: {hero.state}")
print(f"\n--- 更新 (1.1秒) ---")
hero.update(1.1)
print(f"角色状态: {hero.state}")
print(f"生命: {hero.stats.health}/{hero.stats.max_health}")
print(f"\n--- 受伤 ---")
print(hero.take_damage(30))
print(f"生命: {hero.stats.health}/{hero.stats.max_health}")
print(f"\n--- 眩晕 ---")
print(hero.stun(2.0))
print(f"角色状态: {hero.state}")
print(hero.attack(enemy))
print(f"\n--- 致命伤害 ---")
print(hero.take_damage(200))
print(f"角色状态: {hero.state}")
print(f"生命: {hero.stats.health}/{hero.stats.max_health}")
print(f"\n--- 复活 ---")
print(hero.revive())
print(f"角色状态: {hero.state}")
print(f"生命: {hero.stats.health}/{hero.stats.max_health}")20.5.3 网络连接状态管理
from abc import ABC, abstractmethod
from typing import Optional, Callable, Dict, Any, List
from dataclasses import dataclass, field
from datetime import datetime
import time
import random
@dataclass
class ConnectionStats:
bytes_sent: int = 0
bytes_received: int = 0
messages_sent: int = 0
messages_received: int = 0
connect_time: Optional[datetime] = None
last_activity: Optional[datetime] = None
reconnect_attempts: int = 0
class ConnectionState(ABC):
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def connect(self, connection: 'NetworkConnection') -> str:
pass
@abstractmethod
def disconnect(self, connection: 'NetworkConnection') -> str:
pass
@abstractmethod
def send(self, connection: 'NetworkConnection', data: bytes) -> str:
pass
@abstractmethod
def receive(self, connection: 'NetworkConnection') -> str:
pass
@abstractmethod
def timeout(self, connection: 'NetworkConnection') -> str:
pass
@abstractmethod
def error(self, connection: 'NetworkConnection', error_msg: str) -> str:
pass
def enter(self, connection: 'NetworkConnection') -> None:
pass
def exit(self, connection: 'NetworkConnection') -> None:
pass
class DisconnectedState(ConnectionState):
@property
def name(self) -> str:
return "DISCONNECTED"
def connect(self, connection: 'NetworkConnection') -> str:
connection.set_state(ConnectingState())
return "开始连接..."
def disconnect(self, connection: 'NetworkConnection') -> str:
return "已经断开连接"
def send(self, connection: 'NetworkConnection', data: bytes) -> str:
return "错误: 未连接"
def receive(self, connection: 'NetworkConnection') -> str:
return "错误: 未连接"
def timeout(self, connection: 'NetworkConnection') -> str:
return "未连接,无超时"
def error(self, connection: 'NetworkConnection', error_msg: str) -> str:
return f"未连接: {error_msg}"
class ConnectingState(ConnectionState):
MAX_RETRIES = 3
RETRY_DELAY = 1.0
@property
def name(self) -> str:
return "CONNECTING"
def enter(self, connection: 'NetworkConnection') -> None:
connection._retry_count = 0
print(f" [Connection] 开始连接 {connection.server_address}")
def connect(self, connection: 'NetworkConnection') -> str:
return "正在连接中..."
def disconnect(self, connection: 'NetworkConnection') -> str:
connection.set_state(DisconnectedState())
return "连接已取消"
def send(self, connection: 'NetworkConnection', data: bytes) -> str:
return "错误: 正在连接中"
def receive(self, connection: 'NetworkConnection') -> str:
return "错误: 正在连接中"
def timeout(self, connection: 'NetworkConnection') -> str:
connection._retry_count += 1
connection._stats.reconnect_attempts += 1
if connection._retry_count >= self.MAX_RETRIES:
connection.set_state(DisconnectedState())
return f"连接超时,已达到最大重试次数 ({self.MAX_RETRIES})"
print(f" [Connection] 连接超时,重试 {connection._retry_count}/{self.MAX_RETRIES}")
return f"连接超时,正在重试..."
def error(self, connection: 'NetworkConnection', error_msg: str) -> str:
connection._retry_count += 1
if connection._retry_count >= self.MAX_RETRIES:
connection.set_state(DisconnectedState())
return f"连接失败: {error_msg}"
return f"连接错误: {error_msg},正在重试..."
class ConnectedState(ConnectionState):
IDLE_TIMEOUT = 30.0
@property
def name(self) -> str:
return "CONNECTED"
def enter(self, connection: 'NetworkConnection') -> None:
connection._stats.connect_time = datetime.now()
connection._stats.last_activity = datetime.now()
connection._retry_count = 0
print(f" [Connection] 已连接到 {connection.server_address}")
def exit(self, connection: 'NetworkConnection') -> None:
duration = 0
if connection._stats.connect_time:
duration = (datetime.now() - connection._stats.connect_time).total_seconds()
print(f" [Connection] 断开连接,持续时长: {duration:.1f}秒")
def connect(self, connection: 'NetworkConnection') -> str:
return "已经连接"
def disconnect(self, connection: 'NetworkConnection') -> str:
connection.set_state(DisconnectedState())
return "连接已断开"
def send(self, connection: 'NetworkConnection', data: bytes) -> str:
connection._stats.bytes_sent += len(data)
connection._stats.messages_sent += 1
connection._stats.last_activity = datetime.now()
return f"发送 {len(data)} 字节"
def receive(self, connection: 'NetworkConnection') -> str:
received = random.randint(64, 1024)
connection._stats.bytes_received += received
connection._stats.messages_received += 1
connection._stats.last_activity = datetime.now()
return f"接收 {received} 字节"
def timeout(self, connection: 'NetworkConnection') -> str:
connection.set_state(IdleState())
return "连接空闲,进入空闲状态"
def error(self, connection: 'NetworkConnection', error_msg: str) -> str:
connection.set_state(ReconnectingState())
return f"连接错误: {error_msg},尝试重连..."
class IdleState(ConnectionState):
@property
def name(self) -> str:
return "IDLE"
def enter(self, connection: 'NetworkConnection') -> None:
print(f" [Connection] 进入空闲状态")
def exit(self, connection: 'NetworkConnection') -> None:
pass
def connect(self, connection: 'NetworkConnection') -> str:
connection.set_state(ConnectedState())
return "连接已激活"
def disconnect(self, connection: 'NetworkConnection') -> str:
connection.set_state(DisconnectedState())
return "连接已断开"
def send(self, connection: 'NetworkConnection', data: bytes) -> str:
connection.set_state(ConnectedState())
return connection._state.send(connection, data)
def receive(self, connection: 'NetworkConnection') -> str:
connection.set_state(ConnectedState())
return connection._state.receive(connection)
def timeout(self, connection: 'NetworkConnection') -> str:
connection.set_state(DisconnectedState())
return "空闲超时,连接已断开"
def error(self, connection: 'NetworkConnection', error_msg: str) -> str:
connection.set_state(ReconnectingState())
return f"错误: {error_msg},尝试重连..."
class ReconnectingState(ConnectionState):
@property
def name(self) -> str:
return "RECONNECTING"
def enter(self, connection: 'NetworkConnection') -> None:
connection._stats.reconnect_attempts += 1
print(f" [Connection] 正在重连... (第 {connection._stats.reconnect_attempts} 次)")
def connect(self, connection: 'NetworkConnection') -> str:
return "正在重连中..."
def disconnect(self, connection: 'NetworkConnection') -> str:
connection.set_state(DisconnectedState())
return "重连已取消"
def send(self, connection: 'NetworkConnection', data: bytes) -> str:
return "错误: 正在重连中"
def receive(self, connection: 'NetworkConnection') -> str:
return "错误: 正在重连中"
def timeout(self, connection: 'NetworkConnection') -> str:
if connection._stats.reconnect_attempts >= 5:
connection.set_state(DisconnectedState())
return "重连失败,已断开"
connection._stats.reconnect_attempts += 1
return "重连超时,继续尝试..."
def error(self, connection: 'NetworkConnection', error_msg: str) -> str:
connection.set_state(DisconnectedState())
return f"重连失败: {error_msg}"
class NetworkConnection:
def __init__(self, server_address: str):
self.server_address = server_address
self._state: ConnectionState = DisconnectedState()
self._stats = ConnectionStats()
self._retry_count: int = 0
self._listeners: List[Callable] = []
@property
def state(self) -> str:
return self._state.name
@property
def stats(self) -> ConnectionStats:
return self._stats
def set_state(self, new_state: ConnectionState) -> None:
old_state = self._state
self._state.exit(self)
self._state = new_state
self._state.enter(self)
for listener in self._listeners:
listener(old_state.name, new_state.name)
def add_listener(self, listener: Callable) -> None:
self._listeners.append(listener)
def connect(self) -> str:
return self._state.connect(self)
def disconnect(self) -> str:
return self._state.disconnect(self)
def send(self, data: bytes) -> str:
return self._state.send(self, data)
def receive(self) -> str:
return self._state.receive(self)
def timeout(self) -> str:
return self._state.timeout(self)
def error(self, error_msg: str) -> str:
return self._state.error(self, error_msg)
def get_stats_report(self) -> Dict[str, Any]:
return {
"state": self._state.name,
"server": self.server_address,
"bytes_sent": self._stats.bytes_sent,
"bytes_received": self._stats.bytes_received,
"messages_sent": self._stats.messages_sent,
"messages_received": self._stats.messages_received,
"reconnect_attempts": self._stats.reconnect_attempts,
"connected_since": self._stats.connect_time.isoformat() if self._stats.connect_time else None
}
def state_change_logger(old_state: str, new_state: str):
print(f" [Listener] 状态变化: {old_state} -> {new_state}")
conn = NetworkConnection("api.example.com:443")
conn.add_listener(state_change_logger)
print(f"初始状态: {conn.state}")
print(f"\n--- 连接 ---")
print(conn.connect())
print(f"当前状态: {conn.state}")
print(f"\n--- 发送数据 ---")
print(conn.send(b"Hello, Server!"))
print(conn.send(b"GET /api/data HTTP/1.1"))
print(f"\n--- 接收数据 ---")
print(conn.receive())
print(f"\n--- 模拟超时 ---")
print(conn.timeout())
print(f"当前状态: {conn.state}")
print(f"\n--- 激活连接 ---")
print(conn.send(b"Keep-alive ping"))
print(f"当前状态: {conn.state}")
print(f"\n--- 模拟错误 ---")
print(conn.error("Connection reset by peer"))
print(f"当前状态: {conn.state}")
print(f"\n--- 断开连接 ---")
print(conn.disconnect())
print(f"当前状态: {conn.state}")
print(f"\n统计报告: {conn.get_stats_report()}")20.6 模式变体
20.6.1 状态模式变体对比
| 变体 | 特点 | 适用场景 | 复杂度 |
|---|---|---|---|
| 经典状态模式 | 每个状态一个类 | 简单状态转换 | ★☆☆ |
| 枚举状态机 | 使用枚举定义状态 | 状态较少、转换简单 | ★☆☆ |
| 层次状态机 | 支持状态嵌套 | 复杂状态结构 | ★★★ |
| 并发状态机 | 多个独立状态区域 | 多维度状态管理 | ★★★ |
| 状态表驱动 | 转换规则存储在表中 | 需要动态配置转换 | ★★☆ |
20.6.2 状态表驱动实现
from typing import Dict, List, Tuple, Callable, Any, Optional
from dataclasses import dataclass
from enum import Enum
class State(Enum):
NEW = "new"
PROCESSING = "processing"
APPROVED = "approved"
REJECTED = "rejected"
COMPLETED = "completed"
@dataclass
class TransitionRule:
trigger: str
source: State
dest: State
guard: Optional[Callable[[], bool]] = None
action: Optional[Callable[[], None]] = None
class TableDrivenStateMachine:
def __init__(self, initial_state: State):
self._state = initial_state
self._rules: List[TransitionRule] = []
self._state_handlers: Dict[State, Dict[str, Callable]] = {}
self._history: List[Tuple[State, str, State]] = []
def add_rule(self, rule: TransitionRule) -> None:
self._rules.append(rule)
def add_rules(self, rules: List[TransitionRule]) -> None:
self._rules.extend(rules)
def set_handler(self, state: State, event: str, handler: Callable) -> None:
if state not in self._state_handlers:
self._state_handlers[state] = {}
self._state_handlers[state][event] = handler
@property
def state(self) -> State:
return self._state
def trigger(self, event: str, *args, **kwargs) -> bool:
for rule in self._rules:
if rule.trigger == event and rule.source == self._state:
if rule.guard and not rule.guard():
continue
handler = self._state_handlers.get(self._state, {}).get(event)
if handler:
handler(*args, **kwargs)
if rule.action:
rule.action()
old_state = self._state
self._state = rule.dest
self._history.append((old_state, event, rule.dest))
return True
return False
def get_available_triggers(self) -> List[str]:
return [r.trigger for r in self._rules if r.source == self._state]
def get_history(self) -> List[Tuple[State, str, State]]:
return self._history.copy()
def log_transition():
print(" [Action] 执行状态转换")
machine = TableDrivenStateMachine(State.NEW)
machine.add_rules([
TransitionRule("submit", State.NEW, State.PROCESSING, action=log_transition),
TransitionRule("approve", State.PROCESSING, State.APPROVED, action=log_transition),
TransitionRule("reject", State.PROCESSING, State.REJECTED, action=log_transition),
TransitionRule("resubmit", State.REJECTED, State.PROCESSING, action=log_transition),
TransitionRule("complete", State.APPROVED, State.COMPLETED, action=log_transition),
TransitionRule("restart", State.REJECTED, State.NEW),
])
machine.set_handler(State.PROCESSING, "approve", lambda: print(" [Handler] 审批通过处理"))
print(f"初始状态: {machine.state.value}")
print(f"可用触发器: {machine.get_available_triggers()}")
machine.trigger("submit")
print(f"当前状态: {machine.state.value}")
machine.trigger("approve")
print(f"当前状态: {machine.state.value}")
machine.trigger("complete")
print(f"当前状态: {machine.state.value}")
print(f"\n转换历史: {[(s.value, e, d.value) for s, e, d in machine.get_history()]}")20.7 反模式与最佳实践
20.7.1 常见反模式
反模式1:状态爆炸
class BadExample:
def __init__(self):
self._state = "A"
self._flag1 = False
self._flag2 = False
self._flag3 = False
def handle(self):
if self._state == "A":
if self._flag1:
if self._flag2:
pass
else:
pass
else:
if self._flag3:
pass
elif self._state == "B":
pass
class GoodExample:
def __init__(self):
self._state_machine = HierarchicalStateMachine()反模式2:状态泄漏
class BadExample:
def __init__(self):
self._state = IdleState()
self._state._context_data = {}
def change_state(self, new_state):
self._state = new_state
class GoodExample:
def __init__(self):
self._state = IdleState()
self._context_data = {}
def change_state(self, new_state):
self._state.exit(self)
self._state = new_state
self._state.enter(self)反模式3:过度设计
class BadExample:
def __init__(self):
self._state = StateA()
def get_name(self):
return self._state.get_name()
def get_description(self):
return self._state.get_description()
def get_color(self):
return self._state.get_color()
class GoodExample:
STATES = {
"A": {"name": "State A", "description": "...", "color": "red"},
"B": {"name": "State B", "description": "...", "color": "blue"},
}
def __init__(self):
self._state = "A"
@property
def state_info(self):
return self.STATES[self._state]20.7.2 最佳实践清单
| 实践 | 描述 | 重要性 |
|---|---|---|
| 状态不可变 | 状态对象应该是无状态或不可变的 | ★★★ |
| 单一职责 | 每个状态类只负责该状态的行为 | ★★★ |
| 状态转换封装 | 状态转换逻辑应在状态类内部 | ★★★ |
| 避免状态爆炸 | 使用层次状态机管理复杂状态 | ★★☆ |
| 提供状态查询 | 提供is_xxx方法便于状态检查 | ★★☆ |
| 记录状态历史 | 便于调试和审计 | ★★☆ |
| 线程安全 | 多线程环境下保护状态转换 | ★★★ |
20.8 决策指南
20.8.1 是否使用状态模式
对象行为依赖状态?
│
┌────┴────┐
│ │
否 是
│ │
↓ ↓
不需要模式 状态数量?
│
┌─────────┼─────────┐
│ │ │
2-3个 4-10个 >10个
│ │ │
↓ ↓ ↓
枚举/条件判断 状态模式 层次状态机20.8.2 实现方式选择
选择实现方式
│
┌────────────────────┼────────────────────┐
│ │ │
↓ ↓ ↓
简单场景 标准场景 复杂场景
│ │ │
↓ ↓ ↓
枚举状态机 ABC状态模式 层次状态机
│ │ │
│ │ ┌───┴───┐
│ │ │ │
↓ ↓ ↓ ↓
状态表驱动 状态机框架 并发状态机 状态表20.8.3 技术选型对照表
| 场景 | 推荐实现 | 理由 |
|---|---|---|
| 订单流程 | ABC状态模式 | 状态清晰,转换明确 |
| 游戏角色 | 层次状态机 | 状态嵌套,行为复杂 |
| 网络协议 | 状态机框架 | 标准化,可测试 |
| UI组件 | 枚举状态机 | 状态简单,性能优先 |
| 工作流引擎 | 状态表驱动 | 动态配置,可扩展 |
| 设备控制 | 并发状态机 | 多维度状态独立 |
20.9 与其他模式的关系
20.9.1 模式组合
状态模式 + 观察者模式:
┌─────────────────────────────────────────────────────────────┐
│ Context │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ State │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ StateA │ │ StateB │ │ StateC │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ │ 状态变化通知 │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Observers │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Observer1 │ │ Observer2 │ │ Observer3 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
状态模式 + 单例模式:
┌─────────────────────────────────────────────────────────────┐
│ State (Singleton) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ _instance: State │ │
│ │ + get_instance(): State │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 优点:状态对象共享,减少内存占用 │
│ 注意:状态对象必须无状态 │
└─────────────────────────────────────────────────────────────┘20.9.2 模式对比
| 模式 | 关系 | 区别 |
|---|---|---|
| 策略模式 | 结构相似 | 策略独立,状态有转换关系 |
| 观察者模式 | 可组合 | 观察者监听状态变化 |
| 责任链模式 | 都处理请求 | 责任链传递请求,状态模式切换行为 |
| 命令模式 | 可组合 | 命令触发状态转换 |
20.10 快速参考卡
20.10.1 核心概念速查
┌─────────────────────────────────────────────────────────────────┐
│ 状态模式速查卡 │
├─────────────────────────────────────────────────────────────────┤
│ 定义:允许对象在内部状态改变时改变行为 │
├─────────────────────────────────────────────────────────────────┤
│ 核心角色: │
│ • State(状态接口):定义状态行为接口 │
│ • ConcreteState:具体状态实现 │
│ • Context(上下文):维护当前状态,委托行为给状态对象 │
├─────────────────────────────────────────────────────────────────┤
│ 关键方法: │
│ • handle(context):处理请求 │
│ • enter(context):进入状态 │
│ • exit(context):退出状态 │
│ • set_state(state):切换状态 │
├─────────────────────────────────────────────────────────────────┤
│ 状态转换方式: │
│ • 状态类控制:状态类决定下一个状态 │
│ • 上下文控制:上下文根据条件切换状态 │
│ • 状态表驱动:通过配置表定义转换规则 │
├─────────────────────────────────────────────────────────────────┤
│ 适用场景: │
│ ✓ 对象行为依赖于其状态 │
│ ✓ 大量条件判断基于状态 │
│ ✓ 状态转换逻辑复杂 │
│ ✓ 需要消除庞大的条件分支语句 │
├─────────────────────────────────────────────────────────────────┤
│ 注意事项: │
│ ✗ 避免状态爆炸(使用层次状态机) │
│ ✗ 状态对象应无状态或不可变 │
│ ✗ 注意状态转换的原子性 │
└─────────────────────────────────────────────────────────────────┘20.10.2 Python实现模板
from abc import ABC, abstractmethod
from typing import Optional
class State(ABC):
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def handle(self, context: 'Context') -> None:
pass
def enter(self, context: 'Context') -> None:
pass
def exit(self, context: 'Context') -> None:
pass
class Context:
def __init__(self, initial_state: State):
self._state = initial_state
self._state.enter(self)
@property
def state(self) -> State:
return self._state
def set_state(self, new_state: State) -> None:
self._state.exit(self)
self._state = new_state
self._state.enter(self)
def request(self) -> None:
self._state.handle(self)20.11 小结
状态模式是管理对象状态变化的核心模式,基于有限状态机理论,将状态相关的行为封装到独立的状态类中。本章从形式化定义出发,深入探讨了状态模式的理论基础和多种实现方式。
关键要点:
理论基础:状态模式基于有限状态机理论,通过状态转移函数定义合法的状态转换
实现方式:从简单的枚举状态机到复杂的层次状态机和并发状态机,满足不同复杂度的需求
企业应用:在订单管理、游戏角色、网络连接等场景中发挥重要作用
模式变体:层次状态机解决状态爆炸问题,并发状态机管理多维度状态
最佳实践:状态对象应无状态,状态转换应原子化,避免过度设计
状态模式是消除条件分支语句的有效手段,理解其原理和实现对于构建可维护、可扩展的系统至关重要。
思考题
状态模式与策略模式在结构上非常相似,它们的本质区别是什么?
如何设计一个支持状态持久化和恢复的状态机框架?
在层次状态机中,如何处理状态的进入和退出动作的传播?
如何避免状态模式中的状态爆炸问题?请给出具体的设计策略。
在并发环境下,如何保证状态转换的原子性和线程安全?