第17章 中介者模式
学习目标
完成本章学习后,读者将能够:
- 理解中介者模式的数学基础与形式化定义
- 掌握对象间通信的集中管理与解耦机制
- 设计并实现多种类型的中介者(集中式、分布式、事件驱动等)
- 应用中介者模式解决复杂系统中的组件协调问题
- 识别中介者模式的适用场景与反模式
17.1 形式化定义
17.1.1 数学定义
中介者模式(Mediator Pattern) 用一个中介对象来封装一系列的对象交互,中介者使各对象不需要显式地相互引用,从而使其耦合松散,而且可以独立地改变它们之间的交互。
从图论角度,中介者模式将网状结构转换为星型结构:
无中介者时的对象关系(网状结构):
$$G_{mesh} = (V, E_{mesh})$$
其中 $V = {c_1, c_2, \ldots, c_n}$ 为组件集合,$E_{mesh} \subseteq V \times V$ 为直接依赖关系。
有中介者时的对象关系(星型结构):
$$G_{star} = (V \cup {m}, E_{star})$$
其中 $m$ 为中介者,$E_{star} = {(c_i, m), (m, c_i) | c_i \in V}$。
耦合度降低:
$$|E_{mesh}| = O(n^2) \rightarrow |E_{star}| = O(n)$$
17.1.2 通信模型
中介者可形式化为消息传递系统:
$$\mathcal{M} = \langle C, S, \Sigma, \delta, \gamma \rangle$$
其中:
- $C = {c_1, c_2, \ldots, c_n}$:组件集合
- $S$:中介者状态集合
- $\Sigma$:事件/消息字母表
- $\delta: S \times C \times \Sigma \rightarrow S$:状态转换函数
- $\gamma: S \times C \times \Sigma \rightarrow 2^{C \times \Sigma^*}$:消息路由函数
消息路由规则:
$$\gamma(s, c_i, e) = {(c_j, e') | c_j \text{ 应接收事件 } e' }$$
17.1.3 依赖反转原则
中介者模式实现了依赖反转:
传统依赖(组件间直接依赖): $$\forall c_i, c_j \in C: \text{depends}(c_i, c_j) \Leftrightarrow c_i \text{ 直接引用 } c_j$$
中介者依赖(通过中介者间接依赖): $$\forall c_i \in C: \text{depends}(c_i, m) \land \text{depends}(m, c_i)$$
耦合度量:
| 度量指标 | 无中介者 | 有中介者 |
|---|---|---|
| 直接依赖数 | $O(n^2)$ | $O(n)$ |
| 间接依赖数 | 0 | $O(n)$ |
| 修改影响范围 | $O(n)$ | $O(1)$ |
| 新增组件代价 | $O(n)$ | $O(1)$ |
17.2 历史背景与演进
17.2.1 发展历程
| 年代 | 里程碑 | 描述 |
|---|---|---|
| 1987年 | Smalltalk MVC | Model-View-Controller中的Controller作为中介者 |
| 1991年 | Windows消息机制 | Windows消息队列作为UI组件间的中介者 |
| 1994年 | GoF设计模式 | 《设计模式》正式收录中介者模式 |
| 1996年 | Java AWT | 事件调度机制采用中介者思想 |
| 2000年 | JMS规范 | Java消息服务实现分布式中介者 |
| 2002年 | .NET事件模型 | 委托和事件机制支持松耦合通信 |
| 2006年 | Ruby on Rails | ActionController作为MVC中介者 |
| 2010年 | Node.js EventEmitter | 事件驱动的中介者实现 |
| 2013年 | Redux | 单一状态管理器作为中介者 |
| 2016年 | 微服务架构 | 消息总线/服务网格作为分布式中介者 |
| 2020年 | Cloud Events | 云原生事件规范标准化中介者通信 |
17.2.2 设计动机
中介者模式的核心设计动机:
- 降低耦合:避免组件间的直接依赖,将网状依赖简化为星型依赖
- 集中控制:将交互逻辑集中管理,便于维护和修改
- 提高复用:组件独立于其他组件,更易于复用
- 简化协议:组件只需与中介者通信,协议更简单
17.3 UML结构图
17.3.1 标准结构
┌─────────────────────────────────────────────────────────────────────┐
│ Mediator Pattern Structure │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ <<interface>> │ │
│ │ Mediator │ │
│ ├─────────────────┤ │
│ │ + notify(sender,│ │
│ │ event) │ │
│ └────────┬────────┘ │
│ │ │
│ │ implements │
│ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ ConcreteMediator│────────>│ Component │ │
│ ├─────────────────┤ knows │ <<abstract>> │ │
│ │ - component1 │ ├─────────────────┤ │
│ │ - component2 │ │ - mediator │ │
│ │ - ... │ ├─────────────────┤ │
│ ├─────────────────┤ │ + set_mediator()│ │
│ │ + notify() │ └────────┬────────┘ │
│ └─────────────────┘ │ extends │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ComponentA │ │ ComponentB │ │ ComponentC │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ + operation1│ │ + operation2│ │ + operation3│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘17.3.2 依赖关系对比
┌─────────────────────────────────────────────────────────────────────┐
│ Dependency Comparison │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【无中介者 - 网状依赖】 │
│ │
│ ┌─────────┐ │
│ │ A │ │
│ └────┬────┘ │
│ ╱ │ ╲ │
│ ╱ │ ╲ │
│ ┌────┴────┐ │ ┌────┴────┐ │
│ │ B │─┼─│ C │ │
│ └────┬────┘ │ └────┬────┘ │
│ ╲ │ ╱ │
│ ╲ │ ╱ │
│ ┌────┴────┐ │
│ │ D │ │
│ └─────────┘ │
│ │
│ 依赖关系: A-B, A-C, A-D, B-C, B-D, C-D (6条) │
│ │
│ ───────────────────────────────────────────────────────────────── │
│ │
│ 【有中介者 - 星型依赖】 │
│ │
│ ┌─────────┐ │
│ │Mediator │ │
│ └────┬────┘ │
│ ╱ │ │ ╲ │
│ ╱ │ │ ╲ │
│ ┌───┴─┐ ┌┴┐ ┌┴┐ ┌─┴───┐ │
│ │ A │ │B│ │C│ │ D │ │
│ └─────┘ └─┘ └─┘ └─────┘ │
│ │
│ 依赖关系: A-M, B-M, C-M, D-M (4条) │
│ │
└─────────────────────────────────────────────────────────────────────┘17.3.3 消息流程序列图
┌─────────────────────────────────────────────────────────────────────┐
│ Message Flow Sequence │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ComponentA Mediator ComponentB ComponentC │
│ │ │ │ │ │
│ │ do_action() │ │ │ │
│ │──────────────> │ │ │
│ │ │ │ │ │
│ │ │ notify(A, event) │ │
│ │ │<──────────────┘ │ │
│ │ │ │ │ │
│ │ │ handle_event() │ │
│ │ │───────────────> │ │
│ │ │ │ │ │
│ │ │ handle_event() │ │
│ │ │───────────────────────────────> │
│ │ │ │ │ │
│ │ │ response() │ │ │
│ │ │<──────────────┘ │ │
│ │ │ │ │ │
│ │ callback() │ │ │ │
│ │<────────────── │ │ │
│ │ │ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────┘17.4 标准实现
17.4.1 基于ABC的中介者框架
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from enum import Enum, auto
from datetime import datetime
class EventType(Enum):
"""事件类型枚举"""
CREATE = auto()
UPDATE = auto()
DELETE = auto()
SELECT = auto()
VALIDATE = auto()
SUBMIT = auto()
CANCEL = auto()
@dataclass
class Event:
"""事件数据结构"""
type: EventType
source: str
data: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
def __repr__(self) -> str:
return f"Event({self.type.name}, from={self.source})"
class Mediator(ABC):
"""中介者抽象基类"""
@abstractmethod
def notify(self, event: Event) -> None:
"""处理组件通知"""
pass
@abstractmethod
def register(self, component: 'Component', name: str) -> None:
"""注册组件"""
pass
@abstractmethod
def unregister(self, name: str) -> None:
"""注销组件"""
pass
class Component(ABC):
"""组件抽象基类"""
def __init__(self, name: str):
self._name = name
self._mediator: Optional[Mediator] = None
@property
def name(self) -> str:
return self._name
def set_mediator(self, mediator: Mediator) -> None:
self._mediator = mediator
def send_event(self, event_type: EventType, data: Optional[Dict[str, Any]] = None) -> None:
"""发送事件到中介者"""
if self._mediator:
event = Event(
type=event_type,
source=self._name,
data=data or {}
)
self._mediator.notify(event)
@abstractmethod
def receive_event(self, event: Event) -> None:
"""接收事件"""
pass
class ConcreteMediator(Mediator):
"""具体中介者实现"""
def __init__(self):
self._components: Dict[str, Component] = {}
self._event_handlers: Dict[EventType, List[Callable[[Event], None]]] = {}
self._event_log: List[Event] = []
def register(self, component: Component, name: Optional[str] = None) -> None:
name = name or component.name
self._components[name] = component
component.set_mediator(self)
def unregister(self, name: str) -> None:
if name in self._components:
self._components[name].set_mediator(None)
del self._components[name]
def subscribe(self, event_type: EventType, handler: Callable[[Event], None]) -> None:
"""订阅事件"""
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
self._event_handlers[event_type].append(handler)
def notify(self, event: Event) -> None:
"""处理事件通知"""
self._event_log.append(event)
print(f"[中介者] 收到事件: {event}")
if event.type in self._event_handlers:
for handler in self._event_handlers[event.type]:
handler(event)
self._route_event(event)
def _route_event(self, event: Event) -> None:
"""路由事件到相关组件"""
pass
def get_event_log(self) -> List[Event]:
"""获取事件日志"""
return self._event_log.copy()
def broadcast(self, event: Event, exclude: Optional[str] = None) -> None:
"""广播事件到所有组件"""
for name, component in self._components.items():
if name != exclude and name != event.source:
component.receive_event(event)
class ButtonComponent(Component):
"""按钮组件"""
def __init__(self, name: str, label: str = ""):
super().__init__(name)
self._label = label
self._enabled = True
@property
def enabled(self) -> bool:
return self._enabled
def set_enabled(self, enabled: bool) -> None:
self._enabled = enabled
state = "可用" if enabled else "禁用"
print(f" [{self._name}] 按钮状态: {state}")
def click(self) -> None:
if self._enabled:
print(f" [{self._name}] 按钮被点击")
self.send_event(EventType.SUBMIT, {"label": self._label})
def receive_event(self, event: Event) -> None:
if event.type == EventType.UPDATE:
if "enabled" in event.data:
self.set_enabled(event.data["enabled"])
class TextInputComponent(Component):
"""文本输入组件"""
def __init__(self, name: str, placeholder: str = ""):
super().__init__(name)
self._value = ""
self._placeholder = placeholder
@property
def value(self) -> str:
return self._value
def set_value(self, value: str) -> None:
self._value = value
print(f" [{self._name}] 值已更新: '{value}'")
self.send_event(EventType.UPDATE, {"value": value})
def clear(self) -> None:
self._value = ""
print(f" [{self._name}] 已清空")
self.send_event(EventType.UPDATE, {"value": ""})
def receive_event(self, event: Event) -> None:
if event.type == EventType.DELETE:
self.clear()
class LabelComponent(Component):
"""标签组件"""
def __init__(self, name: str):
super().__init__(name)
self._text = ""
def set_text(self, text: str) -> None:
self._text = text
print(f" [{self._name}] 标签文本: '{text}'")
def receive_event(self, event: Event) -> None:
if event.type == EventType.UPDATE:
if "message" in event.data:
self.set_text(event.data["message"])
print("中介者模式标准实现:")
mediator = ConcreteMediator()
title_input = TextInputComponent("title_input", "请输入标题")
content_input = TextInputComponent("content_input", "请输入内容")
save_button = ButtonComponent("save_button", "保存")
clear_button = ButtonComponent("clear_button", "清空")
status_label = LabelComponent("status_label")
mediator.register(title_input)
mediator.register(content_input)
mediator.register(save_button)
mediator.register(clear_button)
mediator.register(status_label)
def validate_inputs(event: Event) -> None:
has_title = bool(title_input.value)
has_content = bool(content_input.value)
save_button.set_enabled(has_title and has_content)
mediator.subscribe(EventType.UPDATE, validate_inputs)
print("\n--- 初始状态 ---")
save_button.set_enabled(False)
print("\n--- 输入标题 ---")
title_input.set_value("我的文章")
print("\n--- 输入内容 ---")
content_input.set_value("这是文章内容")
print("\n--- 点击保存 ---")
save_button.click()
print("\n--- 点击清空 ---")
clear_button.click()17.4.2 事件驱动中介者
from typing import Dict, List, Callable, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import threading
from queue import Queue
import time
class EventPriority(Enum):
"""事件优先级"""
LOW = 1
NORMAL = 5
HIGH = 10
CRITICAL = 20
@dataclass(order=True)
class PrioritizedEvent:
"""优先级事件"""
priority: int
event: Any = field(compare=False)
timestamp: float = field(default_factory=time.time, compare=False)
class EventBus:
"""事件总线 - 分布式中介者"""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
self._once_subscribers: Dict[str, List[Callable]] = {}
self._wildcard_subscribers: List[Callable] = []
self._event_queue: Queue = Queue()
self._running = False
self._lock = threading.Lock()
def on(self, event_type: str, handler: Callable) -> 'EventBus':
"""订阅事件"""
with self._lock:
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
return self
def once(self, event_type: str, handler: Callable) -> 'EventBus':
"""一次性订阅"""
with self._lock:
if event_type not in self._once_subscribers:
self._once_subscribers[event_type] = []
self._once_subscribers[event_type].append(handler)
return self
def on_any(self, handler: Callable) -> 'EventBus':
"""订阅所有事件"""
with self._lock:
self._wildcard_subscribers.append(handler)
return self
def off(self, event_type: str, handler: Callable) -> 'EventBus':
"""取消订阅"""
with self._lock:
if event_type in self._subscribers:
if handler in self._subscribers[event_type]:
self._subscribers[event_type].remove(handler)
return self
def emit(self, event_type: str, data: Any = None) -> 'EventBus':
"""发布事件"""
event = {
'type': event_type,
'data': data,
'timestamp': datetime.now()
}
self._event_queue.put(event)
return self
def emit_sync(self, event_type: str, data: Any = None) -> 'EventBus':
"""同步发布事件"""
event = {
'type': event_type,
'data': data,
'timestamp': datetime.now()
}
self._dispatch_event(event)
return self
def _dispatch_event(self, event: Dict) -> None:
"""分发事件"""
event_type = event['type']
for handler in self._wildcard_subscribers:
handler(event)
if event_type in self._subscribers:
for handler in self._subscribers[event_type]:
handler(event)
if event_type in self._once_subscribers:
handlers = self._once_subscribers[event_type][:]
self._once_subscribers[event_type] = []
for handler in handlers:
handler(event)
def start_processing(self) -> None:
"""开始处理事件队列"""
self._running = True
while self._running:
try:
event = self._event_queue.get(timeout=0.1)
self._dispatch_event(event)
except:
pass
def stop_processing(self) -> None:
"""停止处理"""
self._running = False
class EventEmitter:
"""事件发射器混入类"""
def __init__(self):
self._event_bus: Optional[EventBus] = None
def set_event_bus(self, event_bus: EventBus) -> None:
self._event_bus = event_bus
def emit(self, event_type: str, data: Any = None) -> None:
if self._event_bus:
self._event_bus.emit(event_type, data)
class ComponentBase(EventEmitter):
"""组件基类"""
def __init__(self, name: str):
super().__init__()
self._name = name
@property
def name(self) -> str:
return self._name
class SensorComponent(ComponentBase):
"""传感器组件"""
def __init__(self, name: str):
super().__init__(name)
self._value = 0
def update_value(self, value: float) -> None:
self._value = value
self.emit('sensor.update', {
'source': self._name,
'value': value
})
class ActuatorComponent(ComponentBase):
"""执行器组件"""
def __init__(self, name: str):
super().__init__(name)
self._state = False
def activate(self) -> None:
self._state = True
print(f" [{self._name}] 执行器已激活")
def deactivate(self) -> None:
self._state = False
print(f" [{self._name}] 执行器已停用")
class ControllerMediator:
"""控制器中介者"""
def __init__(self, event_bus: EventBus):
self._event_bus = event_bus
self._threshold = 50.0
self._actuator: Optional[ActuatorComponent] = None
event_bus.on('sensor.update', self._on_sensor_update)
def set_actuator(self, actuator: ActuatorComponent) -> None:
self._actuator = actuator
def set_threshold(self, threshold: float) -> None:
self._threshold = threshold
def _on_sensor_update(self, event: Dict) -> None:
data = event['data']
value = data['value']
source = data['source']
print(f" [控制器] 收到传感器 {source} 数据: {value}")
if self._actuator:
if value > self._threshold:
self._actuator.activate()
else:
self._actuator.deactivate()
print("\n事件驱动中介者示例:")
event_bus = EventBus()
sensor = SensorComponent("温度传感器")
actuator = ActuatorComponent("冷却风扇")
controller = ControllerMediator(event_bus)
sensor.set_event_bus(event_bus)
actuator.set_event_bus(event_bus)
controller.set_actuator(actuator)
print(" 温度低于阈值:")
sensor.update_value(30.0)
print("\n 温度高于阈值:")
sensor.update_value(60.0)17.4.3 层次化中介者
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
@dataclass
class Message:
"""消息结构"""
source: str
target: Optional[str]
content: Any
metadata: Dict[str, Any] = field(default_factory=dict)
class HierarchicalMediator(ABC):
"""层次化中介者基类"""
def __init__(self, name: str, parent: Optional['HierarchicalMediator'] = None):
self._name = name
self._parent = parent
self._children: Dict[str, 'HierarchicalMediator'] = {}
self._local_components: Dict[str, Any] = {}
def add_child(self, child: 'HierarchicalMediator') -> None:
self._children[child._name] = child
def remove_child(self, name: str) -> None:
if name in self._children:
del self._children[name]
def register_component(self, component: Any, name: str) -> None:
self._local_components[name] = component
@abstractmethod
def send(self, message: Message) -> bool:
"""发送消息"""
pass
@abstractmethod
def receive(self, message: Message) -> None:
"""接收消息"""
pass
def broadcast_up(self, message: Message) -> None:
"""向上广播"""
if self._parent:
self._parent.receive(message)
def broadcast_down(self, message: Message, exclude: Optional[str] = None) -> None:
"""向下广播"""
for name, child in self._children.items():
if name != exclude:
child.receive(message)
def broadcast_local(self, message: Message, exclude: Optional[str] = None) -> None:
"""本地广播"""
for name, component in self._local_components.items():
if name != exclude and name != message.source:
if hasattr(component, 'receive'):
component.receive(message)
class RootMediator(HierarchicalMediator):
"""根中介者"""
def __init__(self, name: str = "root"):
super().__init__(name, None)
self._routing_table: Dict[str, str] = {}
def register_route(self, component_name: str, mediator_name: str) -> None:
"""注册路由"""
self._routing_table[component_name] = mediator_name
def send(self, message: Message) -> bool:
if message.target:
if message.target in self._local_components:
component = self._local_components[message.target]
if hasattr(component, 'receive'):
component.receive(message)
return True
if message.target in self._routing_table:
mediator_name = self._routing_table[message.target]
if mediator_name in self._children:
self._children[mediator_name].receive(message)
return True
for child in self._children.values():
if child.send(message):
return True
self.broadcast_down(message, message.source)
return True
def receive(self, message: Message) -> None:
self.send(message)
class LocalMediator(HierarchicalMediator):
"""本地中介者"""
def __init__(self, name: str, parent: HierarchicalMediator):
super().__init__(name, parent)
self._event_handlers: Dict[str, List[Any]] = {}
def subscribe(self, event_type: str, handler: Any) -> None:
if event_type not in self._event_handlers:
self._event_handlers[event_type] = []
self._event_handlers[event_type].append(handler)
def send(self, message: Message) -> bool:
if message.target and message.target in self._local_components:
component = self._local_components[message.target]
if hasattr(component, 'receive'):
component.receive(message)
return True
if message.target:
self.broadcast_up(message)
return True
self.broadcast_local(message)
return True
def receive(self, message: Message) -> None:
if message.target and message.target in self._local_components:
component = self._local_components[message.target]
if hasattr(component, 'receive'):
component.receive(message)
else:
self.broadcast_local(message)
class LocalComponent:
"""本地组件"""
def __init__(self, name: str, mediator: HierarchicalMediator):
self._name = name
self._mediator = mediator
mediator.register_component(self, name)
def send_to(self, target: str, content: Any) -> None:
message = Message(
source=self._name,
target=target,
content=content
)
self._mediator.send(message)
def broadcast(self, content: Any) -> None:
message = Message(
source=self._name,
target=None,
content=content
)
self._mediator.send(message)
def receive(self, message: Message) -> None:
print(f" [{self._name}] 收到来自 [{message.source}] 的消息: {message.content}")
print("\n层次化中介者示例:")
root = RootMediator("root")
department_a = LocalMediator("department_a", root)
department_b = LocalMediator("department_b", root)
root.add_child(department_a)
root.add_child(department_b)
root.register_route("employee_a1", "department_a")
root.register_route("employee_b1", "department_b")
employee_a1 = LocalComponent("employee_a1", department_a)
employee_a2 = LocalComponent("employee_a2", department_a)
employee_b1 = LocalComponent("employee_b1", department_b)
employee_b2 = LocalComponent("employee_b2", department_b)
print(" 部门内通信:")
employee_a1.send_to("employee_a2", "部门内部消息")
print("\n 跨部门通信:")
employee_a1.send_to("employee_b1", "跨部门消息")
print("\n 广播消息:")
employee_a1.broadcast("全员通知")17.5 企业级应用示例
17.5.1 聊天室中介者
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import json
class MessageType(Enum):
"""消息类型"""
PUBLIC = auto()
PRIVATE = auto()
SYSTEM = auto()
GROUP = auto()
class UserStatus(Enum):
"""用户状态"""
ONLINE = auto()
AWAY = auto()
BUSY = auto()
OFFLINE = auto()
@dataclass
class ChatMessage:
"""聊天消息"""
sender: str
content: str
msg_type: MessageType
recipient: Optional[str] = None
group_name: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict:
return {
'sender': self.sender,
'content': self.content,
'type': self.msg_type.name,
'recipient': self.recipient,
'group': self.group_name,
'timestamp': self.timestamp.isoformat()
}
@dataclass
class User:
"""用户"""
name: str
status: UserStatus = UserStatus.ONLINE
groups: Set[str] = field(default_factory=set)
def join_group(self, group: str) -> None:
self.groups.add(group)
def leave_group(self, group: str) -> None:
self.groups.discard(group)
class ChatRoomMediator:
"""聊天室中介者"""
def __init__(self):
self._users: Dict[str, User] = {}
self._groups: Dict[str, Set[str]] = {}
self._message_history: List[ChatMessage] = []
self._max_history = 100
def register_user(self, user: User) -> None:
"""注册用户"""
self._users[user.name] = user
self._broadcast_system(f"{user.name} 加入了聊天室")
print(f" [系统] {user.name} 加入了聊天室")
def unregister_user(self, user_name: str) -> None:
"""注销用户"""
if user_name in self._users:
user = self._users[user_name]
for group in user.groups:
if group in self._groups:
self._groups[group].discard(user_name)
del self._users[user_name]
self._broadcast_system(f"{user_name} 离开了聊天室")
print(f" [系统] {user_name} 离开了聊天室")
def create_group(self, group_name: str) -> None:
"""创建群组"""
if group_name not in self._groups:
self._groups[group_name] = set()
print(f" [系统] 群组 '{group_name}' 已创建")
def join_group(self, user_name: str, group_name: str) -> None:
"""加入群组"""
if user_name in self._users and group_name in self._groups:
self._users[user_name].join_group(group_name)
self._groups[group_name].add(user_name)
self._broadcast_to_group(
group_name,
f"{user_name} 加入了群组",
exclude=user_name
)
print(f" [系统] {user_name} 加入了群组 '{group_name}'")
def leave_group(self, user_name: str, group_name: str) -> None:
"""离开群组"""
if user_name in self._users:
self._users[user_name].leave_group(group_name)
if group_name in self._groups:
self._groups[group_name].discard(user_name)
print(f" [系统] {user_name} 离开了群组 '{group_name}'")
def send_public(self, sender: str, content: str) -> None:
"""发送公开消息"""
if sender not in self._users:
return
message = ChatMessage(
sender=sender,
content=content,
msg_type=MessageType.PUBLIC
)
self._message_history.append(message)
self._trim_history()
for name in self._users:
if name != sender:
self._deliver_message(name, message)
def send_private(self, sender: str, recipient: str, content: str) -> None:
"""发送私信"""
if sender not in self._users or recipient not in self._users:
return
message = ChatMessage(
sender=sender,
content=content,
msg_type=MessageType.PRIVATE,
recipient=recipient
)
self._message_history.append(message)
self._trim_history()
self._deliver_message(recipient, message)
def send_group(self, sender: str, group_name: str, content: str) -> None:
"""发送群组消息"""
if sender not in self._users:
return
if group_name not in self._groups:
return
message = ChatMessage(
sender=sender,
content=content,
msg_type=MessageType.GROUP,
group_name=group_name
)
self._message_history.append(message)
self._trim_history()
self._broadcast_to_group(group_name, message, exclude=sender)
def set_user_status(self, user_name: str, status: UserStatus) -> None:
"""设置用户状态"""
if user_name in self._users:
self._users[user_name].status = status
print(f" [系统] {user_name} 状态变为 {status.name}")
def get_online_users(self) -> List[str]:
"""获取在线用户"""
return [
name for name, user in self._users.items()
if user.status != UserStatus.OFFLINE
]
def get_message_history(self, user_name: str, limit: int = 50) -> List[ChatMessage]:
"""获取消息历史"""
return self._message_history[-limit:]
def _deliver_message(self, recipient: str, message: ChatMessage) -> None:
"""投递消息"""
if recipient in self._users:
user = self._users[recipient]
if user.status != UserStatus.OFFLINE:
prefix = self._get_message_prefix(message)
print(f" [{recipient}] {prefix}{message.sender}: {message.content}")
def _broadcast_system(self, content: str) -> None:
"""广播系统消息"""
message = ChatMessage(
sender="系统",
content=content,
msg_type=MessageType.SYSTEM
)
for name in self._users:
self._deliver_message(name, message)
def _broadcast_to_group(
self,
group_name: str,
content_or_message,
exclude: Optional[str] = None
) -> None:
"""广播到群组"""
if group_name not in self._groups:
return
if isinstance(content_or_message, str):
message = ChatMessage(
sender="系统",
content=content_or_message,
msg_type=MessageType.GROUP,
group_name=group_name
)
else:
message = content_or_message
for member in self._groups[group_name]:
if member != exclude:
self._deliver_message(member, message)
def _get_message_prefix(self, message: ChatMessage) -> str:
"""获取消息前缀"""
if message.msg_type == MessageType.PRIVATE:
return "[私信]"
elif message.msg_type == MessageType.GROUP:
return f"[群:{message.group_name}] "
elif message.msg_type == MessageType.SYSTEM:
return ""
return ""
def _trim_history(self) -> None:
"""修剪历史记录"""
if len(self._message_history) > self._max_history:
self._message_history = self._message_history[-self._max_history:]
print("\n聊天室中介者示例:")
chat_room = ChatRoomMediator()
alice = User("Alice")
bob = User("Bob")
charlie = User("Charlie")
chat_room.register_user(alice)
chat_room.register_user(bob)
chat_room.register_user(charlie)
print("\n--- Alice 发送公开消息 ---")
chat_room.send_public("Alice", "大家好!")
print("\n--- Bob 发送私信给 Alice ---")
chat_room.send_private("Bob", "Alice", "你好 Alice")
print("\n--- 创建群组并发送群消息 ---")
chat_room.create_group("技术讨论")
chat_room.join_group("Alice", "技术讨论")
chat_room.join_group("Bob", "技术讨论")
chat_room.join_group("Charlie", "技术讨论")
chat_room.send_group("Alice", "技术讨论", "有人讨论Python吗?")17.5.2 空中交通管制中介者
from typing import Dict, List, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import math
class FlightStatus(Enum):
"""航班状态"""
GROUND = auto()
TAXIING = auto()
TAKEOFF = auto()
AIRBORNE = auto()
LANDING = auto()
EMERGENCY = auto()
class RunwayStatus(Enum):
"""跑道状态"""
AVAILABLE = auto()
OCCUPIED = auto()
MAINTENANCE = auto()
@dataclass
class Position:
"""位置"""
x: float
y: float
altitude: float = 0.0
def distance_to(self, other: 'Position') -> float:
horizontal = math.sqrt((self.x - other.x)**2 + (self.y - other.y)**2)
return math.sqrt(horizontal**2 + (self.altitude - other.altitude)**2)
def __repr__(self) -> str:
return f"({self.x:.1f}, {self.y:.1f}, {self.altitude:.0f}m)"
@dataclass
class Aircraft:
"""飞机"""
flight_id: str
position: Position
status: FlightStatus = FlightStatus.GROUND
speed: float = 0.0
heading: float = 0.0
destination: Optional[str] = None
def update_position(self, new_position: Position) -> None:
self.position = new_position
@dataclass
class Runway:
"""跑道"""
runway_id: str
status: RunwayStatus = RunwayStatus.AVAILABLE
current_user: Optional[str] = None
def is_available(self) -> bool:
return self.status == RunwayStatus.AVAILABLE
def occupy(self, flight_id: str) -> bool:
if self.is_available():
self.status = RunwayStatus.OCCUPIED
self.current_user = flight_id
return True
return False
def release(self) -> None:
self.status = RunwayStatus.AVAILABLE
self.current_user = None
class AirTrafficControl:
"""空中交通管制中介者"""
MIN_SEPARATION = 1000.0
MIN_ALTITUDE_SEPARATION = 300.0
def __init__(self):
self._aircrafts: Dict[str, Aircraft] = {}
self._runways: Dict[str, Runway] = {}
self._flight_queue: List[str] = []
self._alerts: List[str] = []
self._communication_log: List[str] = []
def register_aircraft(self, aircraft: Aircraft) -> None:
"""注册飞机"""
self._aircrafts[aircraft.flight_id] = aircraft
self._log(f"{aircraft.flight_id} 已注册")
def unregister_aircraft(self, flight_id: str) -> None:
"""注销飞机"""
if flight_id in self._aircrafts:
del self._aircrafts[flight_id]
self._log(f"{flight_id} 已注销")
def add_runway(self, runway: Runway) -> None:
"""添加跑道"""
self._runways[runway.runway_id] = runway
self._log(f"跑道 {runway.runway_id} 已添加")
def request_takeoff(self, flight_id: str, runway_id: str) -> bool:
"""请求起飞"""
if flight_id not in self._aircrafts:
return False
aircraft = self._aircrafts[flight_id]
if aircraft.status != FlightStatus.GROUND:
self._log(f"{flight_id}: 当前状态不允许起飞")
return False
if runway_id not in self._runways:
self._log(f"{flight_id}: 跑道 {runway_id} 不存在")
return False
runway = self._runways[runway_id]
if not runway.is_available():
self._log(f"{flight_id}: 跑道 {runway_id} 被占用")
self._flight_queue.append(flight_id)
return False
runway.occupy(flight_id)
aircraft.status = FlightStatus.TAKEOFF
self._log(f"{flight_id}: 批准从跑道 {runway_id} 起飞")
return True
def confirm_takeoff(self, flight_id: str) -> None:
"""确认起飞完成"""
if flight_id not in self._aircrafts:
return
aircraft = self._aircrafts[flight_id]
aircraft.status = FlightStatus.AIRBORNE
for runway in self._runways.values():
if runway.current_user == flight_id:
runway.release()
self._log(f"{flight_id}: 已起飞,跑道 {runway.runway_id} 释放")
if self._flight_queue:
next_flight = self._flight_queue.pop(0)
self._log(f"通知 {next_flight}: 可以请求起飞")
def request_landing(self, flight_id: str, runway_id: str) -> bool:
"""请求降落"""
if flight_id not in self._aircrafts:
return False
aircraft = self._aircrafts[flight_id]
if aircraft.status != FlightStatus.AIRBORNE:
self._log(f"{flight_id}: 当前状态不允许降落")
return False
if runway_id not in self._runways:
self._log(f"{flight_id}: 跑道 {runway_id} 不存在")
return False
runway = self._runways[runway_id]
if not runway.is_available():
self._log(f"{flight_id}: 跑道 {runway_id} 被占用,进入等待")
return False
runway.occupy(flight_id)
aircraft.status = FlightStatus.LANDING
self._log(f"{flight_id}: 批准降落到跑道 {runway_id}")
return True
def confirm_landing(self, flight_id: str) -> None:
"""确认降落完成"""
if flight_id not in self._aircrafts:
return
aircraft = self._aircrafts[flight_id]
aircraft.status = FlightStatus.GROUND
for runway in self._runways.values():
if runway.current_user == flight_id:
runway.release()
self._log(f"{flight_id}: 已降落,跑道 {runway.runway_id} 释放")
def report_position(self, flight_id: str, position: Position) -> None:
"""报告位置"""
if flight_id not in self._aircrafts:
return
self._aircrafts[flight_id].update_position(position)
self._check_collisions(flight_id)
def declare_emergency(self, flight_id: str) -> None:
"""宣布紧急情况"""
if flight_id not in self._aircrafts:
return
aircraft = self._aircrafts[flight_id]
aircraft.status = FlightStatus.EMERGENCY
self._log(f"紧急情况: {flight_id}")
for runway in self._runways.values():
if runway.is_available():
runway.occupy(flight_id)
self._log(f"{flight_id}: 紧急分配跑道 {runway.runway_id}")
break
def _check_collisions(self, flight_id: str) -> None:
"""检查碰撞风险"""
if flight_id not in self._aircrafts:
return
aircraft1 = self._aircrafts[flight_id]
for other_id, aircraft2 in self._aircrafts.items():
if other_id == flight_id:
continue
distance = aircraft1.position.distance_to(aircraft2.position)
if distance < self.MIN_SEPARATION:
alert = f"碰撞警告: {flight_id} 与 {other_id} 距离 {distance:.0f}m"
self._alerts.append(alert)
self._log(alert)
def _log(self, message: str) -> None:
"""记录日志"""
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self._communication_log.append(log_entry)
print(f" {log_entry}")
def get_status(self) -> Dict:
"""获取状态"""
return {
'aircrafts': {
fid: {
'status': ac.status.name,
'position': str(ac.position)
}
for fid, ac in self._aircrafts.items()
},
'runways': {
rid: {
'status': rw.status.name,
'user': rw.current_user
}
for rid, rw in self._runways.items()
},
'alerts': self._alerts[-5:],
'queue': self._flight_queue
}
print("\n空中交通管制中介者示例:")
atc = AirTrafficControl()
atc.add_runway(Runway("RW01"))
atc.add_runway(Runway("RW02"))
flight_ca123 = Aircraft(
flight_id="CA123",
position=Position(0, 0, 0),
status=FlightStatus.GROUND
)
flight_mu456 = Aircraft(
flight_id="MU456",
position=Position(5000, 5000, 10000),
status=FlightStatus.AIRBORNE
)
atc.register_aircraft(flight_ca123)
atc.register_aircraft(flight_mu456)
print("\n--- CA123 请求起飞 ---")
atc.request_takeoff("CA123", "RW01")
print("\n--- CA123 确认起飞 ---")
atc.confirm_takeoff("CA123")
print("\n--- MU456 请求降落 ---")
atc.request_landing("MU456", "RW01")
print("\n--- MU456 确认降落 ---")
atc.confirm_landing("MU456")17.5.3 微服务协调中介者
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import json
import uuid
class ServiceStatus(Enum):
"""服务状态"""
STARTING = auto()
RUNNING = auto()
STOPPING = auto()
STOPPED = auto()
ERROR = auto()
class MessageType(Enum):
"""消息类型"""
REQUEST = auto()
RESPONSE = auto()
EVENT = auto()
COMMAND = auto()
@dataclass
class ServiceMessage:
"""服务消息"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
message_type: MessageType = MessageType.REQUEST
source: str = ""
target: Optional[str] = None
action: str = ""
payload: Dict[str, Any] = field(default_factory=dict)
correlation_id: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
def to_json(self) -> str:
return json.dumps({
'id': self.message_id,
'type': self.message_type.name,
'source': self.source,
'target': self.target,
'action': self.action,
'payload': self.payload,
'correlation_id': self.correlation_id,
'timestamp': self.timestamp.isoformat()
})
@dataclass
class ServiceInfo:
"""服务信息"""
name: str
endpoint: str
status: ServiceStatus = ServiceStatus.STOPPED
capabilities: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
class ServiceRegistry:
"""服务注册中心"""
def __init__(self):
self._services: Dict[str, ServiceInfo] = {}
self._capability_index: Dict[str, List[str]] = {}
def register(self, service: ServiceInfo) -> None:
self._services[service.name] = service
for cap in service.capabilities:
if cap not in self._capability_index:
self._capability_index[cap] = []
self._capability_index[cap].append(service.name)
print(f" [注册中心] 服务 {service.name} 已注册")
def unregister(self, name: str) -> None:
if name in self._services:
service = self._services[name]
for cap in service.capabilities:
if cap in self._capability_index:
self._capability_index[cap] = [
s for s in self._capability_index[cap] if s != name
]
del self._services[name]
print(f" [注册中心] 服务 {name} 已注销")
def get_service(self, name: str) -> Optional[ServiceInfo]:
return self._services.get(name)
def find_by_capability(self, capability: str) -> List[str]:
return self._capability_index.get(capability, [])
def get_all_services(self) -> List[ServiceInfo]:
return list(self._services.values())
class ServiceMediator:
"""服务中介者"""
def __init__(self, registry: ServiceRegistry):
self._registry = registry
self._handlers: Dict[str, Callable] = {}
self._pending_requests: Dict[str, ServiceMessage] = {}
self._event_subscribers: Dict[str, List[str]] = {}
self._message_log: List[ServiceMessage] = []
def register_handler(
self,
service_name: str,
handler: Callable[[ServiceMessage], ServiceMessage]
) -> None:
self._handlers[service_name] = handler
def subscribe_event(self, event_type: str, subscriber: str) -> None:
if event_type not in self._event_subscribers:
self._event_subscribers[event_type] = []
self._event_subscribers[event_type].append(subscriber)
def send_request(self, message: ServiceMessage) -> Optional[ServiceMessage]:
"""发送请求"""
self._message_log.append(message)
if message.target:
if message.target in self._handlers:
handler = self._handlers[message.target]
response = handler(message)
response.correlation_id = message.message_id
response.message_type = MessageType.RESPONSE
self._message_log.append(response)
return response
else:
print(f" [中介者] 服务 {message.target} 不可用")
return None
else:
services = self._registry.find_by_capability(message.action)
if services:
target = services[0]
message.target = target
return self.send_request(message)
return None
def broadcast_event(self, message: ServiceMessage) -> None:
"""广播事件"""
self._message_log.append(message)
message.message_type = MessageType.EVENT
event_type = message.action
if event_type in self._event_subscribers:
for subscriber in self._event_subscribers[event_type]:
if subscriber in self._handlers:
handler = self._handlers[subscriber]
handler(message)
def send_command(self, message: ServiceMessage) -> bool:
"""发送命令"""
self._message_log.append(message)
message.message_type = MessageType.COMMAND
if message.target and message.target in self._handlers:
handler = self._handlers[message.target]
handler(message)
return True
return False
class Microservice:
"""微服务基类"""
def __init__(self, info: ServiceInfo, mediator: ServiceMediator):
self._info = info
self._mediator = mediator
mediator.register_handler(info.name, self.handle_message)
@property
def name(self) -> str:
return self._info.name
def send_request(
self,
target: str,
action: str,
payload: Dict[str, Any]
) -> Optional[ServiceMessage]:
message = ServiceMessage(
source=self._info.name,
target=target,
action=action,
payload=payload
)
return self._mediator.send_request(message)
def broadcast_event(self, event_type: str, payload: Dict[str, Any]) -> None:
message = ServiceMessage(
source=self._info.name,
action=event_type,
payload=payload
)
self._mediator.broadcast_event(message)
def handle_message(self, message: ServiceMessage) -> ServiceMessage:
return ServiceMessage(
source=self._info.name,
target=message.source,
action=f"{message.action}_response",
payload={'status': 'acknowledged'}
)
class OrderService(Microservice):
"""订单服务"""
def __init__(self, mediator: ServiceMediator):
info = ServiceInfo(
name="order-service",
endpoint="http://order-service:8080",
capabilities=["create_order", "cancel_order", "get_order"]
)
super().__init__(info, mediator)
self._orders: Dict[str, Dict] = {}
def create_order(self, order_data: Dict) -> str:
order_id = str(uuid.uuid4())[:8]
self._orders[order_id] = order_data
self.broadcast_event("order_created", {
'order_id': order_id,
'customer': order_data.get('customer')
})
return order_id
def handle_message(self, message: ServiceMessage) -> ServiceMessage:
if message.action == "create_order":
order_id = self.create_order(message.payload)
return ServiceMessage(
source=self._info.name,
target=message.source,
action="create_order_response",
payload={'order_id': order_id, 'status': 'created'}
)
return super().handle_message(message)
class PaymentService(Microservice):
"""支付服务"""
def __init__(self, mediator: ServiceMediator):
info = ServiceInfo(
name="payment-service",
endpoint="http://payment-service:8080",
capabilities=["process_payment", "refund_payment"]
)
super().__init__(info, mediator)
self._payments: Dict[str, Dict] = {}
def process_payment(self, order_id: str, amount: float) -> bool:
payment_id = str(uuid.uuid4())[:8]
self._payments[payment_id] = {
'order_id': order_id,
'amount': amount,
'status': 'completed'
}
self.broadcast_event("payment_completed", {
'payment_id': payment_id,
'order_id': order_id,
'amount': amount
})
return True
def handle_message(self, message: ServiceMessage) -> ServiceMessage:
if message.action == "process_payment":
success = self.process_payment(
message.payload.get('order_id'),
message.payload.get('amount')
)
return ServiceMessage(
source=self._info.name,
target=message.source,
action="process_payment_response",
payload={'success': success}
)
return super().handle_message(message)
class NotificationService(Microservice):
"""通知服务"""
def __init__(self, mediator: ServiceMediator):
info = ServiceInfo(
name="notification-service",
endpoint="http://notification-service:8080",
capabilities=["send_notification"]
)
super().__init__(info, mediator)
self._notifications: List[Dict] = []
mediator.subscribe_event("order_created", self._info.name)
mediator.subscribe_event("payment_completed", self._info.name)
def handle_message(self, message: ServiceMessage) -> ServiceMessage:
if message.message_type == MessageType.EVENT:
event_type = message.action
payload = message.payload
notification = {
'type': event_type,
'data': payload,
'timestamp': datetime.now().isoformat()
}
self._notifications.append(notification)
print(f" [通知服务] 发送通知: {event_type} -> {payload}")
return ServiceMessage(
source=self._info.name,
action="notification_sent",
payload={'status': 'sent'}
)
return super().handle_message(message)
print("\n微服务协调中介者示例:")
registry = ServiceRegistry()
mediator = ServiceMediator(registry)
order_service = OrderService(mediator)
payment_service = PaymentService(mediator)
notification_service = NotificationService(mediator)
registry.register(ServiceInfo(
name="order-service",
endpoint="http://order-service:8080",
capabilities=["create_order"]
))
registry.register(ServiceInfo(
name="payment-service",
endpoint="http://payment-service:8080",
capabilities=["process_payment"]
))
registry.register(ServiceInfo(
name="notification-service",
endpoint="http://notification-service:8080",
capabilities=["send_notification"]
))
print("\n--- 创建订单 ---")
response = order_service.send_request(
"order-service",
"create_order",
{'customer': '张三', 'items': ['商品A', '商品B']}
)
print(f" 订单响应: {response.payload}")
print("\n--- 处理支付 ---")
response = payment_service.send_request(
"payment-service",
"process_payment",
{'order_id': response.payload.get('order_id'), 'amount': 299.0}
)
print(f" 支付响应: {response.payload}")17.6 模式变体
17.6.1 观察者中介者
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, field
@dataclass
class ObserverEvent:
"""观察者事件"""
name: str
data: Any = None
source: Optional[str] = None
class ObserverMediator:
"""观察者中介者 - 结合观察者模式"""
def __init__(self):
self._observers: Dict[str, List[Callable]] = {}
self._subjects: Dict[str, Any] = {}
def register_subject(self, name: str, subject: Any) -> None:
self._subjects[name] = subject
def register_observer(
self,
event_name: str,
observer: Callable[[ObserverEvent], None]
) -> None:
if event_name not in self._observers:
self._observers[event_name] = []
self._observers[event_name].append(observer)
def notify(self, event: ObserverEvent) -> None:
if event.name in self._observers:
for observer in self._observers[event.name]:
observer(event)
def notify_all(self, event: ObserverEvent) -> None:
for observers in self._observers.values():
for observer in observers:
observer(event)
class ObservableComponent:
"""可观察组件"""
def __init__(self, name: str, mediator: ObserverMediator):
self._name = name
self._mediator = mediator
mediator.register_subject(name, self)
def emit(self, event_name: str, data: Any = None) -> None:
event = ObserverEvent(
name=event_name,
data=data,
source=self._name
)
self._mediator.notify(event)
print("\n观察者中介者示例:")
observer_mediator = ObserverMediator()
def log_event(event: ObserverEvent) -> None:
print(f" [日志] 事件: {event.name}, 数据: {event.data}")
def alert_event(event: ObserverEvent) -> None:
if event.data and event.data.get('level') == 'error':
print(f" [警报] 错误事件: {event.name}")
observer_mediator.register_observer('update', log_event)
observer_mediator.register_observer('update', alert_event)
observer_mediator.register_observer('delete', log_event)
component = ObservableComponent("data-component", observer_mediator)
print(" 发送更新事件:")
component.emit('update', {'level': 'info', 'message': '数据已更新'})
print("\n 发送错误事件:")
component.emit('update', {'level': 'error', 'message': '发生错误'})17.6.2 命令中介者
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass
from enum import Enum, auto
class CommandType(Enum):
"""命令类型"""
EXECUTE = auto()
UNDO = auto()
REDO = auto()
@dataclass
class Command:
"""命令"""
name: str
params: Dict[str, Any]
source: str
class CommandMediator:
"""命令中介者 - 结合命令模式"""
def __init__(self):
self._handlers: Dict[str, Callable] = {}
self._history: List[tuple] = []
self._undo_stack: List[tuple] = []
def register_handler(
self,
command_name: str,
handler: Callable,
undo_handler: Optional[Callable] = None
) -> None:
self._handlers[command_name] = (handler, undo_handler)
def execute(self, command: Command) -> Any:
if command.name not in self._handlers:
raise ValueError(f"未知命令: {command.name}")
handler, undo_handler = self._handlers[command.name]
result = handler(**command.params)
self._history.append((command, result))
if undo_handler:
self._undo_stack.append((command, undo_handler))
return result
def undo(self) -> Optional[Any]:
if not self._undo_stack:
return None
command, undo_handler = self._undo_stack.pop()
return undo_handler(**command.params)
def get_history(self) -> List[tuple]:
return self._history.copy()
class CommandComponent:
"""命令组件"""
def __init__(self, name: str, mediator: CommandMediator):
self._name = name
self._mediator = mediator
def send_command(self, command_name: str, params: Dict[str, Any]) -> Any:
command = Command(
name=command_name,
params=params,
source=self._name
)
return self._mediator.execute(command)
print("\n命令中介者示例:")
command_mediator = CommandMediator()
def create_file(name: str, content: str = "") -> str:
print(f" 创建文件: {name}")
return f"file:{name}"
def delete_file(name: str) -> str:
print(f" 删除文件: {name}")
return f"deleted:{name}"
command_mediator.register_handler('create', create_file, delete_file)
command_mediator.register_handler('delete', delete_file, create_file)
file_manager = CommandComponent("file-manager", command_mediator)
print(" 执行创建命令:")
file_manager.send_command('create', {'name': 'test.txt', 'content': 'Hello'})
print("\n 执行撤销:")
command_mediator.undo()17.6.3 状态中介者
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from enum import Enum, auto
import copy
class StateAction(Enum):
"""状态动作"""
ENTER = auto()
EXIT = auto()
TRANSITION = auto()
@dataclass
class StateChange:
"""状态变更"""
component: str
old_state: Any
new_state: Any
action: StateAction
class StateMediator:
"""状态中介者 - 管理组件状态"""
def __init__(self):
self._states: Dict[str, Any] = {}
self._transitions: Dict[str, Dict[Any, List[Any]]] = {}
self._listeners: List[Callable[[StateChange], None]] = []
self._history: List[StateChange] = []
def register_component(
self,
name: str,
initial_state: Any,
allowed_transitions: Optional[Dict[Any, List[Any]]] = None
) -> None:
self._states[name] = initial_state
if allowed_transitions:
self._transitions[name] = allowed_transitions
def get_state(self, component: str) -> Any:
return self._states.get(component)
def set_state(self, component: str, new_state: Any) -> bool:
if component not in self._states:
return False
old_state = self._states[component]
if component in self._transitions:
allowed = self._transitions[component].get(old_state, [])
if new_state not in allowed:
print(f" [状态中介者] {component}: 状态转换 {old_state} -> {new_state} 不允许")
return False
self._states[component] = new_state
change = StateChange(
component=component,
old_state=old_state,
new_state=new_state,
action=StateAction.TRANSITION
)
self._history.append(change)
for listener in self._listeners:
listener(change)
print(f" [状态中介者] {component}: {old_state} -> {new_state}")
return True
def add_listener(self, listener: Callable[[StateChange], None]) -> None:
self._listeners.append(listener)
def get_history(self) -> List[StateChange]:
return self._history.copy()
def get_all_states(self) -> Dict[str, Any]:
return copy.deepcopy(self._states)
class StatefulComponent:
"""有状态组件"""
def __init__(self, name: str, mediator: StateMediator):
self._name = name
self._mediator = mediator
def get_state(self) -> Any:
return self._mediator.get_state(self._name)
def set_state(self, new_state: Any) -> bool:
return self._mediator.set_state(self._name, new_state)
print("\n状态中介者示例:")
state_mediator = StateMediator()
state_mediator.register_component(
"order",
"pending",
{
"pending": ["confirmed", "cancelled"],
"confirmed": ["shipped", "cancelled"],
"shipped": ["delivered", "returned"],
"delivered": ["returned"],
"cancelled": [],
"returned": []
}
)
state_mediator.register_component(
"payment",
"waiting",
{
"waiting": ["processing", "failed"],
"processing": ["completed", "failed"],
"completed": ["refunded"],
"failed": ["waiting"],
"refunded": []
}
)
def on_state_change(change: StateChange) -> None:
if change.new_state == "failed":
print(f" [警报] {change.component} 进入失败状态!")
state_mediator.add_listener(on_state_change)
order = StatefulComponent("order", state_mediator)
payment = StatefulComponent("payment", state_mediator)
print(" 订单状态变更:")
order.set_state("confirmed")
order.set_state("shipped")
order.set_state("delivered")
print("\n 支付状态变更:")
payment.set_state("processing")
payment.set_state("completed")
print("\n 尝试非法状态转换:")
order.set_state("pending")17.7 反模式与最佳实践
17.7.1 常见反模式
from typing import Dict, List, Any, Optional
class MediatorAntiPatterns:
"""中介者反模式示例"""
@staticmethod
def anti_pattern_1_god_mediator():
"""反模式1:上帝中介者"""
class GodMediator:
def __init__(self):
self._components: Dict[str, Any] = {}
def handle_everything(self, event: str, data: Dict) -> None:
if event == "user_login":
pass
elif event == "user_logout":
pass
elif event == "create_order":
pass
elif event == "process_payment":
pass
elif event == "send_notification":
pass
elif event == "update_inventory":
pass
elif event == "generate_report":
pass
elif event == "backup_data":
pass
print(" 问题: 中介者承担了过多职责,违反单一职责原则")
print(" 解决: 将中介者拆分为多个专门的中介者")
@staticmethod
def anti_pattern_2_tight_coupling():
"""反模式2:紧耦合中介者"""
class TightMediator:
def __init__(self):
self._component_a = None
self._component_b = None
self._component_c = None
def notify(self, sender: str, event: str) -> None:
if sender == "A" and event == "click":
self._component_b.do_something()
self._component_c.do_other()
elif sender == "B" and event == "update":
self._component_a.refresh()
print(" 问题: 中介者直接引用具体组件类,难以扩展")
print(" 解决: 使用接口/抽象类,通过名称或ID引用组件")
@staticmethod
def anti_pattern_3_circular_dependency():
"""反模式3:循环依赖"""
print(" 问题: 组件A -> 中介者 -> 组件B -> 中介者 -> 组件A")
print(" 解决: 使用事件队列或异步处理避免循环调用")
@staticmethod
def anti_pattern_4_missing_mediator():
"""反模式4:缺失中介者"""
class ComponentA:
def __init__(self):
self._b = None
self._c = None
self._d = None
def do_something(self) -> None:
self._b.react()
self._c.react()
self._d.react()
print(" 问题: 组件间直接依赖,形成网状结构")
print(" 解决: 引入中介者,将网状依赖转为星型依赖")
print("\n中介者反模式示例:")
print(" 反模式1 - 上帝中介者:")
MediatorAntiPatterns.anti_pattern_1_god_mediator()
print("\n 反模式2 - 紧耦合中介者:")
MediatorAntiPatterns.anti_pattern_2_tight_coupling()
print("\n 反模式3 - 循环依赖:")
MediatorAntiPatterns.anti_pattern_3_circular_dependency()
print("\n 反模式4 - 缺失中介者:")
MediatorAntiPatterns.anti_pattern_4_missing_mediator()17.7.2 最佳实践
from typing import Dict, List, Callable, Any, Optional
from abc import ABC, abstractmethod
from dataclasses import dataclass
class MediatorBestPractices:
"""中介者最佳实践"""
@staticmethod
def practice_1_single_responsibility():
"""实践1:单一职责"""
class OrderMediator:
def __init__(self):
self._handlers: Dict[str, Callable] = {}
def register(self, event: str, handler: Callable) -> None:
self._handlers[event] = handler
def notify(self, event: str, data: Dict) -> None:
if event in self._handlers:
self._handlers[event](data)
class PaymentMediator:
def __init__(self):
self._handlers: Dict[str, Callable] = {}
def register(self, event: str, handler: Callable) -> None:
self._handlers[event] = handler
def notify(self, event: str, data: Dict) -> None:
if event in self._handlers:
self._handlers[event](data)
print(" 将不同职责分离到不同的中介者")
@staticmethod
def practice_2_event_driven():
"""实践2:事件驱动"""
@dataclass
class Event:
type: str
data: Any
source: str
class EventMediator:
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable) -> None:
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
def publish(self, event: Event) -> None:
if event.type in self._subscribers:
for handler in self._subscribers[event.type]:
handler(event)
print(" 使用事件驱动实现松耦合通信")
@staticmethod
def practice_3_async_processing():
"""实践3:异步处理"""
from queue import Queue
import threading
class AsyncMediator:
def __init__(self):
self._queue: Queue = Queue()
self._handlers: Dict[str, Callable] = {}
self._running = False
def start(self) -> None:
self._running = True
thread = threading.Thread(target=self._process_queue)
thread.daemon = True
thread.start()
def stop(self) -> None:
self._running = False
def send(self, event_type: str, data: Any) -> None:
self._queue.put((event_type, data))
def _process_queue(self) -> None:
while self._running:
try:
event_type, data = self._queue.get(timeout=0.1)
if event_type in self._handlers:
self._handlers[event_type](data)
except:
pass
print(" 使用队列实现异步处理,避免阻塞")
@staticmethod
def practice_4_logging_monitoring():
"""实践4:日志与监控"""
class MonitoredMediator:
def __init__(self):
self._handlers: Dict[str, Callable] = {}
self._event_log: List[Dict] = []
self._metrics: Dict[str, int] = {}
def notify(self, event_type: str, data: Dict) -> None:
from datetime import datetime
self._event_log.append({
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat()
})
self._metrics[event_type] = self._metrics.get(event_type, 0) + 1
if event_type in self._handlers:
self._handlers[event_type](data)
def get_metrics(self) -> Dict[str, int]:
return self._metrics.copy()
print(" 添加日志和监控,便于调试和分析")
print("\n中介者最佳实践示例:")
print(" 实践1 - 单一职责:")
MediatorBestPractices.practice_1_single_responsibility()
print("\n 实践2 - 事件驱动:")
MediatorBestPractices.practice_2_event_driven()
print("\n 实践3 - 异步处理:")
MediatorBestPractices.practice_3_async_processing()
print("\n 实践4 - 日志与监控:")
MediatorBestPractices.practice_4_logging_monitoring()17.8 决策指南
17.8.1 中介者类型选择决策树
┌─────────────────────────────────────────────────────────────────────┐
│ Mediator Type Decision Tree │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 需要协调多个组件? │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ 不需要中介者 │
│ │ 组件数量多少? │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────┼───────┐ │
│ │ │ │ │
│ 少 中等 多 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 简单 事件驱动 分布式 │
│ 中介者 中介者 中介者 │
│ │
│ 通信模式? │
│ │ │
│ ┌────────┴────────┐ │
│ │ │ │
│ 同步 异步 │
│ │ │ │
│ ▼ ▼ │
│ 直接调用 消息队列 │
│ 中介者 中介者 │
│ │
│ 是否需要状态管理? │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 状态中介者 无状态中介者 │
│ │
└─────────────────────────────────────────────────────────────────────┘17.8.2 实现方式选择
| 场景 | 推荐实现 | 原因 |
|---|---|---|
| UI组件协调 | 集中式中介者 | 组件数量有限,逻辑集中便于维护 |
| 聊天系统 | 事件驱动中介者 | 支持广播和私信,扩展性好 |
| 微服务通信 | 消息总线/服务网格 | 分布式环境,支持异步和重试 |
| 状态机管理 | 状态中介者 | 集中管理状态转换规则 |
| 命令执行 | 命令中介者 | 支持撤销/重做,历史记录 |
| 物联网设备 | 层次化中介者 | 支持设备分组,层级管理 |
17.9 快速参考卡
17.9.1 核心概念速查
┌─────────────────────────────────────────────────────────────────────┐
│ Mediator Pattern Quick Reference │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【定义】 │
│ 用一个中介对象封装一系列对象交互,使对象不需要显式相互引用 │
│ │
│ 【核心组件】 │
│ • Mediator: 中介者接口,定义通信方法 │
│ • ConcreteMediator: 具体中介者,协调各组件 │
│ • Component: 组件接口,定义与中介者交互的方法 │
│ • ConcreteComponent: 具体组件,实现业务逻辑 │
│ │
│ 【依赖关系转换】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 无中介者: 网状依赖 O(n²) │ │
│ │ 有中介者: 星型依赖 O(n) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【常用方法】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ notify(sender, event) # 组件通知中介者 │ │
│ │ send(target, message) # 发送消息到目标组件 │ │
│ │ broadcast(message) # 广播消息到所有组件 │ │
│ │ subscribe(event, handler) # 订阅事件 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【适用场景】 │
│ ✓ 对象间存在复杂的引用关系 │
│ ✓ 想通过一个中间类封装多个类的行为 │
│ ✓ 需要自定义对象间的交互逻辑 │
│ ✓ 组件需要独立复用 │
│ │
│ 【优点】 │
│ + 降低耦合度(网状→星型) │
│ + 集中控制交互逻辑 │
│ + 提高组件可复用性 │
│ + 简化组件间协议 │
│ │
│ 【缺点】 │
│ - 中介者可能变得复杂 │
│ - 中介者成为单点故障 │
│ - 可能影响性能 │
│ │
└─────────────────────────────────────────────────────────────────────┘17.9.2 代码模板速查
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass
@dataclass
class Event:
type: str
source: str
data: Dict[str, Any]
class MediatorTemplate(ABC):
"""中介者模板"""
@abstractmethod
def notify(self, event: Event) -> None:
pass
@abstractmethod
def register(self, component: 'ComponentTemplate', name: str) -> None:
pass
class ComponentTemplate(ABC):
"""组件模板"""
def __init__(self, name: str):
self._name = name
self._mediator: Optional[MediatorTemplate] = None
def set_mediator(self, mediator: MediatorTemplate) -> None:
self._mediator = mediator
def send_event(self, event_type: str, data: Dict[str, Any]) -> None:
if self._mediator:
self._mediator.notify(Event(
type=event_type,
source=self._name,
data=data
))
@abstractmethod
def receive_event(self, event: Event) -> None:
pass
class ConcreteMediatorTemplate(MediatorTemplate):
"""具体中介者模板"""
def __init__(self):
self._components: Dict[str, ComponentTemplate] = {}
def register(self, component: ComponentTemplate, name: str) -> None:
self._components[name] = component
component.set_mediator(self)
def notify(self, event: Event) -> None:
for name, component in self._components.items():
if name != event.source:
component.receive_event(event)17.10 小结
中介者模式通过引入一个中介对象来协调对象间的交互,将网状依赖转换为星型依赖,显著降低了系统的耦合度。本章从形式化定义出发,深入探讨了中介者的数学基础、依赖关系转换和通信模型。
关键要点:
形式化理解:中介者将网状结构 $G_{mesh} = (V, E_{mesh})$ 转换为星型结构 $G_{star} = (V \cup {m}, E_{star})$,将 $O(n^2)$ 的依赖关系简化为 $O(n)$。
核心实现:基于ABC的中介者框架、事件驱动中介者、层次化中介者等多种实现方式。
企业级应用:聊天室中介者、空中交通管制中介者、微服务协调中介者等实际场景。
模式变体:观察者中介者、命令中介者、状态中介者等结合其他模式的变体。
最佳实践:单一职责、事件驱动、异步处理、日志监控等实践原则。
中介者模式与观察者模式、命令模式、状态模式等可以结合使用,形成更强大的协调机制。在设计复杂系统时,合理使用中介者模式可以显著提高系统的可维护性和可扩展性。