Skip to content

第17章 中介者模式

学习目标

完成本章学习后,读者将能够:

  • 理解中介者模式的数学基础与形式化定义
  • 掌握对象间通信的集中管理与解耦机制
  • 设计并实现多种类型的中介者(集中式、分布式、事件驱动等)
  • 应用中介者模式解决复杂系统中的组件协调问题
  • 识别中介者模式的适用场景与反模式

17.1 形式化定义

17.1.1 数学定义

中介者模式(Mediator Pattern) 用一个中介对象来封装一系列的对象交互,中介者使各对象不需要显式地相互引用,从而使其耦合松散,而且可以独立地改变它们之间的交互。

从图论角度,中介者模式将网状结构转换为星型结构

无中介者时的对象关系(网状结构):

$$G_{mesh} = (V, E_{mesh})$$

其中 $V = {c_1, c_2, \ldots, c_n}$ 为组件集合,$E_{mesh} \subseteq V \times V$ 为直接依赖关系。

有中介者时的对象关系(星型结构):

$$G_{star} = (V \cup {m}, E_{star})$$

其中 $m$ 为中介者,$E_{star} = {(c_i, m), (m, c_i) | c_i \in V}$。

耦合度降低

$$|E_{mesh}| = O(n^2) \rightarrow |E_{star}| = O(n)$$

17.1.2 通信模型

中介者可形式化为消息传递系统:

$$\mathcal{M} = \langle C, S, \Sigma, \delta, \gamma \rangle$$

其中:

  • $C = {c_1, c_2, \ldots, c_n}$:组件集合
  • $S$:中介者状态集合
  • $\Sigma$:事件/消息字母表
  • $\delta: S \times C \times \Sigma \rightarrow S$:状态转换函数
  • $\gamma: S \times C \times \Sigma \rightarrow 2^{C \times \Sigma^*}$:消息路由函数

消息路由规则

$$\gamma(s, c_i, e) = {(c_j, e') | c_j \text{ 应接收事件 } e' }$$

17.1.3 依赖反转原则

中介者模式实现了依赖反转:

传统依赖(组件间直接依赖): $$\forall c_i, c_j \in C: \text{depends}(c_i, c_j) \Leftrightarrow c_i \text{ 直接引用 } c_j$$

中介者依赖(通过中介者间接依赖): $$\forall c_i \in C: \text{depends}(c_i, m) \land \text{depends}(m, c_i)$$

耦合度量

度量指标无中介者有中介者
直接依赖数$O(n^2)$$O(n)$
间接依赖数0$O(n)$
修改影响范围$O(n)$$O(1)$
新增组件代价$O(n)$$O(1)$

17.2 历史背景与演进

17.2.1 发展历程

年代里程碑描述
1987年Smalltalk MVCModel-View-Controller中的Controller作为中介者
1991年Windows消息机制Windows消息队列作为UI组件间的中介者
1994年GoF设计模式《设计模式》正式收录中介者模式
1996年Java AWT事件调度机制采用中介者思想
2000年JMS规范Java消息服务实现分布式中介者
2002年.NET事件模型委托和事件机制支持松耦合通信
2006年Ruby on RailsActionController作为MVC中介者
2010年Node.js EventEmitter事件驱动的中介者实现
2013年Redux单一状态管理器作为中介者
2016年微服务架构消息总线/服务网格作为分布式中介者
2020年Cloud Events云原生事件规范标准化中介者通信

17.2.2 设计动机

中介者模式的核心设计动机:

  1. 降低耦合:避免组件间的直接依赖,将网状依赖简化为星型依赖
  2. 集中控制:将交互逻辑集中管理,便于维护和修改
  3. 提高复用:组件独立于其他组件,更易于复用
  4. 简化协议:组件只需与中介者通信,协议更简单

17.3 UML结构图

17.3.1 标准结构

┌─────────────────────────────────────────────────────────────────────┐
│                       Mediator Pattern Structure                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────┐                                                │
│  │  <<interface>>  │                                                │
│  │    Mediator     │                                                │
│  ├─────────────────┤                                                │
│  │ + notify(sender,│                                                │
│  │          event) │                                                │
│  └────────┬────────┘                                                │
│           │                                                          │
│           │ implements                                               │
│           ▼                                                          │
│  ┌─────────────────┐         ┌─────────────────┐                   │
│  │ ConcreteMediator│────────>│    Component    │                   │
│  ├─────────────────┤  knows  │  <<abstract>>   │                   │
│  │ - component1    │         ├─────────────────┤                   │
│  │ - component2    │         │ - mediator      │                   │
│  │ - ...           │         ├─────────────────┤                   │
│  ├─────────────────┤         │ + set_mediator()│                   │
│  │ + notify()      │         └────────┬────────┘                   │
│  └─────────────────┘                  │ extends                     │
│                    ┌──────────────────┼──────────────────┐         │
│                    │                  │                  │         │
│                    ▼                  ▼                  ▼         │
│           ┌─────────────┐    ┌─────────────┐    ┌─────────────┐   │
│           │ ComponentA  │    │ ComponentB  │    │ ComponentC  │   │
│           ├─────────────┤    ├─────────────┤    ├─────────────┤   │
│           │ + operation1│    │ + operation2│    │ + operation3│   │
│           └─────────────┘    └─────────────┘    └─────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

17.3.2 依赖关系对比

┌─────────────────────────────────────────────────────────────────────┐
│                    Dependency Comparison                             │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  【无中介者 - 网状依赖】                                             │
│                                                                      │
│           ┌─────────┐                                               │
│           │    A    │                                               │
│           └────┬────┘                                               │
│           ╱    │    ╲                                               │
│          ╱     │     ╲                                              │
│    ┌────┴────┐ │ ┌────┴────┐                                       │
│    │    B    │─┼─│    C    │                                       │
│    └────┬────┘ │ └────┬────┘                                       │
│          ╲     │     ╱                                              │
│           ╲    │    ╱                                               │
│           ┌────┴────┐                                               │
│           │    D    │                                               │
│           └─────────┘                                               │
│                                                                      │
│  依赖关系: A-B, A-C, A-D, B-C, B-D, C-D (6条)                       │
│                                                                      │
│  ─────────────────────────────────────────────────────────────────  │
│                                                                      │
│  【有中介者 - 星型依赖】                                             │
│                                                                      │
│              ┌─────────┐                                            │
│              │Mediator │                                            │
│              └────┬────┘                                            │
│              ╱  │  │  ╲                                             │
│             ╱   │  │   ╲                                            │
│        ┌───┴─┐ ┌┴┐ ┌┴┐ ┌─┴───┐                                    │
│        │  A  │ │B│ │C│ │  D  │                                    │
│        └─────┘ └─┘ └─┘ └─────┘                                    │
│                                                                      │
│  依赖关系: A-M, B-M, C-M, D-M (4条)                                 │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

17.3.3 消息流程序列图

┌─────────────────────────────────────────────────────────────────────┐
│                      Message Flow Sequence                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ComponentA      Mediator       ComponentB       ComponentC         │
│      │              │               │               │               │
│      │  do_action() │               │               │               │
│      │──────────────>               │               │               │
│      │              │               │               │               │
│      │              │  notify(A, event)             │               │
│      │              │<──────────────┘               │               │
│      │              │               │               │               │
│      │              │  handle_event()               │               │
│      │              │───────────────>               │               │
│      │              │               │               │               │
│      │              │  handle_event()               │               │
│      │              │───────────────────────────────>               │
│      │              │               │               │               │
│      │              │  response()   │               │               │
│      │              │<──────────────┘               │               │
│      │              │               │               │               │
│      │  callback()  │               │               │               │
│      │<──────────────               │               │               │
│      │              │               │               │               │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

17.4 标准实现

17.4.1 基于ABC的中介者框架

python
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from enum import Enum, auto
from datetime import datetime

class EventType(Enum):
    """事件类型枚举"""
    CREATE = auto()
    UPDATE = auto()
    DELETE = auto()
    SELECT = auto()
    VALIDATE = auto()
    SUBMIT = auto()
    CANCEL = auto()

@dataclass
class Event:
    """事件数据结构"""
    type: EventType
    source: str
    data: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)
    
    def __repr__(self) -> str:
        return f"Event({self.type.name}, from={self.source})"

class Mediator(ABC):
    """中介者抽象基类"""
    
    @abstractmethod
    def notify(self, event: Event) -> None:
        """处理组件通知"""
        pass
    
    @abstractmethod
    def register(self, component: 'Component', name: str) -> None:
        """注册组件"""
        pass
    
    @abstractmethod
    def unregister(self, name: str) -> None:
        """注销组件"""
        pass

