Skip to content

第28章 反应式编程模式

学习目标

完成本章学习后,读者将能够:

  • 理解反应式编程的核心概念与理论基础
  • 掌握Observable和Observer的数学定义与实现
  • 使用操作符构建复杂的数据处理流水线
  • 实现背压策略和错误处理机制
  • 应用反应式模式解决异步编程问题

28.1 理论基础与形式化定义

28.1.1 反应式编程的形式化定义

定义 28.1(反应式系统):反应式系统 $\mathcal{R}$ 是一个四元组:

$$\mathcal{R} = \langle \mathcal{S}, \mathcal{O}, \mathcal{T}, \mathcal{P} \rangle$$

其中:

  • $\mathcal{S}$:数据流(Stream)集合,表示随时间变化的值序列
  • $\mathcal{O}$:操作符(Operator)集合,对流进行变换
  • $\mathcal{T}$:时间模型,定义事件的发生顺序
  • $\mathcal{P}$:传播规则,定义数据变化的传播方式

定义 28.2(Observable):Observable $O$ 是一个产生值的异步序列:

$$O: \mathcal{T} \rightarrow \mathcal{P}(\mathcal{V} \cup {\epsilon} \cup {\bot})$$

其中:

  • $\mathcal{V}$:值域
  • $\epsilon$:完成信号
  • $\bot$:错误信号
  • $\mathcal{P}$:幂集,表示可能产生多个值

定义 28.3(Observer):Observer是一个三元组:

$$Observer = \langle on_next, on_error, on_complete \rangle$$

满足:

  • $on_next: \mathcal{V} \rightarrow \mathbf{1}$ — 处理正常值
  • $on_error: Exception \rightarrow \mathbf{1}$ — 处理错误
  • $on_complete: \mathbf{1} \rightarrow \mathbf{1}$ — 处理完成

定义 28.4(订阅关系):订阅是一个函数:

$$subscribe: Observable \times Observer \rightarrow Disposable$$

建立Observable与Observer之间的连接,返回可用于取消订阅的Disposable。

28.1.2 反应式宣言

┌─────────────────────────────────────────────────────────────────────────┐
│                        反应式系统特性                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│                        ┌─────────────────┐                             │
│                        │    Responsive   │                             │
│                        │     响应性       │                             │
│                        │  (及时响应)      │                             │
│                        └────────┬────────┘                             │
│                                 │                                       │
│              ┌──────────────────┼──────────────────┐                   │
│              │                  │                  │                   │
│              ▼                  ▼                  ▼                   │
│   ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐         │
│   │    Resilient    │ │    Elastic      │ │ Message Driven  │         │
│   │     弹性        │ │     伸缩性       │ │    消息驱动     │         │
│   │  (故障恢复)     │ │  (负载适应)      │ │   (异步通信)    │         │
│   └─────────────────┘ └─────────────────┘ └─────────────────┘         │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

28.1.3 历史背景与发展脉络

时期里程碑代表人物/项目核心贡献
1997反应式编程概念Conal ElliottFRP概念提出
2000FRP实现Conal Elliott, Paul HudakHaskell FRP库
2009Rx.NETErik Meijer.NET反应式扩展
2012RxJSMicrosoftJavaScript反应式扩展
2013RxJavaNetflixJava反应式扩展
2014反应式宣言Reactive社区定义反应式系统特性
2015Project ReactorPivotalSpring反应式基础
2016Reactive Streams社区规范标准化背压处理
2017Kotlin FlowJetBrains协程反应式流
2019RxPY 3.xPython社区Python反应式扩展
2020+响应式微服务社区实践云原生反应式架构

28.2 Observable与Observer模式

28.2.1 UML类图

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<interface>>                                         │
│                    Observable<T>                                         │
├─────────────────────────────────────────────────────────────────────────┤
│ + subscribe(observer: Observer<T>): Disposable                          │
│ + pipe(*operators): Observable                                          │
└─────────────────────────────────────────────────────────────────────────┘

                                    │ implements

        ┌───────────────────────────┼───────────────────────────┐
        │                           │                           │
        ▼                           ▼                           ▼
┌───────────────┐          ┌───────────────┐          ┌───────────────┐
│   Subject<T>  │          │  ColdObservable│          │ HotObservable │
├───────────────┤          ├───────────────┤          ├───────────────┤
│ - observers   │          │ - generator   │          │ - multicast   │
│ - disposed    │          │ - subscribed  │          │ - refcount    │
├───────────────┤          ├───────────────┤          ├───────────────┤
│ + on_next()   │          │ + subscribe() │          │ + connect()   │
│ + on_error()  │          └───────────────┘          └───────────────┘
│ + on_complete()│
└───────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<interface>>                                         │
│                    Observer<T>                                           │
├─────────────────────────────────────────────────────────────────────────┤
│ + on_next(value: T): void                                               │
│ + on_error(error: Exception): void                                      │
│ + on_complete(): void                                                   │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                    <<interface>>                                         │
│                    Disposable                                            │
├─────────────────────────────────────────────────────────────────────────┤
│ + dispose(): void                                                       │
│ + is_disposed: bool                                                     │
└─────────────────────────────────────────────────────────────────────────┘

28.2.2 核心接口实现

python
from abc import ABC, abstractmethod
from typing import Callable, List, Any, Optional, Generic, TypeVar, Union
from dataclasses import dataclass, field
from enum import Enum, auto
from threading import Lock, RLock
from weakref import WeakSet
import time

T = TypeVar('T')
R = TypeVar('R')

class Disposable(ABC):
    @abstractmethod
    def dispose(self) -> None:
        pass
    
    @property
    @abstractmethod
    def is_disposed(self) -> bool:
        pass

class CompositeDisposable(Disposable):
    def __init__(self, *disposables: Disposable):
        self._disposables = list(disposables)
        self._disposed = False
        self._lock = RLock()
    
    def add(self, disposable: Disposable) -> None:
        with self._lock:
            if self._disposed:
                disposable.dispose()
            else:
                self._disposables.append(disposable)
    
    def dispose(self) -> None:
        with self._lock:
            if not self._disposed:
                self._disposed = True
                for d in self._disposables:
                    try:
                        d.dispose()
                    except Exception:
                        pass
                self._disposables.clear()
    
    @property
    def is_disposed(self) -> bool:
        return self._disposed

class SerialDisposable(Disposable):
    def __init__(self):
        self._disposable: Optional[Disposable] = None
        self._disposed = False
        self._lock = RLock()
    
    @property
    def disposable(self) -> Optional[Disposable]:
        return self._disposable
    
    @disposable.setter
    def disposable(self, value: Optional[Disposable]) -> None:
        with self._lock:
            if self._disposed:
                if value:
                    value.dispose()
            else:
                old = self._disposable
                self._disposable = value
                if old:
                    old.dispose()
    
    def dispose(self) -> None:
        with self._lock:
            if not self._disposed:
                self._disposed = True
                if self._disposable:
                    self._disposable.dispose()
                    self._disposable = None
    
    @property
    def is_disposed(self) -> bool:
        return self._disposed

class RefCountDisposable(Disposable):
    def __init__(self, disposable: Disposable):
        self._disposable = disposable
        self._count = 0
        self._disposed = False
        self._lock = RLock()
    
    def dispose(self) -> None:
        with self._lock:
            if not self._disposed:
                self._disposed = True
                if self._count == 0 and self._disposable:
                    self._disposable.dispose()
    
    @property
    def is_disposed(self) -> bool:
        return self._disposed
    
    def get_disposable(self) -> Disposable:
        with self._lock:
            if self._disposed:
                return EmptyDisposable()
            self._count += 1
            return _InnerDisposable(self)

