Skip to content

第19章 观察者模式

学习目标

完成本章学习后,读者将能够:

  • 理解观察者模式的核心概念、数学定义与理论基础
  • 掌握发布-订阅机制的多种实现策略
  • 使用Python实现同步、异步及响应式观察者模式
  • 识别观察者模式的适用场景与局限性
  • 理解观察者模式与现代响应式编程的关系

19.1 模式定义

19.1.1 核心定义

观察者模式(Observer Pattern) 定义对象间的一种一对多的依赖关系,当一个对象(被观察者/主题)的状态发生改变时,所有依赖于它的对象(观察者)都得到通知并被自动更新。该模式是软件设计模式中应用最广泛的模式之一,是事件驱动架构的基础。

19.1.2 形式化定义

从数学角度,观察者模式可以形式化定义为:

定义 19.1(观察者系统) 观察者系统是一个六元组:

$$\mathcal{O} = \langle S, O, \Sigma, \delta, \gamma, \phi \rangle$$

其中:

  • $S$:主题(Subject)的有限状态集合
  • $O = {o_1, o_2, \ldots, o_n}$:观察者集合
  • $\Sigma$:事件字母表,表示可能的状态变化事件
  • $\delta: S \times \Sigma \rightarrow S$:状态转移函数
  • $\gamma: S \times \Sigma \rightarrow 2^O$:通知选择函数,确定哪些观察者应被通知
  • $\phi: O \times S \times S \times \Sigma \rightarrow \text{Action}$:观察者响应函数

定义 19.2(通知语义) 当主题状态从 $s$ 转移到 $s'$ 时,观察者 $o_i$ 的通知条件为:

