第28章 反应式编程模式
学习目标
完成本章学习后,读者将能够:
- 理解反应式编程的核心概念与理论基础
- 掌握Observable和Observer的数学定义与实现
- 使用操作符构建复杂的数据处理流水线
- 实现背压策略和错误处理机制
- 应用反应式模式解决异步编程问题
28.1 理论基础与形式化定义
28.1.1 反应式编程的形式化定义
定义 28.1(反应式系统):反应式系统 $\mathcal{R}$ 是一个四元组:
$$\mathcal{R} = \langle \mathcal{S}, \mathcal{O}, \mathcal{T}, \mathcal{P} \rangle$$
其中:
- $\mathcal{S}$:数据流(Stream)集合,表示随时间变化的值序列
- $\mathcal{O}$:操作符(Operator)集合,对流进行变换
- $\mathcal{T}$:时间模型,定义事件的发生顺序
- $\mathcal{P}$:传播规则,定义数据变化的传播方式
定义 28.2(Observable):Observable $O$ 是一个产生值的异步序列:
$$O: \mathcal{T} \rightarrow \mathcal{P}(\mathcal{V} \cup {\epsilon} \cup {\bot})$$
其中:
- $\mathcal{V}$:值域
- $\epsilon$:完成信号
- $\bot$:错误信号
- $\mathcal{P}$:幂集,表示可能产生多个值
定义 28.3(Observer):Observer是一个三元组:
$$Observer = \langle on_next, on_error, on_complete \rangle$$
满足:
- $on_next: \mathcal{V} \rightarrow \mathbf{1}$ — 处理正常值
- $on_error: Exception \rightarrow \mathbf{1}$ — 处理错误
- $on_complete: \mathbf{1} \rightarrow \mathbf{1}$ — 处理完成
定义 28.4(订阅关系):订阅是一个函数:
$$subscribe: Observable \times Observer \rightarrow Disposable$$
建立Observable与Observer之间的连接,返回可用于取消订阅的Disposable。
28.1.2 反应式宣言
┌─────────────────────────────────────────────────────────────────────────┐
│ 反应式系统特性 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ Responsive │ │
│ │ 响应性 │ │
│ │ (及时响应) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Resilient │ │ Elastic │ │ Message Driven │ │
│ │ 弹性 │ │ 伸缩性 │ │ 消息驱动 │ │
│ │ (故障恢复) │ │ (负载适应) │ │ (异步通信) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘28.1.3 历史背景与发展脉络
| 时期 | 里程碑 | 代表人物/项目 | 核心贡献 |
|---|---|---|---|
| 1997 | 反应式编程概念 | Conal Elliott | FRP概念提出 |
| 2000 | FRP实现 | Conal Elliott, Paul Hudak | Haskell FRP库 |
| 2009 | Rx.NET | Erik Meijer | .NET反应式扩展 |
| 2012 | RxJS | Microsoft | JavaScript反应式扩展 |
| 2013 | RxJava | Netflix | Java反应式扩展 |
| 2014 | 反应式宣言 | Reactive社区 | 定义反应式系统特性 |
| 2015 | Project Reactor | Pivotal | Spring反应式基础 |
| 2016 | Reactive Streams | 社区规范 | 标准化背压处理 |
| 2017 | Kotlin Flow | JetBrains | 协程反应式流 |
| 2019 | RxPY 3.x | Python社区 | Python反应式扩展 |
| 2020+ | 响应式微服务 | 社区实践 | 云原生反应式架构 |
28.2 Observable与Observer模式
28.2.1 UML类图
┌─────────────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ Observable<T> │
├─────────────────────────────────────────────────────────────────────────┤
│ + subscribe(observer: Observer<T>): Disposable │
│ + pipe(*operators): Observable │
└─────────────────────────────────────────────────────────────────────────┘
│
│ implements
│
┌───────────────────────────┼───────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Subject<T> │ │ ColdObservable│ │ HotObservable │
├───────────────┤ ├───────────────┤ ├───────────────┤
│ - observers │ │ - generator │ │ - multicast │
│ - disposed │ │ - subscribed │ │ - refcount │
├───────────────┤ ├───────────────┤ ├───────────────┤
│ + on_next() │ │ + subscribe() │ │ + connect() │
│ + on_error() │ └───────────────┘ └───────────────┘
│ + on_complete()│
└───────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ Observer<T> │
├─────────────────────────────────────────────────────────────────────────┤
│ + on_next(value: T): void │
│ + on_error(error: Exception): void │
│ + on_complete(): void │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ <<interface>> │
│ Disposable │
├─────────────────────────────────────────────────────────────────────────┤
│ + dispose(): void │
│ + is_disposed: bool │
└─────────────────────────────────────────────────────────────────────────┘28.2.2 核心接口实现
python
from abc import ABC, abstractmethod
from typing import Callable, List, Any, Optional, Generic, TypeVar, Union
from dataclasses import dataclass, field
from enum import Enum, auto
from threading import Lock, RLock
from weakref import WeakSet
import time
T = TypeVar('T')
R = TypeVar('R')
class Disposable(ABC):
@abstractmethod
def dispose(self) -> None:
pass
@property
@abstractmethod
def is_disposed(self) -> bool:
pass
class CompositeDisposable(Disposable):
def __init__(self, *disposables: Disposable):
self._disposables = list(disposables)
self._disposed = False
self._lock = RLock()
def add(self, disposable: Disposable) -> None:
with self._lock:
if self._disposed:
disposable.dispose()
else:
self._disposables.append(disposable)
def dispose(self) -> None:
with self._lock:
if not self._disposed:
self._disposed = True
for d in self._disposables:
try:
d.dispose()
except Exception:
pass
self._disposables.clear()
@property
def is_disposed(self) -> bool:
return self._disposed
class SerialDisposable(Disposable):
def __init__(self):
self._disposable: Optional[Disposable] = None
self._disposed = False
self._lock = RLock()
@property
def disposable(self) -> Optional[Disposable]:
return self._disposable
@disposable.setter
def disposable(self, value: Optional[Disposable]) -> None:
with self._lock:
if self._disposed:
if value:
value.dispose()
else:
old = self._disposable
self._disposable = value
if old:
old.dispose()
def dispose(self) -> None:
with self._lock:
if not self._disposed:
self._disposed = True
if self._disposable:
self._disposable.dispose()
self._disposable = None
@property
def is_disposed(self) -> bool:
return self._disposed
class RefCountDisposable(Disposable):
def __init__(self, disposable: Disposable):
self._disposable = disposable
self._count = 0
self._disposed = False
self._lock = RLock()
def dispose(self) -> None:
with self._lock:
if not self._disposed:
self._disposed = True
if self._count == 0 and self._disposable:
self._disposable.dispose()
@property
def is_disposed(self) -> bool:
return self._disposed
def get_disposable(self) -> Disposable:
with self._lock:
if self._disposed:
return EmptyDisposable()
self._count += 1
return _InnerDisposable(self)
class _InnerDisposable(Disposable):
def __init__(self, ref: RefCountDisposable):
self._ref = ref
self._disposed = False
def dispose(self) -> None:
if not self._disposed:
self._disposed = True
with self._ref._lock:
self._ref._count -= 1
if self._ref._disposed and self._ref._count == 0:
if self._ref._disposable:
self._ref._disposable.dispose()
@property
def is_disposed(self) -> bool:
return self._disposed
class EmptyDisposable(Disposable):
def dispose(self) -> None:
pass
@property
def is_disposed(self) -> bool:
return True
class Observer(ABC, Generic[T]):
@abstractmethod
def on_next(self, value: T) -> None:
pass
@abstractmethod
def on_error(self, error: Exception) -> None:
pass
@abstractmethod
def on_complete(self) -> None:
pass
class Observable(ABC, Generic[T]):
@abstractmethod
def subscribe(self, observer: Observer[T]) -> Disposable:
pass
def pipe(self, *operators: Callable[['Observable[T]'], 'Observable']) -> 'Observable':
result = self
for op in operators:
result = op(result)
return result
class AnonymousObserver(Observer[T]):
def __init__(
self,
on_next: Callable[[T], None] = None,
on_error: Callable[[Exception], None] = None,
on_complete: Callable[[], None] = None
):
self._on_next = on_next or (lambda x: None)
self._on_error = on_error or (lambda e: print(f"Unhandled error: {e}"))
self._on_complete = on_complete or (lambda: None)
self._stopped = False
def on_next(self, value: T) -> None:
if not self._stopped:
try:
self._on_next(value)
except Exception as e:
self.on_error(e)
def on_error(self, error: Exception) -> None:
if not self._stopped:
self._stopped = True
self._on_error(error)
def on_complete(self) -> None:
if not self._stopped:
self._stopped = True
self._on_complete()
class SafeObserver(Observer[T]):
def __init__(self, observer: Observer[T], disposable: Disposable):
self._observer = observer
self._disposable = disposable
self._stopped = False
def on_next(self, value: T) -> None:
if not self._stopped and not self._disposable.is_disposed:
try:
self._observer.on_next(value)
except Exception as e:
self.on_error(e)
def on_error(self, error: Exception) -> None:
if not self._stopped and not self._disposable.is_disposed:
self._stopped = True
try:
self._observer.on_error(error)
finally:
self._disposable.dispose()
def on_complete(self) -> None:
if not self._stopped and not self._disposable.is_disposed:
self._stopped = True
try:
self._observer.on_complete()
finally:
self._disposable.dispose()28.2.3 Subject实现
python
from typing import List, Deque
from collections import deque
from threading import RLock
class Subject(Observable[T], Observer[T]):
def __init__(self):
self._observers: List[Observer[T]] = []
self._lock = RLock()
self._disposed = False
self._stopped = False
self._error: Optional[Exception] = None
def subscribe(self, observer: Observer[T]) -> Disposable:
with self._lock:
if self._disposed:
raise Exception("Subject has been disposed")
if self._stopped:
if self._error:
observer.on_error(self._error)
else:
observer.on_complete()
return EmptyDisposable()
self._observers.append(observer)
def dispose():
with self._lock:
if observer in self._observers:
self._observers.remove(observer)
return DisposableImpl(dispose)
def on_next(self, value: T) -> None:
with self._lock:
if self._stopped or self._disposed:
return
observers = self._observers.copy()
for observer in observers:
try:
observer.on_next(value)
except Exception:
pass
def on_error(self, error: Exception) -> None:
with self._lock:
if self._stopped or self._disposed:
return
self._stopped = True
self._error = error
observers = self._observers.copy()
self._observers.clear()
for observer in observers:
try:
observer.on_error(error)
except Exception:
pass
def on_complete(self) -> None:
with self._lock:
if self._stopped or self._disposed:
return
self._stopped = True
observers = self._observers.copy()
self._observers.clear()
for observer in observers:
try:
observer.on_complete()
except Exception:
pass
def dispose(self) -> None:
with self._lock:
self._disposed = True
self._observers.clear()
class DisposableImpl(Disposable):
def __init__(self, dispose_func: Callable[[], None]):
self._dispose_func = dispose_func
self._disposed = False
self._lock = RLock()
def dispose(self) -> None:
with self._lock:
if not self._disposed:
self._disposed = True
self._dispose_func()
@property
def is_disposed(self) -> bool:
return self._disposed
class BehaviorSubject(Subject[T]):
def __init__(self, initial_value: T):
super().__init__()
self._value = initial_value
@property
def value(self) -> T:
with self._lock:
if self._disposed:
raise Exception("Subject has been disposed")
return self._value
def subscribe(self, observer: Observer[T]) -> Disposable:
disposable = super().subscribe(observer)
with self._lock:
if not self._stopped and not self._disposed:
current_value = self._value
if not self._stopped and not self._disposed:
try:
observer.on_next(current_value)
except Exception:
pass
return disposable
def on_next(self, value: T) -> None:
with self._lock:
self._value = value
super().on_next(value)
class ReplaySubject(Subject[T]):
def __init__(self, buffer_size: Optional[int] = None, window_time: Optional[float] = None):
super().__init__()
self._buffer: Deque[tuple] = deque()
self._buffer_size = buffer_size
self._window_time = window_time
def _trim_buffer(self) -> None:
now = time.time()
while self._buffer:
if self._window_time is not None:
if now - self._buffer[0][0] > self._window_time:
self._buffer.popleft()
continue
if self._buffer_size is not None and len(self._buffer) > self._buffer_size:
self._buffer.popleft()
continue
break
def subscribe(self, observer: Observer[T]) -> Disposable:
disposable = super().subscribe(observer)
with self._lock:
self._trim_buffer()
buffer_copy = list(self._buffer)
for timestamp, value in buffer_copy:
try:
observer.on_next(value)
except Exception:
pass
return disposable
def on_next(self, value: T) -> None:
with self._lock:
self._buffer.append((time.time(), value))
self._trim_buffer()
super().on_next(value)
class AsyncSubject(Subject[T]):
def __init__(self):
super().__init__()
self._last_value: Optional[T] = None
self._has_value = False
def on_next(self, value: T) -> None:
with self._lock:
self._last_value = value
self._has_value = True
def on_complete(self) -> None:
with self._lock:
value = self._last_value
has_value = self._has_value
observers = self._observers.copy()
self._observers.clear()
self._stopped = True
if has_value:
for observer in observers:
try:
observer.on_next(value)
except Exception:
pass
for observer in observers:
try:
observer.on_complete()
except Exception:
pass
subject = Subject[int]()
subject.subscribe(AnonymousObserver(on_next=lambda x: print(f"观察者1: {x}")))
subject.subscribe(AnonymousObserver(on_next=lambda x: print(f"观察者2: {x}")))
subject.on_next(1)
subject.on_next(2)
subject.on_next(3)
subject.on_complete()
print("\nBehaviorSubject示例:")
behavior = BehaviorSubject[int](0)
behavior.subscribe(AnonymousObserver(on_next=lambda x: print(f"初始订阅: {x}")))
behavior.on_next(10)
behavior.subscribe(AnonymousObserver(on_next=lambda x: print(f"后续订阅: {x}")))28.3 操作符
28.3.1 操作符的形式化定义
定义 28.5(操作符):操作符 $op$ 是一个函数:
$$op: Observable \rightarrow Observable$$
操作符可分为:
- 转换操作符:$map, flat_map, scan$
- 过滤操作符:$filter, distinct, take$
- 组合操作符:$merge, zip, combine_latest$
- 错误处理操作符:$catch, retry, on_error_return$
- 工具操作符:$do, delay, timeout$
28.3.2 转换操作符
python
from typing import Callable, TypeVar, Generic, List, Iterator, Iterable
class MapObservable(Observable[R]):
def __init__(self, source: Observable[T], mapper: Callable[[T], R]):
self._source = source
self._mapper = mapper
def subscribe(self, observer: Observer[R]) -> Disposable:
class MapObserver(Observer[T]):
def __init__(self, mapper: Callable[[T], R], downstream: Observer[R]):
self._mapper = mapper
self._downstream = downstream
def on_next(self, value: T) -> None:
try:
mapped = self._mapper(value)
self._downstream.on_next(mapped)
except Exception as e:
self._downstream.on_error(e)
def on_error(self, error: Exception) -> None:
self._downstream.on_error(error)
def on_complete(self) -> None:
self._downstream.on_complete()
return self._source.subscribe(MapObserver(self._mapper, observer))
class FlatMapObservable(Observable[R]):
def __init__(
self,
source: Observable[T],
mapper: Callable[[T], Observable[R]],
max_concurrent: int = None
):
self._source = source
self._mapper = mapper
self._max_concurrent = max_concurrent
def subscribe(self, observer: Observer[R]) -> Disposable:
composite = CompositeDisposable()
lock = RLock()
active_count = [1]
completed = [False]
error_occurred = [False]
class FlatMapObserver(Observer[T]):
def on_next(self, value: T) -> None:
with lock:
if error_occurred[0]:
return
active_count[0] += 1
try:
inner_observable = self._mapper(value)
except Exception as e:
with lock:
if not error_occurred[0]:
error_occurred[0] = True
observer.on_error(e)
return
class InnerObserver(Observer[R]):
def on_next(self, inner_value: R) -> None:
with lock:
if not error_occurred[0]:
observer.on_next(inner_value)
def on_error(self, error: Exception) -> None:
with lock:
if not error_occurred[0]:
error_occurred[0] = True
observer.on_error(error)
def on_complete(self) -> None:
with lock:
active_count[0] -= 1
if completed[0] and active_count[0] == 0:
observer.on_complete()
inner_disposable = inner_observable.subscribe(InnerObserver())
composite.add(inner_disposable)
def on_error(self, error: Exception) -> None:
with lock:
if not error_occurred[0]:
error_occurred[0] = True
observer.on_error(error)
def on_complete(self) -> None:
with lock:
completed[0] = True
active_count[0] -= 1
if active_count[0] == 0 and not error_occurred[0]:
observer.on_complete()
outer_disposable = self._source.subscribe(FlatMapObserver())
composite.add(outer_disposable)
return composite
class ScanObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
accumulator: Callable[[T, T], T],
seed: T = None
):
self._source = source
self._accumulator = accumulator
self._seed = seed
def subscribe(self, observer: Observer[T]) -> Disposable:
accumulated = [self._seed]
has_seed = self._seed is not None
class ScanObserver(Observer[T]):
def on_next(self, value: T) -> None:
try:
if has_seed or accumulated[0] is not None:
accumulated[0] = self._accumulator(accumulated[0], value)
else:
accumulated[0] = value
has_seed = True
observer.on_next(accumulated[0])
except Exception as e:
observer.on_error(e)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(ScanObserver())
class GroupByObservable(Observable[Observable[T]]):
def __init__(
self,
source: Observable[T],
key_selector: Callable[[T], Any]
):
self._source = source
self._key_selector = key_selector
def subscribe(self, observer: Observer[Observable[T]]) -> Disposable:
groups: Dict[Any, Subject[T]] = {}
lock = RLock()
class GroupByObserver(Observer[T]):
def on_next(self, value: T) -> None:
try:
key = self._key_selector(value)
with lock:
if key not in groups:
group = Subject[T]()
groups[key] = group
observer.on_next(group)
else:
group = groups[key]
group.on_next(value)
except Exception as e:
observer.on_error(e)
def on_error(self, error: Exception) -> None:
with lock:
for group in groups.values():
group.on_error(error)
observer.on_error(error)
def on_complete(self) -> None:
with lock:
for group in groups.values():
group.on_complete()
observer.on_complete()
return self._source.subscribe(GroupByObserver())
def map_op(mapper: Callable[[T], R]) -> Callable[[Observable[T]], Observable[R]]:
def apply(source: Observable[T]) -> Observable[R]:
return MapObservable(source, mapper)
return apply
def flat_map_op(
mapper: Callable[[T], Observable[R]]
) -> Callable[[Observable[T]], Observable[R]]:
def apply(source: Observable[T]) -> Observable[R]:
return FlatMapObservable(source, mapper)
return apply
def scan_op(
accumulator: Callable[[T, T], T],
seed: T = None
) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return ScanObservable(source, accumulator, seed)
return apply28.3.3 过滤操作符
python
class FilterObservable(Observable[T]):
def __init__(self, source: Observable[T], predicate: Callable[[T], bool]):
self._source = source
self._predicate = predicate
def subscribe(self, observer: Observer[T]) -> Disposable:
class FilterObserver(Observer[T]):
def on_next(self, value: T) -> None:
try:
if self._predicate(value):
observer.on_next(value)
except Exception as e:
observer.on_error(e)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(FilterObserver())
class TakeObservable(Observable[T]):
def __init__(self, source: Observable[T], count: int):
self._source = source
self._count = count
def subscribe(self, observer: Observer[T]) -> Disposable:
remaining = [self._count]
class TakeObserver(Observer[T]):
def __init__(self):
self._disposable = None
def set_disposable(self, d: Disposable) -> None:
self._disposable = d
def on_next(self, value: T) -> None:
if remaining[0] > 0:
remaining[0] -= 1
observer.on_next(value)
if remaining[0] == 0:
observer.on_complete()
if self._disposable:
self._disposable.dispose()
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
take_observer = TakeObserver()
disposable = self._source.subscribe(take_observer)
take_observer.set_disposable(disposable)
return disposable
class SkipObservable(Observable[T]):
def __init__(self, source: Observable[T], count: int):
self._source = source
self._count = count
def subscribe(self, observer: Observer[T]) -> Disposable:
remaining = [self._count]
class SkipObserver(Observer[T]):
def on_next(self, value: T) -> None:
if remaining[0] > 0:
remaining[0] -= 1
else:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(SkipObserver())
class DistinctObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
key_selector: Callable[[T], Any] = None
):
self._source = source
self._key_selector = key_selector or (lambda x: x)
def subscribe(self, observer: Observer[T]) -> Disposable:
seen: set = set()
class DistinctObserver(Observer[T]):
def on_next(self, value: T) -> None:
try:
key = self._key_selector(value)
if key not in seen:
seen.add(key)
observer.on_next(value)
except Exception as e:
observer.on_error(e)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(DistinctObserver())
class DebounceObservable(Observable[T]):
def __init__(self, source: Observable[T], due_time: float):
self._source = source
self._due_time = due_time
def subscribe(self, observer: Observer[T]) -> Disposable:
import threading
last_value = [None]
has_value = [False]
timer = [None]
lock = RLock()
def emit():
with lock:
if has_value[0]:
observer.on_next(last_value[0])
has_value[0] = False
class DebounceObserver(Observer[T]):
def on_next(self, value: T) -> None:
with lock:
last_value[0] = value
has_value[0] = True
if timer[0]:
timer[0].cancel()
timer[0] = threading.Timer(self._due_time, emit)
timer[0].start()
def on_error(self, error: Exception) -> None:
with lock:
if timer[0]:
timer[0].cancel()
observer.on_error(error)
def on_complete(self) -> None:
with lock:
if timer[0]:
timer[0].cancel()
if has_value[0]:
observer.on_next(last_value[0])
observer.on_complete()
disposable = self._source.subscribe(DebounceObserver())
def dispose():
with lock:
if timer[0]:
timer[0].cancel()
disposable.dispose()
return DisposableImpl(dispose)
class ThrottleObservable(Observable[T]):
def __init__(self, source: Observable[T], duration: float):
self._source = source
self._duration = duration
def subscribe(self, observer: Observer[T]) -> Disposable:
import time
last_emit_time = [0.0]
class ThrottleObserver(Observer[T]):
def on_next(self, value: T) -> None:
now = time.time()
if now - last_emit_time[0] >= self._duration:
last_emit_time[0] = now
observer.on_next(value)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(ThrottleObserver())
def filter_op(predicate: Callable[[T], bool]) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return FilterObservable(source, predicate)
return apply
def take_op(count: int) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return TakeObservable(source, count)
return apply
def skip_op(count: int) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return SkipObservable(source, count)
return apply
def distinct_op(
key_selector: Callable[[T], Any] = None
) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return DistinctObservable(source, key_selector)
return apply
def debounce_op(due_time: float) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return DebounceObservable(source, due_time)
return apply
def throttle_op(duration: float) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return ThrottleObservable(source, duration)
return apply28.3.4 组合操作符
python
class MergeObservable(Observable[T]):
def __init__(self, sources: List[Observable[T]]):
self._sources = sources
def subscribe(self, observer: Observer[T]) -> Disposable:
composite = CompositeDisposable()
lock = RLock()
completed_count = [0]
error_occurred = [False]
class MergeObserver(Observer[T]):
def on_next(self, value: T) -> None:
with lock:
if not error_occurred[0]:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
with lock:
if not error_occurred[0]:
error_occurred[0] = True
observer.on_error(error)
def on_complete(self) -> None:
with lock:
completed_count[0] += 1
if completed_count[0] == len(self._sources) and not error_occurred[0]:
observer.on_complete()
for source in self._sources:
disposable = source.subscribe(MergeObserver())
composite.add(disposable)
return composite
class ZipObservable(Observable[tuple]):
def __init__(self, sources: List[Observable]):
self._sources = sources
def subscribe(self, observer: Observer[tuple]) -> Disposable:
composite = CompositeDisposable()
lock = RLock()
buffers: List[List] = [[] for _ in self._sources]
completed = [False] * len(self._sources)
class ZipObserver(Observer):
def __init__(self, index: int):
self._index = index
def on_next(self, value) -> None:
with lock:
buffers[self._index].append(value)
while all(len(b) > 0 for b in buffers):
values = tuple(b.pop(0) for b in buffers)
observer.on_next(values)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
with lock:
completed[self._index] = True
if all(completed):
observer.on_complete()
for i, source in enumerate(self._sources):
disposable = source.subscribe(ZipObserver(i))
composite.add(disposable)
return composite
class CombineLatestObservable(Observable[tuple]):
def __init__(self, sources: List[Observable]):
self._sources = sources
def subscribe(self, observer: Observer[tuple]) -> Disposable:
composite = CompositeDisposable()
lock = RLock()
latest: List[Optional[Any]] = [None] * len(self._sources)
has_value = [False] * len(self._sources)
completed = [False] * len(self._sources)
def try_emit():
if all(has_value):
observer.on_next(tuple(latest))
class CombineObserver(Observer):
def __init__(self, index: int):
self._index = index
def on_next(self, value) -> None:
with lock:
latest[self._index] = value
has_value[self._index] = True
try_emit()
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
with lock:
completed[self._index] = True
if all(completed):
observer.on_complete()
for i, source in enumerate(self._sources):
disposable = source.subscribe(CombineObserver(i))
composite.add(disposable)
return composite
class ConcatObservable(Observable[T]):
def __init__(self, sources: List[Observable[T]]):
self._sources = sources
def subscribe(self, observer: Observer[T]) -> Disposable:
serial = SerialDisposable()
lock = RLock()
index = [0]
disposed = [False]
def subscribe_next():
if disposed[0] or index[0] >= len(self._sources):
if not disposed[0]:
observer.on_complete()
return
current_index = index[0]
index[0] += 1
class ConcatObserver(Observer[T]):
def on_next(self, value: T) -> None:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
subscribe_next()
serial.disposable = self._sources[current_index].subscribe(ConcatObserver())
subscribe_next()
def dispose():
disposed[0] = True
serial.dispose()
return DisposableImpl(dispose)
class RaceObservable(Observable[T]):
def __init__(self, sources: List[Observable[T]]):
self._sources = sources
def subscribe(self, observer: Observer[T]) -> Disposable:
composite = CompositeDisposable()
lock = RLock()
winner = [None]
class RaceObserver(Observer[T]):
def __init__(self, index: int):
self._index = index
def on_next(self, value: T) -> None:
with lock:
if winner[0] is None:
winner[0] = self._index
elif winner[0] != self._index:
return
observer.on_next(value)
def on_error(self, error: Exception) -> None:
with lock:
if winner[0] is not None and winner[0] != self._index:
return
observer.on_error(error)
def on_complete(self) -> None:
with lock:
if winner[0] is not None and winner[0] != self._index:
return
observer.on_complete()
for i, source in enumerate(self._sources):
disposable = source.subscribe(RaceObserver(i))
composite.add(disposable)
return composite
def merge_op(*sources: Observable[T]) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return MergeObservable([source] + list(sources))
return apply
def zip_op(*sources: Observable) -> Callable[[Observable], Observable[tuple]]:
def apply(source: Observable) -> Observable[tuple]:
return ZipObservable([source] + list(sources))
return apply
def combine_latest_op(*sources: Observable) -> Callable[[Observable], Observable[tuple]]:
def apply(source: Observable) -> Observable[tuple]:
return CombineLatestObservable([source] + list(sources))
return apply
def concat_op(*sources: Observable[T]) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return ConcatObservable([source] + list(sources))
return apply28.4 背压处理
28.4.1 背压策略的形式化定义
定义 28.6(背压):背压是一种流量控制机制,定义了消费者如何向生产者反馈处理能力:
$$Backpressure: Consumer \rightarrow Producer$$
定义 28.7(背压策略):
| 策略 | 形式化描述 | 行为 |
|---|---|---|
| BUFFER | $buffer(v) \rightarrow queue.push(v)$ | 缓冲所有值直到溢出 |
| DROP | $drop(v) \rightarrow \emptyset$ | 丢弃超出处理能力的值 |
| LATEST | $latest(v) \rightarrow current = v$ | 只保留最新值 |
| ERROR | $error(v) \rightarrow raise$ | 溢出时抛出异常 |
28.4.2 背压实现
python
from enum import Enum, auto
from typing import Deque, Optional
from collections import deque
class BackpressureStrategy(Enum):
BUFFER = auto()
DROP = auto()
LATEST = auto()
ERROR = auto()
class BackpressureObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
strategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
buffer_size: int = 128
):
self._source = source
self._strategy = strategy
self._buffer_size = buffer_size
def subscribe(self, observer: Observer[T]) -> Disposable:
buffer: Deque[T] = deque()
latest: List[Optional[T]] = [None]
has_latest = [False]
disposed = [False]
paused = [False]
overflow = [False]
lock = RLock()
class BackpressureObserver(Observer[T]):
def on_next(self, value: T) -> None:
if disposed[0]:
return
with lock:
if self._strategy == BackpressureStrategy.BUFFER:
if len(buffer) >= self._buffer_size:
overflow[0] = True
observer.on_error(Exception(
f"Buffer overflow: size={self._buffer_size}"
))
return
buffer.append(value)
elif self._strategy == BackpressureStrategy.DROP:
if not paused[0]:
observer.on_next(value)
elif self._strategy == BackpressureStrategy.LATEST:
latest[0] = value
has_latest[0] = True
if not paused[0]:
observer.on_next(value)
elif self._strategy == BackpressureStrategy.ERROR:
if paused[0]:
observer.on_error(Exception(
"Downstream cannot keep up"
))
return
observer.on_next(value)
def on_error(self, error: Exception) -> None:
if not disposed[0]:
observer.on_error(error)
def on_complete(self) -> None:
if not disposed[0]:
with lock:
while buffer and not disposed[0]:
observer.on_next(buffer.popleft())
observer.on_complete()
disposable = self._source.subscribe(BackpressureObserver())
def dispose():
disposed[0] = True
disposable.dispose()
return DisposableImpl(dispose)
class Flowable(Observable[T]):
def __init__(
self,
source: Observable[T],
initial_request: int = 128
):
self._source = source
self._initial_request = initial_request
def subscribe(self, observer: Observer[T]) -> Disposable:
requested = [self._initial_request]
produced = [0]
lock = RLock()
class FlowableObserver(Observer[T]):
def on_next(self, value: T) -> None:
with lock:
produced[0] += 1
observer.on_next(value)
with lock:
if produced[0] >= requested[0]:
self._request_more(requested[0])
produced[0] = 0
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
def _request_more(self, n: int) -> None:
pass
return self._source.subscribe(FlowableObserver())
def request(self, n: int) -> None:
pass
class ThrottledObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
max_rate: int,
window_ms: float = 1000.0
):
self._source = source
self._max_rate = max_rate
self._window_ms = window_ms
def subscribe(self, observer: Observer[T]) -> Disposable:
import time
timestamps: List[float] = []
lock = RLock()
class ThrottledObserver(Observer[T]):
def on_next(self, value: T) -> None:
now = time.time()
with lock:
cutoff = now - self._window_ms / 1000.0
while timestamps and timestamps[0] < cutoff:
timestamps.pop(0)
if len(timestamps) < self._max_rate:
timestamps.append(now)
observer.on_next(value)
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(ThrottledObserver())
class BatchObservable(Observable[List[T]]):
def __init__(
self,
source: Observable[T],
batch_size: int = 100,
max_wait_ms: float = 1000.0
):
self._source = source
self._batch_size = batch_size
self._max_wait_ms = max_wait_ms
def subscribe(self, observer: Observer[List[T]]) -> Disposable:
import time
import threading
batch: List[T] = []
last_emit = [time.time()]
lock = RLock()
timer = [None]
def emit_batch():
with lock:
if batch:
observer.on_next(batch.copy())
batch.clear()
last_emit[0] = time.time()
def schedule_emit():
with lock:
if timer[0]:
timer[0].cancel()
timer[0] = threading.Timer(
self._max_wait_ms / 1000.0,
emit_batch
)
timer[0].start()
class BatchObserver(Observer[T]):
def on_next(self, value: T) -> None:
with lock:
batch.append(value)
if len(batch) >= self._batch_size:
emit_batch()
else:
schedule_emit()
def on_error(self, error: Exception) -> None:
with lock:
if timer[0]:
timer[0].cancel()
if batch:
observer.on_next(batch)
observer.on_error(error)
def on_complete(self) -> None:
with lock:
if timer[0]:
timer[0].cancel()
if batch:
observer.on_next(batch)
observer.on_complete()
disposable = self._source.subscribe(BatchObserver())
def dispose():
with lock:
if timer[0]:
timer[0].cancel()
disposable.dispose()
return DisposableImpl(dispose)28.5 错误处理
28.5.1 错误处理操作符
python
class CatchObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
handler: Callable[[Exception], Observable[T]]
):
self._source = source
self._handler = handler
def subscribe(self, observer: Observer[T]) -> Disposable:
serial = SerialDisposable()
class CatchObserver(Observer[T]):
def on_next(self, value: T) -> None:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
try:
fallback = self._handler(error)
serial.disposable = fallback.subscribe(observer)
except Exception as e:
observer.on_error(e)
def on_complete(self) -> None:
observer.on_complete()
serial.disposable = self._source.subscribe(CatchObserver())
return serial
class RetryObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
count: Optional[int] = None,
delay: float = 0.0
):
self._source = source
self._count = count
self._delay = delay
def subscribe(self, observer: Observer[T]) -> Disposable:
import time
serial = SerialDisposable()
attempts = [0]
disposed = [False]
def subscribe_source():
if disposed[0]:
return
if self._count is not None and attempts[0] >= self._count:
return
attempts[0] += 1
class RetryObserver(Observer[T]):
def on_next(self, value: T) -> None:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
if self._count is not None and attempts[0] >= self._count:
observer.on_error(error)
else:
if self._delay > 0:
time.sleep(self._delay)
subscribe_source()
def on_complete(self) -> None:
observer.on_complete()
serial.disposable = self._source.subscribe(RetryObserver())
subscribe_source()
def dispose():
disposed[0] = True
serial.dispose()
return DisposableImpl(dispose)
class OnErrorReturnObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
value_supplier: Callable[[Exception], T]
):
self._source = source
self._value_supplier = value_supplier
def subscribe(self, observer: Observer[T]) -> Disposable:
class OnErrorReturnObserver(Observer[T]):
def on_next(self, value: T) -> None:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
try:
fallback_value = self._value_supplier(error)
observer.on_next(fallback_value)
observer.on_complete()
except Exception as e:
observer.on_error(e)
def on_complete(self) -> None:
observer.on_complete()
return self._source.subscribe(OnErrorReturnObserver())
class OnErrorResumeNextObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
fallback: Observable[T]
):
self._source = source
self._fallback = fallback
def subscribe(self, observer: Observer[T]) -> Disposable:
serial = SerialDisposable()
class ResumeObserver(Observer[T]):
def on_next(self, value: T) -> None:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
serial.disposable = self._fallback.subscribe(observer)
def on_complete(self) -> None:
observer.on_complete()
serial.disposable = self._source.subscribe(ResumeObserver())
return serial
class RetryWhenObservable(Observable[T]):
def __init__(
self,
source: Observable[T],
handler: Callable[[Observable[Exception]], Observable[Any]]
):
self._source = source
self._handler = handler
def subscribe(self, observer: Observer[T]) -> Disposable:
errors = Subject[Exception]()
serial = SerialDisposable()
disposed = [False]
def subscribe_source():
if disposed[0]:
return
class RetryWhenObserver(Observer[T]):
def on_next(self, value: T) -> None:
observer.on_next(value)
def on_error(self, error: Exception) -> None:
errors.on_next(error)
def on_complete(self) -> None:
observer.on_complete()
serial.disposable = self._source.subscribe(RetryWhenObserver())
class HandlerObserver(Observer[Any]):
def on_next(self, value: Any) -> None:
subscribe_source()
def on_error(self, error: Exception) -> None:
observer.on_error(error)
def on_complete(self) -> None:
pass
retry_observable = self._handler(errors)
retry_disposable = retry_observable.subscribe(HandlerObserver())
subscribe_source()
def dispose():
disposed[0] = True
serial.dispose()
retry_disposable.dispose()
return DisposableImpl(dispose)
def catch_op(
handler: Callable[[Exception], Observable[T]]
) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return CatchObservable(source, handler)
return apply
def retry_op(
count: Optional[int] = None,
delay: float = 0.0
) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return RetryObservable(source, count, delay)
return apply
def on_error_return_op(
value_supplier: Callable[[Exception], T]
) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return OnErrorReturnObservable(source, value_supplier)
return apply
def on_error_resume_next_op(
fallback: Observable[T]
) -> Callable[[Observable[T]], Observable[T]]:
def apply(source: Observable[T]) -> Observable[T]:
return OnErrorResumeNextObservable(source, fallback)
return apply28.6 异步反应式编程
28.6.1 异步Observable
python
import asyncio
from typing import AsyncIterator, AsyncGenerator, Awaitable
class AsyncObservable(Generic[T]):
def __init__(self, generator: AsyncGenerator[T, None]):
self._generator = generator
async def subscribe(
self,
on_next: Callable[[T], Awaitable[None]] = None,
on_error: Callable[[Exception], Awaitable[None]] = None,
on_complete: Callable[[], Awaitable[None]] = None
) -> None:
try:
async for value in self._generator:
if on_next:
await on_next(value)
if on_complete:
await on_complete()
except Exception as e:
if on_error:
await on_error(e)
def map(self, mapper: Callable[[T], R]) -> 'AsyncObservable[R]':
async def mapped_generator():
async for value in self._generator:
result = mapper(value)
if asyncio.iscoroutine(result):
yield await result
else:
yield result
return AsyncObservable(mapped_generator())
def filter(self, predicate: Callable[[T], bool]) -> 'AsyncObservable[T]':
async def filtered_generator():
async for value in self._generator:
if predicate(value):
yield value
return AsyncObservable(filtered_generator())
async def to_list(self) -> List[T]:
result = []
async for value in self._generator:
result.append(value)
return result
async def first(self) -> Optional[T]:
async for value in self._generator:
return value
return None
async def reduce(
self,
accumulator: Callable[[T, T], T],
initial: T = None
) -> T:
result = initial
async for value in self._generator:
if result is None:
result = value
else:
result = accumulator(result, value)
return result
async def interval(period: float, count: int = None) -> AsyncGenerator[int, None]:
i = 0
while count is None or i < count:
await asyncio.sleep(period)
yield i
i += 1
async def from_iterable(iterable: Iterable[T]) -> AsyncGenerator[T, None]:
for item in iterable:
yield item
async def from_future(future: Awaitable[T]) -> AsyncGenerator[T, None]:
yield await future
class AsyncSubject(Generic[T]):
def __init__(self):
self._observers: List[Callable[[T], Awaitable[None]]] = []
self._lock = asyncio.Lock()
self._value: Optional[T] = None
self._has_value = False
self._completed = False
self._error: Optional[Exception] = None
async def subscribe(
self,
on_next: Callable[[T], Awaitable[None]]
) -> Callable[[], None]:
async with self._lock:
self._observers.append(on_next)
if self._has_value and not self._completed:
await on_next(self._value)
def unsubscribe():
if on_next in self._observers:
self._observers.remove(on_next)
return unsubscribe
async def on_next(self, value: T) -> None:
async with self._lock:
self._value = value
self._has_value = True
observers = self._observers.copy()
for observer in observers:
await observer(value)
async def on_error(self, error: Exception) -> None:
async with self._lock:
self._error = error
self._completed = True
observers = self._observers.copy()
self._observers.clear()
for observer in observers:
if asyncio.iscoroutinefunction(observer):
pass
async def on_complete(self) -> None:
async with self._lock:
self._completed = True
self._observers.clear()
class AsyncEventBus:
def __init__(self):
self._subjects: Dict[str, AsyncSubject] = {}
self._lock = asyncio.Lock()
async def get_subject(self, name: str) -> AsyncSubject:
async with self._lock:
if name not in self._subjects:
self._subjects[name] = AsyncSubject()
return self._subjects[name]
async def publish(self, name: str, value: Any) -> None:
subject = await self.get_subject(name)
await subject.on_next(value)
async def subscribe(
self,
name: str,
handler: Callable[[Any], Awaitable[None]]
) -> Callable[[], None]:
subject = await self.get_subject(name)
return await subject.subscribe(handler)
async def async_demo():
observable = AsyncObservable(interval(0.1, 10))
result = await (observable
.filter(lambda x: x % 2 == 0)
.map(lambda x: x ** 2)
.to_list())
print(f"异步处理结果: {result}")
bus = AsyncEventBus()
async def handle_order(order):
print(f"处理订单: {order}")
await bus.subscribe("order_created", handle_order)
await bus.publish("order_created", {"id": "ORD-001"})
await bus.publish("order_created", {"id": "ORD-002"})
asyncio.run(async_demo())28.7 实际应用示例
28.7.1 实时数据流处理
python
import asyncio
import random
from typing import AsyncGenerator
from dataclasses import dataclass
from datetime import datetime
@dataclass
class StockPrice:
symbol: str
price: float
change: float
volume: int
timestamp: datetime
async def stock_price_stream(
symbol: str,
interval: float = 0.5
) -> AsyncGenerator[StockPrice, None]:
price = 100.0 + random.uniform(-10, 10)
while True:
await asyncio.sleep(interval)
change = random.uniform(-2, 2)
price += change
volume = random.randint(1000, 10000)
yield StockPrice(
symbol=symbol,
price=round(price, 2),
change=round(change, 2),
volume=volume,
timestamp=datetime.now()
)
class StockMonitor:
def __init__(self):
self._subscriptions: Dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()
async def subscribe(
self,
symbol: str,
callback: Callable[[StockPrice], Awaitable[None]]
) -> Callable[[], None]:
async def consume():
async for price in stock_price_stream(symbol):
await callback(price)
task = asyncio.create_task(consume())
async with self._lock:
self._subscriptions[symbol] = task
def unsubscribe():
task.cancel()
if symbol in self._subscriptions:
del self._subscriptions[symbol]
return unsubscribe
async def stop_all(self) -> None:
async with self._lock:
for task in self._subscriptions.values():
task.cancel()
self._subscriptions.clear()
class PriceAlertSystem:
def __init__(self, monitor: StockMonitor):
self._monitor = monitor
self._alerts: Dict[str, List[tuple]] = {}
self._subjects: Dict[str, AsyncSubject] = {}
def add_alert(
self,
symbol: str,
threshold: float,
direction: str = "above"
) -> AsyncObservable[StockPrice]:
if symbol not in self._alerts:
self._alerts[symbol] = []
self._alerts[symbol].append((threshold, direction))
if symbol not in self._subjects:
self._subjects[symbol] = AsyncSubject()
return AsyncObservable(self._subjects[symbol]._generator)
async def start(self, symbol: str) -> None:
async def on_price(price: StockPrice):
if symbol in self._alerts:
for threshold, direction in self._alerts[symbol]:
triggered = (
(direction == "above" and price.price > threshold) or
(direction == "below" and price.price < threshold)
)
if triggered and symbol in self._subjects:
await self._subjects[symbol].on_next(price)
await self._monitor.subscribe(symbol, on_price)
async def stock_monitoring_demo():
monitor = StockMonitor()
alerts = PriceAlertSystem(monitor)
async def on_price_update(price: StockPrice):
print(f"[{price.symbol}] 价格: {price.price:.2f}, 变化: {price.change:+.2f}")
unsub_aapl = await monitor.subscribe("AAPL", on_price_update)
unsub_googl = await monitor.subscribe("GOOGL", on_price_update)
await asyncio.sleep(3)
unsub_aapl()
print("已取消AAPL订阅")
await asyncio.sleep(2)
await monitor.stop_all()
asyncio.run(stock_monitoring_demo())28.7.2 事件驱动架构
python
from dataclasses import dataclass
from typing import Any, Dict
from enum import Enum, auto
class EventType(Enum):
ORDER_CREATED = auto()
ORDER_CONFIRMED = auto()
PAYMENT_RECEIVED = auto()
ORDER_SHIPPED = auto()
ORDER_DELIVERED = auto()
@dataclass
class Event:
type: EventType
data: Dict[str, Any]
timestamp: datetime
correlation_id: str
causation_id: Optional[str] = None
class ReactiveEventBus:
def __init__(self):
self._subjects: Dict[EventType, Subject[Event]] = {}
self._lock = RLock()
def _get_subject(self, event_type: EventType) -> Subject[Event]:
with self._lock:
if event_type not in self._subjects:
self._subjects[event_type] = Subject[Event]()
return self._subjects[event_type]
def publish(self, event: Event) -> None:
subject = self._get_subject(event.type)
subject.on_next(event)
def subscribe(
self,
event_type: EventType,
handler: Callable[[Event], None]
) -> Disposable:
subject = self._get_subject(event_type)
return subject.subscribe(AnonymousObserver(on_next=handler))
def subscribe_all(self, handler: Callable[[Event], None]) -> CompositeDisposable:
composite = CompositeDisposable()
for event_type in EventType:
composite.add(self.subscribe(event_type, handler))
return composite
def get_observable(self, event_type: EventType) -> Observable[Event]:
return self._get_subject(event_type)
class OrderSaga:
def __init__(self, event_bus: ReactiveEventBus):
self._event_bus = event_bus
self._orders: Dict[str, Dict] = {}
self._setup_handlers()
def _setup_handlers(self) -> None:
self._event_bus.subscribe(
EventType.ORDER_CREATED,
self._handle_order_created
)
self._event_bus.subscribe(
EventType.PAYMENT_RECEIVED,
self._handle_payment_received
)
self._event_bus.subscribe(
EventType.ORDER_SHIPPED,
self._handle_order_shipped
)
def _handle_order_created(self, event: Event) -> None:
order_id = event.data["order_id"]
self._orders[order_id] = {
"status": "created",
"created_at": event.timestamp
}
print(f"[Saga] 订单创建: {order_id}")
def _handle_payment_received(self, event: Event) -> None:
order_id = event.data["order_id"]
if order_id in self._orders:
self._orders[order_id]["status"] = "paid"
self._orders[order_id]["paid_at"] = event.timestamp
print(f"[Saga] 订单支付: {order_id}")
self._event_bus.publish(Event(
type=EventType.ORDER_CONFIRMED,
data={"order_id": order_id},
timestamp=datetime.now(),
correlation_id=event.correlation_id,
causation_id=event.event_id if hasattr(event, 'event_id') else None
))
def _handle_order_shipped(self, event: Event) -> None:
order_id = event.data["order_id"]
if order_id in self._orders:
self._orders[order_id]["status"] = "shipped"
self._orders[order_id]["shipped_at"] = event.timestamp
print(f"[Saga] 订单发货: {order_id}")
class EventLogger:
def __init__(self, event_bus: ReactiveEventBus):
self._disposable = event_bus.subscribe_all(self._log_event)
def _log_event(self, event: Event) -> None:
print(f"[LOG] {event.type.name}: {event.data}")
def stop(self) -> None:
self._disposable.dispose()
def event_driven_demo():
event_bus = ReactiveEventBus()
saga = OrderSaga(event_bus)
logger = EventLogger(event_bus)
import uuid
order_id = str(uuid.uuid4())[:8]
correlation_id = str(uuid.uuid4())
event_bus.publish(Event(
type=EventType.ORDER_CREATED,
data={"order_id": order_id, "customer_id": "CUST-001", "total": 199.99},
timestamp=datetime.now(),
correlation_id=correlation_id
))
event_bus.publish(Event(
type=EventType.PAYMENT_RECEIVED,
data={"order_id": order_id, "amount": 199.99, "method": "credit_card"},
timestamp=datetime.now(),
correlation_id=correlation_id
))
event_bus.publish(Event(
type=EventType.ORDER_SHIPPED,
data={"order_id": order_id, "tracking_number": "SF123456789"},
timestamp=datetime.now(),
correlation_id=correlation_id
))
logger.stop()
event_driven_demo()28.8 反模式与最佳实践
28.8.1 常见反模式
| 反模式 | 描述 | 后果 | 解决方案 |
|---|---|---|---|
| 订阅泄漏 | 未取消订阅 | 内存泄漏 | 使用CompositeDisposable |
| 阻塞操作 | 在Observer中阻塞 | 性能下降 | 使用异步操作符 |
| 过度嵌套 | 多层flat_map | 代码难以理解 | 使用compose/pipe |
| 忽略错误 | 未处理on_error | 异常丢失 | 添加错误处理操作符 |
| 无限流 | 未限制数据流 | 资源耗尽 | 使用take、timeout |
| 冷热混淆 | 混淆冷热Observable | 意外行为 | 明确区分并正确使用 |
28.8.2 最佳实践
python
class ReactiveBestPractices:
@staticmethod
def use_composite_disposable():
composite = CompositeDisposable()
subject = Subject[int]()
composite.add(subject.subscribe(AnonymousObserver(
on_next=lambda x: print(f"订阅1: {x}")
)))
composite.add(subject.subscribe(AnonymousObserver(
on_next=lambda x: print(f"订阅2: {x}")
)))
subject.on_next(1)
subject.on_next(2)
composite.dispose()
print("所有订阅已取消")
@staticmethod
def handle_errors_properly():
subject = Subject[int]()
safe_observable = subject.pipe(
catch_op(lambda e: Observable.from_iterable([0])),
retry_op(count=3),
on_error_return_op(lambda e: -1)
)
safe_observable.subscribe(AnonymousObserver(
on_next=lambda x: print(f"安全值: {x}"),
on_error=lambda e: print(f"最终错误: {e}")
))
@staticmethod
def limit_infinite_streams():
def create_infinite():
i = 0
while True:
yield i
i += 1
from_iterable_observable = type(
'FromIterableObservable',
(Observable[int],),
{'subscribe': lambda self, obs: (
[obs.on_next(v) for v in create_infinite()[:10]],
obs.on_complete(),
EmptyDisposable()
)[2]}
)
print("使用take限制无限流")
class ObservableFactory:
@staticmethod
def from_iterable(iterable: Iterable[T]) -> Observable[T]:
class IterableObservable(Observable[T]):
def subscribe(self, observer: Observer[T]) -> Disposable:
try:
for value in iterable:
observer.on_next(value)
observer.on_complete()
except Exception as e:
observer.on_error(e)
return EmptyDisposable()
return IterableObservable()
@staticmethod
def range(start: int, count: int) -> Observable[int]:
return ObservableFactory.from_iterable(range(start, start + count))
@staticmethod
def just(value: T) -> Observable[T]:
return ObservableFactory.from_iterable([value])
@staticmethod
def empty() -> Observable[T]:
return ObservableFactory.from_iterable([])
@staticmethod
def error(error: Exception) -> Observable[T]:
class ErrorObservable(Observable[T]):
def subscribe(self, observer: Observer[T]) -> Disposable:
observer.on_error(error)
return EmptyDisposable()
return ErrorObservable()
@staticmethod
def never() -> Observable[T]:
class NeverObservable(Observable[T]):
def subscribe(self, observer: Observer[T]) -> Disposable:
return EmptyDisposable()
return NeverObservable()
@staticmethod
def interval(period: float, count: int = None) -> Observable[int]:
import time
import threading
class IntervalObservable(Observable[int]):
def subscribe(self, observer: Observer[int]) -> Disposable:
disposed = [False]
def run():
i = 0
while not disposed[0] and (count is None or i < count):
time.sleep(period)
if not disposed[0]:
observer.on_next(i)
i += 1
if not disposed[0]:
observer.on_complete()
thread = threading.Thread(target=run, daemon=True)
thread.start()
def dispose():
disposed[0] = True
return DisposableImpl(dispose)
return IntervalObservable()
Observable.from_iterable = staticmethod(ObservableFactory.from_iterable)
Observable.range = staticmethod(ObservableFactory.range)
Observable.just = staticmethod(ObservableFactory.just)
Observable.empty = staticmethod(ObservableFactory.empty)
Observable.error = staticmethod(ObservableFactory.error)
Observable.never = staticmethod(ObservableFactory.never)
Observable.interval = staticmethod(ObservableFactory.interval)28.9 决策指南
28.9.1 Observable类型选择
┌─────────────────────────────────────┐
│ 是否需要多播? │
└─────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 否 │ │ 是 │
└───────────────┘ └───────────────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Cold Observable│ │ Hot Observable │
│ (每次订阅独立) │ │ (多播给订阅者) │
└───────────────┘ └───────────────┘
│
┌───────────────┴───────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Subject │ │ multicast │
│ (手动控制) │ │ (自动多播) │
└───────────────┘ └───────────────┘28.9.2 Subject类型选择
| Subject类型 | 特点 | 适用场景 |
|---|---|---|
| Subject | 无初始值,无缓冲 | 简单事件广播 |
| BehaviorSubject | 有初始值,新订阅者收到当前值 | 状态管理 |
| ReplaySubject | 缓冲历史值 | 需要重放历史 |
| AsyncSubject | 只发射最后一个值 | 异步结果 |
28.9.3 操作符选择指南
| 需求 | 推荐操作符 | 说明 |
|---|---|---|
| 转换值 | map | 一对一映射 |
| 展平嵌套 | flat_map / switch_map | 一对多映射 |
| 过滤值 | filter | 条件过滤 |
| 取前N个 | take | 限制数量 |
| 去重 | distinct | 消除重复 |
| 防抖 | debounce | 用户输入 |
| 限流 | throttle | 速率限制 |
| 组合多个流 | merge / zip / combine_latest | 并行处理 |
| 错误恢复 | catch / retry | 容错处理 |
28.10 快速参考卡片
28.10.1 核心概念速查
┌─────────────────────────────────────────────────────────────────────────┐
│ 反应式编程核心概念速查表 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Observable生命周期 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ subscribe → on_next* → (on_error | on_complete) → unsubscribe │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ Observer契约 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 1. on_next可以调用零次或多次 │ │
│ │ 2. on_error或on_complete最多调用一次 │ │
│ │ 3. on_error/on_complete后不再调用其他方法 │ │
│ │ 4. 必须正确处理Disposable │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 冷 vs 热Observable │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Cold: 每次订阅独立执行,数据不共享 │ │
│ │ Hot: 多播数据给所有订阅者,共享执行 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ 背压策略 │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ BUFFER: 缓冲所有值直到溢出 │ │
│ │ DROP: 丢弃超出处理能力的值 │ │
│ │ LATEST: 只保留最新值 │ │
│ │ ERROR: 溢出时抛出异常 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘28.10.2 常用操作符速查
python
OBSERVABLE_CREATION = {
"just": "创建发射单个值的Observable",
"range": "创建发射整数序列的Observable",
"interval": "创建定时发射的Observable",
"from_iterable": "从可迭代对象创建Observable",
"empty": "创建立即完成的Observable",
"error": "创建立即错误的Observable",
"never": "创建永不发射的Observable",
}
TRANSFORMING_OPERATORS = {
"map": "一对一转换",
"flat_map": "一对多展平",
"switch_map": "切换到最新内部Observable",
"scan": "累积计算",
"group_by": "分组",
}
FILTERING_OPERATORS = {
"filter": "条件过滤",
"take": "取前N个",
"skip": "跳过前N个",
"distinct": "去重",
"debounce": "防抖",
"throttle": "限流",
"first": "取第一个",
"last": "取最后一个",
}
COMBINING_OPERATORS = {
"merge": "合并多个Observable",
"zip": "配对组合",
"combine_latest": "最新值组合",
"concat": "顺序连接",
"race": "竞争",
}
ERROR_HANDLING_OPERATORS = {
"catch": "捕获错误并切换",
"retry": "重试",
"on_error_return": "错误时返回默认值",
"retry_when": "条件重试",
}28.11 小结
28.11.1 核心要点
- Observable模式:定义数据流的生产和消费契约
- Observer契约:规范数据接收、错误处理和完成通知
- 操作符:声明式地处理数据流变换
- Subject:实现多播和状态共享
- 背压:控制数据流速,防止消费者过载
- 错误处理:优雅地处理和恢复错误
28.11.2 适用场景
| 适用 | 不适用 |
|---|---|
| 实时数据处理 | 简单同步操作 |
| 用户界面事件 | 批处理任务 |
| IoT数据流 | 低延迟要求 |
| 微服务通信 | 资源受限环境 |
| 事件驱动架构 | 简单CRUD应用 |
28.11.3 实施建议
- 理解契约:严格遵守Observer契约
- 管理订阅:使用CompositeDisposable防止泄漏
- 处理错误:始终添加错误处理操作符
- 选择策略:根据场景选择合适的背压策略
- 保持简单:避免过度嵌套,使用pipe组合操作符