class _InnerDisposable(Disposable):
    def __init__(self, ref: RefCountDisposable):
        self._ref = ref
        self._disposed = False
    
    def dispose(self) -> None:
        if not self._disposed:
            self._disposed = True
            with self._ref._lock:
                self._ref._count -= 1
                if self._ref._disposed and self._ref._count == 0:
                    if self._ref._disposable:
                        self._ref._disposable.dispose()
    
    @property
    def is_disposed(self) -> bool:
        return self._disposed

class EmptyDisposable(Disposable):
    def dispose(self) -> None:
        pass
    
    @property
    def is_disposed(self) -> bool:
        return True

class Observer(ABC, Generic[T]):
    @abstractmethod
    def on_next(self, value: T) -> None:
        pass
    
    @abstractmethod
    def on_error(self, error: Exception) -> None:
        pass
    
    @abstractmethod
    def on_complete(self) -> None:
        pass

class Observable(ABC, Generic[T]):
    @abstractmethod
    def subscribe(self, observer: Observer[T]) -> Disposable:
        pass
    
    def pipe(self, *operators: Callable[['Observable[T]'], 'Observable']) -> 'Observable':
        result = self
        for op in operators:
            result = op(result)
        return result

class AnonymousObserver(Observer[T]):
    def __init__(
        self, 
        on_next: Callable[[T], None] = None,
        on_error: Callable[[Exception], None] = None,
        on_complete: Callable[[], None] = None
    ):
        self._on_next = on_next or (lambda x: None)
        self._on_error = on_error or (lambda e: print(f"Unhandled error: {e}"))
        self._on_complete = on_complete or (lambda: None)
        self._stopped = False
    
    def on_next(self, value: T) -> None:
        if not self._stopped:
            try:
                self._on_next(value)
            except Exception as e:
                self.on_error(e)
    
    def on_error(self, error: Exception) -> None:
        if not self._stopped:
            self._stopped = True
            self._on_error(error)
    
    def on_complete(self) -> None:
        if not self._stopped:
            self._stopped = True
            self._on_complete()

class SafeObserver(Observer[T]):
    def __init__(self, observer: Observer[T], disposable: Disposable):
        self._observer = observer
        self._disposable = disposable
        self._stopped = False
    
    def on_next(self, value: T) -> None:
        if not self._stopped and not self._disposable.is_disposed:
            try:
                self._observer.on_next(value)
            except Exception as e:
                self.on_error(e)
    
    def on_error(self, error: Exception) -> None:
        if not self._stopped and not self._disposable.is_disposed:
            self._stopped = True
            try:
                self._observer.on_error(error)
            finally:
                self._disposable.dispose()
    
    def on_complete(self) -> None:
        if not self._stopped and not self._disposable.is_disposed:
            self._stopped = True
            try:
                self._observer.on_complete()
            finally:
                self._disposable.dispose()

28.2.3 Subject实现

python
from typing import List, Deque
from collections import deque
from threading import RLock

class Subject(Observable[T], Observer[T]):
    def __init__(self):
        self._observers: List[Observer[T]] = []
        self._lock = RLock()
        self._disposed = False
        self._stopped = False
        self._error: Optional[Exception] = None
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        with self._lock:
            if self._disposed:
                raise Exception("Subject has been disposed")
            
            if self._stopped:
                if self._error:
                    observer.on_error(self._error)
                else:
                    observer.on_complete()
                return EmptyDisposable()
            
            self._observers.append(observer)
            
            def dispose():
                with self._lock:
                    if observer in self._observers:
                        self._observers.remove(observer)
            
            return DisposableImpl(dispose)
    
    def on_next(self, value: T) -> None:
        with self._lock:
            if self._stopped or self._disposed:
                return
            observers = self._observers.copy()
        
        for observer in observers:
            try:
                observer.on_next(value)
            except Exception:
                pass
    
    def on_error(self, error: Exception) -> None:
        with self._lock:
            if self._stopped or self._disposed:
                return
            self._stopped = True
            self._error = error
            observers = self._observers.copy()
            self._observers.clear()
        
        for observer in observers:
            try:
                observer.on_error(error)
            except Exception:
                pass
    
    def on_complete(self) -> None:
        with self._lock:
            if self._stopped or self._disposed:
                return
            self._stopped = True
            observers = self._observers.copy()
            self._observers.clear()
        
        for observer in observers:
            try:
                observer.on_complete()
            except Exception:
                pass
    
    def dispose(self) -> None:
        with self._lock:
            self._disposed = True
            self._observers.clear()

class DisposableImpl(Disposable):
    def __init__(self, dispose_func: Callable[[], None]):
        self._dispose_func = dispose_func
        self._disposed = False
        self._lock = RLock()
    
    def dispose(self) -> None:
        with self._lock:
            if not self._disposed:
                self._disposed = True
                self._dispose_func()
    
    @property
    def is_disposed(self) -> bool:
        return self._disposed

class BehaviorSubject(Subject[T]):
    def __init__(self, initial_value: T):
        super().__init__()
        self._value = initial_value
    
    @property
    def value(self) -> T:
        with self._lock:
            if self._disposed:
                raise Exception("Subject has been disposed")
            return self._value
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        disposable = super().subscribe(observer)
        
        with self._lock:
            if not self._stopped and not self._disposed:
                current_value = self._value
        
        if not self._stopped and not self._disposed:
            try:
                observer.on_next(current_value)
            except Exception:
                pass
        
        return disposable
    
    def on_next(self, value: T) -> None:
        with self._lock:
            self._value = value
        super().on_next(value)

class ReplaySubject(Subject[T]):
    def __init__(self, buffer_size: Optional[int] = None, window_time: Optional[float] = None):
        super().__init__()
        self._buffer: Deque[tuple] = deque()
        self._buffer_size = buffer_size
        self._window_time = window_time
    
    def _trim_buffer(self) -> None:
        now = time.time()
        
        while self._buffer:
            if self._window_time is not None:
                if now - self._buffer[0][0] > self._window_time:
                    self._buffer.popleft()
                    continue
            
            if self._buffer_size is not None and len(self._buffer) > self._buffer_size:
                self._buffer.popleft()
                continue
            
            break
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        disposable = super().subscribe(observer)
        
        with self._lock:
            self._trim_buffer()
            buffer_copy = list(self._buffer)
        
        for timestamp, value in buffer_copy:
            try:
                observer.on_next(value)
            except Exception:
                pass
        
        return disposable
    
    def on_next(self, value: T) -> None:
        with self._lock:
            self._buffer.append((time.time(), value))
            self._trim_buffer()
        super().on_next(value)

class AsyncSubject(Subject[T]):
    def __init__(self):
        super().__init__()
        self._last_value: Optional[T] = None
        self._has_value = False
    
    def on_next(self, value: T) -> None:
        with self._lock:
            self._last_value = value
            self._has_value = True
    
    def on_complete(self) -> None:
        with self._lock:
            value = self._last_value
            has_value = self._has_value
            observers = self._observers.copy()
            self._observers.clear()
            self._stopped = True
        
        if has_value:
            for observer in observers:
                try:
                    observer.on_next(value)
                except Exception:
                    pass
        
        for observer in observers:
            try:
                observer.on_complete()
            except Exception:
                pass

subject = Subject[int]()

subject.subscribe(AnonymousObserver(on_next=lambda x: print(f"观察者1: {x}")))
subject.subscribe(AnonymousObserver(on_next=lambda x: print(f"观察者2: {x}")))

subject.on_next(1)
subject.on_next(2)
subject.on_next(3)
subject.on_complete()