$$\text{Notify}(o_i, s, s', \sigma) \Leftrightarrow o_i \in \gamma(s, \sigma)$$

定义 19.3(依赖关系) 观察者 $o_i$ 对主题 $s$ 的依赖度定义为:

$$D(o_i, s) = \frac{|{(s, s', \sigma) : o_i \in \gamma(s, \sigma)}|}{|\Sigma| \times |S|}$$

19.1.3 信息流模型

观察者模式的信息流可以用有向图 $G = (V, E)$ 表示:

  • $V = {s} \cup O$:节点集合,包含主题和所有观察者
  • $E \subseteq {s} \times O$:边集合,表示订阅关系

定理 19.1(解耦性) 在观察者模式中,主题与观察者之间的耦合度 $C$ 满足:

$$C_{\text{observer}} = O(1) \ll C_{\text{direct}} = O(n)$$

其中 $n$ 为观察者数量,直接依赖需要主题显式调用每个依赖对象。

19.1.4 发布-订阅语义

观察者模式与发布-订阅模式的关系:

特性观察者模式发布-订阅模式
耦合度松耦合(主题知道观察者接口)完全解耦(通过消息代理)
通信方式直接调用消息队列
同步性通常同步通常异步
过滤主题端过滤代理端过滤
适用规模小到中等规模大规模分布式系统

19.2 历史背景与理论渊源

19.2.1 发展历程

年份里程碑贡献者意义
1970sSmalltalk MVCTrygve Reenskaug首次将观察者概念应用于UI框架
1987Model-View-ControllerKrasner & Pope正式确立MVC模式,观察者为核心
1994GoF设计模式Gamma et al.将观察者模式标准化为23种设计模式之一
1997JavaBeansSun MicrosystemsPropertyChangeSupport实现观察者模式
2002C# EventsMicrosoft语言级事件支持,委托机制
2010Rx (Reactive Extensions)Microsoft响应式编程范式,观察者模式的函数式扩展
2013React FluxFacebook单向数据流架构,观察者模式的变体
2015Reactive StreamsLightbend等异步流处理标准,观察者模式的标准化
2020Project LoomOracle虚拟线程,简化异步观察者实现

19.2.2 理论基础

观察者模式的理论基础源于:

  1. 依赖倒置原则(DIP):高层模块(主题)不依赖低层模块(观察者),两者都依赖抽象

  2. 开闭原则(OCP):新增观察者无需修改主题代码

  3. 信息隐藏原则:主题不需要了解观察者的具体实现

  4. 控制反转(IoC):控制权从主题转移到观察者框架


19.3 UML结构图

19.3.1 标准结构

┌─────────────────────────────────────────────────────────────────┐
│                        <<interface>>                             │
│                         Observer                                 │
├─────────────────────────────────────────────────────────────────┤
│ + update(subject: Subject): void                                │
│ + update(data: Any): void                    ← 推模型           │
└─────────────────────────────────────────────────────────────────┘


              ┌───────────────┴───────────────┐
              │                               │
┌─────────────────────────┐     ┌─────────────────────────┐
│   ConcreteObserverA     │     │   ConcreteObserverB     │
├─────────────────────────┤     ├─────────────────────────┤
│ - observerState: any    │     │ - observerState: any    │
├─────────────────────────┤     ├─────────────────────────┤
│ + update(subject): void │     │ + update(subject): void │
└─────────────────────────┘     └─────────────────────────┘
              △                               △
              │                               │
              └───────────────┬───────────────┘
                              │ 通知
┌─────────────────────────────────────────────────────────────────┐
│                          Subject                                 │
├─────────────────────────────────────────────────────────────────┤
│ - observers: List[Observer]                                      │
│ - state: Any                                                     │
├─────────────────────────────────────────────────────────────────┤
│ + attach(observer: Observer): void                               │
│ + detach(observer: Observer): void                               │
│ + notify(): void                                                 │
│ + getState(): Any                                                │
│ + setState(state: Any): void                                     │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                      ConcreteSubject                             │
├─────────────────────────────────────────────────────────────────┤
│ - subjectState: Any                                              │
├─────────────────────────────────────────────────────────────────┤
│ + getSubjectState(): Any                                         │
│ + setSubjectState(state: Any): void                              │
└─────────────────────────────────────────────────────────────────┘

19.3.2 推模型 vs 拉模型

推模型(Push Model):
┌──────────┐    update(data)    ┌──────────┐
│  Subject │ ─────────────────→ │ Observer │
└──────────┘                    └──────────┘
主题主动推送变化数据

拉模型(Pull Model):
┌──────────┐    notify()    ┌──────────┐
│  Subject │ ─────────────→ │ Observer │
└──────────┘                └──────────┘

                                  │ subject.getState()

                            ┌──────────┐
                            │  Subject │
                            └──────────┘
观察者主动获取所需数据

19.3.3 事件总线架构

┌─────────────────────────────────────────────────────────────────┐
│                        EventBus                                  │
├─────────────────────────────────────────────────────────────────┤
│ - listeners: Dict[str, List[Callable]]                          │
├─────────────────────────────────────────────────────────────────┤
│ + subscribe(event: str, handler: Callable): void                │
│ + unsubscribe(event: str, handler: Callable): void              │
│ + publish(event: str, data: Any): void                          │
└─────────────────────────────────────────────────────────────────┘
         △                              △
         │ 订阅                          │ 发布
         │                              │
┌─────────────────┐            ┌─────────────────┐
│    Listener A   │            │    Publisher    │
├─────────────────┤            ├─────────────────┤
│ + on_event()    │            │ + do_something()│
└─────────────────┘            └─────────────────┘
┌─────────────────┐
│    Listener B   │
├─────────────────┤
│ + on_event()    │
└─────────────────┘

19.4 Python实现

19.4.1 基于ABC的标准实现

python
from abc import ABC, abstractmethod
from typing import List, Any, Optional, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
import weakref

T = TypeVar('T')

class Observer(ABC, Generic[T]):
    @abstractmethod
    def update(self, data: T) -> None:
        pass

class Subject(Generic[T]):
    def __init__(self, name: str = "Subject"):
        self._name = name
        self._observers: List[Observer[T]] = []
        self._state: Optional[T] = None
        self._version: int = 0
    
    @property
    def name(self) -> str:
        return self._name
    
    @property
    def state(self) -> Optional[T]:
        return self._state
    
    @state.setter
    def state(self, value: T) -> None:
        old_state = self._state
        self._state = value
        self._version += 1
        self._notify(old_state, value)
    
    @property
    def version(self) -> int:
        return self._version
    
    def attach(self, observer: Observer[T]) -> 'Subject[T]':
        if observer not in self._observers:
            self._observers.append(observer)
        return self
    
    def detach(self, observer: Observer[T]) -> 'Subject[T]':
        if observer in self._observers:
            self._observers.remove(observer)
        return self
    
    def _notify(self, old_state: Optional[T], new_state: T) -> None:
        for observer in self._observers[:]:
            try:
                observer.update(new_state)
            except Exception as e:
                print(f"Observer notification failed: {e}")
    
    def observer_count(self) -> int:
        return len(self._observers)

@dataclass
class StateChange:
    subject_name: str
    old_value: Any
    new_value: Any
    timestamp: datetime = field(default_factory=datetime.now)
    version: int = 0

class LoggingObserver(Observer[Any]):
    def __init__(self, name: str):
        self._name = name
        self._history: List[StateChange] = []
    
    def update(self, data: Any) -> None:
        change = StateChange(
            subject_name="",
            old_value=None,
            new_value=data,
            version=0
        )
        self._history.append(change)
        print(f"[{self._name}] Received update: {data}")
    
    @property
    def history(self) -> List[StateChange]:
        return self._history.copy()

class ThresholdObserver(Observer[float]):
    def __init__(self, threshold: float, name: str = "ThresholdObserver"):
        self._threshold = threshold
        self._name = name
        self._triggered = False
    
    def update(self, data: float) -> None:
        if data >= self._threshold and not self._triggered:
            print(f"[{self._name}] Alert! Value {data} >= threshold {self._threshold}")
            self._triggered = True
        elif data < self._threshold:
            self._triggered = False

subject = Subject[float]("TemperatureSensor")
subject.attach(LoggingObserver("Logger"))
subject.attach(ThresholdObserver(30.0, "HighTempAlert"))

subject.state = 25.0
subject.state = 28.0
subject.state = 32.0
subject.state = 35.0
subject.state = 29.0

19.4.2 弱引用观察者

python
from weakref import WeakSet, WeakMethod, ref
from typing import Callable, Set, Any, Optional
from dataclasses import dataclass
import inspect

class WeakSubject:
    def __init__(self):
        self._weak_observers: Set = WeakSet()
        self._weak_methods: list = []
        self._state: Any = None
    
    @property
    def state(self) -> Any:
        return self._state
    
    @state.setter
    def state(self, value: Any) -> None:
        self._state = value
        self._notify()
    
    def attach(self, observer: Any) -> None:
        if hasattr(observer, '__func__') and hasattr(observer, '__self__'):
            self._weak_methods.append(WeakMethod(observer))
        else:
            self._weak_observers.add(observer)
    
    def attach_callback(self, callback: Callable) -> None:
        if inspect.ismethod(callback):
            self._weak_methods.append(WeakMethod(callback))
        else:
            self._weak_observers.add(callback)
    
    def _notify(self) -> None:
        for observer in self._weak_observers:
            try:
                if callable(observer):
                    observer(self._state)
            except Exception as e:
                print(f"Weak observer notification failed: {e}")
        
        alive_methods = []
        for weak_method in self._weak_methods:
            method = weak_method()
            if method is not None:
                alive_methods.append(weak_method)
                try:
                    method(self._state)
                except Exception as e:
                    print(f"Weak method notification failed: {e}")
        self._weak_methods = alive_methods

class Sensor:
    def __init__(self, name: str):
        self._name = name
        self._subject = WeakSubject()
    
    @property
    def value(self) -> Any:
        return self._subject.state
    
    @value.setter
    def value(self, val: Any) -> None:
        self._subject.state = val
    
    def bind(self, callback: Callable) -> None:
        self._subject.attach_callback(callback)

class Display:
    def __init__(self, name: str):
        self._name = name
    
    def on_value_change(self, value: Any) -> None:
        print(f"[{self._name}] Display: {value}")

sensor = Sensor("Temperature")
display = Display("MainDisplay")

sensor.bind(display.on_value_change)
sensor.value = 25.5
sensor.value = 26.0

del display
sensor.value = 27.0
print("Display已销毁,观察者自动移除")

19.4.3 事件总线实现

python
from typing import Callable, Dict, List, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import threading
from functools import wraps

class EventPriority(Enum):
    LOW = auto()
    NORMAL = auto()
    HIGH = auto()
    CRITICAL = auto()

@dataclass
class Event:
    name: str
    data: Any = None
    timestamp: datetime = field(default_factory=datetime.now)
    source: Any = None
    cancelled: bool = False
    
    def cancel(self) -> None:
        self.cancelled = True

@dataclass
class Subscription:
    event_name: str
    handler: Callable
    priority: EventPriority = EventPriority.NORMAL
    once: bool = False

class EventBus:
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls) -> 'EventBus':
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        self._initialized = True
        self._subscriptions: Dict[str, List[Subscription]] = {}
        self._global_handlers: List[Callable] = []
        self._event_history: List[Event] = []
        self._max_history = 100
        self._middleware: List[Callable] = []
    
    def subscribe(
        self,
        event_name: str,
        handler: Callable,
        priority: EventPriority = EventPriority.NORMAL,
        once: bool = False
    ) -> Subscription:
        subscription = Subscription(
            event_name=event_name,
            handler=handler,
            priority=priority,
            once=once
        )
        
        if event_name not in self._subscriptions:
            self._subscriptions[event_name] = []
        
        self._subscriptions[event_name].append(subscription)
        self._subscriptions[event_name].sort(
            key=lambda s: s.priority.value,
            reverse=True
        )
        
        return subscription
    
    def on(self, event_name: str) -> Callable:
        def decorator(handler: Callable) -> Callable:
            self.subscribe(event_name, handler)
            return handler
        return decorator
    
    def once(self, event_name: str) -> Callable:
        def decorator(handler: Callable) -> Callable:
            self.subscribe(event_name, handler, once=True)
            return handler
        return decorator
    
    def unsubscribe(self, subscription: Subscription) -> bool:
        if subscription.event_name in self._subscriptions:
            try:
                self._subscriptions[subscription.event_name].remove(subscription)
                return True
            except ValueError:
                return False
        return False
    
    def publish(self, event_name: str, data: Any = None, source: Any = None) -> Event:
        event = Event(
            name=event_name,
            data=data,
            source=source
        )
        
        for middleware in self._middleware:
            event = middleware(event)
            if event is None or event.cancelled:
                return event
        
        for handler in self._global_handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"Global handler error: {e}")
        
        if event_name in self._subscriptions:
            to_remove = []
            for subscription in self._subscriptions[event_name]:
                if event.cancelled:
                    break
                try:
                    subscription.handler(event)
                except Exception as e:
                    print(f"Handler error for {event_name}: {e}")
                
                if subscription.once:
                    to_remove.append(subscription)
            
            for sub in to_remove:
                self._subscriptions[event_name].remove(sub)
        
        self._add_to_history(event)
        return event
    
    def emit(self, event_name: str, data: Any = None, source: Any = None) -> Event:
        return self.publish(event_name, data, source)
    
    def _add_to_history(self, event: Event) -> None:
        self._event_history.append(event)
        if len(self._event_history) > self._max_history:
            self._event_history.pop(0)
    
    def add_middleware(self, middleware: Callable) -> None:
        self._middleware.append(middleware)
    
    def add_global_handler(self, handler: Callable) -> None:
        self._global_handlers.append(handler)
    
    def get_history(self, event_name: str = None) -> List[Event]:
        if event_name:
            return [e for e in self._event_history if e.name == event_name]
        return self._event_history.copy()
    
    def clear(self) -> None:
        self._subscriptions.clear()
        self._event_history.clear()

bus = EventBus()

@bus.on("user.login")
def on_user_login(event: Event):
    user = event.data
    print(f"用户登录: {user.get('name', 'Unknown')}")

@bus.on("user.logout")
def on_user_logout(event: Event):
    user = event.data
    print(f"用户登出: {user.get('name', 'Unknown')}")

@bus.once("system.shutdown")
def on_shutdown(event: Event):
    print("系统即将关闭(只触发一次)")

def logging_middleware(event: Event) -> Event:
    print(f"[Middleware] Event: {event.name}")
    return event

bus.add_middleware(logging_middleware)

bus.emit("user.login", {"name": "张三", "id": 1001})
bus.emit("user.logout", {"name": "张三", "id": 1001})
bus.emit("system.shutdown")
bus.emit("system.shutdown")

19.4.4 异步观察者实现

python
import asyncio
from typing import List, Callable, Any, Optional, Awaitable
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import inspect

class AsyncObserver(ABC):
    @abstractmethod
    async def update(self, data: Any) -> None:
        pass

@dataclass
class AsyncEvent:
    name: str
    data: Any
    timestamp: datetime = field(default_factory=datetime.now)
    source: Any = None

class AsyncSubject:
    def __init__(self, name: str = "AsyncSubject"):
        self._name = name
        self._observers: List[AsyncObserver] = []
        self._async_callbacks: List[Callable[..., Awaitable]] = []
        self._state: Any = None
        self._pending_notifications: int = 0
    
    @property
    def state(self) -> Any:
        return self._state
    
    @state.setter
    def state(self, value: Any) -> None:
        self._state = value
        asyncio.create_task(self._async_notify(value))
    
    def attach(self, observer: AsyncObserver) -> None:
        self._observers.append(observer)
    
    def attach_async_callback(self, callback: Callable[..., Awaitable]) -> None:
        self._async_callbacks.append(callback)
    
    async def _async_notify(self, data: Any) -> None:
        self._pending_notifications += 1
        tasks = []
        
        for observer in self._observers:
            tasks.append(self._safe_observer_update(observer, data))
        
        for callback in self._async_callbacks:
            tasks.append(self._safe_callback_update(callback, data))
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        
        self._pending_notifications -= 1
    
    async def _safe_observer_update(self, observer: AsyncObserver, data: Any) -> None:
        try:
            await observer.update(data)
        except Exception as e:
            print(f"Async observer update failed: {e}")
    
    async def _safe_callback_update(self, callback: Callable, data: Any) -> None:
        try:
            result = callback(data)
            if inspect.isawaitable(result):
                await result
        except Exception as e:
            print(f"Async callback update failed: {e}")
    
    async def wait_for_notifications(self) -> None:
        while self._pending_notifications > 0:
            await asyncio.sleep(0.01)

class AsyncEventBus:
    def __init__(self):
        self._handlers: dict[str, List[Callable]] = {}
        self._async_handlers: dict[str, List[Callable[..., Awaitable]]] = {}
    
    def on(self, event_name: str, handler: Callable) -> None:
        if event_name not in self._handlers:
            self._handlers[event_name] = []
        self._handlers[event_name].append(handler)
    
    def on_async(self, event_name: str, handler: Callable[..., Awaitable]) -> None:
        if event_name not in self._async_handlers:
            self._async_handlers[event_name] = []
        self._async_handlers[event_name].append(handler)
    
    async def emit(self, event_name: str, data: Any = None) -> None:
        tasks = []
        
        if event_name in self._handlers:
            for handler in self._handlers[event_name]:
                tasks.append(self._run_sync_handler(handler, data))
        
        if event_name in self._async_handlers:
            for handler in self._async_handlers[event_name]:
                tasks.append(self._run_async_handler(handler, data))
        
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _run_sync_handler(self, handler: Callable, data: Any) -> None:
        try:
            handler(data)
        except Exception as e:
            print(f"Sync handler error: {e}")
    
    async def _run_async_handler(self, handler: Callable[..., Awaitable], data: Any) -> None:
        try:
            await handler(data)
        except Exception as e:
            print(f"Async handler error: {e}")

class EmailService(AsyncObserver):
    async def update(self, data: Any) -> None:
        await asyncio.sleep(0.1)
        print(f"[Email] 发送邮件通知: {data}")

class PushService(AsyncObserver):
    async def update(self, data: Any) -> None:
        await asyncio.sleep(0.05)
        print(f"[Push] 发送推送通知: {data}")

async def log_notification(data: Any) -> None:
    await asyncio.sleep(0.02)
    print(f"[Log] 记录通知: {data}")

async def main_async_observer():
    subject = AsyncSubject("NotificationSubject")
    
    subject.attach(EmailService())
    subject.attach(PushService())
    subject.attach_async_callback(log_notification)
    
    print("发送通知...")
    subject.state = {"message": "系统维护通知", "level": "info"}
    
    await subject.wait_for_notifications()
    print("所有通知已发送完成")

if __name__ == "__main__":
    asyncio.run(main_async_observer())

19.4.5 响应式流实现

python
from typing import TypeVar, Generic, Callable, Optional, List, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
from functools import wraps
import time

T = TypeVar('T')
R = TypeVar('R')

class Subscriber(ABC, Generic[T]):
    @abstractmethod
    def on_next(self, value: T) -> None:
        pass
    
    @abstractmethod
    def on_error(self, error: Exception) -> None:
        pass
    
    @abstractmethod
    def on_complete(self) -> None:
        pass

class Observable(ABC, Generic[T]):
    @abstractmethod
    def subscribe(self, subscriber: Subscriber[T]) -> 'Disposable':
        pass

class Disposable:
    def __init__(self, dispose_func: Callable):
        self._dispose_func = dispose_func
        self._disposed = False
    
    def dispose(self) -> None:
        if not self._disposed:
            self._dispose_func()
            self._disposed = True

class Flowable(Generic[T], Observable[T]):
    def __init__(self, source: Callable[[Subscriber[T]], None]):
        self._source = source
    
    def subscribe(self, subscriber: Subscriber[T]) -> Disposable:
        disposed = [False]
        
        class SafeSubscriber(Subscriber[T]):
            def on_next(self, value: T) -> None:
                if not disposed[0]:
                    subscriber.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                if not disposed[0]:
                    subscriber.on_error(error)
            
            def on_complete(self) -> None:
                if not disposed[0]:
                    subscriber.on_complete()
        
        self._source(SafeSubscriber())
        
        return Disposable(lambda: disposed.__setitem__(0, True))
    
    def map(self, mapper: Callable[[T], R]) -> 'Flowable[R]':
        def new_source(subscriber: Subscriber[R]) -> None:
            class MapSubscriber(Subscriber[T]):
                def on_next(self, value: T) -> None:
                    try:
                        result = mapper(value)
                        subscriber.on_next(result)
                    except Exception as e:
                        subscriber.on_error(e)
                
                def on_error(self, error: Exception) -> None:
                    subscriber.on_error(error)
                
                def on_complete(self) -> None:
                    subscriber.on_complete()
            
            self._source(MapSubscriber())
        
        return Flowable(new_source)
    
    def filter(self, predicate: Callable[[T], bool]) -> 'Flowable[T]':
        def new_source(subscriber: Subscriber[T]) -> None:
            class FilterSubscriber(Subscriber[T]):
                def on_next(self, value: T) -> None:
                    try:
                        if predicate(value):
                            subscriber.on_next(value)
                    except Exception as e:
                        subscriber.on_error(e)
                
                def on_error(self, error: Exception) -> None:
                    subscriber.on_error(error)
                
                def on_complete(self) -> None:
                    subscriber.on_complete()
            
            self._source(FilterSubscriber())
        
        return Flowable(new_source)
    
    def take(self, count: int) -> 'Flowable[T]':
        def new_source(subscriber: Subscriber[T]) -> None:
            taken = [0]
            
            class TakeSubscriber(Subscriber[T]):
                def on_next(self, value: T) -> None:
                    if taken[0] < count:
                        taken[0] += 1
                        subscriber.on_next(value)
                    if taken[0] >= count:
                        subscriber.on_complete()
                
                def on_error(self, error: Exception) -> None:
                    subscriber.on_error(error)
                
                def on_complete(self) -> None:
                    subscriber.on_complete()
            
            self._source(TakeSubscriber())
        
        return Flowable(new_source)
    
    def debounce(self, delay: float) -> 'Flowable[T]':
        def new_source(subscriber: Subscriber[T]) -> None:
            last_time = [0.0]
            
            class DebounceSubscriber(Subscriber[T]):
                def on_next(self, value: T) -> None:
                    current_time = time.time()
                    if current_time - last_time[0] >= delay:
                        last_time[0] = current_time
                        subscriber.on_next(value)
                
                def on_error(self, error: Exception) -> None:
                    subscriber.on_error(error)
                
                def on_complete(self) -> None:
                    subscriber.on_complete()
            
            self._source(DebounceSubscriber())
        
        return Flowable(new_source)
    
    @staticmethod
    def just(value: T) -> 'Flowable[T]':
        def source(subscriber: Subscriber[T]) -> None:
            subscriber.on_next(value)
            subscriber.on_complete()
        return Flowable(source)
    
    @staticmethod
    def from_iterable(values: List[T]) -> 'Flowable[T]':
        def source(subscriber: Subscriber[T]) -> None:
            try:
                for value in values:
                    subscriber.on_next(value)
                subscriber.on_complete()
            except Exception as e:
                subscriber.on_error(e)
        return Flowable(source)

class PrintSubscriber(Subscriber[Any]):
    def __init__(self, name: str = "PrintSubscriber"):
        self._name = name
    
    def on_next(self, value: Any) -> None:
        print(f"[{self._name}] {value}")
    
    def on_error(self, error: Exception) -> None:
        print(f"[{self._name}] Error: {error}")
    
    def on_complete(self) -> None:
        print(f"[{self._name}] Complete")

Flowable.from_iterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) \
    .filter(lambda x: x % 2 == 0) \
    .map(lambda x: x * x) \
    .take(3) \
    .subscribe(PrintSubscriber("FilteredSquared"))

19.5 企业级应用示例

19.5.1 实时数据监控系统

python
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from decimal import Decimal
import threading
import time
import json

class AlertLevel(Enum):
    INFO = auto()
    WARNING = auto()
    ERROR = auto()
    CRITICAL = auto()

@dataclass
class Metric:
    name: str
    value: Decimal
    unit: str
    timestamp: datetime = field(default_factory=datetime.now)
    tags: Dict[str, str] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "value": str(self.value),
            "unit": self.unit,
            "timestamp": self.timestamp.isoformat(),
            "tags": self.tags
        }

