第18章 备忘录模式
学习目标
完成本章学习后,读者将能够:
- 理解备忘录模式的数学基础与形式化定义
- 掌握状态保存与恢复的完整实现机制
- 设计并实现多种类型的备忘录系统(快照、增量、版本控制等)
- 应用备忘录模式实现撤销/重做、事务回滚、检查点等功能
- 识别备忘录模式的适用场景与反模式
18.1 形式化定义
18.1.1 数学定义
备忘录模式(Memento Pattern) 在不破坏封装性的前提下,捕获一个对象的内部状态,并在该对象之外保存这个状态。这样以后就可将该对象恢复到原先保存的状态。
从数学角度,备忘录模式可定义为状态转换系统:
$$\mathcal{M} = \langle S, s_0, \Sigma, \delta, \Omega \rangle$$
其中:
- $S$:对象状态集合(State Space)
- $s_0 \in S$:初始状态
- $\Sigma$:操作字母表(Operations)
- $\delta: S \times \Sigma \rightarrow S$:状态转换函数
- $\Omega \subseteq S$:备忘录存储(Memento Store)
备忘录操作的形式化定义:
$$\text{save}: S \rightarrow \Omega \times \Omega$$
$$\text{save}(s) = \Omega \cup {s}$$
$$\text{restore}: \Omega \rightarrow S$$
$$\text{restore}(\omega) = \omega$$
状态序列与时间线:
对于状态序列 $s_0, s_1, s_2, \ldots, s_n$,备忘录存储为:
$$\Omega = {s_{t_1}, s_{t_2}, \ldots, s_{t_k}} \subseteq {s_0, s_1, \ldots, s_n}$$
其中 $t_1 < t_2 < \ldots < t_k$ 为保存时间点。
18.1.2 类型理论视角
在类型理论中,备忘录可表示为:
$$\text{Memento}\langle T \rangle = { \text{state}: T, \text{timestamp}: \text{Time}, \text{metadata}: \text{Map}\langle \text{String}, \text{Any} \rangle }$$
备忘录接口:
$$\text{Originator} = { \text{save}: () \rightarrow \text{Memento}\langle T \rangle, \text{restore}: \text{Memento}\langle T \rangle \rightarrow \text{Unit} }$$
$$\text{Caretaker} = { \text{store}: \text{Memento}\langle T \rangle \rightarrow \text{Unit}, \text{retrieve}: \text{ID} \rightarrow \text{Memento}\langle T \rangle }$$
18.1.3 状态空间复杂度
| 存储策略 | 空间复杂度 | 恢复复杂度 | 适用场景 |
|---|---|---|---|
| 全量快照 | $O(n \cdot k)$ | $O(n)$ | 小对象,快速恢复 |
| 增量存储 | $O(\Delta n \cdot k)$ | $O(n)$ | 大对象,频繁保存 |
| 差异存储 | $O(\Delta n \cdot k)$ | $O(\Delta n)$ | 中等对象,高效恢复 |
| 压缩存储 | $O(n \cdot k / c)$ | $O(n \cdot c)$ | 存储受限场景 |
其中 $n$ 为状态大小,$k$ 为历史数量,$\Delta n$ 为状态变化量,$c$ 为压缩比。
18.2 历史背景与演进
18.2.1 发展历程
| 年代 | 里程碑 | 描述 |
|---|---|---|
| 1970年代 | 事务处理系统 | 数据库系统引入检查点机制 |
| 1980年代 | Smalltalk-80 | 引入Model-View-Controller,支持状态保存 |
| 1987年 | Apple HyperCard | 引入撤销功能的概念 |
| 1994年 | GoF设计模式 | 《设计模式》正式收录备忘录模式 |
| 1995年 | Java 1.0 | 引入序列化机制支持状态持久化 |
| 1999年 | J2EE | JTA事务管理器使用备忘录思想 |
| 2000年 | 版本控制系统 | CVS、SVN广泛应用状态快照 |
| 2005年 | Git | 引入高效增量存储算法 |
| 2010年 | 浏览器History API | HTML5引入历史状态管理 |
| 2015年 | Redux | 单向数据流中的状态快照 |
| 2020年 | 时间旅行调试 | 现代框架支持状态回溯 |
18.2.2 设计动机
备忘录模式的核心设计动机:
- 封装保护:在不暴露对象内部结构的情况下保存状态
- 状态恢复:提供可靠的状态恢复机制
- 历史管理:支持多版本状态存储与检索
- 撤销/重做:实现用户操作的撤销与重做功能
18.3 UML结构图
18.3.1 标准结构
┌─────────────────────────────────────────────────────────────────────┐
│ Memento Pattern Structure │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Originator │ │ Memento │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ - _state │────────>│ - _state │ │
│ ├─────────────────┤ creates ├─────────────────┤ │
│ │ + set_state() │ │ + get_state() │ │
│ │ + get_state() │ │ (narrow iface) │ │
│ │ + save() │ └─────────────────┘ │
│ │ + restore() │ ▲ │
│ └─────────────────┘ │ stores │
│ │ │
│ ┌─────────────────┐ │ │
│ │ Caretaker │───────────────────┘ │
│ ├─────────────────┤ │
│ │ - _mementos │ │
│ ├─────────────────┤ │
│ │ + backup() │ │
│ │ + undo() │ │
│ │ + redo() │ │
│ │ + get_history() │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘18.3.2 宽接口与窄接口
┌─────────────────────────────────────────────────────────────────────┐
│ Wide vs Narrow Interface │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【宽接口(Wide Interface)】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Originator 可以访问 Memento 的所有方法 │ │
│ │ │ │
│ │ class Memento: │ │
│ │ def get_state(self) -> State: # Originator使用 │ │
│ │ def set_state(self, state): # Originator使用 │ │
│ │ def get_metadata(self) -> Dict: # 附加信息 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【窄接口(Narrow Interface)】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Caretaker 只能访问有限的元数据方法 │ │
│ │ │ │
│ │ class Memento: │ │
│ │ def get_timestamp(self) -> datetime: # Caretaker使用 │ │
│ │ def get_id(self) -> str: # Caretaker使用 │ │
│ │ # get_state() 对 Caretaker 不可见 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ Python实现方式: │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 1. 使用私有属性命名约定 (_state) │ │
│ │ 2. 使用 __ 前缀实现名称修饰 │ │
│ │ 3. 使用属性访问控制 (@property) │ │
│ │ 4. 使用嵌套类(内部类) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘18.3.3 状态时间线
┌─────────────────────────────────────────────────────────────────────┐
│ State Timeline │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 时间轴: t0 ─── t1 ─── t2 ─── t3 ─── t4 ─── t5 ─── t6 │
│ │ │ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ ▼ ▼ │
│ 状态: S0 ──> S1 ──> S2 ──> S3 ──> S4 ──> S5 ──> S6 │
│ │ │ │ │ │
│ │ save │ save │ save │ save │
│ │ │ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ ▼ ▼ │
│ 备忘录: [S0] [S2] [S4] [S6] │
│ │ │ │ │ │
│ └─────────────┴─────────────┴─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Caretaker 存储: [S0, S2, S4, S6] │ │
│ │ │ │
│ │ undo(1) -> 恢复到 S4 │ │
│ │ undo(2) -> 恢复到 S2 │ │
│ │ undo(3) -> 恢复到 S0 │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘18.4 标准实现
18.4.1 基于ABC的备忘录框架
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import copy
import hashlib
import json
T = TypeVar('T')
class MementoType(Enum):
"""备忘录类型"""
FULL = auto()
INCREMENTAL = auto()
DIFF = auto()
COMPRESSED = auto()
@dataclass
class MementoMetadata:
"""备忘录元数据"""
id: str
type: MementoType
timestamp: datetime = field(default_factory=datetime.now)
description: str = ""
tags: List[str] = field(default_factory=list)
size: int = 0
checksum: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'type': self.type.name,
'timestamp': self.timestamp.isoformat(),
'description': self.description,
'tags': self.tags,
'size': self.size,
'checksum': self.checksum
}
class Memento(ABC, Generic[T]):
"""备忘录抽象基类"""
def __init__(self, state: T, metadata: Optional[MementoMetadata] = None):
self._state = state
self._metadata = metadata or self._create_metadata(state)
@property
def metadata(self) -> MementoMetadata:
return self._metadata
def get_state(self, requester: Any = None) -> T:
"""获取状态(宽接口)"""
return self._state
def _create_metadata(self, state: T) -> MementoMetadata:
"""创建元数据"""
state_str = str(state)
return MementoMetadata(
id=hashlib.md5(state_str.encode()).hexdigest()[:8],
type=MementoType.FULL,
size=len(state_str),
checksum=hashlib.sha256(state_str.encode()).hexdigest()[:16]
)
class Originator(ABC, Generic[T]):
"""原发器抽象基类"""
@abstractmethod
def save(self) -> Memento[T]:
"""保存当前状态"""
pass
@abstractmethod
def restore(self, memento: Memento[T]) -> None:
"""恢复到指定状态"""
pass
@abstractmethod
def get_current_state(self) -> T:
"""获取当前状态"""
pass
class Caretaker(Generic[T]):
"""管理者 - 管理备忘录历史"""
def __init__(self, max_history: int = 100):
self._history: List[Memento[T]] = []
self._redo_stack: List[Memento[T]] = []
self._max_history = max_history
def save(self, memento: Memento[T]) -> str:
"""保存备忘录"""
self._history.append(memento)
self._redo_stack.clear()
if len(self._history) > self._max_history:
self._history.pop(0)
return memento.metadata.id
def undo(self) -> Optional[Memento[T]]:
"""撤销"""
if len(self._history) <= 1:
return None
current = self._history.pop()
self._redo_stack.append(current)
return self._history[-1] if self._history else None
def redo(self) -> Optional[Memento[T]]:
"""重做"""
if not self._redo_stack:
return None
memento = self._redo_stack.pop()
self._history.append(memento)
return memento
def get_by_id(self, memento_id: str) -> Optional[Memento[T]]:
"""根据ID获取备忘录"""
for m in self._history:
if m.metadata.id == memento_id:
return m
return None
def get_history(self) -> List[MementoMetadata]:
"""获取历史记录"""
return [m.metadata for m in self._history]
def clear(self) -> None:
"""清空历史"""
self._history.clear()
self._redo_stack.clear()
def can_undo(self) -> bool:
return len(self._history) > 1
def can_redo(self) -> bool:
return len(self._redo_stack) > 0
@dataclass
class DocumentState:
"""文档状态"""
content: str
cursor_position: int
selection: tuple = (0, 0)
def __str__(self) -> str:
return f"Document(content='{self.content[:20]}...', cursor={self.cursor_position})"
class Document(Originator[DocumentState]):
"""文档原发器"""
def __init__(self, content: str = ""):
self._content = content
self._cursor_position = 0
self._selection = (0, 0)
def type(self, text: str) -> None:
self._content = (
self._content[:self._cursor_position] +
text +
self._content[self._cursor_position:]
)
self._cursor_position += len(text)
def delete(self) -> None:
if self._cursor_position > 0:
self._content = (
self._content[:self._cursor_position - 1] +
self._content[self._cursor_position:]
)
self._cursor_position -= 1
def set_cursor(self, position: int) -> None:
self._cursor_position = max(0, min(position, len(self._content)))
def get_content(self) -> str:
return self._content
def get_current_state(self) -> DocumentState:
return DocumentState(
content=self._content,
cursor_position=self._cursor_position,
selection=self._selection
)
def save(self) -> Memento[DocumentState]:
state = self.get_current_state()
metadata = MementoMetadata(
id=hashlib.md5(state.content.encode()).hexdigest()[:8],
type=MementoType.FULL,
description=f"Content length: {len(state.content)}"
)
return Memento(state, metadata)
def restore(self, memento: Memento[DocumentState]) -> None:
state = memento.get_state(self)
self._content = state.content
self._cursor_position = state.cursor_position
self._selection = state.selection
print("备忘录模式标准实现:")
document = Document()
caretaker = Caretaker[DocumentState]()
caretaker.save(document.save())
document.type("Hello")
caretaker.save(document.save())
document.type(" World")
caretaker.save(document.save())
document.type("!")
print(f" 当前内容: {document.get_content()}")
print(f" 历史记录: {len(caretaker.get_history())} 条")
print("\n 撤销:")
memento = caretaker.undo()
if memento:
document.restore(memento)
print(f" 内容: {document.get_content()}")
print("\n 重做:")
memento = caretaker.redo()
if memento:
document.restore(memento)
print(f" 内容: {document.get_content()}")18.4.2 增量备忘录
from typing import TypeVar, Generic, Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime
import copy
T = TypeVar('T')
@dataclass
class StateDiff:
"""状态差异"""
added: Dict[str, Any] = field(default_factory=dict)
removed: List[str] = field(default_factory=list)
modified: Dict[str, Any] = field(default_factory=dict)
def is_empty(self) -> bool:
return not (self.added or self.removed or self.modified)
def apply(self, base: Dict[str, Any]) -> Dict[str, Any]:
"""应用差异到基础状态"""
result = copy.deepcopy(base)
for key in self.removed:
result.pop(key, None)
for key, value in self.added.items():
result[key] = copy.deepcopy(value)
for key, value in self.modified.items():
result[key] = copy.deepcopy(value)
return result
class IncrementalMemento(Memento[Dict[str, Any]]):
"""增量备忘录"""
def __init__(
self,
state: Dict[str, Any],
base_id: Optional[str] = None,
diff: Optional[StateDiff] = None
):
self._base_id = base_id
self._diff = diff
super().__init__(state)
@property
def base_id(self) -> Optional[str]:
return self._base_id
@property
def diff(self) -> Optional[StateDiff]:
return self._diff
class IncrementalCaretaker(Caretaker[Dict[str, Any]]):
"""增量备忘录管理者"""
def __init__(self, max_history: int = 100):
super().__init__(max_history)
self._full_snapshots: Dict[str, IncrementalMemento] = {}
self._snapshot_interval = 5
def save(self, memento: IncrementalMemento) -> str:
"""保存备忘录(支持增量)"""
memento_id = memento.metadata.id
if len(self._history) % self._snapshot_interval == 0:
self._full_snapshots[memento_id] = memento
memento._base_id = None
memento._diff = None
else:
if self._history:
last = self._history[-1]
if isinstance(last, IncrementalMemento):
diff = self._compute_diff(
last.get_state(),
memento.get_state()
)
memento._base_id = last.metadata.id
memento._diff = diff
self._history.append(memento)
self._redo_stack.clear()
if len(self._history) > self._max_history:
removed = self._history.pop(0)
if removed.metadata.id in self._full_snapshots:
del self._full_snapshots[removed.metadata.id]
return memento_id
def restore_to(self, memento_id: str) -> Optional[Dict[str, Any]]:
"""恢复到指定状态(支持增量重建)"""
memento = self.get_by_id(memento_id)
if not memento:
return None
if isinstance(memento, IncrementalMemento):
if memento._base_id is None or memento._diff is None:
return memento.get_state()
return self._reconstruct_state(memento)
return memento.get_state()
def _compute_diff(
self,
old_state: Dict[str, Any],
new_state: Dict[str, Any]
) -> StateDiff:
"""计算状态差异"""
diff = StateDiff()
old_keys = set(old_state.keys())
new_keys = set(new_state.keys())
for key in new_keys - old_keys:
diff.added[key] = new_state[key]
for key in old_keys - new_keys:
diff.removed.append(key)
for key in old_keys & new_keys:
if old_state[key] != new_state[key]:
diff.modified[key] = new_state[key]
return diff
def _reconstruct_state(
self,
target_memento: IncrementalMemento
) -> Dict[str, Any]:
"""重建状态"""
chain: List[IncrementalMemento] = [target_memento]
current = target_memento
while current._base_id:
base = self.get_by_id(current._base_id)
if base and isinstance(base, IncrementalMemento):
chain.append(base)
if base._base_id is None:
break
current = base
else:
break
chain.reverse()
state = {}
for memento in chain:
if memento._diff:
state = memento._diff.apply(state)
else:
state = copy.deepcopy(memento.get_state())
return state
def get_storage_stats(self) -> Dict[str, Any]:
"""获取存储统计"""
full_count = len(self._full_snapshots)
incremental_count = len(self._history) - full_count
return {
'total_mementos': len(self._history),
'full_snapshots': full_count,
'incremental_mementos': incremental_count,
'snapshot_interval': self._snapshot_interval
}
print("\n增量备忘录示例:")
caretaker = IncrementalCaretaker()
states = [
{'name': 'Alice', 'age': 25, 'city': 'Beijing'},
{'name': 'Alice', 'age': 26, 'city': 'Beijing'},
{'name': 'Alice', 'age': 26, 'city': 'Shanghai'},
{'name': 'Bob', 'age': 26, 'city': 'Shanghai'},
{'name': 'Bob', 'age': 30, 'city': 'Shanghai'},
{'name': 'Bob', 'age': 30, 'city': 'Guangzhou'},
]
for i, state in enumerate(states):
memento = IncrementalMemento(state)
caretaker.save(memento)
print(f" 保存状态 {i+1}: {state}")
print(f"\n 存储统计: {caretaker.get_storage_stats()}")18.4.3 版本控制备忘录
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import hashlib
class VersionStatus(Enum):
"""版本状态"""
ACTIVE = auto()
ARCHIVED = auto()
DELETED = auto()
@dataclass
class Version:
"""版本信息"""
id: str
parent_id: Optional[str]
message: str
author: str
timestamp: datetime = field(default_factory=datetime.now)
status: VersionStatus = VersionStatus.ACTIVE
tags: List[str] = field(default_factory=list)
def __hash__(self) -> int:
return hash(self.id)
@dataclass
class VersionedState:
"""版本化状态"""
version: Version
state: Dict[str, Any]
checksum: str
class VersionControlCaretaker:
"""版本控制备忘录管理者"""
def __init__(self):
self._versions: Dict[str, VersionedState] = {}
self._branches: Dict[str, str] = {'main': None}
self._current_branch = 'main'
self._head: Optional[str] = None
self._tags: Dict[str, str] = {}
def commit(
self,
state: Dict[str, Any],
message: str,
author: str = "system"
) -> str:
"""创建新版本"""
version_id = self._generate_id(state)
version = Version(
id=version_id,
parent_id=self._head,
message=message,
author=author
)
state_str = str(sorted(state.items()))
checksum = hashlib.sha256(state_str.encode()).hexdigest()
self._versions[version_id] = VersionedState(
version=version,
state=state,
checksum=checksum
)
self._head = version_id
self._branches[self._current_branch] = version_id
return version_id
def checkout(self, version_id: str) -> Optional[Dict[str, Any]]:
"""检出到指定版本"""
if version_id not in self._versions:
return None
self._head = version_id
self._branches[self._current_branch] = version_id
return self._versions[version_id].state.copy()
def create_branch(self, name: str) -> bool:
"""创建分支"""
if name in self._branches:
return False
self._branches[name] = self._head
return True
def switch_branch(self, name: str) -> bool:
"""切换分支"""
if name not in self._branches:
return False
self._current_branch = name
self._head = self._branches[name]
return True
def merge(self, source_branch: str) -> Optional[Dict[str, Any]]:
"""合并分支"""
if source_branch not in self._branches:
return None
source_head = self._branches[source_branch]
if not source_head:
return None
source_state = self._versions[source_head].state
current_state = self.get_current_state() or {}
merged = {**current_state, **source_state}
return merged
def tag(self, name: str, version_id: Optional[str] = None) -> bool:
"""创建标签"""
target_id = version_id or self._head
if target_id not in self._versions:
return False
self._tags[name] = target_id
self._versions[target_id].version.tags.append(name)
return True
def get_version(self, version_id: str) -> Optional[VersionedState]:
"""获取版本"""
return self._versions.get(version_id)
def get_current_state(self) -> Optional[Dict[str, Any]]:
"""获取当前状态"""
if not self._head:
return None
return self._versions[self._head].state.copy()
def get_history(self, limit: int = 10) -> List[Version]:
"""获取历史"""
history = []
current_id = self._head
while current_id and len(history) < limit:
if current_id in self._versions:
history.append(self._versions[current_id].version)
current_id = self._versions[current_id].version.parent_id
else:
break
return history
def get_branches(self) -> Dict[str, Optional[str]]:
"""获取所有分支"""
return self._branches.copy()
def get_tags(self) -> Dict[str, str]:
"""获取所有标签"""
return self._tags.copy()
def _generate_id(self, state: Dict[str, Any]) -> str:
"""生成版本ID"""
content = f"{datetime.now().isoformat()}{str(state)}"
return hashlib.sha256(content.encode()).hexdigest()[:8]
print("\n版本控制备忘录示例:")
vc = VersionControlCaretaker()
vc.commit({'version': '1.0', 'feature': 'basic'}, "初始版本")
vc.commit({'version': '1.1', 'feature': 'basic', 'bugfix': 'true'}, "修复bug")
vc.tag("v1.0")
vc.create_branch("develop")
vc.switch_branch("develop")
vc.commit({'version': '2.0-dev', 'feature': 'advanced'}, "开发新功能")
vc.commit({'version': '2.0-dev', 'feature': 'advanced', 'test': 'added'}, "添加测试")
print(" 当前分支:", vc._current_branch)
print(" 当前状态:", vc.get_current_state())
print("\n 切换到main分支:")
vc.switch_branch("main")
print(" 当前状态:", vc.get_current_state())
print("\n 历史记录:")
for v in vc.get_history():
print(f" {v.id[:6]}: {v.message}")
print("\n 标签:", vc.get_tags())18.5 企业级应用示例
18.5.1 文本编辑器撤销/重做系统
from typing import List, Optional, Dict, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import copy
class OperationType(Enum):
"""操作类型"""
INSERT = auto()
DELETE = auto()
REPLACE = auto()
MOVE_CURSOR = auto()
@dataclass
class TextOperation:
"""文本操作"""
type: OperationType
position: int
content: str = ""
deleted_content: str = ""
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class EditorSnapshot:
"""编辑器快照"""
content: str
cursor_position: int
selection_start: int
selection_end: int
operations: List[TextOperation] = field(default_factory=list)
def copy(self) -> 'EditorSnapshot':
return EditorSnapshot(
content=self.content,
cursor_position=self.cursor_position,
selection_start=self.selection_start,
selection_end=self.selection_end,
operations=self.operations.copy()
)
class TextEditorMemento:
"""文本编辑器备忘录"""
def __init__(self, snapshot: EditorSnapshot, operation: Optional[TextOperation] = None):
self._snapshot = snapshot
self._operation = operation
self._timestamp = datetime.now()
@property
def snapshot(self) -> EditorSnapshot:
return self._snapshot
@property
def operation(self) -> Optional[TextOperation]:
return self._operation
@property
def timestamp(self) -> datetime:
return self._timestamp
class TextEditor:
"""文本编辑器"""
def __init__(self):
self._content = ""
self._cursor_position = 0
self._selection_start = 0
self._selection_end = 0
self._last_operation: Optional[TextOperation] = None
def type(self, text: str) -> TextOperation:
"""输入文本"""
old_content = self._content
if self._selection_start != self._selection_end:
self._delete_selection()
self._content = (
self._content[:self._cursor_position] +
text +
self._content[self._cursor_position:]
)
operation = TextOperation(
type=OperationType.INSERT,
position=self._cursor_position,
content=text
)
self._cursor_position += len(text)
self._last_operation = operation
return operation
def delete(self) -> Optional[TextOperation]:
"""删除字符"""
if self._selection_start != self._selection_end:
deleted = self._delete_selection()
operation = TextOperation(
type=OperationType.DELETE,
position=self._selection_start,
deleted_content=deleted
)
elif self._cursor_position > 0:
deleted = self._content[self._cursor_position - 1]
self._content = (
self._content[:self._cursor_position - 1] +
self._content[self._cursor_position:]
)
self._cursor_position -= 1
operation = TextOperation(
type=OperationType.DELETE,
position=self._cursor_position,
deleted_content=deleted
)
else:
return None
self._last_operation = operation
return operation
def backspace(self) -> Optional[TextOperation]:
"""退格"""
return self.delete()
def set_cursor(self, position: int) -> TextOperation:
"""设置光标位置"""
self._cursor_position = max(0, min(position, len(self._content)))
self._selection_start = self._cursor_position
self._selection_end = self._cursor_position
operation = TextOperation(
type=OperationType.MOVE_CURSOR,
position=self._cursor_position
)
self._last_operation = operation
return operation
def select(self, start: int, end: int) -> None:
"""选择文本"""
self._selection_start = max(0, min(start, len(self._content)))
self._selection_end = max(0, min(end, len(self._content)))
def get_content(self) -> str:
return self._content
def get_cursor_position(self) -> int:
return self._cursor_position
def get_selection(self) -> tuple:
return (self._selection_start, self._selection_end)
def save(self) -> TextEditorMemento:
"""保存状态"""
snapshot = EditorSnapshot(
content=self._content,
cursor_position=self._cursor_position,
selection_start=self._selection_start,
selection_end=self._selection_end
)
return TextEditorMemento(snapshot, self._last_operation)
def restore(self, memento: TextEditorMemento) -> None:
"""恢复状态"""
snapshot = memento.snapshot
self._content = snapshot.content
self._cursor_position = snapshot.cursor_position
self._selection_start = snapshot.selection_start
self._selection_end = snapshot.selection_end
def _delete_selection(self) -> str:
"""删除选中内容"""
start = min(self._selection_start, self._selection_end)
end = max(self._selection_start, self._selection_end)
deleted = self._content[start:end]
self._content = self._content[:start] + self._content[end:]
self._cursor_position = start
self._selection_start = start
self._selection_end = start
return deleted
class EditorHistory:
"""编辑器历史管理"""
def __init__(self, editor: TextEditor, max_history: int = 100):
self._editor = editor
self._max_history = max_history
self._undo_stack: List[TextEditorMemento] = []
self._redo_stack: List[TextEditorMemento] = []
self._save_threshold = 10
self._operation_count = 0
def record_operation(self, operation: Optional[TextOperation] = None) -> None:
"""记录操作"""
self._operation_count += 1
if self._operation_count >= self._save_threshold:
self.save()
self._operation_count = 0
def save(self) -> None:
"""保存当前状态"""
memento = self._editor.save()
self._undo_stack.append(memento)
self._redo_stack.clear()
if len(self._undo_stack) > self._max_history:
self._undo_stack.pop(0)
def undo(self) -> bool:
"""撤销"""
if not self._undo_stack:
return False
current = self._undo_stack.pop()
self._redo_stack.append(current)
if self._undo_stack:
self._editor.restore(self._undo_stack[-1])
return True
self._undo_stack.append(current)
return False
def redo(self) -> bool:
"""重做"""
if not self._redo_stack:
return False
memento = self._redo_stack.pop()
self._undo_stack.append(memento)
self._editor.restore(memento)
return True
def can_undo(self) -> bool:
return len(self._undo_stack) > 1
def can_redo(self) -> bool:
return len(self._redo_stack) > 0
def get_history_size(self) -> tuple:
return (len(self._undo_stack), len(self._redo_stack))
print("\n文本编辑器撤销/重做系统示例:")
editor = TextEditor()
history = EditorHistory(editor)
history.save()
editor.type("Hello")
history.save()
editor.type(" World")
history.save()
editor.type("!")
print(f" 内容: '{editor.get_content()}'")
print(f" 历史大小: {history.get_history_size()}")
print("\n 撤销:")
history.undo()
print(f" 内容: '{editor.get_content()}'")
print("\n 再次撤销:")
history.undo()
print(f" 内容: '{editor.get_content()}'")
print("\n 重做:")
history.redo()
print(f" 内容: '{editor.get_content()}'")18.5.2 事务回滚系统
from typing import Dict, List, Optional, Any, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import copy
T = TypeVar('T')
class TransactionState(Enum):
"""事务状态"""
ACTIVE = auto()
COMMITTED = auto()
ROLLED_BACK = auto()
FAILED = auto()
@dataclass
class TransactionLog:
"""事务日志"""
transaction_id: str
operation: str
before_state: Dict[str, Any]
after_state: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class TransactionMemento:
"""事务备忘录"""
transaction_id: str
state: TransactionState
logs: List[TransactionLog] = field(default_factory=list)
checkpoint: Dict[str, Any] = field(default_factory=dict)
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
class TransactionManager:
"""事务管理器"""
def __init__(self):
self._active_transactions: Dict[str, TransactionMemento] = {}
self._completed_transactions: List[TransactionMemento] = []
self._data: Dict[str, Any] = {}
self._max_completed = 100
def begin(self) -> str:
"""开始事务"""
import uuid
transaction_id = str(uuid.uuid4())[:8]
memento = TransactionMemento(
transaction_id=transaction_id,
state=TransactionState.ACTIVE,
checkpoint=copy.deepcopy(self._data)
)
self._active_transactions[transaction_id] = memento
return transaction_id
def execute(
self,
transaction_id: str,
operation: Callable[[Dict[str, Any]], Dict[str, Any]]
) -> bool:
"""执行操作"""
if transaction_id not in self._active_transactions:
return False
memento = self._active_transactions[transaction_id]
if memento.state != TransactionState.ACTIVE:
return False
before_state = copy.deepcopy(self._data)
try:
result = operation(self._data)
log = TransactionLog(
transaction_id=transaction_id,
operation=operation.__name__,
before_state=before_state,
after_state=copy.deepcopy(self._data)
)
memento.logs.append(log)
return True
except Exception as e:
memento.state = TransactionState.FAILED
return False
def commit(self, transaction_id: str) -> bool:
"""提交事务"""
if transaction_id not in self._active_transactions:
return False
memento = self._active_transactions[transaction_id]
memento.state = TransactionState.COMMITTED
memento.end_time = datetime.now()
self._completed_transactions.append(memento)
del self._active_transactions[transaction_id]
self._trim_completed()
return True
def rollback(self, transaction_id: str) -> bool:
"""回滚事务"""
if transaction_id not in self._active_transactions:
return False
memento = self._active_transactions[transaction_id]
self._data = copy.deepcopy(memento.checkpoint)
memento.state = TransactionState.ROLLED_BACK
memento.end_time = datetime.now()
self._completed_transactions.append(memento)
del self._active_transactions[transaction_id]
self._trim_completed()
return True
def get_data(self) -> Dict[str, Any]:
return self._data.copy()
def set_data(self, key: str, value: Any) -> None:
self._data[key] = value
def get_active_transactions(self) -> List[str]:
return list(self._active_transactions.keys())
def get_transaction_status(self, transaction_id: str) -> Optional[TransactionState]:
if transaction_id in self._active_transactions:
return self._active_transactions[transaction_id].state
for m in self._completed_transactions:
if m.transaction_id == transaction_id:
return m.state
return None
def _trim_completed(self) -> None:
if len(self._completed_transactions) > self._max_completed:
self._completed_transactions = self._completed_transactions[-self._max_completed:]
class TransactionalOperation:
"""事务性操作"""
def __init__(self, manager: TransactionManager):
self._manager = manager
self._transaction_id: Optional[str] = None
def __enter__(self) -> 'TransactionalOperation':
self._transaction_id = self._manager.begin()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
if exc_type is not None:
self._manager.rollback(self._transaction_id)
else:
self._manager.commit(self._transaction_id)
return False
@property
def transaction_id(self) -> Optional[str]:
return self._transaction_id
print("\n事务回滚系统示例:")
tm = TransactionManager()
tm.set_data('balance', 1000)
tm.set_data('transactions', [])
print(f" 初始数据: {tm.get_data()}")
print("\n 执行成功事务:")
with TransactionalOperation(tm) as tx:
def deposit(data):
data['balance'] += 100
data['transactions'].append({'type': 'deposit', 'amount': 100})
tm.execute(tx.transaction_id, deposit)
print(f" 事务中: {tm.get_data()}")
print(f" 提交后: {tm.get_data()}")
print("\n 执行失败事务(自动回滚):")
try:
with TransactionalOperation(tm) as tx:
def withdraw(data):
data['balance'] -= 500
data['transactions'].append({'type': 'withdraw', 'amount': 500})
tm.execute(tx.transaction_id, withdraw)
print(f" 事务中: {tm.get_data()}")
raise Exception("模拟错误")
except:
pass
print(f" 回滚后: {tm.get_data()}")18.5.3 游戏存档系统
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import json
import hashlib
class SaveType(Enum):
"""存档类型"""
MANUAL = auto()
AUTO = auto()
QUICK = auto()
CHECKPOINT = auto()
@dataclass
class PlayerState:
"""玩家状态"""
name: str
level: int = 1
experience: int = 0
health: int = 100
max_health: int = 100
mana: int = 100
max_mana: int = 100
gold: int = 0
position: tuple = (0, 0, 0)
def to_dict(self) -> Dict[str, Any]:
return {
'name': self.name,
'level': self.level,
'experience': self.experience,
'health': self.health,
'max_health': self.max_health,
'mana': self.mana,
'max_mana': self.max_mana,
'gold': self.gold,
'position': list(self.position)
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'PlayerState':
return cls(
name=data['name'],
level=data['level'],
experience=data['experience'],
health=data['health'],
max_health=data['max_health'],
mana=data['mana'],
max_mana=data['max_mana'],
gold=data['gold'],
position=tuple(data['position'])
)
@dataclass
class WorldState:
"""世界状态"""
current_map: str = "start_area"
completed_quests: List[str] = field(default_factory=list)
unlocked_areas: Set[str] = field(default_factory=lambda: {"start_area"})
npc_states: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
'current_map': self.current_map,
'completed_quests': self.completed_quests,
'unlocked_areas': list(self.unlocked_areas),
'npc_states': self.npc_states
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WorldState':
return cls(
current_map=data['current_map'],
completed_quests=data['completed_quests'],
unlocked_areas=set(data['unlocked_areas']),
npc_states=data['npc_states']
)
@dataclass
class GameSnapshot:
"""游戏快照"""
player: PlayerState
world: WorldState
inventory: List[str]
playtime: float
save_type: SaveType
timestamp: datetime = field(default_factory=datetime.now)
description: str = ""
def to_dict(self) -> Dict[str, Any]:
return {
'player': self.player.to_dict(),
'world': self.world.to_dict(),
'inventory': self.inventory,
'playtime': self.playtime,
'save_type': self.save_type.name,
'timestamp': self.timestamp.isoformat(),
'description': self.description
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'GameSnapshot':
return cls(
player=PlayerState.from_dict(data['player']),
world=WorldState.from_dict(data['world']),
inventory=data['inventory'],
playtime=data['playtime'],
save_type=SaveType[data['save_type']],
timestamp=datetime.fromisoformat(data['timestamp']),
description=data.get('description', '')
)
class GameMemento:
"""游戏备忘录"""
def __init__(self, snapshot: GameSnapshot):
self._snapshot = snapshot
self._id = self._generate_id()
@property
def id(self) -> str:
return self._id
@property
def snapshot(self) -> GameSnapshot:
return self._snapshot
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
def to_dict(self) -> Dict[str, Any]:
return {
'id': self._id,
'snapshot': self._snapshot.to_dict()
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'GameMemento':
memento = cls(GameSnapshot.from_dict(data['snapshot']))
memento._id = data['id']
return memento
def _generate_id(self) -> str:
content = f"{self._snapshot.timestamp.isoformat()}{self._snapshot.player.name}"
return hashlib.md5(content.encode()).hexdigest()[:8]
class SaveManager:
"""存档管理器"""
def __init__(self, max_saves: int = 10, auto_save_interval: int = 300):
self._saves: Dict[str, GameMemento] = {}
self._quick_save: Optional[GameMemento] = None
self._auto_saves: List[GameMemento] = []
self._max_saves = max_saves
self._max_auto_saves = 3
self._auto_save_interval = auto_save_interval
def save(
self,
game_state: Any,
save_type: SaveType = SaveType.MANUAL,
description: str = ""
) -> str:
"""保存游戏"""
snapshot = self._create_snapshot(game_state, save_type, description)
memento = GameMemento(snapshot)
if save_type == SaveType.QUICK:
self._quick_save = memento
elif save_type == SaveType.AUTO:
self._auto_saves.append(memento)
if len(self._auto_saves) > self._max_auto_saves:
self._auto_saves.pop(0)
else:
self._saves[memento.id] = memento
if len(self._saves) > self._max_saves:
oldest = min(self._saves.values(), key=lambda m: m.snapshot.timestamp)
del self._saves[oldest.id]
return memento.id
def load(self, save_id: str) -> Optional[GameSnapshot]:
"""加载存档"""
if save_id == "quick" and self._quick_save:
return self._quick_save.snapshot
for auto_save in self._auto_saves:
if auto_save.id == save_id:
return auto_save.snapshot
if save_id in self._saves:
return self._saves[save_id].snapshot
return None
def delete(self, save_id: str) -> bool:
"""删除存档"""
if save_id in self._saves:
del self._saves[save_id]
return True
return False
def list_saves(self) -> List[Dict[str, Any]]:
"""列出所有存档"""
saves = []
for memento in self._saves.values():
saves.append({
'id': memento.id,
'type': memento.snapshot.save_type.name,
'timestamp': memento.snapshot.timestamp.isoformat(),
'description': memento.snapshot.description,
'player_name': memento.snapshot.player.name,
'level': memento.snapshot.player.level,
'playtime': memento.snapshot.playtime
})
return sorted(saves, key=lambda x: x['timestamp'], reverse=True)
def get_quick_save(self) -> Optional[GameSnapshot]:
"""获取快速存档"""
return self._quick_save.snapshot if self._quick_save else None
def get_auto_saves(self) -> List[GameSnapshot]:
"""获取自动存档"""
return [m.snapshot for m in self._auto_saves]
def _create_snapshot(
self,
game_state: Any,
save_type: SaveType,
description: str
) -> GameSnapshot:
"""创建快照"""
return GameSnapshot(
player=game_state.player,
world=game_state.world,
inventory=game_state.inventory.copy(),
playtime=game_state.playtime,
save_type=save_type,
description=description
)
class Game:
"""游戏类"""
def __init__(self, player_name: str):
self.player = PlayerState(name=player_name)
self.world = WorldState()
self.inventory: List[str] = []
self.playtime = 0.0
def gain_experience(self, amount: int) -> None:
self.player.experience += amount
while self.player.experience >= self.player.level * 100:
self.player.experience -= self.player.level * 100
self.player.level += 1
self.player.max_health += 10
self.player.health = self.player.max_health
def take_damage(self, damage: int) -> None:
self.player.health = max(0, self.player.health - damage)
def heal(self, amount: int) -> None:
self.player.health = min(self.player.max_health, self.player.health + amount)
def collect_item(self, item: str) -> None:
self.inventory.append(item)
def complete_quest(self, quest_id: str) -> None:
if quest_id not in self.world.completed_quests:
self.world.completed_quests.append(quest_id)
def move_to(self, map_name: str) -> None:
self.world.current_map = map_name
if map_name not in self.world.unlocked_areas:
self.world.unlocked_areas.add(map_name)
def get_status(self) -> str:
return (
f"[{self.player.name}] Lv.{self.player.level} "
f"HP:{self.player.health}/{self.player.max_health} "
f"MP:{self.player.mana}/{self.player.max_mana} "
f"Gold:{self.player.gold} "
f"Map:{self.world.current_map}"
)
print("\n游戏存档系统示例:")
game = Game("勇者")
save_manager = SaveManager()
print(f" 初始状态: {game.get_status()}")
save_id1 = save_manager.save(game, SaveType.MANUAL, "初始存档")
print(f" 保存存档: {save_id1}")
game.gain_experience(150)
game.collect_item("铁剑")
game.move_to("forest")
print(f" 升级后: {game.get_status()}")
save_id2 = save_manager.save(game, SaveType.MANUAL, "森林探索")
print(f" 保存存档: {save_id2}")
game.take_damage(50)
game.collect_item("药草")
print(f" 受伤后: {game.get_status()}")
print("\n 加载存档:")
snapshot = save_manager.load(save_id1)
if snapshot:
game.player = snapshot.player
game.world = snapshot.world
game.inventory = snapshot.inventory
print(f" 加载后: {game.get_status()}")
print("\n 存档列表:")
for save in save_manager.list_saves():
print(f" {save['id']}: {save['description']} - Lv.{save['level']}")18.6 模式变体
18.6.1 序列化备忘录
from typing import Dict, Any, Optional, TypeVar, Generic
from dataclasses import dataclass, asdict
from datetime import datetime
import json
import pickle
import base64
T = TypeVar('T')
class SerializationFormat:
"""序列化格式"""
JSON = 'json'
PICKLE = 'pickle'
BASE64 = 'base64'
@dataclass
class SerializedMemento:
"""序列化备忘录"""
data: str
format: str
checksum: str
timestamp: datetime
metadata: Dict[str, Any]
@classmethod
def create(
cls,
obj: Any,
format: str = SerializationFormat.JSON,
metadata: Optional[Dict[str, Any]] = None
) -> 'SerializedMemento':
"""创建序列化备忘录"""
import hashlib
if format == SerializationFormat.JSON:
if hasattr(obj, 'to_dict'):
data_dict = obj.to_dict()
elif hasattr(obj, '__dict__'):
data_dict = obj.__dict__
else:
data_dict = {'value': obj}
data = json.dumps(data_dict, default=str, ensure_ascii=False)
elif format == SerializationFormat.PICKLE:
data = base64.b64encode(pickle.dumps(obj)).decode('utf-8')
elif format == SerializationFormat.BASE64:
data = base64.b64encode(str(obj).encode()).decode('utf-8')
else:
raise ValueError(f"Unsupported format: {format}")
checksum = hashlib.sha256(data.encode()).hexdigest()[:16]
return cls(
data=data,
format=format,
checksum=checksum,
timestamp=datetime.now(),
metadata=metadata or {}
)
def restore(self, target_class: Optional[type] = None) -> Any:
"""恢复对象"""
if self.format == SerializationFormat.JSON:
data_dict = json.loads(self.data)
if target_class and hasattr(target_class, 'from_dict'):
return target_class.from_dict(data_dict)
elif target_class:
obj = target_class.__new__(target_class)
obj.__dict__.update(data_dict)
return obj
return data_dict
elif self.format == SerializationFormat.PICKLE:
return pickle.loads(base64.b64decode(self.data))
elif self.format == SerializationFormat.BASE64:
return base64.b64decode(self.data).decode()
def verify(self) -> bool:
"""验证完整性"""
import hashlib
expected = hashlib.sha256(self.data.encode()).hexdigest()[:16]
return self.checksum == expected
print("\n序列化备忘录示例:")
@dataclass
class Config:
theme: str
language: str
font_size: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Config':
return cls(**data)
config = Config(theme="dark", language="zh-CN", font_size=14)
memento = SerializedMemento.create(config, SerializationFormat.JSON, {'version': '1.0'})
print(f" 序列化数据: {memento.data[:50]}...")
print(f" 校验和: {memento.checksum}")
print(f" 验证: {memento.verify()}")
restored = memento.restore(Config)
print(f" 恢复对象: {restored}")18.6.2 检查点备忘录
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
class CheckpointType(Enum):
"""检查点类型"""
MANUAL = auto()
AUTO = auto()
ERROR = auto()
MILESTONE = auto()
@dataclass
class Checkpoint:
"""检查点"""
id: str
type: CheckpointType
state: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
description: str = ""
parent_id: Optional[str] = None
class CheckpointManager:
"""检查点管理器"""
def __init__(self, max_checkpoints: int = 20):
self._checkpoints: Dict[str, Checkpoint] = {}
self._current_id: Optional[str] = None
self._max_checkpoints = max_checkpoints
self._auto_checkpoint_interval = 5
self._operation_count = 0
def create_checkpoint(
self,
state: Dict[str, Any],
checkpoint_type: CheckpointType = CheckpointType.MANUAL,
description: str = ""
) -> str:
"""创建检查点"""
import uuid
checkpoint_id = str(uuid.uuid4())[:8]
checkpoint = Checkpoint(
id=checkpoint_id,
type=checkpoint_type,
state=state.copy(),
description=description,
parent_id=self._current_id
)
self._checkpoints[checkpoint_id] = checkpoint
self._current_id = checkpoint_id
self._trim_checkpoints()
return checkpoint_id
def auto_checkpoint(self, state: Dict[str, Any]) -> Optional[str]:
"""自动检查点"""
self._operation_count += 1
if self._operation_count >= self._auto_checkpoint_interval:
self._operation_count = 0
return self.create_checkpoint(
state,
CheckpointType.AUTO,
f"Auto checkpoint at {datetime.now().isoformat()}"
)
return None
def restore_to_checkpoint(self, checkpoint_id: str) -> Optional[Dict[str, Any]]:
"""恢复到检查点"""
if checkpoint_id not in self._checkpoints:
return None
checkpoint = self._checkpoints[checkpoint_id]
self._current_id = checkpoint_id
return checkpoint.state.copy()
def restore_to_last_milestone(self) -> Optional[Dict[str, Any]]:
"""恢复到上一个里程碑"""
current = self._current_id
while current:
checkpoint = self._checkpoints.get(current)
if not checkpoint:
break
if checkpoint.type == CheckpointType.MILESTONE:
return checkpoint.state.copy()
current = checkpoint.parent_id
return None
def get_checkpoint_chain(self) -> List[Checkpoint]:
"""获取检查点链"""
chain = []
current = self._current_id
while current:
checkpoint = self._checkpoints.get(current)
if not checkpoint:
break
chain.append(checkpoint)
current = checkpoint.parent_id
return chain
def list_checkpoints(self) -> List[Dict[str, Any]]:
"""列出所有检查点"""
return [
{
'id': cp.id,
'type': cp.type.name,
'timestamp': cp.timestamp.isoformat(),
'description': cp.description
}
for cp in self._checkpoints.values()
]
def _trim_checkpoints(self) -> None:
"""修剪检查点"""
if len(self._checkpoints) <= self._max_checkpoints:
return
auto_checkpoints = [
cp for cp in self._checkpoints.values()
if cp.type == CheckpointType.AUTO
]
auto_checkpoints.sort(key=lambda cp: cp.timestamp)
for cp in auto_checkpoints[:len(auto_checkpoints) // 2]:
del self._checkpoints[cp.id]
print("\n检查点备忘录示例:")
cp_manager = CheckpointManager()
state = {'step': 0, 'data': []}
cp1 = cp_manager.create_checkpoint(state, CheckpointType.MILESTONE, "开始")
state['step'] = 1
state['data'].append('A')
cp_manager.auto_checkpoint(state)
state['step'] = 2
state['data'].append('B')
cp2 = cp_manager.create_checkpoint(state, CheckpointType.MANUAL, "中间状态")
state['step'] = 3
state['data'].append('C')
print(" 当前状态:", state)
print("\n 检查点链:")
for cp in cp_manager.get_checkpoint_chain():
print(f" {cp.id}: {cp.type.name} - {cp.description}")
print("\n 恢复到上一个里程碑:")
restored = cp_manager.restore_to_last_milestone()
print(" 恢复后状态:", restored)18.6.3 时间旅行备忘录
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum, auto
import heapq
class TimeDirection(Enum):
"""时间方向"""
FORWARD = auto()
BACKWARD = auto()
@dataclass
class TimePoint:
"""时间点"""
timestamp: datetime
state: Dict[str, Any]
event: str = ""
def __lt__(self, other: 'TimePoint') -> bool:
return self.timestamp < other.timestamp
class TimeTravelMemento:
"""时间旅行备忘录"""
def __init__(self, max_points: int = 100):
self._timeline: List[TimePoint] = []
self._current_index: int = -1
self._max_points = max_points
self._current_time: datetime = datetime.now()
def record(self, state: Dict[str, Any], event: str = "") -> None:
"""记录状态"""
now = datetime.now()
if self._current_index < len(self._timeline) - 1:
self._timeline = self._timeline[:self._current_index + 1]
time_point = TimePoint(
timestamp=now,
state=state.copy(),
event=event
)
self._timeline.append(time_point)
self._current_index = len(self._timeline) - 1
if len(self._timeline) > self._max_points:
self._timeline.pop(0)
self._current_index -= 1
def travel_to(self, target_time: datetime) -> Optional[Dict[str, Any]]:
"""穿越到指定时间"""
for i, point in enumerate(self._timeline):
if point.timestamp >= target_time:
self._current_index = i
return point.state.copy()
if self._timeline:
self._current_index = len(self._timeline) - 1
return self._timeline[-1].state.copy()
return None
def travel_by(self, delta: timedelta, direction: TimeDirection) -> Optional[Dict[str, Any]]:
"""按时间差穿越"""
if direction == TimeDirection.BACKWARD:
if self._current_index > 0:
self._current_index -= 1
return self._timeline[self._current_index].state.copy()
else:
if self._current_index < len(self._timeline) - 1:
self._current_index += 1
return self._timeline[self._current_index].state.copy()
return None
def undo(self) -> Optional[Dict[str, Any]]:
"""撤销(向后穿越)"""
return self.travel_by(timedelta(), TimeDirection.BACKWARD)
def redo(self) -> Optional[Dict[str, Any]]:
"""重做(向前穿越)"""
return self.travel_by(timedelta(), TimeDirection.FORWARD)
def get_current_state(self) -> Optional[Dict[str, Any]]:
"""获取当前状态"""
if 0 <= self._current_index < len(self._timeline):
return self._timeline[self._current_index].state.copy()
return None
def get_timeline(self) -> List[Tuple[datetime, str]]:
"""获取时间线"""
return [
(point.timestamp, point.event)
for point in self._timeline
]
def can_undo(self) -> bool:
return self._current_index > 0
def can_redo(self) -> bool:
return self._current_index < len(self._timeline) - 1
def get_current_position(self) -> Tuple[int, int]:
"""获取当前位置 (当前索引, 总数)"""
return (self._current_index + 1, len(self._timeline))
print("\n时间旅行备忘录示例:")
tt_memento = TimeTravelMemento()
state = {'value': 0}
tt_memento.record(state, "初始化")
state['value'] = 10
tt_memento.record(state, "设置为10")
state['value'] = 20
tt_memento.record(state, "设置为20")
state['value'] = 30
tt_memento.record(state, "设置为30")
print(" 当前状态:", tt_memento.get_current_state())
print(" 位置:", tt_memento.get_current_position())
print("\n 时间线:")
for ts, event in tt_memento.get_timeline():
print(f" {ts.strftime('%H:%M:%S')}: {event}")
print("\n 向后穿越:")
for _ in range(2):
state = tt_memento.undo()
print(f" 状态: {state}")
print("\n 向前穿越:")
state = tt_memento.redo()
print(f" 状态: {state}")18.7 反模式与最佳实践
18.7.1 常见反模式
from typing import Dict, Any, Optional
from dataclasses import dataclass
class MementoAntiPatterns:
"""备忘录反模式示例"""
@staticmethod
def anti_pattern_1_exposed_state():
"""反模式1:暴露内部状态"""
class BadOriginator:
def __init__(self):
self._internal_data = {'sensitive': 'data'}
def get_state(self) -> Dict[str, Any]:
return self._internal_data
def set_state(self, state: Dict[str, Any]) -> None:
self._internal_data = state
print(" 问题: 直接暴露内部状态,破坏封装")
print(" 解决: 使用备忘录对象封装状态")
@staticmethod
def anti_pattern_2_deep_copy_abuse():
"""反模式2:滥用深拷贝"""
import copy
class LargeObject:
def __init__(self):
self.data = list(range(1000000))
class BadCaretaker:
def __init__(self):
self._states = []
def save(self, obj: LargeObject) -> None:
self._states.append(copy.deepcopy(obj))
print(" 问题: 对大对象频繁深拷贝,内存消耗大")
print(" 解决: 使用增量存储或压缩")
@staticmethod
def anti_pattern_3_unlimited_history():
"""反模式3:无限历史记录"""
class BadHistory:
def __init__(self):
self._history = []
def save(self, state: Any) -> None:
self._history.append(state)
print(" 问题: 历史记录无限增长,内存溢出")
print(" 解决: 设置最大历史数量,定期清理")
@staticmethod
def anti_pattern_4_caretaker_manipulation():
"""反模式4:管理者篡改状态"""
class BadCaretaker:
def __init__(self):
self._memento = None
def save(self, memento: Any) -> None:
self._memento = memento
def modify_and_restore(self) -> Any:
if self._memento:
self._memento['modified'] = True
return self._memento
print(" 问题: 管理者篡改备忘录内容")
print(" 解决: 使用不可变备忘录或窄接口")
print("\n备忘录反模式示例:")
print(" 反模式1 - 暴露内部状态:")
MementoAntiPatterns.anti_pattern_1_exposed_state()
print("\n 反模式2 - 滥用深拷贝:")
MementoAntiPatterns.anti_pattern_2_deep_copy_abuse()
print("\n 反模式3 - 无限历史记录:")
MementoAntiPatterns.anti_pattern_3_unlimited_history()
print("\n 反模式4 - 管理者篡改状态:")
MementoAntiPatterns.anti_pattern_4_caretaker_manipulation()18.7.2 最佳实践
from typing import Dict, List, Any, Optional, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
from copy import deepcopy
import json
T = TypeVar('T')
class MementoBestPractices:
"""备忘录最佳实践"""
@staticmethod
def practice_1_immutable_memento():
"""实践1:不可变备忘录"""
@dataclass(frozen=True)
class ImmutableMemento:
state: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
def get_state(self) -> Dict[str, Any]:
return deepcopy(self.state)
print(" 使用不可变备忘录防止意外修改")
@staticmethod
def practice_2_narrow_interface():
"""实践2:窄接口"""
class Memento:
def __init__(self, state: Dict[str, Any]):
self._state = state
self._timestamp = datetime.now()
self._id = id(self)
def get_state(self, originator: Any) -> Dict[str, Any]:
if not isinstance(originator, Originator):
raise PermissionError("只有原发器可以访问状态")
return self._state.copy()
@property
def timestamp(self) -> datetime:
return self._timestamp
@property
def id(self) -> int:
return self._id
class Originator:
pass
print(" 使用窄接口限制管理者访问")
@staticmethod
def practice_3_incremental_storage():
"""实践3:增量存储"""
class IncrementalMemento:
def __init__(self):
self._base_state: Optional[Dict[str, Any]] = None
self._deltas: List[Dict[str, Any]] = []
def save(self, state: Dict[str, Any]) -> None:
if self._base_state is None:
self._base_state = state.copy()
else:
delta = self._compute_delta(self._base_state, state)
self._deltas.append(delta)
def _compute_delta(
self,
old: Dict[str, Any],
new: Dict[str, Any]
) -> Dict[str, Any]:
delta = {}
for key in set(old.keys()) | set(new.keys()):
if key not in old:
delta[key] = {'op': 'add', 'value': new[key]}
elif key not in new:
delta[key] = {'op': 'remove'}
elif old[key] != new[key]:
delta[key] = {'op': 'modify', 'value': new[key]}
return delta
print(" 使用增量存储减少内存消耗")
@staticmethod
def practice_4_serialization():
"""实践4:序列化支持"""
class SerializableMemento:
def __init__(self, state: Dict[str, Any]):
self._state = state
self._timestamp = datetime.now().isoformat()
def to_json(self) -> str:
return json.dumps({
'state': self._state,
'timestamp': self._timestamp
})
@classmethod
def from_json(cls, json_str: str) -> 'SerializableMemento':
data = json.loads(json_str)
return cls(data['state'])
print(" 支持序列化实现持久化存储")
print("\n备忘录最佳实践示例:")
print(" 实践1 - 不可变备忘录:")
MementoBestPractices.practice_1_immutable_memento()
print("\n 实践2 - 窄接口:")
MementoBestPractices.practice_2_narrow_interface()
print("\n 实践3 - 增量存储:")
MementoBestPractices.practice_3_incremental_storage()
print("\n 实践4 - 序列化支持:")
MementoBestPractices.practice_4_serialization()18.8 决策指南
18.8.1 存储策略选择决策树
┌─────────────────────────────────────────────────────────────────────┐
│ Storage Strategy Decision Tree │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 需要保存对象状态? │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ 不需要备忘录 │
│ │ 状态大小? │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────┼───────┐ │
│ │ │ │ │
│ 小 中等 大 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 全量 增量 压缩 │
│ 快照 存储 存储 │
│ │
│ 需要多少历史? │
│ │ │
│ ┌────────┴────────┐ │
│ │ │ │
│ 少量 大量 │
│ │ │ │
│ ▼ ▼ │
│ 简单列表 分页存储 │
│ 存储 + 索引 │
│ │
│ 是否需要持久化? │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 序列化存储 内存存储 │
│ (JSON/Pickle) │
│ │
└─────────────────────────────────────────────────────────────────────┘18.8.2 实现方式选择
| 场景 | 推荐实现 | 原因 |
|---|---|---|
| 文本编辑器撤销 | 全量快照 + 操作日志 | 状态较小,恢复快速 |
| 游戏存档 | 序列化快照 | 需要持久化,支持多存档 |
| 数据库事务 | 检查点 + 日志 | 支持回滚,保证ACID |
| 版本控制 | 增量存储 + 压缩 | 大文件,频繁保存 |
| 配置管理 | JSON序列化 | 可读性好,易于调试 |
| 调试器状态 | 深拷贝快照 | 需要完整状态 |
18.9 快速参考卡
18.9.1 核心概念速查
┌─────────────────────────────────────────────────────────────────────┐
│ Memento Pattern Quick Reference │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【定义】 │
│ 在不破坏封装性的前提下,捕获对象内部状态并在对象外保存 │
│ │
│ 【核心组件】 │
│ • Originator: 原发器,创建和恢复备忘录 │
│ • Memento: 备忘录,存储原发器状态 │
│ • Caretaker: 管理者,负责保管备忘录 │
│ │
│ 【接口设计】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 宽接口: Originator 可访问完整状态 │ │
│ │ 窄接口: Caretaker 只能访问元数据 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【存储策略】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 全量快照: 保存完整状态,恢复快 │ │
│ │ 增量存储: 只保存变化,节省空间 │ │
│ │ 压缩存储: 压缩后存储,适合大对象 │ │
│ │ 序列化: 支持持久化,跨进程传输 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【适用场景】 │
│ ✓ 需要保存和恢复对象状态 │
│ ✓ 需要实现撤销/重做功能 │
│ ✓ 需要保存对象的历史状态 │
│ ✓ 需要事务回滚机制 │
│ │
│ 【优点】 │
│ + 保持封装性 │
│ + 简化状态恢复 │
│ + 支持多版本历史 │
│ + 实现撤销/重做 │
│ │
│ 【缺点】 │
│ - 内存消耗大 │
│ - 管理复杂度高 │
│ - 可能影响性能 │
│ │
└─────────────────────────────────────────────────────────────────────┘18.9.2 代码模板速查
from typing import TypeVar, Generic, Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from copy import deepcopy
T = TypeVar('T')
@dataclass
class MementoTemplate(Generic[T]):
"""备忘录模板"""
state: T
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
class OriginatorTemplate(Generic[T]):
"""原发器模板"""
def __init__(self, initial_state: T):
self._state = initial_state
def get_state(self) -> T:
return self._state
def set_state(self, state: T) -> None:
self._state = state
def save(self) -> MementoTemplate[T]:
return MementoTemplate(state=deepcopy(self._state))
def restore(self, memento: MementoTemplate[T]) -> None:
self._state = deepcopy(memento.state)
class CaretakerTemplate(Generic[T]):
"""管理者模板"""
def __init__(self, max_history: int = 100):
self._history: List[MementoTemplate[T]] = []
self._redo_stack: List[MementoTemplate[T]] = []
self._max_history = max_history
def save(self, memento: MementoTemplate[T]) -> None:
self._history.append(memento)
self._redo_stack.clear()
if len(self._history) > self._max_history:
self._history.pop(0)
def undo(self) -> Optional[MementoTemplate[T]]:
if len(self._history) <= 1:
return None
current = self._history.pop()
self._redo_stack.append(current)
return self._history[-1]
def redo(self) -> Optional[MementoTemplate[T]]:
if not self._redo_stack:
return None
memento = self._redo_stack.pop()
self._history.append(memento)
return memento18.10 小结
备忘录模式提供了一种在不破坏封装性的情况下保存和恢复对象状态的机制。本章从形式化定义出发,深入探讨了备忘录的数学基础、状态空间复杂度和存储策略。
关键要点:
形式化理解:备忘录可形式化为状态转换系统 $\mathcal{M} = \langle S, s_0, \Sigma, \delta, \Omega \rangle$,支持状态保存与恢复操作。
接口设计:宽接口供原发器使用,窄接口供管理者使用,确保封装性。
存储策略:全量快照、增量存储、差异存储、压缩存储等多种策略适应不同场景。
企业级应用:文本编辑器撤销/重做、事务回滚系统、游戏存档系统等实际应用。
模式变体:序列化备忘录、检查点备忘录、时间旅行备忘录等扩展实现。
备忘录模式与命令模式常结合使用,命令模式负责执行操作,备忘录模式负责保存状态。在实现撤销/重做功能时,这两种模式的组合尤为有效。