print("\nBehaviorSubject示例:")
behavior = BehaviorSubject[int](0)
behavior.subscribe(AnonymousObserver(on_next=lambda x: print(f"初始订阅: {x}")))
behavior.on_next(10)
behavior.subscribe(AnonymousObserver(on_next=lambda x: print(f"后续订阅: {x}")))

28.3 操作符

28.3.1 操作符的形式化定义

定义 28.5(操作符):操作符 $op$ 是一个函数:

$$op: Observable \rightarrow Observable$$

操作符可分为:

  • 转换操作符:$map, flat_map, scan$
  • 过滤操作符:$filter, distinct, take$
  • 组合操作符:$merge, zip, combine_latest$
  • 错误处理操作符:$catch, retry, on_error_return$
  • 工具操作符:$do, delay, timeout$

28.3.2 转换操作符

python
from typing import Callable, TypeVar, Generic, List, Iterator, Iterable

class MapObservable(Observable[R]):
    def __init__(self, source: Observable[T], mapper: Callable[[T], R]):
        self._source = source
        self._mapper = mapper
    
    def subscribe(self, observer: Observer[R]) -> Disposable:
        class MapObserver(Observer[T]):
            def __init__(self, mapper: Callable[[T], R], downstream: Observer[R]):
                self._mapper = mapper
                self._downstream = downstream
            
            def on_next(self, value: T) -> None:
                try:
                    mapped = self._mapper(value)
                    self._downstream.on_next(mapped)
                except Exception as e:
                    self._downstream.on_error(e)
            
            def on_error(self, error: Exception) -> None:
                self._downstream.on_error(error)
            
            def on_complete(self) -> None:
                self._downstream.on_complete()
        
        return self._source.subscribe(MapObserver(self._mapper, observer))

class FlatMapObservable(Observable[R]):
    def __init__(
        self, 
        source: Observable[T], 
        mapper: Callable[[T], Observable[R]],
        max_concurrent: int = None
    ):
        self._source = source
        self._mapper = mapper
        self._max_concurrent = max_concurrent
    
    def subscribe(self, observer: Observer[R]) -> Disposable:
        composite = CompositeDisposable()
        lock = RLock()
        active_count = [1]
        completed = [False]
        error_occurred = [False]
        
        class FlatMapObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                with lock:
                    if error_occurred[0]:
                        return
                    active_count[0] += 1
                
                try:
                    inner_observable = self._mapper(value)
                except Exception as e:
                    with lock:
                        if not error_occurred[0]:
                            error_occurred[0] = True
                            observer.on_error(e)
                    return
                
                class InnerObserver(Observer[R]):
                    def on_next(self, inner_value: R) -> None:
                        with lock:
                            if not error_occurred[0]:
                                observer.on_next(inner_value)
                    
                    def on_error(self, error: Exception) -> None:
                        with lock:
                            if not error_occurred[0]:
                                error_occurred[0] = True
                                observer.on_error(error)
                    
                    def on_complete(self) -> None:
                        with lock:
                            active_count[0] -= 1
                            if completed[0] and active_count[0] == 0:
                                observer.on_complete()
                
                inner_disposable = inner_observable.subscribe(InnerObserver())
                composite.add(inner_disposable)
            
            def on_error(self, error: Exception) -> None:
                with lock:
                    if not error_occurred[0]:
                        error_occurred[0] = True
                        observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    completed[0] = True
                    active_count[0] -= 1
                    if active_count[0] == 0 and not error_occurred[0]:
                        observer.on_complete()
        
        outer_disposable = self._source.subscribe(FlatMapObserver())
        composite.add(outer_disposable)
        
        return composite

class ScanObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T], 
        accumulator: Callable[[T, T], T],
        seed: T = None
    ):
        self._source = source
        self._accumulator = accumulator
        self._seed = seed
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        accumulated = [self._seed]
        has_seed = self._seed is not None
        
        class ScanObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                try:
                    if has_seed or accumulated[0] is not None:
                        accumulated[0] = self._accumulator(accumulated[0], value)
                    else:
                        accumulated[0] = value
                        has_seed = True
                    observer.on_next(accumulated[0])
                except Exception as e:
                    observer.on_error(e)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(ScanObserver())

class GroupByObservable(Observable[Observable[T]]):
    def __init__(
        self, 
        source: Observable[T], 
        key_selector: Callable[[T], Any]
    ):
        self._source = source
        self._key_selector = key_selector
    
    def subscribe(self, observer: Observer[Observable[T]]) -> Disposable:
        groups: Dict[Any, Subject[T]] = {}
        lock = RLock()
        
        class GroupByObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                try:
                    key = self._key_selector(value)
                    
                    with lock:
                        if key not in groups:
                            group = Subject[T]()
                            groups[key] = group
                            observer.on_next(group)
                        else:
                            group = groups[key]
                    
                    group.on_next(value)
                except Exception as e:
                    observer.on_error(e)
            
            def on_error(self, error: Exception) -> None:
                with lock:
                    for group in groups.values():
                        group.on_error(error)
                observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    for group in groups.values():
                        group.on_complete()
                observer.on_complete()
        
        return self._source.subscribe(GroupByObserver())

def map_op(mapper: Callable[[T], R]) -> Callable[[Observable[T]], Observable[R]]:
    def apply(source: Observable[T]) -> Observable[R]:
        return MapObservable(source, mapper)
    return apply

def flat_map_op(
    mapper: Callable[[T], Observable[R]]
) -> Callable[[Observable[T]], Observable[R]]:
    def apply(source: Observable[T]) -> Observable[R]:
        return FlatMapObservable(source, mapper)
    return apply

def scan_op(
    accumulator: Callable[[T, T], T],
    seed: T = None
) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return ScanObservable(source, accumulator, seed)
    return apply

28.3.3 过滤操作符

python
class FilterObservable(Observable[T]):
    def __init__(self, source: Observable[T], predicate: Callable[[T], bool]):
        self._source = source
        self._predicate = predicate
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        class FilterObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                try:
                    if self._predicate(value):
                        observer.on_next(value)
                except Exception as e:
                    observer.on_error(e)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(FilterObserver())

class TakeObservable(Observable[T]):
    def __init__(self, source: Observable[T], count: int):
        self._source = source
        self._count = count
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        remaining = [self._count]
        
        class TakeObserver(Observer[T]):
            def __init__(self):
                self._disposable = None
            
            def set_disposable(self, d: Disposable) -> None:
                self._disposable = d
            
            def on_next(self, value: T) -> None:
                if remaining[0] > 0:
                    remaining[0] -= 1
                    observer.on_next(value)
                    if remaining[0] == 0:
                        observer.on_complete()
                        if self._disposable:
                            self._disposable.dispose()
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        take_observer = TakeObserver()
        disposable = self._source.subscribe(take_observer)
        take_observer.set_disposable(disposable)
        return disposable

class SkipObservable(Observable[T]):
    def __init__(self, source: Observable[T], count: int):
        self._source = source
        self._count = count
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        remaining = [self._count]
        
        class SkipObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                if remaining[0] > 0:
                    remaining[0] -= 1
                else:
                    observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(SkipObserver())

class DistinctObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T], 
        key_selector: Callable[[T], Any] = None
    ):
        self._source = source
        self._key_selector = key_selector or (lambda x: x)
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        seen: set = set()
        
        class DistinctObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                try:
                    key = self._key_selector(value)
                    if key not in seen:
                        seen.add(key)
                        observer.on_next(value)
                except Exception as e:
                    observer.on_error(e)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(DistinctObserver())