@dataclass
class Alert:
    metric_name: str
    level: AlertLevel
    message: str
    value: Decimal
    threshold: Decimal
    timestamp: datetime = field(default_factory=datetime.now)

class MetricObserver(ABC):
    @abstractmethod
    def on_metric(self, metric: Metric) -> None:
        pass
    
    @abstractmethod
    def on_alert(self, alert: Alert) -> None:
        pass

class AlertRule:
    def __init__(
        self,
        metric_name: str,
        threshold: Decimal,
        comparison: str = ">",
        level: AlertLevel = AlertLevel.WARNING,
        message_template: str = None
    ):
        self._metric_name = metric_name
        self._threshold = threshold
        self._comparison = comparison
        self._level = level
        self._message_template = message_template or f"{metric_name} {{comparison}} {threshold}"
        self._triggered = False
    
    def evaluate(self, metric: Metric) -> Optional[Alert]:
        if metric.name != self._metric_name:
            return None
        
        triggered = self._compare(metric.value)
        
        if triggered and not self._triggered:
            self._triggered = True
            return Alert(
                metric_name=metric.name,
                level=self._level,
                message=self._message_template.format(
                    comparison=self._comparison,
                    value=metric.value
                ),
                value=metric.value,
                threshold=self._threshold
            )
        elif not triggered:
            self._triggered = False
        
        return None
    
    def _compare(self, value: Decimal) -> bool:
        comparisons = {
            ">": lambda v, t: v > t,
            ">=": lambda v, t: v >= t,
            "<": lambda v, t: v < t,
            "<=": lambda v, t: v <= t,
            "==": lambda v, t: v == t,
            "!=": lambda v, t: v != t
        }
        return comparisons.get(self._comparison, lambda v, t: False)(value, self._threshold)