class Component(ABC):
    """组件抽象基类"""
    
    def __init__(self, name: str):
        self._name = name
        self._mediator: Optional[Mediator] = None
    
    @property
    def name(self) -> str:
        return self._name
    
    def set_mediator(self, mediator: Mediator) -> None:
        self._mediator = mediator
    
    def send_event(self, event_type: EventType, data: Optional[Dict[str, Any]] = None) -> None:
        """发送事件到中介者"""
        if self._mediator:
            event = Event(
                type=event_type,
                source=self._name,
                data=data or {}
            )
            self._mediator.notify(event)
    
    @abstractmethod
    def receive_event(self, event: Event) -> None:
        """接收事件"""
        pass

class ConcreteMediator(Mediator):
    """具体中介者实现"""
    
    def __init__(self):
        self._components: Dict[str, Component] = {}
        self._event_handlers: Dict[EventType, List[Callable[[Event], None]]] = {}
        self._event_log: List[Event] = []
    
    def register(self, component: Component, name: Optional[str] = None) -> None:
        name = name or component.name
        self._components[name] = component
        component.set_mediator(self)
    
    def unregister(self, name: str) -> None:
        if name in self._components:
            self._components[name].set_mediator(None)
            del self._components[name]
    
    def subscribe(self, event_type: EventType, handler: Callable[[Event], None]) -> None:
        """订阅事件"""
        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)
    
    def notify(self, event: Event) -> None:
        """处理事件通知"""
        self._event_log.append(event)
        print(f"[中介者] 收到事件: {event}")
        
        if event.type in self._event_handlers:
            for handler in self._event_handlers[event.type]:
                handler(event)
        
        self._route_event(event)
    
    def _route_event(self, event: Event) -> None:
        """路由事件到相关组件"""
        pass
    
    def get_event_log(self) -> List[Event]:
        """获取事件日志"""
        return self._event_log.copy()
    
    def broadcast(self, event: Event, exclude: Optional[str] = None) -> None:
        """广播事件到所有组件"""
        for name, component in self._components.items():
            if name != exclude and name != event.source:
                component.receive_event(event)

class ButtonComponent(Component):
    """按钮组件"""
    
    def __init__(self, name: str, label: str = ""):
        super().__init__(name)
        self._label = label
        self._enabled = True
    
    @property
    def enabled(self) -> bool:
        return self._enabled
    
    def set_enabled(self, enabled: bool) -> None:
        self._enabled = enabled
        state = "可用" if enabled else "禁用"
        print(f"  [{self._name}] 按钮状态: {state}")
    
    def click(self) -> None:
        if self._enabled:
            print(f"  [{self._name}] 按钮被点击")
            self.send_event(EventType.SUBMIT, {"label": self._label})
    
    def receive_event(self, event: Event) -> None:
        if event.type == EventType.UPDATE:
            if "enabled" in event.data:
                self.set_enabled(event.data["enabled"])

class TextInputComponent(Component):
    """文本输入组件"""
    
    def __init__(self, name: str, placeholder: str = ""):
        super().__init__(name)
        self._value = ""
        self._placeholder = placeholder
    
    @property
    def value(self) -> str:
        return self._value
    
    def set_value(self, value: str) -> None:
        self._value = value
        print(f"  [{self._name}] 值已更新: '{value}'")
        self.send_event(EventType.UPDATE, {"value": value})
    
    def clear(self) -> None:
        self._value = ""
        print(f"  [{self._name}] 已清空")
        self.send_event(EventType.UPDATE, {"value": ""})
    
    def receive_event(self, event: Event) -> None:
        if event.type == EventType.DELETE:
            self.clear()

class LabelComponent(Component):
    """标签组件"""
    
    def __init__(self, name: str):
        super().__init__(name)
        self._text = ""
    
    def set_text(self, text: str) -> None:
        self._text = text
        print(f"  [{self._name}] 标签文本: '{text}'")
    
    def receive_event(self, event: Event) -> None:
        if event.type == EventType.UPDATE:
            if "message" in event.data:
                self.set_text(event.data["message"])

print("中介者模式标准实现:")

mediator = ConcreteMediator()

title_input = TextInputComponent("title_input", "请输入标题")
content_input = TextInputComponent("content_input", "请输入内容")
save_button = ButtonComponent("save_button", "保存")
clear_button = ButtonComponent("clear_button", "清空")
status_label = LabelComponent("status_label")

mediator.register(title_input)
mediator.register(content_input)
mediator.register(save_button)
mediator.register(clear_button)
mediator.register(status_label)

def validate_inputs(event: Event) -> None:
    has_title = bool(title_input.value)
    has_content = bool(content_input.value)
    save_button.set_enabled(has_title and has_content)

mediator.subscribe(EventType.UPDATE, validate_inputs)

print("\n--- 初始状态 ---")
save_button.set_enabled(False)

print("\n--- 输入标题 ---")
title_input.set_value("我的文章")

print("\n--- 输入内容 ---")
content_input.set_value("这是文章内容")

print("\n--- 点击保存 ---")
save_button.click()

print("\n--- 点击清空 ---")
clear_button.click()

17.4.2 事件驱动中介者

python
from typing import Dict, List, Callable, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import threading
from queue import Queue
import time

class EventPriority(Enum):
    """事件优先级"""
    LOW = 1
    NORMAL = 5
    HIGH = 10
    CRITICAL = 20

@dataclass(order=True)
class PrioritizedEvent:
    """优先级事件"""
    priority: int
    event: Any = field(compare=False)
    timestamp: float = field(default_factory=time.time, compare=False)

class EventBus:
    """事件总线 - 分布式中介者"""
    
    def __init__(self):
        self._subscribers: Dict[str, List[Callable]] = {}
        self._once_subscribers: Dict[str, List[Callable]] = {}
        self._wildcard_subscribers: List[Callable] = []
        self._event_queue: Queue = Queue()
        self._running = False
        self._lock = threading.Lock()
    
    def on(self, event_type: str, handler: Callable) -> 'EventBus':
        """订阅事件"""
        with self._lock:
            if event_type not in self._subscribers:
                self._subscribers[event_type] = []
            self._subscribers[event_type].append(handler)
        return self
    
    def once(self, event_type: str, handler: Callable) -> 'EventBus':
        """一次性订阅"""
        with self._lock:
            if event_type not in self._once_subscribers:
                self._once_subscribers[event_type] = []
            self._once_subscribers[event_type].append(handler)
        return self
    
    def on_any(self, handler: Callable) -> 'EventBus':
        """订阅所有事件"""
        with self._lock:
            self._wildcard_subscribers.append(handler)
        return self
    
    def off(self, event_type: str, handler: Callable) -> 'EventBus':
        """取消订阅"""
        with self._lock:
            if event_type in self._subscribers:
                if handler in self._subscribers[event_type]:
                    self._subscribers[event_type].remove(handler)
        return self
    
    def emit(self, event_type: str, data: Any = None) -> 'EventBus':
        """发布事件"""
        event = {
            'type': event_type,
            'data': data,
            'timestamp': datetime.now()
        }
        self._event_queue.put(event)
        return self
    
    def emit_sync(self, event_type: str, data: Any = None) -> 'EventBus':
        """同步发布事件"""
        event = {
            'type': event_type,
            'data': data,
            'timestamp': datetime.now()
        }
        self._dispatch_event(event)
        return self
    
    def _dispatch_event(self, event: Dict) -> None:
        """分发事件"""
        event_type = event['type']
        
        for handler in self._wildcard_subscribers:
            handler(event)
        
        if event_type in self._subscribers:
            for handler in self._subscribers[event_type]:
                handler(event)
        
        if event_type in self._once_subscribers:
            handlers = self._once_subscribers[event_type][:]
            self._once_subscribers[event_type] = []
            for handler in handlers:
                handler(event)
    
    def start_processing(self) -> None:
        """开始处理事件队列"""
        self._running = True
        while self._running:
            try:
                event = self._event_queue.get(timeout=0.1)
                self._dispatch_event(event)
            except:
                pass
    
    def stop_processing(self) -> None:
        """停止处理"""
        self._running = False

class EventEmitter:
    """事件发射器混入类"""
    
    def __init__(self):
        self._event_bus: Optional[EventBus] = None
    
    def set_event_bus(self, event_bus: EventBus) -> None:
        self._event_bus = event_bus
    
    def emit(self, event_type: str, data: Any = None) -> None:
        if self._event_bus:
            self._event_bus.emit(event_type, data)

class ComponentBase(EventEmitter):
    """组件基类"""
    
    def __init__(self, name: str):
        super().__init__()
        self._name = name
    
    @property
    def name(self) -> str:
        return self._name

class SensorComponent(ComponentBase):
    """传感器组件"""
    
    def __init__(self, name: str):
        super().__init__(name)
        self._value = 0
    
    def update_value(self, value: float) -> None:
        self._value = value
        self.emit('sensor.update', {
            'source': self._name,
            'value': value
        })