class DebounceObservable(Observable[T]):
    def __init__(self, source: Observable[T], due_time: float):
        self._source = source
        self._due_time = due_time
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        import threading
        last_value = [None]
        has_value = [False]
        timer = [None]
        lock = RLock()
        
        def emit():
            with lock:
                if has_value[0]:
                    observer.on_next(last_value[0])
                    has_value[0] = False
        
        class DebounceObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                with lock:
                    last_value[0] = value
                    has_value[0] = True
                    
                    if timer[0]:
                        timer[0].cancel()
                    
                    timer[0] = threading.Timer(self._due_time, emit)
                    timer[0].start()
            
            def on_error(self, error: Exception) -> None:
                with lock:
                    if timer[0]:
                        timer[0].cancel()
                observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    if timer[0]:
                        timer[0].cancel()
                    if has_value[0]:
                        observer.on_next(last_value[0])
                observer.on_complete()
        
        disposable = self._source.subscribe(DebounceObserver())
        
        def dispose():
            with lock:
                if timer[0]:
                    timer[0].cancel()
            disposable.dispose()
        
        return DisposableImpl(dispose)

class ThrottleObservable(Observable[T]):
    def __init__(self, source: Observable[T], duration: float):
        self._source = source
        self._duration = duration
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        import time
        last_emit_time = [0.0]
        
        class ThrottleObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                now = time.time()
                if now - last_emit_time[0] >= self._duration:
                    last_emit_time[0] = now
                    observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(ThrottleObserver())

def filter_op(predicate: Callable[[T], bool]) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return FilterObservable(source, predicate)
    return apply

def take_op(count: int) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return TakeObservable(source, count)
    return apply

def skip_op(count: int) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return SkipObservable(source, count)
    return apply

def distinct_op(
    key_selector: Callable[[T], Any] = None
) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return DistinctObservable(source, key_selector)
    return apply

def debounce_op(due_time: float) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return DebounceObservable(source, due_time)
    return apply

def throttle_op(duration: float) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return ThrottleObservable(source, duration)
    return apply

28.3.4 组合操作符

python
class MergeObservable(Observable[T]):
    def __init__(self, sources: List[Observable[T]]):
        self._sources = sources
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        composite = CompositeDisposable()
        lock = RLock()
        completed_count = [0]
        error_occurred = [False]
        
        class MergeObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                with lock:
                    if not error_occurred[0]:
                        observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                with lock:
                    if not error_occurred[0]:
                        error_occurred[0] = True
                        observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    completed_count[0] += 1
                    if completed_count[0] == len(self._sources) and not error_occurred[0]:
                        observer.on_complete()
        
        for source in self._sources:
            disposable = source.subscribe(MergeObserver())
            composite.add(disposable)
        
        return composite

class ZipObservable(Observable[tuple]):
    def __init__(self, sources: List[Observable]):
        self._sources = sources
    
    def subscribe(self, observer: Observer[tuple]) -> Disposable:
        composite = CompositeDisposable()
        lock = RLock()
        buffers: List[List] = [[] for _ in self._sources]
        completed = [False] * len(self._sources)
        
        class ZipObserver(Observer):
            def __init__(self, index: int):
                self._index = index
            
            def on_next(self, value) -> None:
                with lock:
                    buffers[self._index].append(value)
                    
                    while all(len(b) > 0 for b in buffers):
                        values = tuple(b.pop(0) for b in buffers)
                        observer.on_next(values)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    completed[self._index] = True
                    if all(completed):
                        observer.on_complete()
        
        for i, source in enumerate(self._sources):
            disposable = source.subscribe(ZipObserver(i))
            composite.add(disposable)
        
        return composite

class CombineLatestObservable(Observable[tuple]):
    def __init__(self, sources: List[Observable]):
        self._sources = sources
    
    def subscribe(self, observer: Observer[tuple]) -> Disposable:
        composite = CompositeDisposable()
        lock = RLock()
        latest: List[Optional[Any]] = [None] * len(self._sources)
        has_value = [False] * len(self._sources)
        completed = [False] * len(self._sources)
        
        def try_emit():
            if all(has_value):
                observer.on_next(tuple(latest))
        
        class CombineObserver(Observer):
            def __init__(self, index: int):
                self._index = index
            
            def on_next(self, value) -> None:
                with lock:
                    latest[self._index] = value
                    has_value[self._index] = True
                    try_emit()
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    completed[self._index] = True
                    if all(completed):
                        observer.on_complete()
        
        for i, source in enumerate(self._sources):
            disposable = source.subscribe(CombineObserver(i))
            composite.add(disposable)
        
        return composite

class ConcatObservable(Observable[T]):
    def __init__(self, sources: List[Observable[T]]):
        self._sources = sources
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        serial = SerialDisposable()
        lock = RLock()
        index = [0]
        disposed = [False]
        
        def subscribe_next():
            if disposed[0] or index[0] >= len(self._sources):
                if not disposed[0]:
                    observer.on_complete()
                return
            
            current_index = index[0]
            index[0] += 1
            
            class ConcatObserver(Observer[T]):
                def on_next(self, value: T) -> None:
                    observer.on_next(value)
                
                def on_error(self, error: Exception) -> None:
                    observer.on_error(error)
                
                def on_complete(self) -> None:
                    subscribe_next()
            
            serial.disposable = self._sources[current_index].subscribe(ConcatObserver())
        
        subscribe_next()
        
        def dispose():
            disposed[0] = True
            serial.dispose()
        
        return DisposableImpl(dispose)

class RaceObservable(Observable[T]):
    def __init__(self, sources: List[Observable[T]]):
        self._sources = sources
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        composite = CompositeDisposable()
        lock = RLock()
        winner = [None]
        
        class RaceObserver(Observer[T]):
            def __init__(self, index: int):
                self._index = index
            
            def on_next(self, value: T) -> None:
                with lock:
                    if winner[0] is None:
                        winner[0] = self._index
                    elif winner[0] != self._index:
                        return
                observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                with lock:
                    if winner[0] is not None and winner[0] != self._index:
                        return
                observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    if winner[0] is not None and winner[0] != self._index:
                        return
                observer.on_complete()
        
        for i, source in enumerate(self._sources):
            disposable = source.subscribe(RaceObserver(i))
            composite.add(disposable)
        
        return composite

def merge_op(*sources: Observable[T]) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return MergeObservable([source] + list(sources))
    return apply

def zip_op(*sources: Observable) -> Callable[[Observable], Observable[tuple]]:
    def apply(source: Observable) -> Observable[tuple]:
        return ZipObservable([source] + list(sources))
    return apply

def combine_latest_op(*sources: Observable) -> Callable[[Observable], Observable[tuple]]:
    def apply(source: Observable) -> Observable[tuple]:
        return CombineLatestObservable([source] + list(sources))
    return apply

def concat_op(*sources: Observable[T]) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return ConcatObservable([source] + list(sources))
    return apply

28.4 背压处理

28.4.1 背压策略的形式化定义

定义 28.6(背压):背压是一种流量控制机制,定义了消费者如何向生产者反馈处理能力:

$$Backpressure: Consumer \rightarrow Producer$$

定义 28.7(背压策略)

策略形式化描述行为
BUFFER$buffer(v) \rightarrow queue.push(v)$缓冲所有值直到溢出
DROP$drop(v) \rightarrow \emptyset$丢弃超出处理能力的值
LATEST$latest(v) \rightarrow current = v$只保留最新值
ERROR$error(v) \rightarrow raise$溢出时抛出异常

28.4.2 背压实现

python
from enum import Enum, auto
from typing import Deque, Optional
from collections import deque

class BackpressureStrategy(Enum):
    BUFFER = auto()
    DROP = auto()
    LATEST = auto()
    ERROR = auto()

class BackpressureObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T], 
        strategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
        buffer_size: int = 128
    ):
        self._source = source
        self._strategy = strategy
        self._buffer_size = buffer_size
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        buffer: Deque[T] = deque()
        latest: List[Optional[T]] = [None]
        has_latest = [False]
        disposed = [False]
        paused = [False]
        overflow = [False]
        lock = RLock()
        
        class BackpressureObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                if disposed[0]:
                    return
                
                with lock:
                    if self._strategy == BackpressureStrategy.BUFFER:
                        if len(buffer) >= self._buffer_size:
                            overflow[0] = True
                            observer.on_error(Exception(
                                f"Buffer overflow: size={self._buffer_size}"
                            ))
                            return
                        buffer.append(value)
                    
                    elif self._strategy == BackpressureStrategy.DROP:
                        if not paused[0]:
                            observer.on_next(value)
                    
                    elif self._strategy == BackpressureStrategy.LATEST:
                        latest[0] = value
                        has_latest[0] = True
                        if not paused[0]:
                            observer.on_next(value)
                    
                    elif self._strategy == BackpressureStrategy.ERROR:
                        if paused[0]:
                            observer.on_error(Exception(
                                "Downstream cannot keep up"
                            ))
                            return
                        observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                if not disposed[0]:
                    observer.on_error(error)
            
            def on_complete(self) -> None:
                if not disposed[0]:
                    with lock:
                        while buffer and not disposed[0]:
                            observer.on_next(buffer.popleft())
                    observer.on_complete()
        
        disposable = self._source.subscribe(BackpressureObserver())
        
        def dispose():
            disposed[0] = True
            disposable.dispose()
        
        return DisposableImpl(dispose)

class Flowable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T],
        initial_request: int = 128
    ):
        self._source = source
        self._initial_request = initial_request
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        requested = [self._initial_request]
        produced = [0]
        lock = RLock()
        
        class FlowableObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                with lock:
                    produced[0] += 1
                
                observer.on_next(value)
                
                with lock:
                    if produced[0] >= requested[0]:
                        self._request_more(requested[0])
                        produced[0] = 0
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
            
            def _request_more(self, n: int) -> None:
                pass
        
        return self._source.subscribe(FlowableObserver())
    
    def request(self, n: int) -> None:
        pass

class ThrottledObservable(Observable[T]):
    def __init__(
        self,
        source: Observable[T],
        max_rate: int,
        window_ms: float = 1000.0
    ):
        self._source = source
        self._max_rate = max_rate
        self._window_ms = window_ms
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        import time
        timestamps: List[float] = []
        lock = RLock()
        
        class ThrottledObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                now = time.time()
                
                with lock:
                    cutoff = now - self._window_ms / 1000.0
                    while timestamps and timestamps[0] < cutoff:
                        timestamps.pop(0)
                    
                    if len(timestamps) < self._max_rate:
                        timestamps.append(now)
                        observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(ThrottledObserver())

class BatchObservable(Observable[List[T]]):
    def __init__(
        self,
        source: Observable[T],
        batch_size: int = 100,
        max_wait_ms: float = 1000.0
    ):
        self._source = source
        self._batch_size = batch_size
        self._max_wait_ms = max_wait_ms
    
    def subscribe(self, observer: Observer[List[T]]) -> Disposable:
        import time
        import threading
        batch: List[T] = []
        last_emit = [time.time()]
        lock = RLock()
        timer = [None]
        
        def emit_batch():
            with lock:
                if batch:
                    observer.on_next(batch.copy())
                    batch.clear()
                    last_emit[0] = time.time()
        
        def schedule_emit():
            with lock:
                if timer[0]:
                    timer[0].cancel()
                timer[0] = threading.Timer(
                    self._max_wait_ms / 1000.0,
                    emit_batch
                )
                timer[0].start()
        
        class BatchObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                with lock:
                    batch.append(value)
                    
                    if len(batch) >= self._batch_size:
                        emit_batch()
                    else:
                        schedule_emit()
            
            def on_error(self, error: Exception) -> None:
                with lock:
                    if timer[0]:
                        timer[0].cancel()
                    if batch:
                        observer.on_next(batch)
                observer.on_error(error)
            
            def on_complete(self) -> None:
                with lock:
                    if timer[0]:
                        timer[0].cancel()
                    if batch:
                        observer.on_next(batch)
                observer.on_complete()
        
        disposable = self._source.subscribe(BatchObserver())
        
        def dispose():
            with lock:
                if timer[0]:
                    timer[0].cancel()
            disposable.dispose()
        
        return DisposableImpl(dispose)

28.5 错误处理

28.5.1 错误处理操作符

python
class CatchObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T],
        handler: Callable[[Exception], Observable[T]]
    ):
        self._source = source
        self._handler = handler
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        serial = SerialDisposable()
        
        class CatchObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                try:
                    fallback = self._handler(error)
                    serial.disposable = fallback.subscribe(observer)
                except Exception as e:
                    observer.on_error(e)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        serial.disposable = self._source.subscribe(CatchObserver())
        return serial

class RetryObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T],
        count: Optional[int] = None,
        delay: float = 0.0
    ):
        self._source = source
        self._count = count
        self._delay = delay
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        import time
        serial = SerialDisposable()
        attempts = [0]
        disposed = [False]
        
        def subscribe_source():
            if disposed[0]:
                return
            
            if self._count is not None and attempts[0] >= self._count:
                return
            
            attempts[0] += 1
            
            class RetryObserver(Observer[T]):
                def on_next(self, value: T) -> None:
                    observer.on_next(value)
                
                def on_error(self, error: Exception) -> None:
                    if self._count is not None and attempts[0] >= self._count:
                        observer.on_error(error)
                    else:
                        if self._delay > 0:
                            time.sleep(self._delay)
                        subscribe_source()
                
                def on_complete(self) -> None:
                    observer.on_complete()
            
            serial.disposable = self._source.subscribe(RetryObserver())
        
        subscribe_source()
        
        def dispose():
            disposed[0] = True
            serial.dispose()
        
        return DisposableImpl(dispose)

class OnErrorReturnObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T],
        value_supplier: Callable[[Exception], T]
    ):
        self._source = source
        self._value_supplier = value_supplier
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        class OnErrorReturnObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                try:
                    fallback_value = self._value_supplier(error)
                    observer.on_next(fallback_value)
                    observer.on_complete()
                except Exception as e:
                    observer.on_error(e)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        return self._source.subscribe(OnErrorReturnObserver())

class OnErrorResumeNextObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T],
        fallback: Observable[T]
    ):
        self._source = source
        self._fallback = fallback
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        serial = SerialDisposable()
        
        class ResumeObserver(Observer[T]):
            def on_next(self, value: T) -> None:
                observer.on_next(value)
            
            def on_error(self, error: Exception) -> None:
                serial.disposable = self._fallback.subscribe(observer)
            
            def on_complete(self) -> None:
                observer.on_complete()
        
        serial.disposable = self._source.subscribe(ResumeObserver())
        return serial

class RetryWhenObservable(Observable[T]):
    def __init__(
        self, 
        source: Observable[T],
        handler: Callable[[Observable[Exception]], Observable[Any]]
    ):
        self._source = source
        self._handler = handler
    
    def subscribe(self, observer: Observer[T]) -> Disposable:
        errors = Subject[Exception]()
        serial = SerialDisposable()
        disposed = [False]
        
        def subscribe_source():
            if disposed[0]:
                return
            
            class RetryWhenObserver(Observer[T]):
                def on_next(self, value: T) -> None:
                    observer.on_next(value)
                
                def on_error(self, error: Exception) -> None:
                    errors.on_next(error)
                
                def on_complete(self) -> None:
                    observer.on_complete()
            
            serial.disposable = self._source.subscribe(RetryWhenObserver())
        
        class HandlerObserver(Observer[Any]):
            def on_next(self, value: Any) -> None:
                subscribe_source()
            
            def on_error(self, error: Exception) -> None:
                observer.on_error(error)
            
            def on_complete(self) -> None:
                pass
        
        retry_observable = self._handler(errors)
        retry_disposable = retry_observable.subscribe(HandlerObserver())
        
        subscribe_source()
        
        def dispose():
            disposed[0] = True
            serial.dispose()
            retry_disposable.dispose()
        
        return DisposableImpl(dispose)