class MetricSubject:
    def __init__(self, name: str):
        self._name = name
        self._observers: List[MetricObserver] = []
        self._rules: List[AlertRule] = []
        self._metrics: Dict[str, List[Metric]] = {}
        self._lock = threading.Lock()
    
    def attach(self, observer: MetricObserver) -> None:
        with self._lock:
            self._observers.append(observer)
    
    def detach(self, observer: MetricObserver) -> None:
        with self._lock:
            if observer in self._observers:
                self._observers.remove(observer)
    
    def add_rule(self, rule: AlertRule) -> None:
        with self._lock:
            self._rules.append(rule)
    
    def record(self, metric: Metric) -> None:
        with self._lock:
            if metric.name not in self._metrics:
                self._metrics[metric.name] = []
            self._metrics[metric.name].append(metric)
            
            observers = self._observers.copy()
            rules = self._rules.copy()
        
        for observer in observers:
            try:
                observer.on_metric(metric)
            except Exception as e:
                print(f"Observer error: {e}")
        
        for rule in rules:
            alert = rule.evaluate(metric)
            if alert:
                for observer in observers:
                    try:
                        observer.on_alert(alert)
                    except Exception as e:
                        print(f"Alert observer error: {e}")
    
    def get_metrics(self, name: str = None) -> List[Metric]:
        with self._lock:
            if name:
                return self._metrics.get(name, []).copy()
            return [m for ms in self._metrics.values() for m in ms]

class ConsoleMetricObserver(MetricObserver):
    def on_metric(self, metric: Metric) -> None:
        print(f"[Metric] {metric.name} = {metric.value} {metric.unit}")
    
    def on_alert(self, alert: Alert) -> None:
        level_name = alert.level.name
        print(f"[{level_name}] {alert.message} (value: {alert.value})")

class LogFileObserver(MetricObserver):
    def __init__(self, file_path: str):
        self._file_path = file_path
    
    def on_metric(self, metric: Metric) -> None:
        self._write(json.dumps(metric.to_dict()))
    
    def on_alert(self, alert: Alert) -> None:
        self._write(json.dumps({
            "type": "alert",
            "metric_name": alert.metric_name,
            "level": alert.level.name,
            "message": alert.message,
            "value": str(alert.value),
            "timestamp": alert.timestamp.isoformat()
        }))
    
    def _write(self, content: str) -> None:
        with open(self._file_path, 'a') as f:
            f.write(content + '\n')