class ActuatorComponent(ComponentBase):
    """执行器组件"""
    
    def __init__(self, name: str):
        super().__init__(name)
        self._state = False
    
    def activate(self) -> None:
        self._state = True
        print(f"  [{self._name}] 执行器已激活")
    
    def deactivate(self) -> None:
        self._state = False
        print(f"  [{self._name}] 执行器已停用")

class ControllerMediator:
    """控制器中介者"""
    
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus
        self._threshold = 50.0
        self._actuator: Optional[ActuatorComponent] = None
        
        event_bus.on('sensor.update', self._on_sensor_update)
    
    def set_actuator(self, actuator: ActuatorComponent) -> None:
        self._actuator = actuator
    
    def set_threshold(self, threshold: float) -> None:
        self._threshold = threshold
    
    def _on_sensor_update(self, event: Dict) -> None:
        data = event['data']
        value = data['value']
        source = data['source']
        
        print(f"  [控制器] 收到传感器 {source} 数据: {value}")
        
        if self._actuator:
            if value > self._threshold:
                self._actuator.activate()
            else:
                self._actuator.deactivate()

print("\n事件驱动中介者示例:")

event_bus = EventBus()

sensor = SensorComponent("温度传感器")
actuator = ActuatorComponent("冷却风扇")
controller = ControllerMediator(event_bus)

sensor.set_event_bus(event_bus)
actuator.set_event_bus(event_bus)
controller.set_actuator(actuator)

print("  温度低于阈值:")
sensor.update_value(30.0)

print("\n  温度高于阈值:")
sensor.update_value(60.0)

17.4.3 层次化中介者

python
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from abc import ABC, abstractmethod

@dataclass
class Message:
    """消息结构"""
    source: str
    target: Optional[str]
    content: Any
    metadata: Dict[str, Any] = field(default_factory=dict)

class HierarchicalMediator(ABC):
    """层次化中介者基类"""
    
    def __init__(self, name: str, parent: Optional['HierarchicalMediator'] = None):
        self._name = name
        self._parent = parent
        self._children: Dict[str, 'HierarchicalMediator'] = {}
        self._local_components: Dict[str, Any] = {}
    
    def add_child(self, child: 'HierarchicalMediator') -> None:
        self._children[child._name] = child
    
    def remove_child(self, name: str) -> None:
        if name in self._children:
            del self._children[name]
    
    def register_component(self, component: Any, name: str) -> None:
        self._local_components[name] = component
    
    @abstractmethod
    def send(self, message: Message) -> bool:
        """发送消息"""
        pass
    
    @abstractmethod
    def receive(self, message: Message) -> None:
        """接收消息"""
        pass
    
    def broadcast_up(self, message: Message) -> None:
        """向上广播"""
        if self._parent:
            self._parent.receive(message)
    
    def broadcast_down(self, message: Message, exclude: Optional[str] = None) -> None:
        """向下广播"""
        for name, child in self._children.items():
            if name != exclude:
                child.receive(message)
    
    def broadcast_local(self, message: Message, exclude: Optional[str] = None) -> None:
        """本地广播"""
        for name, component in self._local_components.items():
            if name != exclude and name != message.source:
                if hasattr(component, 'receive'):
                    component.receive(message)

class RootMediator(HierarchicalMediator):
    """根中介者"""
    
    def __init__(self, name: str = "root"):
        super().__init__(name, None)
        self._routing_table: Dict[str, str] = {}
    
    def register_route(self, component_name: str, mediator_name: str) -> None:
        """注册路由"""
        self._routing_table[component_name] = mediator_name
    
    def send(self, message: Message) -> bool:
        if message.target:
            if message.target in self._local_components:
                component = self._local_components[message.target]
                if hasattr(component, 'receive'):
                    component.receive(message)
                return True
            
            if message.target in self._routing_table:
                mediator_name = self._routing_table[message.target]
                if mediator_name in self._children:
                    self._children[mediator_name].receive(message)
                    return True
            
            for child in self._children.values():
                if child.send(message):
                    return True
        
        self.broadcast_down(message, message.source)
        return True
    
    def receive(self, message: Message) -> None:
        self.send(message)

class LocalMediator(HierarchicalMediator):
    """本地中介者"""
    
    def __init__(self, name: str, parent: HierarchicalMediator):
        super().__init__(name, parent)
        self._event_handlers: Dict[str, List[Any]] = {}
    
    def subscribe(self, event_type: str, handler: Any) -> None:
        if event_type not in self._event_handlers:
            self._event_handlers[event_type] = []
        self._event_handlers[event_type].append(handler)
    
    def send(self, message: Message) -> bool:
        if message.target and message.target in self._local_components:
            component = self._local_components[message.target]
            if hasattr(component, 'receive'):
                component.receive(message)
            return True
        
        if message.target:
            self.broadcast_up(message)
            return True
        
        self.broadcast_local(message)
        return True
    
    def receive(self, message: Message) -> None:
        if message.target and message.target in self._local_components:
            component = self._local_components[message.target]
            if hasattr(component, 'receive'):
                component.receive(message)
        else:
            self.broadcast_local(message)

class LocalComponent:
    """本地组件"""
    
    def __init__(self, name: str, mediator: HierarchicalMediator):
        self._name = name
        self._mediator = mediator
        mediator.register_component(self, name)
    
    def send_to(self, target: str, content: Any) -> None:
        message = Message(
            source=self._name,
            target=target,
            content=content
        )
        self._mediator.send(message)
    
    def broadcast(self, content: Any) -> None:
        message = Message(
            source=self._name,
            target=None,
            content=content
        )
        self._mediator.send(message)
    
    def receive(self, message: Message) -> None:
        print(f"  [{self._name}] 收到来自 [{message.source}] 的消息: {message.content}")

print("\n层次化中介者示例:")

root = RootMediator("root")

department_a = LocalMediator("department_a", root)
department_b = LocalMediator("department_b", root)

root.add_child(department_a)
root.add_child(department_b)

root.register_route("employee_a1", "department_a")
root.register_route("employee_b1", "department_b")

employee_a1 = LocalComponent("employee_a1", department_a)
employee_a2 = LocalComponent("employee_a2", department_a)
employee_b1 = LocalComponent("employee_b1", department_b)
employee_b2 = LocalComponent("employee_b2", department_b)

print("  部门内通信:")
employee_a1.send_to("employee_a2", "部门内部消息")

print("\n  跨部门通信:")
employee_a1.send_to("employee_b1", "跨部门消息")

print("\n  广播消息:")
employee_a1.broadcast("全员通知")

17.5 企业级应用示例

17.5.1 聊天室中介者

python
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import json

class MessageType(Enum):
    """消息类型"""
    PUBLIC = auto()
    PRIVATE = auto()
    SYSTEM = auto()
    GROUP = auto()

class UserStatus(Enum):
    """用户状态"""
    ONLINE = auto()
    AWAY = auto()
    BUSY = auto()
    OFFLINE = auto()

@dataclass
class ChatMessage:
    """聊天消息"""
    sender: str
    content: str
    msg_type: MessageType
    recipient: Optional[str] = None
    group_name: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.now)
    
    def to_dict(self) -> Dict:
        return {
            'sender': self.sender,
            'content': self.content,
            'type': self.msg_type.name,
            'recipient': self.recipient,
            'group': self.group_name,
            'timestamp': self.timestamp.isoformat()
        }

@dataclass
class User:
    """用户"""
    name: str
    status: UserStatus = UserStatus.ONLINE
    groups: Set[str] = field(default_factory=set)
    
    def join_group(self, group: str) -> None:
        self.groups.add(group)
    
    def leave_group(self, group: str) -> None:
        self.groups.discard(group)