def catch_op(
    handler: Callable[[Exception], Observable[T]]
) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return CatchObservable(source, handler)
    return apply

def retry_op(
    count: Optional[int] = None,
    delay: float = 0.0
) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return RetryObservable(source, count, delay)
    return apply

def on_error_return_op(
    value_supplier: Callable[[Exception], T]
) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return OnErrorReturnObservable(source, value_supplier)
    return apply

def on_error_resume_next_op(
    fallback: Observable[T]
) -> Callable[[Observable[T]], Observable[T]]:
    def apply(source: Observable[T]) -> Observable[T]:
        return OnErrorResumeNextObservable(source, fallback)
    return apply

28.6 异步反应式编程

28.6.1 异步Observable

python
import asyncio
from typing import AsyncIterator, AsyncGenerator, Awaitable

class AsyncObservable(Generic[T]):
    def __init__(self, generator: AsyncGenerator[T, None]):
        self._generator = generator
    
    async def subscribe(
        self, 
        on_next: Callable[[T], Awaitable[None]] = None,
        on_error: Callable[[Exception], Awaitable[None]] = None,
        on_complete: Callable[[], Awaitable[None]] = None
    ) -> None:
        try:
            async for value in self._generator:
                if on_next:
                    await on_next(value)
            if on_complete:
                await on_complete()
        except Exception as e:
            if on_error:
                await on_error(e)
    
    def map(self, mapper: Callable[[T], R]) -> 'AsyncObservable[R]':
        async def mapped_generator():
            async for value in self._generator:
                result = mapper(value)
                if asyncio.iscoroutine(result):
                    yield await result
                else:
                    yield result
        return AsyncObservable(mapped_generator())
    
    def filter(self, predicate: Callable[[T], bool]) -> 'AsyncObservable[T]':
        async def filtered_generator():
            async for value in self._generator:
                if predicate(value):
                    yield value
        return AsyncObservable(filtered_generator())
    
    async def to_list(self) -> List[T]:
        result = []
        async for value in self._generator:
            result.append(value)
        return result
    
    async def first(self) -> Optional[T]:
        async for value in self._generator:
            return value
        return None
    
    async def reduce(
        self, 
        accumulator: Callable[[T, T], T],
        initial: T = None
    ) -> T:
        result = initial
        async for value in self._generator:
            if result is None:
                result = value
            else:
                result = accumulator(result, value)
        return result

async def interval(period: float, count: int = None) -> AsyncGenerator[int, None]:
    i = 0
    while count is None or i < count:
        await asyncio.sleep(period)
        yield i
        i += 1

async def from_iterable(iterable: Iterable[T]) -> AsyncGenerator[T, None]:
    for item in iterable:
        yield item

async def from_future(future: Awaitable[T]) -> AsyncGenerator[T, None]:
    yield await future

class AsyncSubject(Generic[T]):
    def __init__(self):
        self._observers: List[Callable[[T], Awaitable[None]]] = []
        self._lock = asyncio.Lock()
        self._value: Optional[T] = None
        self._has_value = False
        self._completed = False
        self._error: Optional[Exception] = None
    
    async def subscribe(
        self, 
        on_next: Callable[[T], Awaitable[None]]
    ) -> Callable[[], None]:
        async with self._lock:
            self._observers.append(on_next)
            if self._has_value and not self._completed:
                await on_next(self._value)
        
        def unsubscribe():
            if on_next in self._observers:
                self._observers.remove(on_next)
        
        return unsubscribe
    
    async def on_next(self, value: T) -> None:
        async with self._lock:
            self._value = value
            self._has_value = True
            observers = self._observers.copy()
        
        for observer in observers:
            await observer(value)
    
    async def on_error(self, error: Exception) -> None:
        async with self._lock:
            self._error = error
            self._completed = True
            observers = self._observers.copy()
            self._observers.clear()
        
        for observer in observers:
            if asyncio.iscoroutinefunction(observer):
                pass
    
    async def on_complete(self) -> None:
        async with self._lock:
            self._completed = True
            self._observers.clear()

class AsyncEventBus:
    def __init__(self):
        self._subjects: Dict[str, AsyncSubject] = {}
        self._lock = asyncio.Lock()
    
    async def get_subject(self, name: str) -> AsyncSubject:
        async with self._lock:
            if name not in self._subjects:
                self._subjects[name] = AsyncSubject()
            return self._subjects[name]
    
    async def publish(self, name: str, value: Any) -> None:
        subject = await self.get_subject(name)
        await subject.on_next(value)
    
    async def subscribe(
        self, 
        name: str, 
        handler: Callable[[Any], Awaitable[None]]
    ) -> Callable[[], None]:
        subject = await self.get_subject(name)
        return await subject.subscribe(handler)

async def async_demo():
    observable = AsyncObservable(interval(0.1, 10))
    
    result = await (observable
        .filter(lambda x: x % 2 == 0)
        .map(lambda x: x ** 2)
        .to_list())
    
    print(f"异步处理结果: {result}")
    
    bus = AsyncEventBus()
    
    async def handle_order(order):
        print(f"处理订单: {order}")
    
    await bus.subscribe("order_created", handle_order)
    await bus.publish("order_created", {"id": "ORD-001"})
    await bus.publish("order_created", {"id": "ORD-002"})

asyncio.run(async_demo())

28.7 实际应用示例

28.7.1 实时数据流处理

python
import asyncio
import random
from typing import AsyncGenerator
from dataclasses import dataclass
from datetime import datetime

@dataclass
class StockPrice:
    symbol: str
    price: float
    change: float
    volume: int
    timestamp: datetime

async def stock_price_stream(
    symbol: str, 
    interval: float = 0.5
) -> AsyncGenerator[StockPrice, None]:
    price = 100.0 + random.uniform(-10, 10)
    
    while True:
        await asyncio.sleep(interval)
        change = random.uniform(-2, 2)
        price += change
        volume = random.randint(1000, 10000)
        
        yield StockPrice(
            symbol=symbol,
            price=round(price, 2),
            change=round(change, 2),
            volume=volume,
            timestamp=datetime.now()
        )

class StockMonitor:
    def __init__(self):
        self._subscriptions: Dict[str, asyncio.Task] = {}
        self._lock = asyncio.Lock()
    
    async def subscribe(
        self, 
        symbol: str, 
        callback: Callable[[StockPrice], Awaitable[None]]
    ) -> Callable[[], None]:
        async def consume():
            async for price in stock_price_stream(symbol):
                await callback(price)
        
        task = asyncio.create_task(consume())
        
        async with self._lock:
            self._subscriptions[symbol] = task
        
        def unsubscribe():
            task.cancel()
            if symbol in self._subscriptions:
                del self._subscriptions[symbol]
        
        return unsubscribe
    
    async def stop_all(self) -> None:
        async with self._lock:
            for task in self._subscriptions.values():
                task.cancel()
            self._subscriptions.clear()