class AggregationObserver(MetricObserver):
    def __init__(self, window_size: int = 60):
        self._window_size = window_size
        self._data: Dict[str, List[Decimal]] = {}
        self._aggregations: Dict[str, Dict[str, Decimal]] = {}
    
    def on_metric(self, metric: Metric) -> None:
        if metric.name not in self._data:
            self._data[metric.name] = []
        
        self._data[metric.name].append(metric.value)
        
        if len(self._data[metric.name]) > self._window_size:
            self._data[metric.name].pop(0)
        
        values = self._data[metric.name]
        self._aggregations[metric.name] = {
            "min": min(values),
            "max": max(values),
            "avg": sum(values) / len(values),
            "count": Decimal(len(values))
        }
    
    def on_alert(self, alert: Alert) -> None:
        pass
    
    def get_aggregation(self, metric_name: str) -> Optional[Dict[str, Decimal]]:
        return self._aggregations.get(metric_name)

monitor = MetricSubject("SystemMonitor")
monitor.attach(ConsoleMetricObserver())
monitor.attach(AggregationObserver(window_size=100))

monitor.add_rule(AlertRule("cpu_usage", Decimal("80"), ">", AlertLevel.WARNING, "CPU使用率过高"))
monitor.add_rule(AlertRule("cpu_usage", Decimal("95"), ">", AlertLevel.CRITICAL, "CPU使用率严重过高"))
monitor.add_rule(AlertRule("memory_usage", Decimal("90"), ">", AlertLevel.ERROR, "内存使用率过高"))

monitor.record(Metric("cpu_usage", Decimal("75.5"), "%", tags={"host": "server1"}))
monitor.record(Metric("cpu_usage", Decimal("85.2"), "%", tags={"host": "server1"}))
monitor.record(Metric("cpu_usage", Decimal("96.8"), "%", tags={"host": "server1"}))
monitor.record(Metric("memory_usage", Decimal("92.1"), "%", tags={"host": "server1"}))

19.5.2 订单状态追踪系统

python
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import uuid

class OrderStatus(Enum):
    PENDING = auto()
    CONFIRMED = auto()
    PAID = auto()
    PROCESSING = auto()
    SHIPPED = auto()
    DELIVERED = auto()
    CANCELLED = auto()
    REFUNDED = auto()

@dataclass
class OrderEvent:
    order_id: str
    old_status: Optional[OrderStatus]
    new_status: OrderStatus
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Order:
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    customer_id: str = ""
    items: List[Dict[str, Any]] = field(default_factory=list)
    total_amount: float = 0.0
    status: OrderStatus = OrderStatus.PENDING
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    
    def add_item(self, product_id: str, name: str, price: float, quantity: int = 1) -> None:
        self.items.append({
            "product_id": product_id,
            "name": name,
            "price": price,
            "quantity": quantity
        })
        self._recalculate_total()
    
    def _recalculate_total(self) -> None:
        self.total_amount = sum(
            item["price"] * item["quantity"]
            for item in self.items
        )

class OrderObserver(ABC):
    @abstractmethod
    def on_status_change(self, event: OrderEvent) -> None:
        pass

class OrderSubject:
    VALID_TRANSITIONS: Dict[OrderStatus, Set[OrderStatus]] = {
        OrderStatus.PENDING: {OrderStatus.CONFIRMED, OrderStatus.CANCELLED},
        OrderStatus.CONFIRMED: {OrderStatus.PAID, OrderStatus.CANCELLED},
        OrderStatus.PAID: {OrderStatus.PROCESSING, OrderStatus.CANCELLED, OrderStatus.REFUNDED},
        OrderStatus.PROCESSING: {OrderStatus.SHIPPED, OrderStatus.CANCELLED},
        OrderStatus.SHIPPED: {OrderStatus.DELIVERED, OrderStatus.REFUNDED},
        OrderStatus.DELIVERED: {OrderStatus.REFUNDED},
        OrderStatus.CANCELLED: set(),
        OrderStatus.REFUNDED: set()
    }
    
    def __init__(self):
        self._orders: Dict[str, Order] = {}
        self._observers: List[OrderObserver] = []
    
    def attach(self, observer: OrderObserver) -> None:
        self._observers.append(observer)
    
    def detach(self, observer: OrderObserver) -> None:
        if observer in self._observers:
            self._observers.remove(observer)
    
    def create_order(self, customer_id: str) -> Order:
        order = Order(customer_id=customer_id)
        self._orders[order.id] = order
        return order
    
    def update_status(self, order_id: str, new_status: OrderStatus, metadata: Dict[str, Any] = None) -> bool:
        if order_id not in self._orders:
            return False
        
        order = self._orders[order_id]
        old_status = order.status
        
        if new_status not in self.VALID_TRANSITIONS.get(old_status, set()):
            print(f"Invalid transition: {old_status.name} -> {new_status.name}")
            return False
        
        order.status = new_status
        order.updated_at = datetime.now()
        
        event = OrderEvent(
            order_id=order_id,
            old_status=old_status,
            new_status=new_status,
            metadata=metadata or {}
        )
        
        self._notify_observers(event)
        return True
    
    def _notify_observers(self, event: OrderEvent) -> None:
        for observer in self._observers:
            try:
                observer.on_status_change(event)
            except Exception as e:
                print(f"Observer error: {e}")
    
    def get_order(self, order_id: str) -> Optional[Order]:
        return self._orders.get(order_id)

class EmailNotificationObserver(OrderObserver):
    def __init__(self, email_service: Any = None):
        self._email_service = email_service
    
    def on_status_change(self, event: OrderEvent) -> None:
        status_messages = {
            OrderStatus.CONFIRMED: "您的订单已确认,请尽快完成支付",
            OrderStatus.PAID: "支付成功,我们正在为您准备商品",
            OrderStatus.SHIPPED: "您的订单已发货",
            OrderStatus.DELIVERED: "您的订单已送达",
            OrderStatus.CANCELLED: "您的订单已取消",
            OrderStatus.REFUNDED: "您的订单退款已处理"
        }
        
        message = status_messages.get(event.new_status)
        if message:
            print(f"[Email] 订单 {event.order_id}: {message}")

class InventoryObserver(OrderObserver):
    def __init__(self):
        self._reserved_items: Dict[str, List[str]] = {}
    
    def on_status_change(self, event: OrderEvent) -> None:
        if event.new_status == OrderStatus.CONFIRMED:
            print(f"[Inventory] 预留库存 - 订单 {event.order_id}")
        elif event.new_status == OrderStatus.CANCELLED:
            print(f"[Inventory] 释放库存 - 订单 {event.order_id}")
        elif event.new_status == OrderStatus.REFUNDED:
            print(f"[Inventory] 退回库存 - 订单 {event.order_id}")

class LogisticsObserver(OrderObserver):
    def on_status_change(self, event: OrderEvent) -> None:
        if event.new_status == OrderStatus.PAID:
            tracking_number = f"TRK{event.order_id.upper()}"
            print(f"[Logistics] 创建运单: {tracking_number}")
        elif event.new_status == OrderStatus.SHIPPED:
            tracking_number = event.metadata.get("tracking_number", "N/A")
            print(f"[Logistics] 更新物流状态 - 运单号: {tracking_number}")

class AnalyticsObserver(OrderObserver):
    def __init__(self):
        self._status_counts: Dict[OrderStatus, int] = {}
        self._transition_times: List[float] = []
    
    def on_status_change(self, event: OrderEvent) -> None:
        if event.new_status not in self._status_counts:
            self._status_counts[event.new_status] = 0
        self._status_counts[event.new_status] += 1
        
        print(f"[Analytics] 状态统计: {dict((s.name, c) for s, c in self._status_counts.items())}")
    
    def get_report(self) -> Dict[str, Any]:
        return {
            "status_counts": dict((s.name, c) for s, c in self._status_counts.items()),
            "total_transitions": sum(self._status_counts.values())
        }

