第19章 观察者模式
学习目标
完成本章学习后,读者将能够:
- 理解观察者模式的核心概念、数学定义与理论基础
- 掌握发布-订阅机制的多种实现策略
- 使用Python实现同步、异步及响应式观察者模式
- 识别观察者模式的适用场景与局限性
- 理解观察者模式与现代响应式编程的关系
19.1 模式定义
19.1.1 核心定义
观察者模式(Observer Pattern) 定义对象间的一种一对多的依赖关系,当一个对象(被观察者/主题)的状态发生改变时,所有依赖于它的对象(观察者)都得到通知并被自动更新。该模式是软件设计模式中应用最广泛的模式之一,是事件驱动架构的基础。
19.1.2 形式化定义
从数学角度,观察者模式可以形式化定义为:
定义 19.1(观察者系统) 观察者系统是一个六元组:
$$\mathcal{O} = \langle S, O, \Sigma, \delta, \gamma, \phi \rangle$$
其中:
- $S$:主题(Subject)的有限状态集合
- $O = {o_1, o_2, \ldots, o_n}$:观察者集合
- $\Sigma$:事件字母表,表示可能的状态变化事件
- $\delta: S \times \Sigma \rightarrow S$:状态转移函数
- $\gamma: S \times \Sigma \rightarrow 2^O$:通知选择函数,确定哪些观察者应被通知
- $\phi: O \times S \times S \times \Sigma \rightarrow \text{Action}$:观察者响应函数
定义 19.2(通知语义) 当主题状态从 $s$ 转移到 $s'$ 时,观察者 $o_i$ 的通知条件为:
$$\text{Notify}(o_i, s, s', \sigma) \Leftrightarrow o_i \in \gamma(s, \sigma)$$
定义 19.3(依赖关系) 观察者 $o_i$ 对主题 $s$ 的依赖度定义为:
$$D(o_i, s) = \frac{|{(s, s', \sigma) : o_i \in \gamma(s, \sigma)}|}{|\Sigma| \times |S|}$$
19.1.3 信息流模型
观察者模式的信息流可以用有向图 $G = (V, E)$ 表示:
- $V = {s} \cup O$:节点集合,包含主题和所有观察者
- $E \subseteq {s} \times O$:边集合,表示订阅关系
定理 19.1(解耦性) 在观察者模式中,主题与观察者之间的耦合度 $C$ 满足:
$$C_{\text{observer}} = O(1) \ll C_{\text{direct}} = O(n)$$
其中 $n$ 为观察者数量,直接依赖需要主题显式调用每个依赖对象。
19.1.4 发布-订阅语义
观察者模式与发布-订阅模式的关系:
| 特性 | 观察者模式 | 发布-订阅模式 |
|---|---|---|
| 耦合度 | 松耦合(主题知道观察者接口) | 完全解耦(通过消息代理) |
| 通信方式 | 直接调用 | 消息队列 |
| 同步性 | 通常同步 | 通常异步 |
| 过滤 | 主题端过滤 | 代理端过滤 |
| 适用规模 | 小到中等规模 | 大规模分布式系统 |
19.2 历史背景与理论渊源
19.2.1 发展历程
| 年份 | 里程碑 | 贡献者 | 意义 |
|---|---|---|---|
| 1970s | Smalltalk MVC | Trygve Reenskaug | 首次将观察者概念应用于UI框架 |
| 1987 | Model-View-Controller | Krasner & Pope | 正式确立MVC模式,观察者为核心 |
| 1994 | GoF设计模式 | Gamma et al. | 将观察者模式标准化为23种设计模式之一 |
| 1997 | JavaBeans | Sun Microsystems | PropertyChangeSupport实现观察者模式 |
| 2002 | C# Events | Microsoft | 语言级事件支持,委托机制 |
| 2010 | Rx (Reactive Extensions) | Microsoft | 响应式编程范式,观察者模式的函数式扩展 |
| 2013 | React Flux | 单向数据流架构,观察者模式的变体 | |
| 2015 | Reactive Streams | Lightbend等 | 异步流处理标准,观察者模式的标准化 |
| 2020 | Project Loom | Oracle | 虚拟线程,简化异步观察者实现 |
19.2.2 理论基础
观察者模式的理论基础源于:
依赖倒置原则(DIP):高层模块(主题)不依赖低层模块(观察者),两者都依赖抽象
开闭原则(OCP):新增观察者无需修改主题代码
信息隐藏原则:主题不需要了解观察者的具体实现
控制反转(IoC):控制权从主题转移到观察者框架
19.3 UML结构图
19.3.1 标准结构
┌─────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ Observer │
├─────────────────────────────────────────────────────────────────┤
│ + update(subject: Subject): void │
│ + update(data: Any): void ← 推模型 │
└─────────────────────────────────────────────────────────────────┘
△
│
┌───────────────┴───────────────┐
│ │
┌─────────────────────────┐ ┌─────────────────────────┐
│ ConcreteObserverA │ │ ConcreteObserverB │
├─────────────────────────┤ ├─────────────────────────┤
│ - observerState: any │ │ - observerState: any │
├─────────────────────────┤ ├─────────────────────────┤
│ + update(subject): void │ │ + update(subject): void │
└─────────────────────────┘ └─────────────────────────┘
△ △
│ │
└───────────────┬───────────────┘
│ 通知
┌─────────────────────────────────────────────────────────────────┐
│ Subject │
├─────────────────────────────────────────────────────────────────┤
│ - observers: List[Observer] │
│ - state: Any │
├─────────────────────────────────────────────────────────────────┤
│ + attach(observer: Observer): void │
│ + detach(observer: Observer): void │
│ + notify(): void │
│ + getState(): Any │
│ + setState(state: Any): void │
└─────────────────────────────────────────────────────────────────┘
△
│
┌─────────────────────────────────────────────────────────────────┐
│ ConcreteSubject │
├─────────────────────────────────────────────────────────────────┤
│ - subjectState: Any │
├─────────────────────────────────────────────────────────────────┤
│ + getSubjectState(): Any │
│ + setSubjectState(state: Any): void │
└─────────────────────────────────────────────────────────────────┘19.3.2 推模型 vs 拉模型
推模型(Push Model):
┌──────────┐ update(data) ┌──────────┐
│ Subject │ ─────────────────→ │ Observer │
└──────────┘ └──────────┘
主题主动推送变化数据
拉模型(Pull Model):
┌──────────┐ notify() ┌──────────┐
│ Subject │ ─────────────→ │ Observer │
└──────────┘ └──────────┘
│
│ subject.getState()
↓
┌──────────┐
│ Subject │
└──────────┘
观察者主动获取所需数据19.3.3 事件总线架构
┌─────────────────────────────────────────────────────────────────┐
│ EventBus │
├─────────────────────────────────────────────────────────────────┤
│ - listeners: Dict[str, List[Callable]] │
├─────────────────────────────────────────────────────────────────┤
│ + subscribe(event: str, handler: Callable): void │
│ + unsubscribe(event: str, handler: Callable): void │
│ + publish(event: str, data: Any): void │
└─────────────────────────────────────────────────────────────────┘
△ △
│ 订阅 │ 发布
│ │
┌─────────────────┐ ┌─────────────────┐
│ Listener A │ │ Publisher │
├─────────────────┤ ├─────────────────┤
│ + on_event() │ │ + do_something()│
└─────────────────┘ └─────────────────┘
┌─────────────────┐
│ Listener B │
├─────────────────┤
│ + on_event() │
└─────────────────┘19.4 Python实现
19.4.1 基于ABC的标准实现
from abc import ABC, abstractmethod
from typing import List, Any, Optional, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
import weakref
T = TypeVar('T')
class Observer(ABC, Generic[T]):
@abstractmethod
def update(self, data: T) -> None:
pass
class Subject(Generic[T]):
def __init__(self, name: str = "Subject"):
self._name = name
self._observers: List[Observer[T]] = []
self._state: Optional[T] = None
self._version: int = 0
@property
def name(self) -> str:
return self._name
@property
def state(self) -> Optional[T]:
return self._state
@state.setter
def state(self, value: T) -> None:
old_state = self._state
self._state = value
self._version += 1
self._notify(old_state, value)
@property
def version(self) -> int:
return self._version
def attach(self, observer: Observer[T]) -> 'Subject[T]':
if observer not in self._observers:
self._observers.append(observer)
return self
def detach(self, observer: Observer[T]) -> 'Subject[T]':
if observer in self._observers:
self._observers.remove(observer)
return self
def _notify(self, old_state: Optional[T], new_state: T) -> None:
for observer in self._observers[:]:
try:
observer.update(new_state)
except Exception as e:
print(f"Observer notification failed: {e}")
def observer_count(self) -> int:
return len(self._observers)
@dataclass
class StateChange:
subject_name: str
old_value: Any
new_value: Any
timestamp: datetime = field(default_factory=datetime.now)
version: int = 0
class LoggingObserver(Observer[Any]):
def __init__(self, name: str):
self._name = name
self._history: List[StateChange] = []
def update(self, data: Any) -> None:
change = StateChange(
subject_name="",
old_value=None,
new_value=data,
version=0
)
self._history.append(change)
print(f"[{self._name}] Received update: {data}")
@property
def history(self) -> List[StateChange]:
return self._history.copy()
class ThresholdObserver(Observer[float]):
def __init__(self, threshold: float, name: str = "ThresholdObserver"):
self._threshold = threshold
self._name = name
self._triggered = False
def update(self, data: float) -> None:
if data >= self._threshold and not self._triggered:
print(f"[{self._name}] Alert! Value {data} >= threshold {self._threshold}")
self._triggered = True
elif data < self._threshold:
self._triggered = False
subject = Subject[float]("TemperatureSensor")
subject.attach(LoggingObserver("Logger"))
subject.attach(ThresholdObserver(30.0, "HighTempAlert"))
subject.state = 25.0
subject.state = 28.0
subject.state = 32.0
subject.state = 35.0
subject.state = 29.019.4.2 弱引用观察者
from weakref import WeakSet, WeakMethod, ref
from typing import Callable, Set, Any, Optional
from dataclasses import dataclass
import inspect
class WeakSubject:
def __init__(self):
self._weak_observers: Set = WeakSet()
self._weak_methods: list = []
self._state: Any = None
@property
def state(self) -> Any:
return self._state
@state.setter
def state(self, value: Any) -> None:
self._state = value
self._notify()
def attach(self, observer: Any) -> None:
if hasattr(observer, '__func__') and hasattr(observer, '__self__'):
self._weak_methods.append(WeakMethod(observer))
else:
self._weak_observers.add(observer)
def attach_callback(self, callback: Callable) -> None:
if inspect.ismethod(callback):
self._weak_methods.append(WeakMethod(callback))
else:
self._weak_observers.add(callback)
def _notify(self) -> None:
for observer in self._weak_observers:
try:
if callable(observer):
observer(self._state)
except Exception as e:
print(f"Weak observer notification failed: {e}")
alive_methods = []
for weak_method in self._weak_methods:
method = weak_method()
if method is not None:
alive_methods.append(weak_method)
try:
method(self._state)
except Exception as e:
print(f"Weak method notification failed: {e}")
self._weak_methods = alive_methods
class Sensor:
def __init__(self, name: str):
self._name = name
self._subject = WeakSubject()
@property
def value(self) -> Any:
return self._subject.state
@value.setter
def value(self, val: Any) -> None:
self._subject.state = val
def bind(self, callback: Callable) -> None:
self._subject.attach_callback(callback)
class Display:
def __init__(self, name: str):
self._name = name
def on_value_change(self, value: Any) -> None:
print(f"[{self._name}] Display: {value}")
sensor = Sensor("Temperature")
display = Display("MainDisplay")
sensor.bind(display.on_value_change)
sensor.value = 25.5
sensor.value = 26.0
del display
sensor.value = 27.0
print("Display已销毁,观察者自动移除")19.4.3 事件总线实现
from typing import Callable, Dict, List, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import threading
from functools import wraps
class EventPriority(Enum):
LOW = auto()
NORMAL = auto()
HIGH = auto()
CRITICAL = auto()
@dataclass
class Event:
name: str
data: Any = None
timestamp: datetime = field(default_factory=datetime.now)
source: Any = None
cancelled: bool = False
def cancel(self) -> None:
self.cancelled = True
@dataclass
class Subscription:
event_name: str
handler: Callable
priority: EventPriority = EventPriority.NORMAL
once: bool = False
class EventBus:
_instance = None
_lock = threading.Lock()
def __new__(cls) -> 'EventBus':
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._subscriptions: Dict[str, List[Subscription]] = {}
self._global_handlers: List[Callable] = []
self._event_history: List[Event] = []
self._max_history = 100
self._middleware: List[Callable] = []
def subscribe(
self,
event_name: str,
handler: Callable,
priority: EventPriority = EventPriority.NORMAL,
once: bool = False
) -> Subscription:
subscription = Subscription(
event_name=event_name,
handler=handler,
priority=priority,
once=once
)
if event_name not in self._subscriptions:
self._subscriptions[event_name] = []
self._subscriptions[event_name].append(subscription)
self._subscriptions[event_name].sort(
key=lambda s: s.priority.value,
reverse=True
)
return subscription
def on(self, event_name: str) -> Callable:
def decorator(handler: Callable) -> Callable:
self.subscribe(event_name, handler)
return handler
return decorator
def once(self, event_name: str) -> Callable:
def decorator(handler: Callable) -> Callable:
self.subscribe(event_name, handler, once=True)
return handler
return decorator
def unsubscribe(self, subscription: Subscription) -> bool:
if subscription.event_name in self._subscriptions:
try:
self._subscriptions[subscription.event_name].remove(subscription)
return True
except ValueError:
return False
return False
def publish(self, event_name: str, data: Any = None, source: Any = None) -> Event:
event = Event(
name=event_name,
data=data,
source=source
)
for middleware in self._middleware:
event = middleware(event)
if event is None or event.cancelled:
return event
for handler in self._global_handlers:
try:
handler(event)
except Exception as e:
print(f"Global handler error: {e}")
if event_name in self._subscriptions:
to_remove = []
for subscription in self._subscriptions[event_name]:
if event.cancelled:
break
try:
subscription.handler(event)
except Exception as e:
print(f"Handler error for {event_name}: {e}")
if subscription.once:
to_remove.append(subscription)
for sub in to_remove:
self._subscriptions[event_name].remove(sub)
self._add_to_history(event)
return event
def emit(self, event_name: str, data: Any = None, source: Any = None) -> Event:
return self.publish(event_name, data, source)
def _add_to_history(self, event: Event) -> None:
self._event_history.append(event)
if len(self._event_history) > self._max_history:
self._event_history.pop(0)
def add_middleware(self, middleware: Callable) -> None:
self._middleware.append(middleware)
def add_global_handler(self, handler: Callable) -> None:
self._global_handlers.append(handler)
def get_history(self, event_name: str = None) -> List[Event]:
if event_name:
return [e for e in self._event_history if e.name == event_name]
return self._event_history.copy()
def clear(self) -> None:
self._subscriptions.clear()
self._event_history.clear()
bus = EventBus()
@bus.on("user.login")
def on_user_login(event: Event):
user = event.data
print(f"用户登录: {user.get('name', 'Unknown')}")
@bus.on("user.logout")
def on_user_logout(event: Event):
user = event.data
print(f"用户登出: {user.get('name', 'Unknown')}")
@bus.once("system.shutdown")
def on_shutdown(event: Event):
print("系统即将关闭(只触发一次)")
def logging_middleware(event: Event) -> Event:
print(f"[Middleware] Event: {event.name}")
return event
bus.add_middleware(logging_middleware)
bus.emit("user.login", {"name": "张三", "id": 1001})
bus.emit("user.logout", {"name": "张三", "id": 1001})
bus.emit("system.shutdown")
bus.emit("system.shutdown")19.4.4 异步观察者实现
import asyncio
from typing import List, Callable, Any, Optional, Awaitable
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import inspect
class AsyncObserver(ABC):
@abstractmethod
async def update(self, data: Any) -> None:
pass
@dataclass
class AsyncEvent:
name: str
data: Any
timestamp: datetime = field(default_factory=datetime.now)
source: Any = None
class AsyncSubject:
def __init__(self, name: str = "AsyncSubject"):
self._name = name
self._observers: List[AsyncObserver] = []
self._async_callbacks: List[Callable[..., Awaitable]] = []
self._state: Any = None
self._pending_notifications: int = 0
@property
def state(self) -> Any:
return self._state
@state.setter
def state(self, value: Any) -> None:
self._state = value
asyncio.create_task(self._async_notify(value))
def attach(self, observer: AsyncObserver) -> None:
self._observers.append(observer)
def attach_async_callback(self, callback: Callable[..., Awaitable]) -> None:
self._async_callbacks.append(callback)
async def _async_notify(self, data: Any) -> None:
self._pending_notifications += 1
tasks = []
for observer in self._observers:
tasks.append(self._safe_observer_update(observer, data))
for callback in self._async_callbacks:
tasks.append(self._safe_callback_update(callback, data))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
self._pending_notifications -= 1
async def _safe_observer_update(self, observer: AsyncObserver, data: Any) -> None:
try:
await observer.update(data)
except Exception as e:
print(f"Async observer update failed: {e}")
async def _safe_callback_update(self, callback: Callable, data: Any) -> None:
try:
result = callback(data)
if inspect.isawaitable(result):
await result
except Exception as e:
print(f"Async callback update failed: {e}")
async def wait_for_notifications(self) -> None:
while self._pending_notifications > 0:
await asyncio.sleep(0.01)
class AsyncEventBus:
def __init__(self):
self._handlers: dict[str, List[Callable]] = {}
self._async_handlers: dict[str, List[Callable[..., Awaitable]]] = {}
def on(self, event_name: str, handler: Callable) -> None:
if event_name not in self._handlers:
self._handlers[event_name] = []
self._handlers[event_name].append(handler)
def on_async(self, event_name: str, handler: Callable[..., Awaitable]) -> None:
if event_name not in self._async_handlers:
self._async_handlers[event_name] = []
self._async_handlers[event_name].append(handler)
async def emit(self, event_name: str, data: Any = None) -> None:
tasks = []
if event_name in self._handlers:
for handler in self._handlers[event_name]:
tasks.append(self._run_sync_handler(handler, data))
if event_name in self._async_handlers:
for handler in self._async_handlers[event_name]:
tasks.append(self._run_async_handler(handler, data))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _run_sync_handler(self, handler: Callable, data: Any) -> None:
try:
handler(data)
except Exception as e:
print(f"Sync handler error: {e}")
async def _run_async_handler(self, handler: Callable[..., Awaitable], data: Any) -> None:
try:
await handler(data)
except Exception as e:
print(f"Async handler error: {e}")
class EmailService(AsyncObserver):
async def update(self, data: Any) -> None:
await asyncio.sleep(0.1)
print(f"[Email] 发送邮件通知: {data}")
class PushService(AsyncObserver):
async def update(self, data: Any) -> None:
await asyncio.sleep(0.05)
print(f"[Push] 发送推送通知: {data}")
async def log_notification(data: Any) -> None:
await asyncio.sleep(0.02)
print(f"[Log] 记录通知: {data}")
async def main_async_observer():
subject = AsyncSubject("NotificationSubject")
subject.attach(EmailService())
subject.attach(PushService())
subject.attach_async_callback(log_notification)
print("发送通知...")
subject.state = {"message": "系统维护通知", "level": "info"}
await subject.wait_for_notifications()
print("所有通知已发送完成")
if __name__ == "__main__":
asyncio.run(main_async_observer())19.4.5 响应式流实现
from typing import TypeVar, Generic, Callable, Optional, List, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
from functools import wraps
import time
T = TypeVar('T')
R = TypeVar('R')
class Subscriber(ABC, Generic[T]):
@abstractmethod
def on_next(self, value: T) -> None:
pass
@abstractmethod
def on_error(self, error: Exception) -> None:
pass
@abstractmethod
def on_complete(self) -> None:
pass
class Observable(ABC, Generic[T]):
@abstractmethod
def subscribe(self, subscriber: Subscriber[T]) -> 'Disposable':
pass
class Disposable:
def __init__(self, dispose_func: Callable):
self._dispose_func = dispose_func
self._disposed = False
def dispose(self) -> None:
if not self._disposed:
self._dispose_func()
self._disposed = True
class Flowable(Generic[T], Observable[T]):
def __init__(self, source: Callable[[Subscriber[T]], None]):
self._source = source
def subscribe(self, subscriber: Subscriber[T]) -> Disposable:
disposed = [False]
class SafeSubscriber(Subscriber[T]):
def on_next(self, value: T) -> None:
if not disposed[0]:
subscriber.on_next(value)
def on_error(self, error: Exception) -> None:
if not disposed[0]:
subscriber.on_error(error)
def on_complete(self) -> None:
if not disposed[0]:
subscriber.on_complete()
self._source(SafeSubscriber())
return Disposable(lambda: disposed.__setitem__(0, True))
def map(self, mapper: Callable[[T], R]) -> 'Flowable[R]':
def new_source(subscriber: Subscriber[R]) -> None:
class MapSubscriber(Subscriber[T]):
def on_next(self, value: T) -> None:
try:
result = mapper(value)
subscriber.on_next(result)
except Exception as e:
subscriber.on_error(e)
def on_error(self, error: Exception) -> None:
subscriber.on_error(error)
def on_complete(self) -> None:
subscriber.on_complete()
self._source(MapSubscriber())
return Flowable(new_source)
def filter(self, predicate: Callable[[T], bool]) -> 'Flowable[T]':
def new_source(subscriber: Subscriber[T]) -> None:
class FilterSubscriber(Subscriber[T]):
def on_next(self, value: T) -> None:
try:
if predicate(value):
subscriber.on_next(value)
except Exception as e:
subscriber.on_error(e)
def on_error(self, error: Exception) -> None:
subscriber.on_error(error)
def on_complete(self) -> None:
subscriber.on_complete()
self._source(FilterSubscriber())
return Flowable(new_source)
def take(self, count: int) -> 'Flowable[T]':
def new_source(subscriber: Subscriber[T]) -> None:
taken = [0]
class TakeSubscriber(Subscriber[T]):
def on_next(self, value: T) -> None:
if taken[0] < count:
taken[0] += 1
subscriber.on_next(value)
if taken[0] >= count:
subscriber.on_complete()
def on_error(self, error: Exception) -> None:
subscriber.on_error(error)
def on_complete(self) -> None:
subscriber.on_complete()
self._source(TakeSubscriber())
return Flowable(new_source)
def debounce(self, delay: float) -> 'Flowable[T]':
def new_source(subscriber: Subscriber[T]) -> None:
last_time = [0.0]
class DebounceSubscriber(Subscriber[T]):
def on_next(self, value: T) -> None:
current_time = time.time()
if current_time - last_time[0] >= delay:
last_time[0] = current_time
subscriber.on_next(value)
def on_error(self, error: Exception) -> None:
subscriber.on_error(error)
def on_complete(self) -> None:
subscriber.on_complete()
self._source(DebounceSubscriber())
return Flowable(new_source)
@staticmethod
def just(value: T) -> 'Flowable[T]':
def source(subscriber: Subscriber[T]) -> None:
subscriber.on_next(value)
subscriber.on_complete()
return Flowable(source)
@staticmethod
def from_iterable(values: List[T]) -> 'Flowable[T]':
def source(subscriber: Subscriber[T]) -> None:
try:
for value in values:
subscriber.on_next(value)
subscriber.on_complete()
except Exception as e:
subscriber.on_error(e)
return Flowable(source)
class PrintSubscriber(Subscriber[Any]):
def __init__(self, name: str = "PrintSubscriber"):
self._name = name
def on_next(self, value: Any) -> None:
print(f"[{self._name}] {value}")
def on_error(self, error: Exception) -> None:
print(f"[{self._name}] Error: {error}")
def on_complete(self) -> None:
print(f"[{self._name}] Complete")
Flowable.from_iterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) \
.filter(lambda x: x % 2 == 0) \
.map(lambda x: x * x) \
.take(3) \
.subscribe(PrintSubscriber("FilteredSquared"))19.5 企业级应用示例
19.5.1 实时数据监控系统
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from decimal import Decimal
import threading
import time
import json
class AlertLevel(Enum):
INFO = auto()
WARNING = auto()
ERROR = auto()
CRITICAL = auto()
@dataclass
class Metric:
name: str
value: Decimal
unit: str
timestamp: datetime = field(default_factory=datetime.now)
tags: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"value": str(self.value),
"unit": self.unit,
"timestamp": self.timestamp.isoformat(),
"tags": self.tags
}
@dataclass
class Alert:
metric_name: str
level: AlertLevel
message: str
value: Decimal
threshold: Decimal
timestamp: datetime = field(default_factory=datetime.now)
class MetricObserver(ABC):
@abstractmethod
def on_metric(self, metric: Metric) -> None:
pass
@abstractmethod
def on_alert(self, alert: Alert) -> None:
pass
class AlertRule:
def __init__(
self,
metric_name: str,
threshold: Decimal,
comparison: str = ">",
level: AlertLevel = AlertLevel.WARNING,
message_template: str = None
):
self._metric_name = metric_name
self._threshold = threshold
self._comparison = comparison
self._level = level
self._message_template = message_template or f"{metric_name} {{comparison}} {threshold}"
self._triggered = False
def evaluate(self, metric: Metric) -> Optional[Alert]:
if metric.name != self._metric_name:
return None
triggered = self._compare(metric.value)
if triggered and not self._triggered:
self._triggered = True
return Alert(
metric_name=metric.name,
level=self._level,
message=self._message_template.format(
comparison=self._comparison,
value=metric.value
),
value=metric.value,
threshold=self._threshold
)
elif not triggered:
self._triggered = False
return None
def _compare(self, value: Decimal) -> bool:
comparisons = {
">": lambda v, t: v > t,
">=": lambda v, t: v >= t,
"<": lambda v, t: v < t,
"<=": lambda v, t: v <= t,
"==": lambda v, t: v == t,
"!=": lambda v, t: v != t
}
return comparisons.get(self._comparison, lambda v, t: False)(value, self._threshold)
class MetricSubject:
def __init__(self, name: str):
self._name = name
self._observers: List[MetricObserver] = []
self._rules: List[AlertRule] = []
self._metrics: Dict[str, List[Metric]] = {}
self._lock = threading.Lock()
def attach(self, observer: MetricObserver) -> None:
with self._lock:
self._observers.append(observer)
def detach(self, observer: MetricObserver) -> None:
with self._lock:
if observer in self._observers:
self._observers.remove(observer)
def add_rule(self, rule: AlertRule) -> None:
with self._lock:
self._rules.append(rule)
def record(self, metric: Metric) -> None:
with self._lock:
if metric.name not in self._metrics:
self._metrics[metric.name] = []
self._metrics[metric.name].append(metric)
observers = self._observers.copy()
rules = self._rules.copy()
for observer in observers:
try:
observer.on_metric(metric)
except Exception as e:
print(f"Observer error: {e}")
for rule in rules:
alert = rule.evaluate(metric)
if alert:
for observer in observers:
try:
observer.on_alert(alert)
except Exception as e:
print(f"Alert observer error: {e}")
def get_metrics(self, name: str = None) -> List[Metric]:
with self._lock:
if name:
return self._metrics.get(name, []).copy()
return [m for ms in self._metrics.values() for m in ms]
class ConsoleMetricObserver(MetricObserver):
def on_metric(self, metric: Metric) -> None:
print(f"[Metric] {metric.name} = {metric.value} {metric.unit}")
def on_alert(self, alert: Alert) -> None:
level_name = alert.level.name
print(f"[{level_name}] {alert.message} (value: {alert.value})")
class LogFileObserver(MetricObserver):
def __init__(self, file_path: str):
self._file_path = file_path
def on_metric(self, metric: Metric) -> None:
self._write(json.dumps(metric.to_dict()))
def on_alert(self, alert: Alert) -> None:
self._write(json.dumps({
"type": "alert",
"metric_name": alert.metric_name,
"level": alert.level.name,
"message": alert.message,
"value": str(alert.value),
"timestamp": alert.timestamp.isoformat()
}))
def _write(self, content: str) -> None:
with open(self._file_path, 'a') as f:
f.write(content + '\n')
class AggregationObserver(MetricObserver):
def __init__(self, window_size: int = 60):
self._window_size = window_size
self._data: Dict[str, List[Decimal]] = {}
self._aggregations: Dict[str, Dict[str, Decimal]] = {}
def on_metric(self, metric: Metric) -> None:
if metric.name not in self._data:
self._data[metric.name] = []
self._data[metric.name].append(metric.value)
if len(self._data[metric.name]) > self._window_size:
self._data[metric.name].pop(0)
values = self._data[metric.name]
self._aggregations[metric.name] = {
"min": min(values),
"max": max(values),
"avg": sum(values) / len(values),
"count": Decimal(len(values))
}
def on_alert(self, alert: Alert) -> None:
pass
def get_aggregation(self, metric_name: str) -> Optional[Dict[str, Decimal]]:
return self._aggregations.get(metric_name)
monitor = MetricSubject("SystemMonitor")
monitor.attach(ConsoleMetricObserver())
monitor.attach(AggregationObserver(window_size=100))
monitor.add_rule(AlertRule("cpu_usage", Decimal("80"), ">", AlertLevel.WARNING, "CPU使用率过高"))
monitor.add_rule(AlertRule("cpu_usage", Decimal("95"), ">", AlertLevel.CRITICAL, "CPU使用率严重过高"))
monitor.add_rule(AlertRule("memory_usage", Decimal("90"), ">", AlertLevel.ERROR, "内存使用率过高"))
monitor.record(Metric("cpu_usage", Decimal("75.5"), "%", tags={"host": "server1"}))
monitor.record(Metric("cpu_usage", Decimal("85.2"), "%", tags={"host": "server1"}))
monitor.record(Metric("cpu_usage", Decimal("96.8"), "%", tags={"host": "server1"}))
monitor.record(Metric("memory_usage", Decimal("92.1"), "%", tags={"host": "server1"}))19.5.2 订单状态追踪系统
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import uuid
class OrderStatus(Enum):
PENDING = auto()
CONFIRMED = auto()
PAID = auto()
PROCESSING = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
REFUNDED = auto()
@dataclass
class OrderEvent:
order_id: str
old_status: Optional[OrderStatus]
new_status: OrderStatus
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Order:
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
customer_id: str = ""
items: List[Dict[str, Any]] = field(default_factory=list)
total_amount: float = 0.0
status: OrderStatus = OrderStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def add_item(self, product_id: str, name: str, price: float, quantity: int = 1) -> None:
self.items.append({
"product_id": product_id,
"name": name,
"price": price,
"quantity": quantity
})
self._recalculate_total()
def _recalculate_total(self) -> None:
self.total_amount = sum(
item["price"] * item["quantity"]
for item in self.items
)
class OrderObserver(ABC):
@abstractmethod
def on_status_change(self, event: OrderEvent) -> None:
pass
class OrderSubject:
VALID_TRANSITIONS: Dict[OrderStatus, Set[OrderStatus]] = {
OrderStatus.PENDING: {OrderStatus.CONFIRMED, OrderStatus.CANCELLED},
OrderStatus.CONFIRMED: {OrderStatus.PAID, OrderStatus.CANCELLED},
OrderStatus.PAID: {OrderStatus.PROCESSING, OrderStatus.CANCELLED, OrderStatus.REFUNDED},
OrderStatus.PROCESSING: {OrderStatus.SHIPPED, OrderStatus.CANCELLED},
OrderStatus.SHIPPED: {OrderStatus.DELIVERED, OrderStatus.REFUNDED},
OrderStatus.DELIVERED: {OrderStatus.REFUNDED},
OrderStatus.CANCELLED: set(),
OrderStatus.REFUNDED: set()
}
def __init__(self):
self._orders: Dict[str, Order] = {}
self._observers: List[OrderObserver] = []
def attach(self, observer: OrderObserver) -> None:
self._observers.append(observer)
def detach(self, observer: OrderObserver) -> None:
if observer in self._observers:
self._observers.remove(observer)
def create_order(self, customer_id: str) -> Order:
order = Order(customer_id=customer_id)
self._orders[order.id] = order
return order
def update_status(self, order_id: str, new_status: OrderStatus, metadata: Dict[str, Any] = None) -> bool:
if order_id not in self._orders:
return False
order = self._orders[order_id]
old_status = order.status
if new_status not in self.VALID_TRANSITIONS.get(old_status, set()):
print(f"Invalid transition: {old_status.name} -> {new_status.name}")
return False
order.status = new_status
order.updated_at = datetime.now()
event = OrderEvent(
order_id=order_id,
old_status=old_status,
new_status=new_status,
metadata=metadata or {}
)
self._notify_observers(event)
return True
def _notify_observers(self, event: OrderEvent) -> None:
for observer in self._observers:
try:
observer.on_status_change(event)
except Exception as e:
print(f"Observer error: {e}")
def get_order(self, order_id: str) -> Optional[Order]:
return self._orders.get(order_id)
class EmailNotificationObserver(OrderObserver):
def __init__(self, email_service: Any = None):
self._email_service = email_service
def on_status_change(self, event: OrderEvent) -> None:
status_messages = {
OrderStatus.CONFIRMED: "您的订单已确认,请尽快完成支付",
OrderStatus.PAID: "支付成功,我们正在为您准备商品",
OrderStatus.SHIPPED: "您的订单已发货",
OrderStatus.DELIVERED: "您的订单已送达",
OrderStatus.CANCELLED: "您的订单已取消",
OrderStatus.REFUNDED: "您的订单退款已处理"
}
message = status_messages.get(event.new_status)
if message:
print(f"[Email] 订单 {event.order_id}: {message}")
class InventoryObserver(OrderObserver):
def __init__(self):
self._reserved_items: Dict[str, List[str]] = {}
def on_status_change(self, event: OrderEvent) -> None:
if event.new_status == OrderStatus.CONFIRMED:
print(f"[Inventory] 预留库存 - 订单 {event.order_id}")
elif event.new_status == OrderStatus.CANCELLED:
print(f"[Inventory] 释放库存 - 订单 {event.order_id}")
elif event.new_status == OrderStatus.REFUNDED:
print(f"[Inventory] 退回库存 - 订单 {event.order_id}")
class LogisticsObserver(OrderObserver):
def on_status_change(self, event: OrderEvent) -> None:
if event.new_status == OrderStatus.PAID:
tracking_number = f"TRK{event.order_id.upper()}"
print(f"[Logistics] 创建运单: {tracking_number}")
elif event.new_status == OrderStatus.SHIPPED:
tracking_number = event.metadata.get("tracking_number", "N/A")
print(f"[Logistics] 更新物流状态 - 运单号: {tracking_number}")
class AnalyticsObserver(OrderObserver):
def __init__(self):
self._status_counts: Dict[OrderStatus, int] = {}
self._transition_times: List[float] = []
def on_status_change(self, event: OrderEvent) -> None:
if event.new_status not in self._status_counts:
self._status_counts[event.new_status] = 0
self._status_counts[event.new_status] += 1
print(f"[Analytics] 状态统计: {dict((s.name, c) for s, c in self._status_counts.items())}")
def get_report(self) -> Dict[str, Any]:
return {
"status_counts": dict((s.name, c) for s, c in self._status_counts.items()),
"total_transitions": sum(self._status_counts.values())
}
order_system = OrderSubject()
order_system.attach(EmailNotificationObserver())
order_system.attach(InventoryObserver())
order_system.attach(LogisticsObserver())
order_system.attach(AnalyticsObserver())
order = order_system.create_order("customer_001")
order.add_item("prod_001", "Python高级编程", 89.0, 2)
order.add_item("prod_002", "设计模式精解", 69.0, 1)
print(f"\n创建订单: {order.id}, 总金额: {order.total_amount}")
order_system.update_status(order.id, OrderStatus.CONFIRMED)
order_system.update_status(order.id, OrderStatus.PAID)
order_system.update_status(order.id, OrderStatus.PROCESSING)
order_system.update_status(order.id, OrderStatus.SHIPPED, {"tracking_number": "SF1234567890"})
order_system.update_status(order.id, OrderStatus.DELIVERED)19.5.3 配置中心热更新
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import json
import hashlib
import threading
@dataclass
class ConfigChangeEvent:
key: str
old_value: Any
new_value: Any
namespace: str = "default"
timestamp: datetime = field(default_factory=datetime.now)
class ConfigObserver(ABC):
@abstractmethod
def on_config_change(self, event: ConfigChangeEvent) -> None:
pass
class ConfigCenter:
def __init__(self):
self._configs: Dict[str, Dict[str, Any]] = {"default": {}}
self._observers: List[ConfigObserver] = []
self._key_observers: Dict[str, List[ConfigObserver]] = {}
self._version: int = 0
self._lock = threading.RLock()
self._checksums: Dict[str, str] = {}
def register_observer(self, observer: ConfigObserver, keys: List[str] = None) -> None:
with self._lock:
if keys is None:
self._observers.append(observer)
else:
for key in keys:
if key not in self._key_observers:
self._key_observers[key] = []
self._key_observers[key].append(observer)
def unregister_observer(self, observer: ConfigObserver) -> None:
with self._lock:
if observer in self._observers:
self._observers.remove(observer)
for key in self._key_observers:
if observer in self._key_observers[key]:
self._key_observers[key].remove(observer)
def set(self, key: str, value: Any, namespace: str = "default") -> None:
with self._lock:
if namespace not in self._configs:
self._configs[namespace] = {}
old_value = self._configs[namespace].get(key)
self._configs[namespace][key] = value
self._version += 1
self._update_checksum(namespace)
event = ConfigChangeEvent(
key=key,
old_value=old_value,
new_value=value,
namespace=namespace
)
observers = self._observers.copy()
key_observers = self._key_observers.get(key, []).copy()
for observer in observers:
try:
observer.on_config_change(event)
except Exception as e:
print(f"Config observer error: {e}")
for observer in key_observers:
try:
observer.on_config_change(event)
except Exception as e:
print(f"Key config observer error: {e}")
def get(self, key: str, default: Any = None, namespace: str = "default") -> Any:
with self._lock:
return self._configs.get(namespace, {}).get(key, default)
def get_all(self, namespace: str = "default") -> Dict[str, Any]:
with self._lock:
return self._configs.get(namespace, {}).copy()
def delete(self, key: str, namespace: str = "default") -> bool:
with self._lock:
if namespace not in self._configs:
return False
if key not in self._configs[namespace]:
return False
old_value = self._configs[namespace].pop(key)
self._version += 1
event = ConfigChangeEvent(
key=key,
old_value=old_value,
new_value=None,
namespace=namespace
)
observers = self._observers.copy()
for observer in observers:
try:
observer.on_config_change(event)
except Exception as e:
print(f"Config observer error: {e}")
return True
def _update_checksum(self, namespace: str) -> None:
config_str = json.dumps(self._configs.get(namespace, {}), sort_keys=True)
self._checksums[namespace] = hashlib.md5(config_str.encode()).hexdigest()
def get_checksum(self, namespace: str = "default") -> str:
return self._checksums.get(namespace, "")
@property
def version(self) -> int:
return self._version
class DatabaseConfigObserver(ConfigObserver):
def __init__(self):
self._pool_size = 10
self._timeout = 30
def on_config_change(self, event: ConfigChangeEvent) -> None:
if event.key == "db.pool_size":
old_pool = self._pool_size
self._pool_size = event.new_value
print(f"[Database] 连接池大小: {old_pool} -> {self._pool_size}")
elif event.key == "db.timeout":
self._timeout = event.new_value
print(f"[Database] 超时时间更新为: {self._timeout}s")
class CacheConfigObserver(ConfigObserver):
def __init__(self):
self._ttl = 300
self._max_size = 1000
def on_config_change(self, event: ConfigChangeEvent) -> None:
if event.key == "cache.ttl":
self._ttl = event.new_value
print(f"[Cache] TTL更新为: {self._ttl}s")
elif event.key == "cache.max_size":
self._max_size = event.new_value
print(f"[Cache] 最大容量更新为: {self._max_size}")
class FeatureFlagObserver(ConfigObserver):
def __init__(self):
self._flags: Dict[str, bool] = {}
def on_config_change(self, event: ConfigChangeEvent) -> None:
if event.key.startswith("feature."):
flag_name = event.key[8:]
self._flags[flag_name] = event.new_value
status = "启用" if event.new_value else "禁用"
print(f"[FeatureFlag] {flag_name}: {status}")
def is_enabled(self, flag_name: str) -> bool:
return self._flags.get(flag_name, False)
class LoggingConfigObserver(ConfigObserver):
def __init__(self):
self._level = "INFO"
self._format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
def on_config_change(self, event: ConfigChangeEvent) -> None:
if event.key == "logging.level":
self._level = event.new_value
print(f"[Logging] 日志级别更新为: {self._level}")
elif event.key == "logging.format":
self._format = event.new_value
print(f"[Logging] 日志格式已更新")
config_center = ConfigCenter()
db_observer = DatabaseConfigObserver()
cache_observer = CacheConfigObserver()
feature_observer = FeatureFlagObserver()
logging_observer = LoggingConfigObserver()
config_center.register_observer(db_observer, ["db.pool_size", "db.timeout"])
config_center.register_observer(cache_observer, ["cache.ttl", "cache.max_size"])
config_center.register_observer(feature_observer)
config_center.register_observer(logging_observer, ["logging.level", "logging.format"])
print("=== 初始化配置 ===")
config_center.set("db.pool_size", 10)
config_center.set("db.timeout", 30)
config_center.set("cache.ttl", 300)
config_center.set("cache.max_size", 1000)
config_center.set("logging.level", "INFO")
print("\n=== 热更新配置 ===")
config_center.set("db.pool_size", 20)
config_center.set("cache.ttl", 600)
config_center.set("feature.new_ui", True)
config_center.set("feature.beta_api", True)
config_center.set("logging.level", "DEBUG")
print(f"\nFeature 'new_ui' enabled: {feature_observer.is_enabled('new_ui')}")
print(f"Config version: {config_center.version}")19.6 模式变体
19.6.1 观察者模式变体对比
| 变体 | 特点 | 适用场景 | 复杂度 |
|---|---|---|---|
| 经典观察者 | 主题直接通知观察者 | 简单事件通知 | ★☆☆ |
| 事件总线 | 解耦发布者和订阅者 | 模块间通信 | ★★☆ |
| 发布-订阅 | 通过消息代理通信 | 分布式系统 | ★★★ |
| 响应式流 | 支持操作符链式处理 | 数据流处理 | ★★★ |
| 属性绑定 | 双向数据绑定 | UI框架 | ★★☆ |
| 信号-槽 | 类型安全的观察者 | Qt等框架 | ★★☆ |
19.6.2 属性绑定模式
from typing import TypeVar, Generic, Callable, Optional, List, Any
from dataclasses import dataclass, field
from weakref import WeakSet
T = TypeVar('T')
@dataclass
class Binding:
source: 'BindableProperty'
target: 'BindableProperty'
converter: Optional[Callable] = None
mode: str = "one_way"
class BindableProperty(Generic[T]):
def __init__(self, value: T = None):
self._value: T = value
self._observers: WeakSet = WeakSet()
self._bindings: List[Binding] = []
@property
def value(self) -> T:
return self._value
@value.setter
def value(self, new_value: T) -> None:
if self._value != new_value:
old_value = self._value
self._value = new_value
self._notify(old_value, new_value)
def bind_to(self, other: 'BindableProperty[T]', converter: Callable = None) -> None:
binding = Binding(
source=self,
target=other,
converter=converter,
mode="one_way"
)
self._bindings.append(binding)
if converter:
other.value = converter(self._value)
else:
other.value = self._value
def bind_two_way(self, other: 'BindableProperty[T]', converter: Callable = None) -> None:
self.bind_to(other, converter)
other._bindings.append(Binding(
source=other,
target=self,
converter=converter,
mode="two_way"
))
def observe(self, callback: Callable[[T, T], None]) -> None:
self._observers.add(callback)
def _notify(self, old_value: T, new_value: T) -> None:
for observer in self._observers:
try:
observer(old_value, new_value)
except Exception as e:
print(f"Observer error: {e}")
for binding in self._bindings:
try:
target_value = binding.converter(new_value) if binding.converter else new_value
if binding.target.value != target_value:
binding.target.value = target_value
except Exception as e:
print(f"Binding error: {e}")
class ViewModel:
def __init__(self):
self.username = BindableProperty[str]("")
self.email = BindableProperty[str]("")
self.age = BindableProperty[int](0)
self.is_adult = BindableProperty[bool](False)
self.age.bind_to(self.is_adult, converter=lambda x: x >= 18)
vm = ViewModel()
def on_username_change(old: str, new: str) -> None:
print(f"Username changed: '{old}' -> '{new}'")
vm.username.observe(on_username_change)
vm.username.value = "张三"
vm.email.value = "zhangsan@example.com"
vm.age.value = 16
print(f"Is adult: {vm.is_adult.value}")
vm.age.value = 20
print(f"Is adult: {vm.is_adult.value}")19.6.3 信号-槽模式
from typing import Callable, List, Any, Tuple, get_type_hints
from dataclasses import dataclass
from functools import wraps
import inspect
@dataclass
class Connection:
signal: 'Signal'
slot: Callable
disconnected: bool = False
class Signal:
def __init__(self, *param_types: type):
self._param_types = param_types
self._connections: List[Connection] = []
def connect(self, slot: Callable) -> Connection:
self._validate_slot(slot)
connection = Connection(signal=self, slot=slot)
self._connections.append(connection)
return connection
def disconnect(self, connection: Connection) -> bool:
if connection in self._connections:
connection.disconnected = True
self._connections.remove(connection)
return True
return False
def emit(self, *args: Any) -> None:
self._validate_args(args)
for connection in self._connections[:]:
if not connection.disconnected:
try:
connection.slot(*args)
except Exception as e:
print(f"Slot execution error: {e}")
def _validate_slot(self, slot: Callable) -> None:
sig = inspect.signature(slot)
params = list(sig.parameters.values())
for i, expected_type in enumerate(self._param_types):
if i < len(params):
param = params[i]
if param.annotation != inspect.Parameter.empty:
if param.annotation != expected_type:
raise TypeError(
f"Slot parameter {i} type mismatch: "
f"expected {expected_type}, got {param.annotation}"
)
def _validate_args(self, args: Tuple) -> None:
if len(args) != len(self._param_types):
raise ValueError(
f"Expected {len(self._param_types)} arguments, got {len(args)}"
)
for i, (arg, expected_type) in enumerate(zip(args, self._param_types)):
if not isinstance(arg, expected_type):
raise TypeError(
f"Argument {i} type mismatch: expected {expected_type}, got {type(arg)}"
)
class Button:
def __init__(self, name: str):
self._name = name
self.clicked = Signal()
self.pressed = Signal()
self.released = Signal()
def click(self) -> None:
print(f"[{self._name}] Button clicked")
self.clicked.emit()
def press(self) -> None:
self.pressed.emit()
def release(self) -> None:
self.released.emit()
class Slider:
def __init__(self, name: str):
self._name = name
self._value = 0
self.value_changed = Signal(int)
@property
def value(self) -> int:
return self._value
@value.setter
def value(self, val: int) -> None:
if self._value != val:
self._value = val
self.value_changed.emit(val)
class TextBox:
def __init__(self, name: str):
self._name = name
self._text = ""
self.text_changed = Signal(str)
@property
def text(self) -> str:
return self._text
@text.setter
def text(self, value: str) -> None:
if self._text != value:
self._text = value
self.text_changed.emit(value)
class Controller:
def __init__(self):
self._click_count = 0
def on_button_clicked(self) -> None:
self._click_count += 1
print(f"[Controller] Button clicked {self._click_count} times")
def on_slider_changed(self, value: int) -> None:
print(f"[Controller] Slider value: {value}")
def on_text_changed(self, text: str) -> None:
print(f"[Controller] Text: {text}")
button = Button("SubmitButton")
slider = Slider("VolumeSlider")
textbox = TextBox("SearchBox")
controller = Controller()
button.clicked.connect(controller.on_button_clicked)
slider.value_changed.connect(controller.on_slider_changed)
textbox.text_changed.connect(controller.on_text_changed)
button.click()
slider.value = 50
slider.value = 75
textbox.text = "Hello"
textbox.text = "Hello World"
button.click()19.7 反模式与最佳实践
19.7.1 常见反模式
反模式1:观察者泄漏
class BadExample:
def __init__(self):
self._observers = []
def attach(self, observer):
self._observers.append(observer)
class GoodExample:
def __init__(self):
self._observers = WeakSet()
def attach(self, observer):
self._observers.add(observer)反模式2:通知风暴
class BadExample:
def __init__(self):
self._value = 0
self._observers = []
def set_value(self, value):
self._value = value
for observer in self._observers:
observer.update(self)
class GoodExample:
def __init__(self):
self._value = 0
self._observers = []
self._notify_pending = False
def set_value(self, value):
self._value = value
self._schedule_notify()
def _schedule_notify(self):
if not self._notify_pending:
self._notify_pending = True
self._do_notify()
def _do_notify(self):
for observer in self._observers:
observer.update(self)
self._notify_pending = False反模式3:循环依赖
class BadExample:
def __init__(self):
self._value = 0
self._observers = []
self._is_notifying = False
def set_value(self, value):
if self._is_notifying:
return
self._is_notifying = True
self._value = value
for observer in self._observers:
observer.update(self)
self._is_notifying = False
class GoodExample:
def __init__(self):
self._value = 0
self._observers = []
self._notification_stack = []
def set_value(self, value):
if id(self) in self._notification_stack:
return
self._notification_stack.append(id(self))
try:
self._value = value
for observer in self._observers[:]:
observer.update(self)
finally:
self._notification_stack.pop()19.7.2 最佳实践清单
| 实践 | 描述 | 重要性 |
|---|---|---|
| 使用弱引用 | 防止观察者无法被垃圾回收 | ★★★ |
| 异常隔离 | 观察者异常不应影响其他观察者 | ★★★ |
| 线程安全 | 多线程环境下保护观察者列表 | ★★★ |
| 批量通知 | 合并多次状态变化为单次通知 | ★★☆ |
| 优先级支持 | 允许设置观察者通知顺序 | ★★☆ |
| 生命周期管理 | 提供取消订阅机制 | ★★★ |
| 事件过滤 | 支持条件性通知 | ★★☆ |
| 性能监控 | 记录通知延迟和耗时 | ★☆☆ |
19.8 决策指南
19.8.1 是否使用观察者模式
需要对象间通信?
│
┌────┴────┐
│ │
否 是
│ │
↓ ↓
不需要模式 通信对象数量?
│
┌─────────┼─────────┐
│ │ │
1对1 1对多 多对多
│ │ │
↓ ↓ ↓
直接调用 观察者模式 发布-订阅
或事件总线19.8.2 实现方式选择
选择实现方式
│
┌────────────────────┼────────────────────┐
│ │ │
↓ ↓ ↓
简单场景 企业应用 分布式系统
│ │ │
↓ ↓ ↓
ABC观察者 事件总线 消息队列
│ │ │
│ ┌───┴───┐ │
│ │ │ │
│ 同步 异步 │
│ │ │ │
↓ ↓ ↓ ↓
Subject/Observer EventBus AsyncEventBus Kafka/RabbitMQ19.8.3 技术选型对照表
| 场景 | 推荐实现 | 理由 |
|---|---|---|
| UI事件处理 | 信号-槽 | 类型安全,支持连接管理 |
| 配置热更新 | 事件总线 | 解耦配置源和消费者 |
| 实时数据监控 | 响应式流 | 支持过滤、聚合操作 |
| 微服务通信 | 消息队列 | 可靠传递,支持持久化 |
| 状态同步 | 属性绑定 | 双向绑定,自动更新 |
| 日志收集 | 异步观察者 | 非阻塞,高性能 |
19.9 与其他模式的关系
19.9.1 模式组合
观察者模式 + 策略模式:
┌─────────────────────────────────────────────────────────────┐
│ Subject │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ NotificationStrategy │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ SyncNotify │ │AsyncNotify │ │BatchNotify │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
观察者模式 + 中介者模式:
┌─────────────────────────────────────────────────────────────┐
│ Mediator │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Subject A │ │ Subject B │ │ Subject C │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ↓ │
│ ┌─────────────────┐ │
│ │ Observer │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘19.9.2 模式对比
| 模式 | 关系 | 区别 |
|---|---|---|
| 中介者模式 | 都处理对象通信 | 中介者集中控制,观察者分散通知 |
| 发布-订阅 | 观察者的扩展 | 发布-订阅完全解耦,通过代理通信 |
| 责任链模式 | 都涉及对象链 | 责任链传递请求,观察者广播通知 |
| 迭代器模式 | 都遍历集合 | 迭代器主动遍历,观察者被动接收 |
19.10 快速参考卡
19.10.1 核心概念速查
┌─────────────────────────────────────────────────────────────────┐
│ 观察者模式速查卡 │
├─────────────────────────────────────────────────────────────────┤
│ 定义:一对多依赖,状态变化自动通知 │
├─────────────────────────────────────────────────────────────────┤
│ 核心角色: │
│ • Subject(主题):维护观察者列表,状态变化时通知 │
│ • Observer(观察者):定义更新接口,接收通知 │
│ • ConcreteSubject:具体主题,存储状态 │
│ • ConcreteObserver:具体观察者,实现更新逻辑 │
├─────────────────────────────────────────────────────────────────┤
│ 关键方法: │
│ • attach(observer):添加观察者 │
│ • detach(observer):移除观察者 │
│ • notify():通知所有观察者 │
│ • update(data):观察者更新方法 │
├─────────────────────────────────────────────────────────────────┤
│ 推模型 vs 拉模型: │
│ • 推模型:主题主动发送数据给观察者 │
│ • 拉模型:观察者主动从主题获取数据 │
├─────────────────────────────────────────────────────────────────┤
│ 适用场景: │
│ ✓ 对象间存在一对多依赖关系 │
│ ✓ 一个对象变化需要通知多个对象 │
│ ✓ 需要实现事件驱动架构 │
│ ✓ 需要实现发布-订阅机制 │
├─────────────────────────────────────────────────────────────────┤
│ 注意事项: │
│ ✗ 避免观察者泄漏(使用弱引用) │
│ ✗ 避免循环依赖(添加通知保护) │
│ ✗ 避免性能问题(批量通知、异步处理) │
└─────────────────────────────────────────────────────────────────┘19.10.2 Python实现模板
from abc import ABC, abstractmethod
from typing import List, Any
from weakref import WeakSet
class Observer(ABC):
@abstractmethod
def update(self, data: Any) -> None:
pass
class Subject:
def __init__(self):
self._observers: WeakSet[Observer] = WeakSet()
self._state: Any = None
def attach(self, observer: Observer) -> None:
self._observers.add(observer)
def detach(self, observer: Observer) -> None:
self._observers.discard(observer)
def notify(self) -> None:
for observer in self._observers:
observer.update(self._state)
@property
def state(self) -> Any:
return self._state
@state.setter
def state(self, value: Any) -> None:
self._state = value
self.notify()19.11 小结
观察者模式是实现对象间松耦合通信的核心模式,是事件驱动架构的基础。本章从形式化定义出发,深入探讨了观察者模式的理论基础和多种实现方式。
关键要点:
理论基础:观察者模式建立在依赖倒置原则之上,通过抽象接口解耦主题和观察者
实现方式:从经典的ABC实现到现代的响应式流,Python提供了丰富的实现选择
企业应用:在实时监控、订单追踪、配置管理等场景中发挥重要作用
模式变体:事件总线、响应式流、属性绑定等变体扩展了观察者模式的应用范围
最佳实践:使用弱引用防止内存泄漏,异常隔离保证通知可靠性,线程安全支持并发场景
观察者模式是现代软件架构的基石之一,理解其原理和实现对于构建可扩展、可维护的系统至关重要。
思考题
观察者模式与发布-订阅模式的核心区别是什么?在什么情况下应该选择后者?
如何设计一个支持优先级和异步通知的观察者框架?
在微服务架构中,观察者模式如何与消息队列结合使用?
响应式编程(Rx)如何扩展了传统观察者模式的能力?
如何解决观察者模式中的循环依赖问题?请设计一个检测和预防机制。