class PriceAlertSystem:
    def __init__(self, monitor: StockMonitor):
        self._monitor = monitor
        self._alerts: Dict[str, List[tuple]] = {}
        self._subjects: Dict[str, AsyncSubject] = {}
    
    def add_alert(
        self, 
        symbol: str, 
        threshold: float, 
        direction: str = "above"
    ) -> AsyncObservable[StockPrice]:
        if symbol not in self._alerts:
            self._alerts[symbol] = []
        self._alerts[symbol].append((threshold, direction))
        
        if symbol not in self._subjects:
            self._subjects[symbol] = AsyncSubject()
        
        return AsyncObservable(self._subjects[symbol]._generator)
    
    async def start(self, symbol: str) -> None:
        async def on_price(price: StockPrice):
            if symbol in self._alerts:
                for threshold, direction in self._alerts[symbol]:
                    triggered = (
                        (direction == "above" and price.price > threshold) or
                        (direction == "below" and price.price < threshold)
                    )
                    if triggered and symbol in self._subjects:
                        await self._subjects[symbol].on_next(price)
        
        await self._monitor.subscribe(symbol, on_price)

async def stock_monitoring_demo():
    monitor = StockMonitor()
    alerts = PriceAlertSystem(monitor)
    
    async def on_price_update(price: StockPrice):
        print(f"[{price.symbol}] 价格: {price.price:.2f}, 变化: {price.change:+.2f}")
    
    unsub_aapl = await monitor.subscribe("AAPL", on_price_update)
    unsub_googl = await monitor.subscribe("GOOGL", on_price_update)
    
    await asyncio.sleep(3)
    
    unsub_aapl()
    print("已取消AAPL订阅")
    
    await asyncio.sleep(2)
    await monitor.stop_all()

asyncio.run(stock_monitoring_demo())

28.7.2 事件驱动架构

python
from dataclasses import dataclass
from typing import Any, Dict
from enum import Enum, auto

class EventType(Enum):
    ORDER_CREATED = auto()
    ORDER_CONFIRMED = auto()
    PAYMENT_RECEIVED = auto()
    ORDER_SHIPPED = auto()
    ORDER_DELIVERED = auto()

@dataclass
class Event:
    type: EventType
    data: Dict[str, Any]
    timestamp: datetime
    correlation_id: str
    causation_id: Optional[str] = None

class ReactiveEventBus:
    def __init__(self):
        self._subjects: Dict[EventType, Subject[Event]] = {}
        self._lock = RLock()
    
    def _get_subject(self, event_type: EventType) -> Subject[Event]:
        with self._lock:
            if event_type not in self._subjects:
                self._subjects[event_type] = Subject[Event]()
            return self._subjects[event_type]
    
    def publish(self, event: Event) -> None:
        subject = self._get_subject(event.type)
        subject.on_next(event)
    
    def subscribe(
        self, 
        event_type: EventType,
        handler: Callable[[Event], None]
    ) -> Disposable:
        subject = self._get_subject(event_type)
        return subject.subscribe(AnonymousObserver(on_next=handler))
    
    def subscribe_all(self, handler: Callable[[Event], None]) -> CompositeDisposable:
        composite = CompositeDisposable()
        for event_type in EventType:
            composite.add(self.subscribe(event_type, handler))
        return composite
    
    def get_observable(self, event_type: EventType) -> Observable[Event]:
        return self._get_subject(event_type)

class OrderSaga:
    def __init__(self, event_bus: ReactiveEventBus):
        self._event_bus = event_bus
        self._orders: Dict[str, Dict] = {}
        self._setup_handlers()
    
    def _setup_handlers(self) -> None:
        self._event_bus.subscribe(
            EventType.ORDER_CREATED,
            self._handle_order_created
        )
        
        self._event_bus.subscribe(
            EventType.PAYMENT_RECEIVED,
            self._handle_payment_received
        )
        
        self._event_bus.subscribe(
            EventType.ORDER_SHIPPED,
            self._handle_order_shipped
        )
    
    def _handle_order_created(self, event: Event) -> None:
        order_id = event.data["order_id"]
        self._orders[order_id] = {
            "status": "created",
            "created_at": event.timestamp
        }
        print(f"[Saga] 订单创建: {order_id}")
    
    def _handle_payment_received(self, event: Event) -> None:
        order_id = event.data["order_id"]
        if order_id in self._orders:
            self._orders[order_id]["status"] = "paid"
            self._orders[order_id]["paid_at"] = event.timestamp
            print(f"[Saga] 订单支付: {order_id}")
            
            self._event_bus.publish(Event(
                type=EventType.ORDER_CONFIRMED,
                data={"order_id": order_id},
                timestamp=datetime.now(),
                correlation_id=event.correlation_id,
                causation_id=event.event_id if hasattr(event, 'event_id') else None
            ))
    
    def _handle_order_shipped(self, event: Event) -> None:
        order_id = event.data["order_id"]
        if order_id in self._orders:
            self._orders[order_id]["status"] = "shipped"
            self._orders[order_id]["shipped_at"] = event.timestamp
            print(f"[Saga] 订单发货: {order_id}")

class EventLogger:
    def __init__(self, event_bus: ReactiveEventBus):
        self._disposable = event_bus.subscribe_all(self._log_event)
    
    def _log_event(self, event: Event) -> None:
        print(f"[LOG] {event.type.name}: {event.data}")
    
    def stop(self) -> None:
        self._disposable.dispose()

def event_driven_demo():
    event_bus = ReactiveEventBus()
    saga = OrderSaga(event_bus)
    logger = EventLogger(event_bus)
    
    import uuid
    
    order_id = str(uuid.uuid4())[:8]
    correlation_id = str(uuid.uuid4())
    
    event_bus.publish(Event(
        type=EventType.ORDER_CREATED,
        data={"order_id": order_id, "customer_id": "CUST-001", "total": 199.99},
        timestamp=datetime.now(),
        correlation_id=correlation_id
    ))
    
    event_bus.publish(Event(
        type=EventType.PAYMENT_RECEIVED,
        data={"order_id": order_id, "amount": 199.99, "method": "credit_card"},
        timestamp=datetime.now(),
        correlation_id=correlation_id
    ))
    
    event_bus.publish(Event(
        type=EventType.ORDER_SHIPPED,
        data={"order_id": order_id, "tracking_number": "SF123456789"},
        timestamp=datetime.now(),
        correlation_id=correlation_id
    ))
    
    logger.stop()

event_driven_demo()

28.8 反模式与最佳实践

28.8.1 常见反模式

反模式描述后果解决方案
订阅泄漏未取消订阅内存泄漏使用CompositeDisposable
阻塞操作在Observer中阻塞性能下降使用异步操作符
过度嵌套多层flat_map代码难以理解使用compose/pipe
忽略错误未处理on_error异常丢失添加错误处理操作符
无限流未限制数据流资源耗尽使用take、timeout
冷热混淆混淆冷热Observable意外行为明确区分并正确使用

28.8.2 最佳实践

python
class ReactiveBestPractices:
    @staticmethod
    def use_composite_disposable():
        composite = CompositeDisposable()
        
        subject = Subject[int]()
        
        composite.add(subject.subscribe(AnonymousObserver(
            on_next=lambda x: print(f"订阅1: {x}")
        )))
        
        composite.add(subject.subscribe(AnonymousObserver(
            on_next=lambda x: print(f"订阅2: {x}")
        )))
        
        subject.on_next(1)
        subject.on_next(2)
        
        composite.dispose()
        print("所有订阅已取消")
    
    @staticmethod
    def handle_errors_properly():
        subject = Subject[int]()
        
        safe_observable = subject.pipe(
            catch_op(lambda e: Observable.from_iterable([0])),
            retry_op(count=3),
            on_error_return_op(lambda e: -1)
        )
        
        safe_observable.subscribe(AnonymousObserver(
            on_next=lambda x: print(f"安全值: {x}"),
            on_error=lambda e: print(f"最终错误: {e}")
        ))
    
    @staticmethod
    def limit_infinite_streams():
        def create_infinite():
            i = 0
            while True:
                yield i
                i += 1
        
        from_iterable_observable = type(
            'FromIterableObservable',
            (Observable[int],),
            {'subscribe': lambda self, obs: (
                [obs.on_next(v) for v in create_infinite()[:10]],
                obs.on_complete(),
                EmptyDisposable()
            )[2]}
        )
        
        print("使用take限制无限流")