order_system = OrderSubject()
order_system.attach(EmailNotificationObserver())
order_system.attach(InventoryObserver())
order_system.attach(LogisticsObserver())
order_system.attach(AnalyticsObserver())

order = order_system.create_order("customer_001")
order.add_item("prod_001", "Python高级编程", 89.0, 2)
order.add_item("prod_002", "设计模式精解", 69.0, 1)

print(f"\n创建订单: {order.id}, 总金额: {order.total_amount}")

order_system.update_status(order.id, OrderStatus.CONFIRMED)
order_system.update_status(order.id, OrderStatus.PAID)
order_system.update_status(order.id, OrderStatus.PROCESSING)
order_system.update_status(order.id, OrderStatus.SHIPPED, {"tracking_number": "SF1234567890"})
order_system.update_status(order.id, OrderStatus.DELIVERED)

19.5.3 配置中心热更新

python
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import json
import hashlib
import threading

@dataclass
class ConfigChangeEvent:
    key: str
    old_value: Any
    new_value: Any
    namespace: str = "default"
    timestamp: datetime = field(default_factory=datetime.now)

class ConfigObserver(ABC):
    @abstractmethod
    def on_config_change(self, event: ConfigChangeEvent) -> None:
        pass

class ConfigCenter:
    def __init__(self):
        self._configs: Dict[str, Dict[str, Any]] = {"default": {}}
        self._observers: List[ConfigObserver] = []
        self._key_observers: Dict[str, List[ConfigObserver]] = {}
        self._version: int = 0
        self._lock = threading.RLock()
        self._checksums: Dict[str, str] = {}
    
    def register_observer(self, observer: ConfigObserver, keys: List[str] = None) -> None:
        with self._lock:
            if keys is None:
                self._observers.append(observer)
            else:
                for key in keys:
                    if key not in self._key_observers:
                        self._key_observers[key] = []
                    self._key_observers[key].append(observer)
    
    def unregister_observer(self, observer: ConfigObserver) -> None:
        with self._lock:
            if observer in self._observers:
                self._observers.remove(observer)
            for key in self._key_observers:
                if observer in self._key_observers[key]:
                    self._key_observers[key].remove(observer)
    
    def set(self, key: str, value: Any, namespace: str = "default") -> None:
        with self._lock:
            if namespace not in self._configs:
                self._configs[namespace] = {}
            
            old_value = self._configs[namespace].get(key)
            self._configs[namespace][key] = value
            self._version += 1
            
            self._update_checksum(namespace)
            
            event = ConfigChangeEvent(
                key=key,
                old_value=old_value,
                new_value=value,
                namespace=namespace
            )
            
            observers = self._observers.copy()
            key_observers = self._key_observers.get(key, []).copy()
        
        for observer in observers:
            try:
                observer.on_config_change(event)
            except Exception as e:
                print(f"Config observer error: {e}")
        
        for observer in key_observers:
            try:
                observer.on_config_change(event)
            except Exception as e:
                print(f"Key config observer error: {e}")
    
    def get(self, key: str, default: Any = None, namespace: str = "default") -> Any:
        with self._lock:
            return self._configs.get(namespace, {}).get(key, default)
    
    def get_all(self, namespace: str = "default") -> Dict[str, Any]:
        with self._lock:
            return self._configs.get(namespace, {}).copy()
    
    def delete(self, key: str, namespace: str = "default") -> bool:
        with self._lock:
            if namespace not in self._configs:
                return False
            if key not in self._configs[namespace]:
                return False
            
            old_value = self._configs[namespace].pop(key)
            self._version += 1
            
            event = ConfigChangeEvent(
                key=key,
                old_value=old_value,
                new_value=None,
                namespace=namespace
            )
            
            observers = self._observers.copy()
        
        for observer in observers:
            try:
                observer.on_config_change(event)
            except Exception as e:
                print(f"Config observer error: {e}")
        
        return True
    
    def _update_checksum(self, namespace: str) -> None:
        config_str = json.dumps(self._configs.get(namespace, {}), sort_keys=True)
        self._checksums[namespace] = hashlib.md5(config_str.encode()).hexdigest()
    
    def get_checksum(self, namespace: str = "default") -> str:
        return self._checksums.get(namespace, "")
    
    @property
    def version(self) -> int:
        return self._version

class DatabaseConfigObserver(ConfigObserver):
    def __init__(self):
        self._pool_size = 10
        self._timeout = 30
    
    def on_config_change(self, event: ConfigChangeEvent) -> None:
        if event.key == "db.pool_size":
            old_pool = self._pool_size
            self._pool_size = event.new_value
            print(f"[Database] 连接池大小: {old_pool} -> {self._pool_size}")
        elif event.key == "db.timeout":
            self._timeout = event.new_value
            print(f"[Database] 超时时间更新为: {self._timeout}s")

class CacheConfigObserver(ConfigObserver):
    def __init__(self):
        self._ttl = 300
        self._max_size = 1000
    
    def on_config_change(self, event: ConfigChangeEvent) -> None:
        if event.key == "cache.ttl":
            self._ttl = event.new_value
            print(f"[Cache] TTL更新为: {self._ttl}s")
        elif event.key == "cache.max_size":
            self._max_size = event.new_value
            print(f"[Cache] 最大容量更新为: {self._max_size}")

class FeatureFlagObserver(ConfigObserver):
    def __init__(self):
        self._flags: Dict[str, bool] = {}
    
    def on_config_change(self, event: ConfigChangeEvent) -> None:
        if event.key.startswith("feature."):
            flag_name = event.key[8:]
            self._flags[flag_name] = event.new_value
            status = "启用" if event.new_value else "禁用"
            print(f"[FeatureFlag] {flag_name}: {status}")
    
    def is_enabled(self, flag_name: str) -> bool:
        return self._flags.get(flag_name, False)

class LoggingConfigObserver(ConfigObserver):
    def __init__(self):
        self._level = "INFO"
        self._format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    
    def on_config_change(self, event: ConfigChangeEvent) -> None:
        if event.key == "logging.level":
            self._level = event.new_value
            print(f"[Logging] 日志级别更新为: {self._level}")
        elif event.key == "logging.format":
            self._format = event.new_value
            print(f"[Logging] 日志格式已更新")

config_center = ConfigCenter()

db_observer = DatabaseConfigObserver()
cache_observer = CacheConfigObserver()
feature_observer = FeatureFlagObserver()
logging_observer = LoggingConfigObserver()

config_center.register_observer(db_observer, ["db.pool_size", "db.timeout"])
config_center.register_observer(cache_observer, ["cache.ttl", "cache.max_size"])
config_center.register_observer(feature_observer)
config_center.register_observer(logging_observer, ["logging.level", "logging.format"])

print("=== 初始化配置 ===")
config_center.set("db.pool_size", 10)
config_center.set("db.timeout", 30)
config_center.set("cache.ttl", 300)
config_center.set("cache.max_size", 1000)
config_center.set("logging.level", "INFO")

print("\n=== 热更新配置 ===")
config_center.set("db.pool_size", 20)
config_center.set("cache.ttl", 600)
config_center.set("feature.new_ui", True)
config_center.set("feature.beta_api", True)
config_center.set("logging.level", "DEBUG")

print(f"\nFeature 'new_ui' enabled: {feature_observer.is_enabled('new_ui')}")
print(f"Config version: {config_center.version}")

19.6 模式变体

19.6.1 观察者模式变体对比