class ChatRoomMediator:
    """聊天室中介者"""
    
    def __init__(self):
        self._users: Dict[str, User] = {}
        self._groups: Dict[str, Set[str]] = {}
        self._message_history: List[ChatMessage] = []
        self._max_history = 100
    
    def register_user(self, user: User) -> None:
        """注册用户"""
        self._users[user.name] = user
        self._broadcast_system(f"{user.name} 加入了聊天室")
        print(f"  [系统] {user.name} 加入了聊天室")
    
    def unregister_user(self, user_name: str) -> None:
        """注销用户"""
        if user_name in self._users:
            user = self._users[user_name]
            for group in user.groups:
                if group in self._groups:
                    self._groups[group].discard(user_name)
            del self._users[user_name]
            self._broadcast_system(f"{user_name} 离开了聊天室")
            print(f"  [系统] {user_name} 离开了聊天室")
    
    def create_group(self, group_name: str) -> None:
        """创建群组"""
        if group_name not in self._groups:
            self._groups[group_name] = set()
            print(f"  [系统] 群组 '{group_name}' 已创建")
    
    def join_group(self, user_name: str, group_name: str) -> None:
        """加入群组"""
        if user_name in self._users and group_name in self._groups:
            self._users[user_name].join_group(group_name)
            self._groups[group_name].add(user_name)
            self._broadcast_to_group(
                group_name,
                f"{user_name} 加入了群组",
                exclude=user_name
            )
            print(f"  [系统] {user_name} 加入了群组 '{group_name}'")
    
    def leave_group(self, user_name: str, group_name: str) -> None:
        """离开群组"""
        if user_name in self._users:
            self._users[user_name].leave_group(group_name)
            if group_name in self._groups:
                self._groups[group_name].discard(user_name)
            print(f"  [系统] {user_name} 离开了群组 '{group_name}'")
    
    def send_public(self, sender: str, content: str) -> None:
        """发送公开消息"""
        if sender not in self._users:
            return
        
        message = ChatMessage(
            sender=sender,
            content=content,
            msg_type=MessageType.PUBLIC
        )
        self._message_history.append(message)
        self._trim_history()
        
        for name in self._users:
            if name != sender:
                self._deliver_message(name, message)
    
    def send_private(self, sender: str, recipient: str, content: str) -> None:
        """发送私信"""
        if sender not in self._users or recipient not in self._users:
            return
        
        message = ChatMessage(
            sender=sender,
            content=content,
            msg_type=MessageType.PRIVATE,
            recipient=recipient
        )
        self._message_history.append(message)
        self._trim_history()
        
        self._deliver_message(recipient, message)
    
    def send_group(self, sender: str, group_name: str, content: str) -> None:
        """发送群组消息"""
        if sender not in self._users:
            return
        if group_name not in self._groups:
            return
        
        message = ChatMessage(
            sender=sender,
            content=content,
            msg_type=MessageType.GROUP,
            group_name=group_name
        )
        self._message_history.append(message)
        self._trim_history()
        
        self._broadcast_to_group(group_name, message, exclude=sender)
    
    def set_user_status(self, user_name: str, status: UserStatus) -> None:
        """设置用户状态"""
        if user_name in self._users:
            self._users[user_name].status = status
            print(f"  [系统] {user_name} 状态变为 {status.name}")
    
    def get_online_users(self) -> List[str]:
        """获取在线用户"""
        return [
            name for name, user in self._users.items()
            if user.status != UserStatus.OFFLINE
        ]
    
    def get_message_history(self, user_name: str, limit: int = 50) -> List[ChatMessage]:
        """获取消息历史"""
        return self._message_history[-limit:]
    
    def _deliver_message(self, recipient: str, message: ChatMessage) -> None:
        """投递消息"""
        if recipient in self._users:
            user = self._users[recipient]
            if user.status != UserStatus.OFFLINE:
                prefix = self._get_message_prefix(message)
                print(f"  [{recipient}] {prefix}{message.sender}: {message.content}")
    
    def _broadcast_system(self, content: str) -> None:
        """广播系统消息"""
        message = ChatMessage(
            sender="系统",
            content=content,
            msg_type=MessageType.SYSTEM
        )
        for name in self._users:
            self._deliver_message(name, message)
    
    def _broadcast_to_group(
        self,
        group_name: str,
        content_or_message,
        exclude: Optional[str] = None
    ) -> None:
        """广播到群组"""
        if group_name not in self._groups:
            return
        
        if isinstance(content_or_message, str):
            message = ChatMessage(
                sender="系统",
                content=content_or_message,
                msg_type=MessageType.GROUP,
                group_name=group_name
            )
        else:
            message = content_or_message
        
        for member in self._groups[group_name]:
            if member != exclude:
                self._deliver_message(member, message)
    
    def _get_message_prefix(self, message: ChatMessage) -> str:
        """获取消息前缀"""
        if message.msg_type == MessageType.PRIVATE:
            return "[私信]"
        elif message.msg_type == MessageType.GROUP:
            return f"[群:{message.group_name}] "
        elif message.msg_type == MessageType.SYSTEM:
            return ""
        return ""
    
    def _trim_history(self) -> None:
        """修剪历史记录"""
        if len(self._message_history) > self._max_history:
            self._message_history = self._message_history[-self._max_history:]

print("\n聊天室中介者示例:")

chat_room = ChatRoomMediator()

alice = User("Alice")
bob = User("Bob")
charlie = User("Charlie")

chat_room.register_user(alice)
chat_room.register_user(bob)
chat_room.register_user(charlie)

print("\n--- Alice 发送公开消息 ---")
chat_room.send_public("Alice", "大家好!")

print("\n--- Bob 发送私信给 Alice ---")
chat_room.send_private("Bob", "Alice", "你好 Alice")

print("\n--- 创建群组并发送群消息 ---")
chat_room.create_group("技术讨论")
chat_room.join_group("Alice", "技术讨论")
chat_room.join_group("Bob", "技术讨论")
chat_room.join_group("Charlie", "技术讨论")

chat_room.send_group("Alice", "技术讨论", "有人讨论Python吗?")

17.5.2 空中交通管制中介者

python
from typing import Dict, List, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import math

class FlightStatus(Enum):
    """航班状态"""
    GROUND = auto()
    TAXIING = auto()
    TAKEOFF = auto()
    AIRBORNE = auto()
    LANDING = auto()
    EMERGENCY = auto()

class RunwayStatus(Enum):
    """跑道状态"""
    AVAILABLE = auto()
    OCCUPIED = auto()
    MAINTENANCE = auto()

@dataclass
class Position:
    """位置"""
    x: float
    y: float
    altitude: float = 0.0
    
    def distance_to(self, other: 'Position') -> float:
        horizontal = math.sqrt((self.x - other.x)**2 + (self.y - other.y)**2)
        return math.sqrt(horizontal**2 + (self.altitude - other.altitude)**2)
    
    def __repr__(self) -> str:
        return f"({self.x:.1f}, {self.y:.1f}, {self.altitude:.0f}m)"

@dataclass
class Aircraft:
    """飞机"""
    flight_id: str
    position: Position
    status: FlightStatus = FlightStatus.GROUND
    speed: float = 0.0
    heading: float = 0.0
    destination: Optional[str] = None
    
    def update_position(self, new_position: Position) -> None:
        self.position = new_position

@dataclass
class Runway:
    """跑道"""
    runway_id: str
    status: RunwayStatus = RunwayStatus.AVAILABLE
    current_user: Optional[str] = None
    
    def is_available(self) -> bool:
        return self.status == RunwayStatus.AVAILABLE
    
    def occupy(self, flight_id: str) -> bool:
        if self.is_available():
            self.status = RunwayStatus.OCCUPIED
            self.current_user = flight_id
            return True
        return False
    
    def release(self) -> None:
        self.status = RunwayStatus.AVAILABLE
        self.current_user = None