class ObservableFactory:
    @staticmethod
    def from_iterable(iterable: Iterable[T]) -> Observable[T]:
        class IterableObservable(Observable[T]):
            def subscribe(self, observer: Observer[T]) -> Disposable:
                try:
                    for value in iterable:
                        observer.on_next(value)
                    observer.on_complete()
                except Exception as e:
                    observer.on_error(e)
                return EmptyDisposable()
        
        return IterableObservable()
    
    @staticmethod
    def range(start: int, count: int) -> Observable[int]:
        return ObservableFactory.from_iterable(range(start, start + count))
    
    @staticmethod
    def just(value: T) -> Observable[T]:
        return ObservableFactory.from_iterable([value])
    
    @staticmethod
    def empty() -> Observable[T]:
        return ObservableFactory.from_iterable([])
    
    @staticmethod
    def error(error: Exception) -> Observable[T]:
        class ErrorObservable(Observable[T]):
            def subscribe(self, observer: Observer[T]) -> Disposable:
                observer.on_error(error)
                return EmptyDisposable()
        
        return ErrorObservable()
    
    @staticmethod
    def never() -> Observable[T]:
        class NeverObservable(Observable[T]):
            def subscribe(self, observer: Observer[T]) -> Disposable:
                return EmptyDisposable()
        
        return NeverObservable()
    
    @staticmethod
    def interval(period: float, count: int = None) -> Observable[int]:
        import time
        import threading
        
        class IntervalObservable(Observable[int]):
            def subscribe(self, observer: Observer[int]) -> Disposable:
                disposed = [False]
                
                def run():
                    i = 0
                    while not disposed[0] and (count is None or i < count):
                        time.sleep(period)
                        if not disposed[0]:
                            observer.on_next(i)
                            i += 1
                    if not disposed[0]:
                        observer.on_complete()
                
                thread = threading.Thread(target=run, daemon=True)
                thread.start()
                
                def dispose():
                    disposed[0] = True
                
                return DisposableImpl(dispose)
        
        return IntervalObservable()

Observable.from_iterable = staticmethod(ObservableFactory.from_iterable)
Observable.range = staticmethod(ObservableFactory.range)
Observable.just = staticmethod(ObservableFactory.just)
Observable.empty = staticmethod(ObservableFactory.empty)
Observable.error = staticmethod(ObservableFactory.error)
Observable.never = staticmethod(ObservableFactory.never)
Observable.interval = staticmethod(ObservableFactory.interval)

28.9 决策指南

28.9.1 Observable类型选择

                    ┌─────────────────────────────────────┐
                    │      是否需要多播?                  │
                    └─────────────────────────────────────┘

                    ┌───────────────┴───────────────┐
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │     否        │               │     是        │
            └───────────────┘               └───────────────┘
                    │                               │
                    ▼                               ▼
            ┌───────────────┐               ┌───────────────┐
            │ Cold Observable│               │ Hot Observable │
            │ (每次订阅独立)  │               │ (多播给订阅者) │
            └───────────────┘               └───────────────┘

                                    ┌───────────────┴───────────────┐
                                    │                               │
                                    ▼                               ▼
                            ┌───────────────┐               ┌───────────────┐
                            │    Subject    │               │   multicast   │
                            │  (手动控制)   │               │  (自动多播)   │
                            └───────────────┘               └───────────────┘

28.9.2 Subject类型选择

Subject类型特点适用场景
Subject无初始值,无缓冲简单事件广播
BehaviorSubject有初始值,新订阅者收到当前值状态管理
ReplaySubject缓冲历史值需要重放历史
AsyncSubject只发射最后一个值异步结果

28.9.3 操作符选择指南

需求推荐操作符说明
转换值map一对一映射
展平嵌套flat_map / switch_map一对多映射
过滤值filter条件过滤
取前N个take限制数量
去重distinct消除重复
防抖debounce用户输入
限流throttle速率限制
组合多个流merge / zip / combine_latest并行处理
错误恢复catch / retry容错处理

28.10 快速参考卡片

28.10.1 核心概念速查

┌─────────────────────────────────────────────────────────────────────────┐
│                        反应式编程核心概念速查表                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Observable生命周期                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ subscribe → on_next* → (on_error | on_complete) → unsubscribe  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  Observer契约                                                           │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ 1. on_next可以调用零次或多次                                     │   │
│  │ 2. on_error或on_complete最多调用一次                             │   │
│  │ 3. on_error/on_complete后不再调用其他方法                        │   │
│  │ 4. 必须正确处理Disposable                                        │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  冷 vs 热Observable                                                     │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ Cold: 每次订阅独立执行,数据不共享                               │   │
│  │ Hot:  多播数据给所有订阅者,共享执行                             │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  背压策略                                                               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │ BUFFER: 缓冲所有值直到溢出                                       │   │
│  │ DROP:  丢弃超出处理能力的值                                      │   │
│  │ LATEST: 只保留最新值                                             │   │
│  │ ERROR: 溢出时抛出异常                                            │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

28.10.2 常用操作符速查

python
OBSERVABLE_CREATION = {
    "just": "创建发射单个值的Observable",
    "range": "创建发射整数序列的Observable",
    "interval": "创建定时发射的Observable",
    "from_iterable": "从可迭代对象创建Observable",
    "empty": "创建立即完成的Observable",
    "error": "创建立即错误的Observable",
    "never": "创建永不发射的Observable",
}

TRANSFORMING_OPERATORS = {
    "map": "一对一转换",
    "flat_map": "一对多展平",
    "switch_map": "切换到最新内部Observable",
    "scan": "累积计算",
    "group_by": "分组",
}

FILTERING_OPERATORS = {
    "filter": "条件过滤",
    "take": "取前N个",
    "skip": "跳过前N个",
    "distinct": "去重",
    "debounce": "防抖",
    "throttle": "限流",
    "first": "取第一个",
    "last": "取最后一个",
}

COMBINING_OPERATORS = {
    "merge": "合并多个Observable",
    "zip": "配对组合",
    "combine_latest": "最新值组合",
    "concat": "顺序连接",
    "race": "竞争",
}

ERROR_HANDLING_OPERATORS = {
    "catch": "捕获错误并切换",
    "retry": "重试",
    "on_error_return": "错误时返回默认值",
    "retry_when": "条件重试",
}

28.11 小结

28.11.1 核心要点

  1. Observable模式:定义数据流的生产和消费契约
  2. Observer契约:规范数据接收、错误处理和完成通知
  3. 操作符:声明式地处理数据流变换
  4. Subject:实现多播和状态共享
  5. 背压:控制数据流速,防止消费者过载
  6. 错误处理:优雅地处理和恢复错误

28.11.2 适用场景

适用不适用
实时数据处理简单同步操作
用户界面事件批处理任务
IoT数据流低延迟要求
微服务通信资源受限环境
事件驱动架构简单CRUD应用

28.11.3 实施建议

  1. 理解契约:严格遵守Observer契约
  2. 管理订阅:使用CompositeDisposable防止泄漏
  3. 处理错误:始终添加错误处理操作符
  4. 选择策略:根据场景选择合适的背压策略
  5. 保持简单:避免过度嵌套,使用pipe组合操作符

Python技术丛书 - 江苏省宿城中等专业学校