变体特点适用场景复杂度
经典观察者主题直接通知观察者简单事件通知★☆☆
事件总线解耦发布者和订阅者模块间通信★★☆
发布-订阅通过消息代理通信分布式系统★★★
响应式流支持操作符链式处理数据流处理★★★
属性绑定双向数据绑定UI框架★★☆
信号-槽类型安全的观察者Qt等框架★★☆

19.6.2 属性绑定模式

python
from typing import TypeVar, Generic, Callable, Optional, List, Any
from dataclasses import dataclass, field
from weakref import WeakSet

T = TypeVar('T')

@dataclass
class Binding:
    source: 'BindableProperty'
    target: 'BindableProperty'
    converter: Optional[Callable] = None
    mode: str = "one_way"

class BindableProperty(Generic[T]):
    def __init__(self, value: T = None):
        self._value: T = value
        self._observers: WeakSet = WeakSet()
        self._bindings: List[Binding] = []
    
    @property
    def value(self) -> T:
        return self._value
    
    @value.setter
    def value(self, new_value: T) -> None:
        if self._value != new_value:
            old_value = self._value
            self._value = new_value
            self._notify(old_value, new_value)
    
    def bind_to(self, other: 'BindableProperty[T]', converter: Callable = None) -> None:
        binding = Binding(
            source=self,
            target=other,
            converter=converter,
            mode="one_way"
        )
        self._bindings.append(binding)
        
        if converter:
            other.value = converter(self._value)
        else:
            other.value = self._value
    
    def bind_two_way(self, other: 'BindableProperty[T]', converter: Callable = None) -> None:
        self.bind_to(other, converter)
        other._bindings.append(Binding(
            source=other,
            target=self,
            converter=converter,
            mode="two_way"
        ))
    
    def observe(self, callback: Callable[[T, T], None]) -> None:
        self._observers.add(callback)
    
    def _notify(self, old_value: T, new_value: T) -> None:
        for observer in self._observers:
            try:
                observer(old_value, new_value)
            except Exception as e:
                print(f"Observer error: {e}")
        
        for binding in self._bindings:
            try:
                target_value = binding.converter(new_value) if binding.converter else new_value
                if binding.target.value != target_value:
                    binding.target.value = target_value
            except Exception as e:
                print(f"Binding error: {e}")

class ViewModel:
    def __init__(self):
        self.username = BindableProperty[str]("")
        self.email = BindableProperty[str]("")
        self.age = BindableProperty[int](0)
        self.is_adult = BindableProperty[bool](False)
        
        self.age.bind_to(self.is_adult, converter=lambda x: x >= 18)

vm = ViewModel()

def on_username_change(old: str, new: str) -> None:
    print(f"Username changed: '{old}' -> '{new}'")

vm.username.observe(on_username_change)

vm.username.value = "张三"
vm.email.value = "zhangsan@example.com"
vm.age.value = 16
print(f"Is adult: {vm.is_adult.value}")

vm.age.value = 20
print(f"Is adult: {vm.is_adult.value}")

19.6.3 信号-槽模式

python
from typing import Callable, List, Any, Tuple, get_type_hints
from dataclasses import dataclass
from functools import wraps
import inspect

@dataclass
class Connection:
    signal: 'Signal'
    slot: Callable
    disconnected: bool = False

class Signal:
    def __init__(self, *param_types: type):
        self._param_types = param_types
        self._connections: List[Connection] = []
    
    def connect(self, slot: Callable) -> Connection:
        self._validate_slot(slot)
        connection = Connection(signal=self, slot=slot)
        self._connections.append(connection)
        return connection
    
    def disconnect(self, connection: Connection) -> bool:
        if connection in self._connections:
            connection.disconnected = True
            self._connections.remove(connection)
            return True
        return False
    
    def emit(self, *args: Any) -> None:
        self._validate_args(args)
        
        for connection in self._connections[:]:
            if not connection.disconnected:
                try:
                    connection.slot(*args)
                except Exception as e:
                    print(f"Slot execution error: {e}")
    
    def _validate_slot(self, slot: Callable) -> None:
        sig = inspect.signature(slot)
        params = list(sig.parameters.values())
        
        for i, expected_type in enumerate(self._param_types):
            if i < len(params):
                param = params[i]
                if param.annotation != inspect.Parameter.empty:
                    if param.annotation != expected_type:
                        raise TypeError(
                            f"Slot parameter {i} type mismatch: "
                            f"expected {expected_type}, got {param.annotation}"
                        )
    
    def _validate_args(self, args: Tuple) -> None:
        if len(args) != len(self._param_types):
            raise ValueError(
                f"Expected {len(self._param_types)} arguments, got {len(args)}"
            )
        
        for i, (arg, expected_type) in enumerate(zip(args, self._param_types)):
            if not isinstance(arg, expected_type):
                raise TypeError(
                    f"Argument {i} type mismatch: expected {expected_type}, got {type(arg)}"
                )

class Button:
    def __init__(self, name: str):
        self._name = name
        self.clicked = Signal()
        self.pressed = Signal()
        self.released = Signal()
    
    def click(self) -> None:
        print(f"[{self._name}] Button clicked")
        self.clicked.emit()
    
    def press(self) -> None:
        self.pressed.emit()
    
    def release(self) -> None:
        self.released.emit()

class Slider:
    def __init__(self, name: str):
        self._name = name
        self._value = 0
        self.value_changed = Signal(int)
    
    @property
    def value(self) -> int:
        return self._value
    
    @value.setter
    def value(self, val: int) -> None:
        if self._value != val:
            self._value = val
            self.value_changed.emit(val)

class TextBox:
    def __init__(self, name: str):
        self._name = name
        self._text = ""
        self.text_changed = Signal(str)
    
    @property
    def text(self) -> str:
        return self._text
    
    @text.setter
    def text(self, value: str) -> None:
        if self._text != value:
            self._text = value
            self.text_changed.emit(value)

class Controller:
    def __init__(self):
        self._click_count = 0
    
    def on_button_clicked(self) -> None:
        self._click_count += 1
        print(f"[Controller] Button clicked {self._click_count} times")
    
    def on_slider_changed(self, value: int) -> None:
        print(f"[Controller] Slider value: {value}")
    
    def on_text_changed(self, text: str) -> None:
        print(f"[Controller] Text: {text}")

button = Button("SubmitButton")
slider = Slider("VolumeSlider")
textbox = TextBox("SearchBox")
controller = Controller()

button.clicked.connect(controller.on_button_clicked)
slider.value_changed.connect(controller.on_slider_changed)
textbox.text_changed.connect(controller.on_text_changed)

button.click()
slider.value = 50
slider.value = 75
textbox.text = "Hello"
textbox.text = "Hello World"
button.click()

19.7 反模式与最佳实践

19.7.1 常见反模式

反模式1:观察者泄漏

python
class BadExample:
    def __init__(self):
        self._observers = []
    
    def attach(self, observer):
        self._observers.append(observer)

class GoodExample:
    def __init__(self):
        self._observers = WeakSet()
    
    def attach(self, observer):
        self._observers.add(observer)

反模式2:通知风暴

python
class BadExample:
    def __init__(self):
        self._value = 0
        self._observers = []
    
    def set_value(self, value):
        self._value = value
        for observer in self._observers:
            observer.update(self)

class GoodExample:
    def __init__(self):
        self._value = 0
        self._observers = []
        self._notify_pending = False
    
    def set_value(self, value):
        self._value = value
        self._schedule_notify()
    
    def _schedule_notify(self):
        if not self._notify_pending:
            self._notify_pending = True
            self._do_notify()
    
    def _do_notify(self):
        for observer in self._observers:
            observer.update(self)
        self._notify_pending = False