class AirTrafficControl:
    """空中交通管制中介者"""
    
    MIN_SEPARATION = 1000.0
    MIN_ALTITUDE_SEPARATION = 300.0
    
    def __init__(self):
        self._aircrafts: Dict[str, Aircraft] = {}
        self._runways: Dict[str, Runway] = {}
        self._flight_queue: List[str] = []
        self._alerts: List[str] = []
        self._communication_log: List[str] = []
    
    def register_aircraft(self, aircraft: Aircraft) -> None:
        """注册飞机"""
        self._aircrafts[aircraft.flight_id] = aircraft
        self._log(f"{aircraft.flight_id} 已注册")
    
    def unregister_aircraft(self, flight_id: str) -> None:
        """注销飞机"""
        if flight_id in self._aircrafts:
            del self._aircrafts[flight_id]
            self._log(f"{flight_id} 已注销")
    
    def add_runway(self, runway: Runway) -> None:
        """添加跑道"""
        self._runways[runway.runway_id] = runway
        self._log(f"跑道 {runway.runway_id} 已添加")
    
    def request_takeoff(self, flight_id: str, runway_id: str) -> bool:
        """请求起飞"""
        if flight_id not in self._aircrafts:
            return False
        
        aircraft = self._aircrafts[flight_id]
        
        if aircraft.status != FlightStatus.GROUND:
            self._log(f"{flight_id}: 当前状态不允许起飞")
            return False
        
        if runway_id not in self._runways:
            self._log(f"{flight_id}: 跑道 {runway_id} 不存在")
            return False
        
        runway = self._runways[runway_id]
        
        if not runway.is_available():
            self._log(f"{flight_id}: 跑道 {runway_id} 被占用")
            self._flight_queue.append(flight_id)
            return False
        
        runway.occupy(flight_id)
        aircraft.status = FlightStatus.TAKEOFF
        self._log(f"{flight_id}: 批准从跑道 {runway_id} 起飞")
        return True
    
    def confirm_takeoff(self, flight_id: str) -> None:
        """确认起飞完成"""
        if flight_id not in self._aircrafts:
            return
        
        aircraft = self._aircrafts[flight_id]
        aircraft.status = FlightStatus.AIRBORNE
        
        for runway in self._runways.values():
            if runway.current_user == flight_id:
                runway.release()
                self._log(f"{flight_id}: 已起飞,跑道 {runway.runway_id} 释放")
        
        if self._flight_queue:
            next_flight = self._flight_queue.pop(0)
            self._log(f"通知 {next_flight}: 可以请求起飞")
    
    def request_landing(self, flight_id: str, runway_id: str) -> bool:
        """请求降落"""
        if flight_id not in self._aircrafts:
            return False
        
        aircraft = self._aircrafts[flight_id]
        
        if aircraft.status != FlightStatus.AIRBORNE:
            self._log(f"{flight_id}: 当前状态不允许降落")
            return False
        
        if runway_id not in self._runways:
            self._log(f"{flight_id}: 跑道 {runway_id} 不存在")
            return False
        
        runway = self._runways[runway_id]
        
        if not runway.is_available():
            self._log(f"{flight_id}: 跑道 {runway_id} 被占用,进入等待")
            return False
        
        runway.occupy(flight_id)
        aircraft.status = FlightStatus.LANDING
        self._log(f"{flight_id}: 批准降落到跑道 {runway_id}")
        return True
    
    def confirm_landing(self, flight_id: str) -> None:
        """确认降落完成"""
        if flight_id not in self._aircrafts:
            return
        
        aircraft = self._aircrafts[flight_id]
        aircraft.status = FlightStatus.GROUND
        
        for runway in self._runways.values():
            if runway.current_user == flight_id:
                runway.release()
                self._log(f"{flight_id}: 已降落,跑道 {runway.runway_id} 释放")
    
    def report_position(self, flight_id: str, position: Position) -> None:
        """报告位置"""
        if flight_id not in self._aircrafts:
            return
        
        self._aircrafts[flight_id].update_position(position)
        self._check_collisions(flight_id)
    
    def declare_emergency(self, flight_id: str) -> None:
        """宣布紧急情况"""
        if flight_id not in self._aircrafts:
            return
        
        aircraft = self._aircrafts[flight_id]
        aircraft.status = FlightStatus.EMERGENCY
        
        self._log(f"紧急情况: {flight_id}")
        
        for runway in self._runways.values():
            if runway.is_available():
                runway.occupy(flight_id)
                self._log(f"{flight_id}: 紧急分配跑道 {runway.runway_id}")
                break
    
    def _check_collisions(self, flight_id: str) -> None:
        """检查碰撞风险"""
        if flight_id not in self._aircrafts:
            return
        
        aircraft1 = self._aircrafts[flight_id]
        
        for other_id, aircraft2 in self._aircrafts.items():
            if other_id == flight_id:
                continue
            
            distance = aircraft1.position.distance_to(aircraft2.position)
            
            if distance < self.MIN_SEPARATION:
                alert = f"碰撞警告: {flight_id}{other_id} 距离 {distance:.0f}m"
                self._alerts.append(alert)
                self._log(alert)
    
    def _log(self, message: str) -> None:
        """记录日志"""
        timestamp = datetime.now().strftime("%H:%M:%S")
        log_entry = f"[{timestamp}] {message}"
        self._communication_log.append(log_entry)
        print(f"  {log_entry}")
    
    def get_status(self) -> Dict:
        """获取状态"""
        return {
            'aircrafts': {
                fid: {
                    'status': ac.status.name,
                    'position': str(ac.position)
                }
                for fid, ac in self._aircrafts.items()
            },
            'runways': {
                rid: {
                    'status': rw.status.name,
                    'user': rw.current_user
                }
                for rid, rw in self._runways.items()
            },
            'alerts': self._alerts[-5:],
            'queue': self._flight_queue
        }

print("\n空中交通管制中介者示例:")

atc = AirTrafficControl()

atc.add_runway(Runway("RW01"))
atc.add_runway(Runway("RW02"))

flight_ca123 = Aircraft(
    flight_id="CA123",
    position=Position(0, 0, 0),
    status=FlightStatus.GROUND
)
flight_mu456 = Aircraft(
    flight_id="MU456",
    position=Position(5000, 5000, 10000),
    status=FlightStatus.AIRBORNE
)

atc.register_aircraft(flight_ca123)
atc.register_aircraft(flight_mu456)

print("\n--- CA123 请求起飞 ---")
atc.request_takeoff("CA123", "RW01")

print("\n--- CA123 确认起飞 ---")
atc.confirm_takeoff("CA123")

print("\n--- MU456 请求降落 ---")
atc.request_landing("MU456", "RW01")

print("\n--- MU456 确认降落 ---")
atc.confirm_landing("MU456")

17.5.3 微服务协调中介者

python
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import json
import uuid

class ServiceStatus(Enum):
    """服务状态"""
    STARTING = auto()
    RUNNING = auto()
    STOPPING = auto()
    STOPPED = auto()
    ERROR = auto()

class MessageType(Enum):
    """消息类型"""
    REQUEST = auto()
    RESPONSE = auto()
    EVENT = auto()
    COMMAND = auto()

@dataclass
class ServiceMessage:
    """服务消息"""
    message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    message_type: MessageType = MessageType.REQUEST
    source: str = ""
    target: Optional[str] = None
    action: str = ""
    payload: Dict[str, Any] = field(default_factory=dict)
    correlation_id: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.now)
    
    def to_json(self) -> str:
        return json.dumps({
            'id': self.message_id,
            'type': self.message_type.name,
            'source': self.source,
            'target': self.target,
            'action': self.action,
            'payload': self.payload,
            'correlation_id': self.correlation_id,
            'timestamp': self.timestamp.isoformat()
        })

@dataclass
class ServiceInfo:
    """服务信息"""
    name: str
    endpoint: str
    status: ServiceStatus = ServiceStatus.STOPPED
    capabilities: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)

class ServiceRegistry:
    """服务注册中心"""
    
    def __init__(self):
        self._services: Dict[str, ServiceInfo] = {}
        self._capability_index: Dict[str, List[str]] = {}
    
    def register(self, service: ServiceInfo) -> None:
        self._services[service.name] = service
        for cap in service.capabilities:
            if cap not in self._capability_index:
                self._capability_index[cap] = []
            self._capability_index[cap].append(service.name)
        print(f"  [注册中心] 服务 {service.name} 已注册")
    
    def unregister(self, name: str) -> None:
        if name in self._services:
            service = self._services[name]
            for cap in service.capabilities:
                if cap in self._capability_index:
                    self._capability_index[cap] = [
                        s for s in self._capability_index[cap] if s != name
                    ]
            del self._services[name]
            print(f"  [注册中心] 服务 {name} 已注销")
    
    def get_service(self, name: str) -> Optional[ServiceInfo]:
        return self._services.get(name)
    
    def find_by_capability(self, capability: str) -> List[str]:
        return self._capability_index.get(capability, [])
    
    def get_all_services(self) -> List[ServiceInfo]:
        return list(self._services.values())

class ServiceMediator:
    """服务中介者"""
    
    def __init__(self, registry: ServiceRegistry):
        self._registry = registry
        self._handlers: Dict[str, Callable] = {}
        self._pending_requests: Dict[str, ServiceMessage] = {}
        self._event_subscribers: Dict[str, List[str]] = {}
        self._message_log: List[ServiceMessage] = []
    
    def register_handler(
        self,
        service_name: str,
        handler: Callable[[ServiceMessage], ServiceMessage]
    ) -> None:
        self._handlers[service_name] = handler
    
    def subscribe_event(self, event_type: str, subscriber: str) -> None:
        if event_type not in self._event_subscribers:
            self._event_subscribers[event_type] = []
        self._event_subscribers[event_type].append(subscriber)
    
    def send_request(self, message: ServiceMessage) -> Optional[ServiceMessage]:
        """发送请求"""
        self._message_log.append(message)
        
        if message.target:
            if message.target in self._handlers:
                handler = self._handlers[message.target]
                response = handler(message)
                response.correlation_id = message.message_id
                response.message_type = MessageType.RESPONSE
                self._message_log.append(response)
                return response
            else:
                print(f"  [中介者] 服务 {message.target} 不可用")
                return None
        else:
            services = self._registry.find_by_capability(message.action)
            if services:
                target = services[0]
                message.target = target
                return self.send_request(message)
            return None
    
    def broadcast_event(self, message: ServiceMessage) -> None:
        """广播事件"""
        self._message_log.append(message)
        message.message_type = MessageType.EVENT
        
        event_type = message.action
        if event_type in self._event_subscribers:
            for subscriber in self._event_subscribers[event_type]:
                if subscriber in self._handlers:
                    handler = self._handlers[subscriber]
                    handler(message)
    
    def send_command(self, message: ServiceMessage) -> bool:
        """发送命令"""
        self._message_log.append(message)
        message.message_type = MessageType.COMMAND
        
        if message.target and message.target in self._handlers:
            handler = self._handlers[message.target]
            handler(message)
            return True
        return False

class Microservice:
    """微服务基类"""
    
    def __init__(self, info: ServiceInfo, mediator: ServiceMediator):
        self._info = info
        self._mediator = mediator
        mediator.register_handler(info.name, self.handle_message)
    
    @property
    def name(self) -> str:
        return self._info.name
    
    def send_request(
        self,
        target: str,
        action: str,
        payload: Dict[str, Any]
    ) -> Optional[ServiceMessage]:
        message = ServiceMessage(
            source=self._info.name,
            target=target,
            action=action,
            payload=payload
        )
        return self._mediator.send_request(message)
    
    def broadcast_event(self, event_type: str, payload: Dict[str, Any]) -> None:
        message = ServiceMessage(
            source=self._info.name,
            action=event_type,
            payload=payload
        )
        self._mediator.broadcast_event(message)
    
    def handle_message(self, message: ServiceMessage) -> ServiceMessage:
        return ServiceMessage(
            source=self._info.name,
            target=message.source,
            action=f"{message.action}_response",
            payload={'status': 'acknowledged'}
        )

class OrderService(Microservice):
    """订单服务"""
    
    def __init__(self, mediator: ServiceMediator):
        info = ServiceInfo(
            name="order-service",
            endpoint="http://order-service:8080",
            capabilities=["create_order", "cancel_order", "get_order"]
        )
        super().__init__(info, mediator)
        self._orders: Dict[str, Dict] = {}
    
    def create_order(self, order_data: Dict) -> str:
        order_id = str(uuid.uuid4())[:8]
        self._orders[order_id] = order_data
        
        self.broadcast_event("order_created", {
            'order_id': order_id,
            'customer': order_data.get('customer')
        })
        
        return order_id
    
    def handle_message(self, message: ServiceMessage) -> ServiceMessage:
        if message.action == "create_order":
            order_id = self.create_order(message.payload)
            return ServiceMessage(
                source=self._info.name,
                target=message.source,
                action="create_order_response",
                payload={'order_id': order_id, 'status': 'created'}
            )
        return super().handle_message(message)

class PaymentService(Microservice):
    """支付服务"""
    
    def __init__(self, mediator: ServiceMediator):
        info = ServiceInfo(
            name="payment-service",
            endpoint="http://payment-service:8080",
            capabilities=["process_payment", "refund_payment"]
        )
        super().__init__(info, mediator)
        self._payments: Dict[str, Dict] = {}
    
    def process_payment(self, order_id: str, amount: float) -> bool:
        payment_id = str(uuid.uuid4())[:8]
        self._payments[payment_id] = {
            'order_id': order_id,
            'amount': amount,
            'status': 'completed'
        }
        
        self.broadcast_event("payment_completed", {
            'payment_id': payment_id,
            'order_id': order_id,
            'amount': amount
        })
        
        return True
    
    def handle_message(self, message: ServiceMessage) -> ServiceMessage:
        if message.action == "process_payment":
            success = self.process_payment(
                message.payload.get('order_id'),
                message.payload.get('amount')
            )
            return ServiceMessage(
                source=self._info.name,
                target=message.source,
                action="process_payment_response",
                payload={'success': success}
            )
        return super().handle_message(message)

class NotificationService(Microservice):
    """通知服务"""
    
    def __init__(self, mediator: ServiceMediator):
        info = ServiceInfo(
            name="notification-service",
            endpoint="http://notification-service:8080",
            capabilities=["send_notification"]
        )
        super().__init__(info, mediator)
        self._notifications: List[Dict] = []
        
        mediator.subscribe_event("order_created", self._info.name)
        mediator.subscribe_event("payment_completed", self._info.name)
    
    def handle_message(self, message: ServiceMessage) -> ServiceMessage:
        if message.message_type == MessageType.EVENT:
            event_type = message.action
            payload = message.payload
            
            notification = {
                'type': event_type,
                'data': payload,
                'timestamp': datetime.now().isoformat()
            }
            self._notifications.append(notification)
            print(f"  [通知服务] 发送通知: {event_type} -> {payload}")
            
            return ServiceMessage(
                source=self._info.name,
                action="notification_sent",
                payload={'status': 'sent'}
            )
        
        return super().handle_message(message)

print("\n微服务协调中介者示例:")

registry = ServiceRegistry()
mediator = ServiceMediator(registry)

order_service = OrderService(mediator)
payment_service = PaymentService(mediator)
notification_service = NotificationService(mediator)

registry.register(ServiceInfo(
    name="order-service",
    endpoint="http://order-service:8080",
    capabilities=["create_order"]
))
registry.register(ServiceInfo(
    name="payment-service",
    endpoint="http://payment-service:8080",
    capabilities=["process_payment"]
))
registry.register(ServiceInfo(
    name="notification-service",
    endpoint="http://notification-service:8080",
    capabilities=["send_notification"]
))

print("\n--- 创建订单 ---")
response = order_service.send_request(
    "order-service",
    "create_order",
    {'customer': '张三', 'items': ['商品A', '商品B']}
)
print(f"  订单响应: {response.payload}")

print("\n--- 处理支付 ---")
response = payment_service.send_request(
    "payment-service",
    "process_payment",
    {'order_id': response.payload.get('order_id'), 'amount': 299.0}
)
print(f"  支付响应: {response.payload}")

17.6 模式变体

17.6.1 观察者中介者

python
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass, field

@dataclass
class ObserverEvent:
    """观察者事件"""
    name: str
    data: Any = None
    source: Optional[str] = None

class ObserverMediator:
    """观察者中介者 - 结合观察者模式"""
    
    def __init__(self):
        self._observers: Dict[str, List[Callable]] = {}
        self._subjects: Dict[str, Any] = {}
    
    def register_subject(self, name: str, subject: Any) -> None:
        self._subjects[name] = subject
    
    def register_observer(
        self,
        event_name: str,
        observer: Callable[[ObserverEvent], None]
    ) -> None:
        if event_name not in self._observers:
            self._observers[event_name] = []
        self._observers[event_name].append(observer)
    
    def notify(self, event: ObserverEvent) -> None:
        if event.name in self._observers:
            for observer in self._observers[event.name]:
                observer(event)
    
    def notify_all(self, event: ObserverEvent) -> None:
        for observers in self._observers.values():
            for observer in observers:
                observer(event)

class ObservableComponent:
    """可观察组件"""
    
    def __init__(self, name: str, mediator: ObserverMediator):
        self._name = name
        self._mediator = mediator
        mediator.register_subject(name, self)
    
    def emit(self, event_name: str, data: Any = None) -> None:
        event = ObserverEvent(
            name=event_name,
            data=data,
            source=self._name
        )
        self._mediator.notify(event)

print("\n观察者中介者示例:")

observer_mediator = ObserverMediator()

def log_event(event: ObserverEvent) -> None:
    print(f"  [日志] 事件: {event.name}, 数据: {event.data}")

def alert_event(event: ObserverEvent) -> None:
    if event.data and event.data.get('level') == 'error':
        print(f"  [警报] 错误事件: {event.name}")

observer_mediator.register_observer('update', log_event)
observer_mediator.register_observer('update', alert_event)
observer_mediator.register_observer('delete', log_event)

component = ObservableComponent("data-component", observer_mediator)

print("  发送更新事件:")
component.emit('update', {'level': 'info', 'message': '数据已更新'})

print("\n  发送错误事件:")
component.emit('update', {'level': 'error', 'message': '发生错误'})

17.6.2 命令中介者

python
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass
from enum import Enum, auto

class CommandType(Enum):
    """命令类型"""
    EXECUTE = auto()
    UNDO = auto()
    REDO = auto()

@dataclass
class Command:
    """命令"""
    name: str
    params: Dict[str, Any]
    source: str

class CommandMediator:
    """命令中介者 - 结合命令模式"""
    
    def __init__(self):
        self._handlers: Dict[str, Callable] = {}
        self._history: List[tuple] = []
        self._undo_stack: List[tuple] = []
    
    def register_handler(
        self,
        command_name: str,
        handler: Callable,
        undo_handler: Optional[Callable] = None
    ) -> None:
        self._handlers[command_name] = (handler, undo_handler)
    
    def execute(self, command: Command) -> Any:
        if command.name not in self._handlers:
            raise ValueError(f"未知命令: {command.name}")
        
        handler, undo_handler = self._handlers[command.name]
        result = handler(**command.params)
        
        self._history.append((command, result))
        if undo_handler:
            self._undo_stack.append((command, undo_handler))
        
        return result
    
    def undo(self) -> Optional[Any]:
        if not self._undo_stack:
            return None
        
        command, undo_handler = self._undo_stack.pop()
        return undo_handler(**command.params)
    
    def get_history(self) -> List[tuple]:
        return self._history.copy()