反模式3:循环依赖

python
class BadExample:
    def __init__(self):
        self._value = 0
        self._observers = []
        self._is_notifying = False
    
    def set_value(self, value):
        if self._is_notifying:
            return
        self._is_notifying = True
        self._value = value
        for observer in self._observers:
            observer.update(self)
        self._is_notifying = False

class GoodExample:
    def __init__(self):
        self._value = 0
        self._observers = []
        self._notification_stack = []
    
    def set_value(self, value):
        if id(self) in self._notification_stack:
            return
        
        self._notification_stack.append(id(self))
        try:
            self._value = value
            for observer in self._observers[:]:
                observer.update(self)
        finally:
            self._notification_stack.pop()

19.7.2 最佳实践清单

实践描述重要性
使用弱引用防止观察者无法被垃圾回收★★★
异常隔离观察者异常不应影响其他观察者★★★
线程安全多线程环境下保护观察者列表★★★
批量通知合并多次状态变化为单次通知★★☆
优先级支持允许设置观察者通知顺序★★☆
生命周期管理提供取消订阅机制★★★
事件过滤支持条件性通知★★☆
性能监控记录通知延迟和耗时★☆☆

19.8 决策指南

19.8.1 是否使用观察者模式

                    需要对象间通信?

                    ┌────┴────┐
                    │         │
                   否         是
                    │         │
                    ↓         ↓
              不需要模式    通信对象数量?

                    ┌─────────┼─────────┐
                    │         │         │
                   1对1      1对多     多对多
                    │         │         │
                    ↓         ↓         ↓
              直接调用    观察者模式   发布-订阅
                                       或事件总线

19.8.2 实现方式选择

                    选择实现方式

    ┌────────────────────┼────────────────────┐
    │                    │                    │
    ↓                    ↓                    ↓
简单场景              企业应用            分布式系统
    │                    │                    │
    ↓                    ↓                    ↓
ABC观察者            事件总线             消息队列
    │                    │                    │
    │                ┌───┴───┐               │
    │                │       │               │
    │              同步     异步             │
    │                │       │               │
    ↓                ↓       ↓               ↓
Subject/Observer  EventBus  AsyncEventBus  Kafka/RabbitMQ

19.8.3 技术选型对照表

场景推荐实现理由
UI事件处理信号-槽类型安全,支持连接管理
配置热更新事件总线解耦配置源和消费者
实时数据监控响应式流支持过滤、聚合操作
微服务通信消息队列可靠传递,支持持久化
状态同步属性绑定双向绑定,自动更新
日志收集异步观察者非阻塞,高性能

19.9 与其他模式的关系

19.9.1 模式组合

观察者模式 + 策略模式:
┌─────────────────────────────────────────────────────────────┐
│                    Subject                                   │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              NotificationStrategy                    │    │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │    │
│  │  │ SyncNotify  │  │AsyncNotify  │  │BatchNotify  │ │    │
│  │  └─────────────┘  └─────────────┘  └─────────────┘ │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

观察者模式 + 中介者模式:
┌─────────────────────────────────────────────────────────────┐
│                      Mediator                                │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐              │
│  │ Subject A │  │ Subject B │  │ Subject C │              │
│  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘              │
│        │              │              │                     │
│        └──────────────┼──────────────┘                     │
│                       ↓                                    │
│              ┌─────────────────┐                           │
│              │    Observer     │                           │
│              └─────────────────┘                           │
└─────────────────────────────────────────────────────────────┘

19.9.2 模式对比

模式关系区别
中介者模式都处理对象通信中介者集中控制,观察者分散通知
发布-订阅观察者的扩展发布-订阅完全解耦,通过代理通信
责任链模式都涉及对象链责任链传递请求,观察者广播通知
迭代器模式都遍历集合迭代器主动遍历,观察者被动接收

19.10 快速参考卡

19.10.1 核心概念速查

┌─────────────────────────────────────────────────────────────────┐
│                     观察者模式速查卡                             │
├─────────────────────────────────────────────────────────────────┤
│ 定义:一对多依赖,状态变化自动通知                                │
├─────────────────────────────────────────────────────────────────┤
│ 核心角色:                                                       │
│   • Subject(主题):维护观察者列表,状态变化时通知               │
│   • Observer(观察者):定义更新接口,接收通知                    │
│   • ConcreteSubject:具体主题,存储状态                          │
│   • ConcreteObserver:具体观察者,实现更新逻辑                   │
├─────────────────────────────────────────────────────────────────┤
│ 关键方法:                                                       │
│   • attach(observer):添加观察者                                 │
│   • detach(observer):移除观察者                                 │
│   • notify():通知所有观察者                                     │
│   • update(data):观察者更新方法                                 │
├─────────────────────────────────────────────────────────────────┤
│ 推模型 vs 拉模型:                                               │
│   • 推模型:主题主动发送数据给观察者                              │
│   • 拉模型:观察者主动从主题获取数据                              │
├─────────────────────────────────────────────────────────────────┤
│ 适用场景:                                                       │
│   ✓ 对象间存在一对多依赖关系                                     │
│   ✓ 一个对象变化需要通知多个对象                                  │
│   ✓ 需要实现事件驱动架构                                         │
│   ✓ 需要实现发布-订阅机制                                        │
├─────────────────────────────────────────────────────────────────┤
│ 注意事项:                                                       │
│   ✗ 避免观察者泄漏(使用弱引用)                                  │
│   ✗ 避免循环依赖(添加通知保护)                                  │
│   ✗ 避免性能问题(批量通知、异步处理)                            │
└─────────────────────────────────────────────────────────────────┘

19.10.2 Python实现模板

python
from abc import ABC, abstractmethod
from typing import List, Any
from weakref import WeakSet

class Observer(ABC):
    @abstractmethod
    def update(self, data: Any) -> None:
        pass

class Subject:
    def __init__(self):
        self._observers: WeakSet[Observer] = WeakSet()
        self._state: Any = None
    
    def attach(self, observer: Observer) -> None:
        self._observers.add(observer)
    
    def detach(self, observer: Observer) -> None:
        self._observers.discard(observer)
    
    def notify(self) -> None:
        for observer in self._observers:
            observer.update(self._state)
    
    @property
    def state(self) -> Any:
        return self._state
    
    @state.setter
    def state(self, value: Any) -> None:
        self._state = value
        self.notify()

19.11 小结

观察者模式是实现对象间松耦合通信的核心模式,是事件驱动架构的基础。本章从形式化定义出发,深入探讨了观察者模式的理论基础和多种实现方式。

关键要点:

  1. 理论基础:观察者模式建立在依赖倒置原则之上,通过抽象接口解耦主题和观察者

  2. 实现方式:从经典的ABC实现到现代的响应式流,Python提供了丰富的实现选择

  3. 企业应用:在实时监控、订单追踪、配置管理等场景中发挥重要作用

  4. 模式变体:事件总线、响应式流、属性绑定等变体扩展了观察者模式的应用范围

  5. 最佳实践:使用弱引用防止内存泄漏,异常隔离保证通知可靠性,线程安全支持并发场景

观察者模式是现代软件架构的基石之一,理解其原理和实现对于构建可扩展、可维护的系统至关重要。


思考题

  1. 观察者模式与发布-订阅模式的核心区别是什么?在什么情况下应该选择后者?

  2. 如何设计一个支持优先级和异步通知的观察者框架?

  3. 在微服务架构中,观察者模式如何与消息队列结合使用?

  4. 响应式编程(Rx)如何扩展了传统观察者模式的能力?

  5. 如何解决观察者模式中的循环依赖问题?请设计一个检测和预防机制。

Python技术丛书 - 江苏省宿城中等专业学校