class CommandComponent:
    """命令组件"""
    
    def __init__(self, name: str, mediator: CommandMediator):
        self._name = name
        self._mediator = mediator
    
    def send_command(self, command_name: str, params: Dict[str, Any]) -> Any:
        command = Command(
            name=command_name,
            params=params,
            source=self._name
        )
        return self._mediator.execute(command)

print("\n命令中介者示例:")

command_mediator = CommandMediator()

def create_file(name: str, content: str = "") -> str:
    print(f"  创建文件: {name}")
    return f"file:{name}"

def delete_file(name: str) -> str:
    print(f"  删除文件: {name}")
    return f"deleted:{name}"

command_mediator.register_handler('create', create_file, delete_file)
command_mediator.register_handler('delete', delete_file, create_file)

file_manager = CommandComponent("file-manager", command_mediator)

print("  执行创建命令:")
file_manager.send_command('create', {'name': 'test.txt', 'content': 'Hello'})

print("\n  执行撤销:")
command_mediator.undo()

17.6.3 状态中介者

python
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from enum import Enum, auto
import copy

class StateAction(Enum):
    """状态动作"""
    ENTER = auto()
    EXIT = auto()
    TRANSITION = auto()

@dataclass
class StateChange:
    """状态变更"""
    component: str
    old_state: Any
    new_state: Any
    action: StateAction

class StateMediator:
    """状态中介者 - 管理组件状态"""
    
    def __init__(self):
        self._states: Dict[str, Any] = {}
        self._transitions: Dict[str, Dict[Any, List[Any]]] = {}
        self._listeners: List[Callable[[StateChange], None]] = []
        self._history: List[StateChange] = []
    
    def register_component(
        self,
        name: str,
        initial_state: Any,
        allowed_transitions: Optional[Dict[Any, List[Any]]] = None
    ) -> None:
        self._states[name] = initial_state
        if allowed_transitions:
            self._transitions[name] = allowed_transitions
    
    def get_state(self, component: str) -> Any:
        return self._states.get(component)
    
    def set_state(self, component: str, new_state: Any) -> bool:
        if component not in self._states:
            return False
        
        old_state = self._states[component]
        
        if component in self._transitions:
            allowed = self._transitions[component].get(old_state, [])
            if new_state not in allowed:
                print(f"  [状态中介者] {component}: 状态转换 {old_state} -> {new_state} 不允许")
                return False
        
        self._states[component] = new_state
        
        change = StateChange(
            component=component,
            old_state=old_state,
            new_state=new_state,
            action=StateAction.TRANSITION
        )
        self._history.append(change)
        
        for listener in self._listeners:
            listener(change)
        
        print(f"  [状态中介者] {component}: {old_state} -> {new_state}")
        return True
    
    def add_listener(self, listener: Callable[[StateChange], None]) -> None:
        self._listeners.append(listener)
    
    def get_history(self) -> List[StateChange]:
        return self._history.copy()
    
    def get_all_states(self) -> Dict[str, Any]:
        return copy.deepcopy(self._states)

class StatefulComponent:
    """有状态组件"""
    
    def __init__(self, name: str, mediator: StateMediator):
        self._name = name
        self._mediator = mediator
    
    def get_state(self) -> Any:
        return self._mediator.get_state(self._name)
    
    def set_state(self, new_state: Any) -> bool:
        return self._mediator.set_state(self._name, new_state)

print("\n状态中介者示例:")

state_mediator = StateMediator()

state_mediator.register_component(
    "order",
    "pending",
    {
        "pending": ["confirmed", "cancelled"],
        "confirmed": ["shipped", "cancelled"],
        "shipped": ["delivered", "returned"],
        "delivered": ["returned"],
        "cancelled": [],
        "returned": []
    }
)

state_mediator.register_component(
    "payment",
    "waiting",
    {
        "waiting": ["processing", "failed"],
        "processing": ["completed", "failed"],
        "completed": ["refunded"],
        "failed": ["waiting"],
        "refunded": []
    }
)

def on_state_change(change: StateChange) -> None:
    if change.new_state == "failed":
        print(f"    [警报] {change.component} 进入失败状态!")

state_mediator.add_listener(on_state_change)

order = StatefulComponent("order", state_mediator)
payment = StatefulComponent("payment", state_mediator)

print("  订单状态变更:")
order.set_state("confirmed")
order.set_state("shipped")
order.set_state("delivered")

print("\n  支付状态变更:")
payment.set_state("processing")
payment.set_state("completed")

print("\n  尝试非法状态转换:")
order.set_state("pending")

17.7 反模式与最佳实践

17.7.1 常见反模式

python
from typing import Dict, List, Any, Optional

class MediatorAntiPatterns:
    """中介者反模式示例"""
    
    @staticmethod
    def anti_pattern_1_god_mediator():
        """反模式1:上帝中介者"""
        
        class GodMediator:
            def __init__(self):
                self._components: Dict[str, Any] = {}
            
            def handle_everything(self, event: str, data: Dict) -> None:
                if event == "user_login":
                    pass
                elif event == "user_logout":
                    pass
                elif event == "create_order":
                    pass
                elif event == "process_payment":
                    pass
                elif event == "send_notification":
                    pass
                elif event == "update_inventory":
                    pass
                elif event == "generate_report":
                    pass
                elif event == "backup_data":
                    pass
        
        print("    问题: 中介者承担了过多职责,违反单一职责原则")
        print("    解决: 将中介者拆分为多个专门的中介者")
    
    @staticmethod
    def anti_pattern_2_tight_coupling():
        """反模式2:紧耦合中介者"""
        
        class TightMediator:
            def __init__(self):
                self._component_a = None
                self._component_b = None
                self._component_c = None
            
            def notify(self, sender: str, event: str) -> None:
                if sender == "A" and event == "click":
                    self._component_b.do_something()
                    self._component_c.do_other()
                elif sender == "B" and event == "update":
                    self._component_a.refresh()
        
        print("    问题: 中介者直接引用具体组件类,难以扩展")
        print("    解决: 使用接口/抽象类,通过名称或ID引用组件")
    
    @staticmethod
    def anti_pattern_3_circular_dependency():
        """反模式3:循环依赖"""
        
        print("    问题: 组件A -> 中介者 -> 组件B -> 中介者 -> 组件A")
        print("    解决: 使用事件队列或异步处理避免循环调用")
    
    @staticmethod
    def anti_pattern_4_missing_mediator():
        """反模式4:缺失中介者"""
        
        class ComponentA:
            def __init__(self):
                self._b = None
                self._c = None
                self._d = None
            
            def do_something(self) -> None:
                self._b.react()
                self._c.react()
                self._d.react()
        
        print("    问题: 组件间直接依赖,形成网状结构")
        print("    解决: 引入中介者,将网状依赖转为星型依赖")

print("\n中介者反模式示例:")
print("  反模式1 - 上帝中介者:")
MediatorAntiPatterns.anti_pattern_1_god_mediator()

print("\n  反模式2 - 紧耦合中介者:")
MediatorAntiPatterns.anti_pattern_2_tight_coupling()

print("\n  反模式3 - 循环依赖:")
MediatorAntiPatterns.anti_pattern_3_circular_dependency()

print("\n  反模式4 - 缺失中介者:")
MediatorAntiPatterns.anti_pattern_4_missing_mediator()

17.7.2 最佳实践

python
from typing import Dict, List, Callable, Any, Optional
from abc import ABC, abstractmethod
from dataclasses import dataclass

class MediatorBestPractices:
    """中介者最佳实践"""
    
    @staticmethod
    def practice_1_single_responsibility():
        """实践1:单一职责"""
        
        class OrderMediator:
            def __init__(self):
                self._handlers: Dict[str, Callable] = {}
            
            def register(self, event: str, handler: Callable) -> None:
                self._handlers[event] = handler
            
            def notify(self, event: str, data: Dict) -> None:
                if event in self._handlers:
                    self._handlers[event](data)
        
        class PaymentMediator:
            def __init__(self):
                self._handlers: Dict[str, Callable] = {}
            
            def register(self, event: str, handler: Callable) -> None:
                self._handlers[event] = handler
            
            def notify(self, event: str, data: Dict) -> None:
                if event in self._handlers:
                    self._handlers[event](data)
        
        print("    将不同职责分离到不同的中介者")
    
    @staticmethod
    def practice_2_event_driven():
        """实践2:事件驱动"""
        
        @dataclass
        class Event:
            type: str
            data: Any
            source: str
        
        class EventMediator:
            def __init__(self):
                self._subscribers: Dict[str, List[Callable]] = {}
            
            def subscribe(self, event_type: str, handler: Callable) -> None:
                if event_type not in self._subscribers:
                    self._subscribers[event_type] = []
                self._subscribers[event_type].append(handler)
            
            def publish(self, event: Event) -> None:
                if event.type in self._subscribers:
                    for handler in self._subscribers[event.type]:
                        handler(event)
        
        print("    使用事件驱动实现松耦合通信")
    
    @staticmethod
    def practice_3_async_processing():
        """实践3:异步处理"""
        
        from queue import Queue
        import threading
        
        class AsyncMediator:
            def __init__(self):
                self._queue: Queue = Queue()
                self._handlers: Dict[str, Callable] = {}
                self._running = False
            
            def start(self) -> None:
                self._running = True
                thread = threading.Thread(target=self._process_queue)
                thread.daemon = True
                thread.start()
            
            def stop(self) -> None:
                self._running = False
            
            def send(self, event_type: str, data: Any) -> None:
                self._queue.put((event_type, data))
            
            def _process_queue(self) -> None:
                while self._running:
                    try:
                        event_type, data = self._queue.get(timeout=0.1)
                        if event_type in self._handlers:
                            self._handlers[event_type](data)
                    except:
                        pass
        
        print("    使用队列实现异步处理,避免阻塞")
    
    @staticmethod
    def practice_4_logging_monitoring():
        """实践4:日志与监控"""
        
        class MonitoredMediator:
            def __init__(self):
                self._handlers: Dict[str, Callable] = {}
                self._event_log: List[Dict] = []
                self._metrics: Dict[str, int] = {}
            
            def notify(self, event_type: str, data: Dict) -> None:
                from datetime import datetime
                
                self._event_log.append({
                    'type': event_type,
                    'data': data,
                    'timestamp': datetime.now().isoformat()
                })
                
                self._metrics[event_type] = self._metrics.get(event_type, 0) + 1
                
                if event_type in self._handlers:
                    self._handlers[event_type](data)
            
            def get_metrics(self) -> Dict[str, int]:
                return self._metrics.copy()
        
        print("    添加日志和监控,便于调试和分析")

print("\n中介者最佳实践示例:")
print("  实践1 - 单一职责:")
MediatorBestPractices.practice_1_single_responsibility()

print("\n  实践2 - 事件驱动:")
MediatorBestPractices.practice_2_event_driven()

print("\n  实践3 - 异步处理:")
MediatorBestPractices.practice_3_async_processing()

print("\n  实践4 - 日志与监控:")
MediatorBestPractices.practice_4_logging_monitoring()

17.8 决策指南

17.8.1 中介者类型选择决策树

┌─────────────────────────────────────────────────────────────────────┐
│                    Mediator Type Decision Tree                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│                    需要协调多个组件?                                │
│                         │                                            │
│              ┌──────────┴──────────┐                                │
│              │                     │                                │
│             是                    否                                │
│              │                     │                                │
│              ▼                     ▼                                │
│    ┌─────────────────┐      不需要中介者                            │
│    │ 组件数量多少?  │                                             │
│    └────────┬────────┘                                             │
│             │                                                       │
│     ┌───────┼───────┐                                              │
│     │       │       │                                              │
│    少      中等     多                                              │
│     │       │       │                                              │
│     ▼       ▼       ▼                                              │
│  简单    事件驱动  分布式                                           │
│  中介者  中介者    中介者                                           │
│                                                                      │
│              通信模式?                                              │
│                  │                                                   │
│        ┌────────┴────────┐                                          │
│        │                 │                                          │
│      同步              异步                                         │
│        │                 │                                          │
│        ▼                 ▼                                          │
│   直接调用          消息队列                                         │
│   中介者            中介者                                           │
│                                                                      │
│              是否需要状态管理?                                      │
│                    │                                                 │
│         ┌─────────┴─────────┐                                       │
│         │                   │                                       │
│        是                  否                                       │
│         │                   │                                       │
│         ▼                   ▼                                       │
│   状态中介者          无状态中介者                                   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

17.8.2 实现方式选择

场景推荐实现原因
UI组件协调集中式中介者组件数量有限,逻辑集中便于维护
聊天系统事件驱动中介者支持广播和私信,扩展性好
微服务通信消息总线/服务网格分布式环境,支持异步和重试
状态机管理状态中介者集中管理状态转换规则
命令执行命令中介者支持撤销/重做,历史记录
物联网设备层次化中介者支持设备分组,层级管理

17.9 快速参考卡

17.9.1 核心概念速查

┌─────────────────────────────────────────────────────────────────────┐
│                     Mediator Pattern Quick Reference                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  【定义】                                                            │
│  用一个中介对象封装一系列对象交互,使对象不需要显式相互引用         │
│                                                                      │
│  【核心组件】                                                        │
│  • Mediator: 中介者接口,定义通信方法                               │
│  • ConcreteMediator: 具体中介者,协调各组件                         │
│  • Component: 组件接口,定义与中介者交互的方法                      │
│  • ConcreteComponent: 具体组件,实现业务逻辑                        │
│                                                                      │
│  【依赖关系转换】                                                    │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  无中介者: 网状依赖 O(n²)                                     │   │
│  │  有中介者: 星型依赖 O(n)                                      │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【常用方法】                                                        │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  notify(sender, event)  # 组件通知中介者                      │   │
│  │  send(target, message)  # 发送消息到目标组件                  │   │
│  │  broadcast(message)     # 广播消息到所有组件                  │   │
│  │  subscribe(event, handler) # 订阅事件                         │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【适用场景】                                                        │
│  ✓ 对象间存在复杂的引用关系                                         │
│  ✓ 想通过一个中间类封装多个类的行为                                 │
│  ✓ 需要自定义对象间的交互逻辑                                       │
│  ✓ 组件需要独立复用                                                 │
│                                                                      │
│  【优点】                                                            │
│  + 降低耦合度(网状→星型)                                          │
│  + 集中控制交互逻辑                                                 │
│  + 提高组件可复用性                                                 │
│  + 简化组件间协议                                                   │
│                                                                      │
│  【缺点】                                                            │
│  - 中介者可能变得复杂                                               │
│  - 中介者成为单点故障                                               │
│  - 可能影响性能                                                     │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

17.9.2 代码模板速查

python
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass

@dataclass
class Event:
    type: str
    source: str
    data: Dict[str, Any]

class MediatorTemplate(ABC):
    """中介者模板"""
    
    @abstractmethod
    def notify(self, event: Event) -> None:
        pass
    
    @abstractmethod
    def register(self, component: 'ComponentTemplate', name: str) -> None:
        pass

class ComponentTemplate(ABC):
    """组件模板"""
    
    def __init__(self, name: str):
        self._name = name
        self._mediator: Optional[MediatorTemplate] = None
    
    def set_mediator(self, mediator: MediatorTemplate) -> None:
        self._mediator = mediator
    
    def send_event(self, event_type: str, data: Dict[str, Any]) -> None:
        if self._mediator:
            self._mediator.notify(Event(
                type=event_type,
                source=self._name,
                data=data
            ))
    
    @abstractmethod
    def receive_event(self, event: Event) -> None:
        pass

class ConcreteMediatorTemplate(MediatorTemplate):
    """具体中介者模板"""
    
    def __init__(self):
        self._components: Dict[str, ComponentTemplate] = {}
    
    def register(self, component: ComponentTemplate, name: str) -> None:
        self._components[name] = component
        component.set_mediator(self)
    
    def notify(self, event: Event) -> None:
        for name, component in self._components.items():
            if name != event.source:
                component.receive_event(event)

17.10 小结

中介者模式通过引入一个中介对象来协调对象间的交互,将网状依赖转换为星型依赖,显著降低了系统的耦合度。本章从形式化定义出发,深入探讨了中介者的数学基础、依赖关系转换和通信模型。

关键要点

  1. 形式化理解:中介者将网状结构 $G_{mesh} = (V, E_{mesh})$ 转换为星型结构 $G_{star} = (V \cup {m}, E_{star})$,将 $O(n^2)$ 的依赖关系简化为 $O(n)$。

  2. 核心实现:基于ABC的中介者框架、事件驱动中介者、层次化中介者等多种实现方式。

  3. 企业级应用:聊天室中介者、空中交通管制中介者、微服务协调中介者等实际场景。

  4. 模式变体:观察者中介者、命令中介者、状态中介者等结合其他模式的变体。

  5. 最佳实践:单一职责、事件驱动、异步处理、日志监控等实践原则。

中介者模式与观察者模式、命令模式、状态模式等可以结合使用,形成更强大的协调机制。在设计复杂系统时,合理使用中介者模式可以显著提高系统的可维护性和可扩展性。

Python技术丛书 - 江苏省宿城中等专业学校