Skip to content

第18章 备忘录模式

学习目标

完成本章学习后,读者将能够:

  • 理解备忘录模式的数学基础与形式化定义
  • 掌握状态保存与恢复的完整实现机制
  • 设计并实现多种类型的备忘录系统(快照、增量、版本控制等)
  • 应用备忘录模式实现撤销/重做、事务回滚、检查点等功能
  • 识别备忘录模式的适用场景与反模式

18.1 形式化定义

18.1.1 数学定义

备忘录模式(Memento Pattern) 在不破坏封装性的前提下,捕获一个对象的内部状态,并在该对象之外保存这个状态。这样以后就可将该对象恢复到原先保存的状态。

从数学角度,备忘录模式可定义为状态转换系统:

$$\mathcal{M} = \langle S, s_0, \Sigma, \delta, \Omega \rangle$$

其中:

  • $S$:对象状态集合(State Space)
  • $s_0 \in S$:初始状态
  • $\Sigma$:操作字母表(Operations)
  • $\delta: S \times \Sigma \rightarrow S$:状态转换函数
  • $\Omega \subseteq S$:备忘录存储(Memento Store)

备忘录操作的形式化定义

$$\text{save}: S \rightarrow \Omega \times \Omega$$

$$\text{save}(s) = \Omega \cup {s}$$

$$\text{restore}: \Omega \rightarrow S$$

$$\text{restore}(\omega) = \omega$$

状态序列与时间线

对于状态序列 $s_0, s_1, s_2, \ldots, s_n$,备忘录存储为:

$$\Omega = {s_{t_1}, s_{t_2}, \ldots, s_{t_k}} \subseteq {s_0, s_1, \ldots, s_n}$$

其中 $t_1 < t_2 < \ldots < t_k$ 为保存时间点。

18.1.2 类型理论视角

在类型理论中,备忘录可表示为:

$$\text{Memento}\langle T \rangle = { \text{state}: T, \text{timestamp}: \text{Time}, \text{metadata}: \text{Map}\langle \text{String}, \text{Any} \rangle }$$

备忘录接口

$$\text{Originator} = { \text{save}: () \rightarrow \text{Memento}\langle T \rangle, \text{restore}: \text{Memento}\langle T \rangle \rightarrow \text{Unit} }$$

$$\text{Caretaker} = { \text{store}: \text{Memento}\langle T \rangle \rightarrow \text{Unit}, \text{retrieve}: \text{ID} \rightarrow \text{Memento}\langle T \rangle }$$

18.1.3 状态空间复杂度

存储策略空间复杂度恢复复杂度适用场景
全量快照$O(n \cdot k)$$O(n)$小对象,快速恢复
增量存储$O(\Delta n \cdot k)$$O(n)$大对象,频繁保存
差异存储$O(\Delta n \cdot k)$$O(\Delta n)$中等对象,高效恢复
压缩存储$O(n \cdot k / c)$$O(n \cdot c)$存储受限场景

其中 $n$ 为状态大小,$k$ 为历史数量,$\Delta n$ 为状态变化量,$c$ 为压缩比。


18.2 历史背景与演进

18.2.1 发展历程

年代里程碑描述
1970年代事务处理系统数据库系统引入检查点机制
1980年代Smalltalk-80引入Model-View-Controller,支持状态保存
1987年Apple HyperCard引入撤销功能的概念
1994年GoF设计模式《设计模式》正式收录备忘录模式
1995年Java 1.0引入序列化机制支持状态持久化
1999年J2EEJTA事务管理器使用备忘录思想
2000年版本控制系统CVS、SVN广泛应用状态快照
2005年Git引入高效增量存储算法
2010年浏览器History APIHTML5引入历史状态管理
2015年Redux单向数据流中的状态快照
2020年时间旅行调试现代框架支持状态回溯

18.2.2 设计动机

备忘录模式的核心设计动机:

  1. 封装保护:在不暴露对象内部结构的情况下保存状态
  2. 状态恢复:提供可靠的状态恢复机制
  3. 历史管理:支持多版本状态存储与检索
  4. 撤销/重做:实现用户操作的撤销与重做功能

18.3 UML结构图

18.3.1 标准结构

┌─────────────────────────────────────────────────────────────────────┐
│                       Memento Pattern Structure                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────┐         ┌─────────────────┐                   │
│  │    Originator   │         │    Memento      │                   │
│  ├─────────────────┤         ├─────────────────┤                   │
│  │ - _state        │────────>│ - _state        │                   │
│  ├─────────────────┤ creates ├─────────────────┤                   │
│  │ + set_state()   │         │ + get_state()   │                   │
│  │ + get_state()   │         │ (narrow iface)  │                   │
│  │ + save()        │         └─────────────────┘                   │
│  │ + restore()     │                   ▲                           │
│  └─────────────────┘                   │ stores                    │
│                                        │                            │
│  ┌─────────────────┐                   │                            │
│  │    Caretaker    │───────────────────┘                            │
│  ├─────────────────┤                                                │
│  │ - _mementos     │                                                │
│  ├─────────────────┤                                                │
│  │ + backup()      │                                                │
│  │ + undo()        │                                                │
│  │ + redo()        │                                                │
│  │ + get_history() │                                                │
│  └─────────────────┘                                                │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

18.3.2 宽接口与窄接口

┌─────────────────────────────────────────────────────────────────────┐
│                    Wide vs Narrow Interface                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  【宽接口(Wide Interface)】                                        │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  Originator 可以访问 Memento 的所有方法                       │   │
│  │                                                               │   │
│  │  class Memento:                                               │   │
│  │      def get_state(self) -> State:  # Originator使用         │   │
│  │      def set_state(self, state):    # Originator使用         │   │
│  │      def get_metadata(self) -> Dict: # 附加信息              │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【窄接口(Narrow Interface)】                                      │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  Caretaker 只能访问有限的元数据方法                            │   │
│  │                                                               │   │
│  │  class Memento:                                               │   │
│  │      def get_timestamp(self) -> datetime:  # Caretaker使用   │   │
│  │      def get_id(self) -> str:              # Caretaker使用   │   │
│  │      # get_state() 对 Caretaker 不可见                       │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  Python实现方式:                                                     │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  1. 使用私有属性命名约定 (_state)                              │   │
│  │  2. 使用 __ 前缀实现名称修饰                                   │   │
│  │  3. 使用属性访问控制 (@property)                               │   │
│  │  4. 使用嵌套类(内部类)                                       │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

18.3.3 状态时间线

┌─────────────────────────────────────────────────────────────────────┐
│                       State Timeline                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  时间轴:  t0 ─── t1 ─── t2 ─── t3 ─── t4 ─── t5 ─── t6             │
│           │      │      │      │      │      │      │               │
│           ▼      ▼      ▼      ▼      ▼      ▼      ▼               │
│  状态:   S0 ──> S1 ──> S2 ──> S3 ──> S4 ──> S5 ──> S6              │
│           │             │             │             │               │
│           │    save     │    save     │    save     │    save      │
│           │      │      │      │      │      │      │               │
│           ▼      ▼      ▼      ▼      ▼      ▼      ▼               │
│  备忘录: [S0]         [S2]         [S4]         [S6]              │
│           │             │             │             │               │
│           └─────────────┴─────────────┴─────────────┘               │
│                         │                                            │
│                         ▼                                            │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  Caretaker 存储: [S0, S2, S4, S6]                           │   │
│  │                                                              │   │
│  │  undo(1) -> 恢复到 S4                                        │   │
│  │  undo(2) -> 恢复到 S2                                        │   │
│  │  undo(3) -> 恢复到 S0                                        │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

18.4 标准实现

18.4.1 基于ABC的备忘录框架

python
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import copy
import hashlib
import json

T = TypeVar('T')

class MementoType(Enum):
    """备忘录类型"""
    FULL = auto()
    INCREMENTAL = auto()
    DIFF = auto()
    COMPRESSED = auto()

@dataclass
class MementoMetadata:
    """备忘录元数据"""
    id: str
    type: MementoType
    timestamp: datetime = field(default_factory=datetime.now)
    description: str = ""
    tags: List[str] = field(default_factory=list)
    size: int = 0
    checksum: str = ""
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'id': self.id,
            'type': self.type.name,
            'timestamp': self.timestamp.isoformat(),
            'description': self.description,
            'tags': self.tags,
            'size': self.size,
            'checksum': self.checksum
        }

class Memento(ABC, Generic[T]):
    """备忘录抽象基类"""
    
    def __init__(self, state: T, metadata: Optional[MementoMetadata] = None):
        self._state = state
        self._metadata = metadata or self._create_metadata(state)
    
    @property
    def metadata(self) -> MementoMetadata:
        return self._metadata
    
    def get_state(self, requester: Any = None) -> T:
        """获取状态(宽接口)"""
        return self._state
    
    def _create_metadata(self, state: T) -> MementoMetadata:
        """创建元数据"""
        state_str = str(state)
        return MementoMetadata(
            id=hashlib.md5(state_str.encode()).hexdigest()[:8],
            type=MementoType.FULL,
            size=len(state_str),
            checksum=hashlib.sha256(state_str.encode()).hexdigest()[:16]
        )

class Originator(ABC, Generic[T]):
    """原发器抽象基类"""
    
    @abstractmethod
    def save(self) -> Memento[T]:
        """保存当前状态"""
        pass
    
    @abstractmethod
    def restore(self, memento: Memento[T]) -> None:
        """恢复到指定状态"""
        pass
    
    @abstractmethod
    def get_current_state(self) -> T:
        """获取当前状态"""
        pass

class Caretaker(Generic[T]):
    """管理者 - 管理备忘录历史"""
    
    def __init__(self, max_history: int = 100):
        self._history: List[Memento[T]] = []
        self._redo_stack: List[Memento[T]] = []
        self._max_history = max_history
    
    def save(self, memento: Memento[T]) -> str:
        """保存备忘录"""
        self._history.append(memento)
        self._redo_stack.clear()
        
        if len(self._history) > self._max_history:
            self._history.pop(0)
        
        return memento.metadata.id
    
    def undo(self) -> Optional[Memento[T]]:
        """撤销"""
        if len(self._history) <= 1:
            return None
        
        current = self._history.pop()
        self._redo_stack.append(current)
        return self._history[-1] if self._history else None
    
    def redo(self) -> Optional[Memento[T]]:
        """重做"""
        if not self._redo_stack:
            return None
        
        memento = self._redo_stack.pop()
        self._history.append(memento)
        return memento
    
    def get_by_id(self, memento_id: str) -> Optional[Memento[T]]:
        """根据ID获取备忘录"""
        for m in self._history:
            if m.metadata.id == memento_id:
                return m
        return None
    
    def get_history(self) -> List[MementoMetadata]:
        """获取历史记录"""
        return [m.metadata for m in self._history]
    
    def clear(self) -> None:
        """清空历史"""
        self._history.clear()
        self._redo_stack.clear()
    
    def can_undo(self) -> bool:
        return len(self._history) > 1
    
    def can_redo(self) -> bool:
        return len(self._redo_stack) > 0

@dataclass
class DocumentState:
    """文档状态"""
    content: str
    cursor_position: int
    selection: tuple = (0, 0)
    
    def __str__(self) -> str:
        return f"Document(content='{self.content[:20]}...', cursor={self.cursor_position})"

class Document(Originator[DocumentState]):
    """文档原发器"""
    
    def __init__(self, content: str = ""):
        self._content = content
        self._cursor_position = 0
        self._selection = (0, 0)
    
    def type(self, text: str) -> None:
        self._content = (
            self._content[:self._cursor_position] +
            text +
            self._content[self._cursor_position:]
        )
        self._cursor_position += len(text)
    
    def delete(self) -> None:
        if self._cursor_position > 0:
            self._content = (
                self._content[:self._cursor_position - 1] +
                self._content[self._cursor_position:]
            )
            self._cursor_position -= 1
    
    def set_cursor(self, position: int) -> None:
        self._cursor_position = max(0, min(position, len(self._content)))
    
    def get_content(self) -> str:
        return self._content
    
    def get_current_state(self) -> DocumentState:
        return DocumentState(
            content=self._content,
            cursor_position=self._cursor_position,
            selection=self._selection
        )
    
    def save(self) -> Memento[DocumentState]:
        state = self.get_current_state()
        metadata = MementoMetadata(
            id=hashlib.md5(state.content.encode()).hexdigest()[:8],
            type=MementoType.FULL,
            description=f"Content length: {len(state.content)}"
        )
        return Memento(state, metadata)
    
    def restore(self, memento: Memento[DocumentState]) -> None:
        state = memento.get_state(self)
        self._content = state.content
        self._cursor_position = state.cursor_position
        self._selection = state.selection

print("备忘录模式标准实现:")

document = Document()
caretaker = Caretaker[DocumentState]()

caretaker.save(document.save())
document.type("Hello")
caretaker.save(document.save())
document.type(" World")
caretaker.save(document.save())
document.type("!")

print(f"  当前内容: {document.get_content()}")
print(f"  历史记录: {len(caretaker.get_history())} 条")

print("\n  撤销:")
memento = caretaker.undo()
if memento:
    document.restore(memento)
    print(f"  内容: {document.get_content()}")

print("\n  重做:")
memento = caretaker.redo()
if memento:
    document.restore(memento)
    print(f"  内容: {document.get_content()}")

18.4.2 增量备忘录

python
from typing import TypeVar, Generic, Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime
import copy

T = TypeVar('T')

@dataclass
class StateDiff:
    """状态差异"""
    added: Dict[str, Any] = field(default_factory=dict)
    removed: List[str] = field(default_factory=list)
    modified: Dict[str, Any] = field(default_factory=dict)
    
    def is_empty(self) -> bool:
        return not (self.added or self.removed or self.modified)
    
    def apply(self, base: Dict[str, Any]) -> Dict[str, Any]:
        """应用差异到基础状态"""
        result = copy.deepcopy(base)
        
        for key in self.removed:
            result.pop(key, None)
        
        for key, value in self.added.items():
            result[key] = copy.deepcopy(value)
        
        for key, value in self.modified.items():
            result[key] = copy.deepcopy(value)
        
        return result

class IncrementalMemento(Memento[Dict[str, Any]]):
    """增量备忘录"""
    
    def __init__(
        self,
        state: Dict[str, Any],
        base_id: Optional[str] = None,
        diff: Optional[StateDiff] = None
    ):
        self._base_id = base_id
        self._diff = diff
        super().__init__(state)
    
    @property
    def base_id(self) -> Optional[str]:
        return self._base_id
    
    @property
    def diff(self) -> Optional[StateDiff]:
        return self._diff

class IncrementalCaretaker(Caretaker[Dict[str, Any]]):
    """增量备忘录管理者"""
    
    def __init__(self, max_history: int = 100):
        super().__init__(max_history)
        self._full_snapshots: Dict[str, IncrementalMemento] = {}
        self._snapshot_interval = 5
    
    def save(self, memento: IncrementalMemento) -> str:
        """保存备忘录(支持增量)"""
        memento_id = memento.metadata.id
        
        if len(self._history) % self._snapshot_interval == 0:
            self._full_snapshots[memento_id] = memento
            memento._base_id = None
            memento._diff = None
        else:
            if self._history:
                last = self._history[-1]
                if isinstance(last, IncrementalMemento):
                    diff = self._compute_diff(
                        last.get_state(),
                        memento.get_state()
                    )
                    memento._base_id = last.metadata.id
                    memento._diff = diff
        
        self._history.append(memento)
        self._redo_stack.clear()
        
        if len(self._history) > self._max_history:
            removed = self._history.pop(0)
            if removed.metadata.id in self._full_snapshots:
                del self._full_snapshots[removed.metadata.id]
        
        return memento_id
    
    def restore_to(self, memento_id: str) -> Optional[Dict[str, Any]]:
        """恢复到指定状态(支持增量重建)"""
        memento = self.get_by_id(memento_id)
        if not memento:
            return None
        
        if isinstance(memento, IncrementalMemento):
            if memento._base_id is None or memento._diff is None:
                return memento.get_state()
            
            return self._reconstruct_state(memento)
        
        return memento.get_state()
    
    def _compute_diff(
        self,
        old_state: Dict[str, Any],
        new_state: Dict[str, Any]
    ) -> StateDiff:
        """计算状态差异"""
        diff = StateDiff()
        
        old_keys = set(old_state.keys())
        new_keys = set(new_state.keys())
        
        for key in new_keys - old_keys:
            diff.added[key] = new_state[key]
        
        for key in old_keys - new_keys:
            diff.removed.append(key)
        
        for key in old_keys & new_keys:
            if old_state[key] != new_state[key]:
                diff.modified[key] = new_state[key]
        
        return diff
    
    def _reconstruct_state(
        self,
        target_memento: IncrementalMemento
    ) -> Dict[str, Any]:
        """重建状态"""
        chain: List[IncrementalMemento] = [target_memento]
        
        current = target_memento
        while current._base_id:
            base = self.get_by_id(current._base_id)
            if base and isinstance(base, IncrementalMemento):
                chain.append(base)
                if base._base_id is None:
                    break
                current = base
            else:
                break
        
        chain.reverse()
        
        state = {}
        for memento in chain:
            if memento._diff:
                state = memento._diff.apply(state)
            else:
                state = copy.deepcopy(memento.get_state())
        
        return state
    
    def get_storage_stats(self) -> Dict[str, Any]:
        """获取存储统计"""
        full_count = len(self._full_snapshots)
        incremental_count = len(self._history) - full_count
        
        return {
            'total_mementos': len(self._history),
            'full_snapshots': full_count,
            'incremental_mementos': incremental_count,
            'snapshot_interval': self._snapshot_interval
        }

print("\n增量备忘录示例:")

caretaker = IncrementalCaretaker()

states = [
    {'name': 'Alice', 'age': 25, 'city': 'Beijing'},
    {'name': 'Alice', 'age': 26, 'city': 'Beijing'},
    {'name': 'Alice', 'age': 26, 'city': 'Shanghai'},
    {'name': 'Bob', 'age': 26, 'city': 'Shanghai'},
    {'name': 'Bob', 'age': 30, 'city': 'Shanghai'},
    {'name': 'Bob', 'age': 30, 'city': 'Guangzhou'},
]

for i, state in enumerate(states):
    memento = IncrementalMemento(state)
    caretaker.save(memento)
    print(f"  保存状态 {i+1}: {state}")

print(f"\n  存储统计: {caretaker.get_storage_stats()}")

18.4.3 版本控制备忘录

python
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import hashlib

class VersionStatus(Enum):
    """版本状态"""
    ACTIVE = auto()
    ARCHIVED = auto()
    DELETED = auto()

@dataclass
class Version:
    """版本信息"""
    id: str
    parent_id: Optional[str]
    message: str
    author: str
    timestamp: datetime = field(default_factory=datetime.now)
    status: VersionStatus = VersionStatus.ACTIVE
    tags: List[str] = field(default_factory=list)
    
    def __hash__(self) -> int:
        return hash(self.id)

@dataclass
class VersionedState:
    """版本化状态"""
    version: Version
    state: Dict[str, Any]
    checksum: str

class VersionControlCaretaker:
    """版本控制备忘录管理者"""
    
    def __init__(self):
        self._versions: Dict[str, VersionedState] = {}
        self._branches: Dict[str, str] = {'main': None}
        self._current_branch = 'main'
        self._head: Optional[str] = None
        self._tags: Dict[str, str] = {}
    
    def commit(
        self,
        state: Dict[str, Any],
        message: str,
        author: str = "system"
    ) -> str:
        """创建新版本"""
        version_id = self._generate_id(state)
        
        version = Version(
            id=version_id,
            parent_id=self._head,
            message=message,
            author=author
        )
        
        state_str = str(sorted(state.items()))
        checksum = hashlib.sha256(state_str.encode()).hexdigest()
        
        self._versions[version_id] = VersionedState(
            version=version,
            state=state,
            checksum=checksum
        )
        
        self._head = version_id
        self._branches[self._current_branch] = version_id
        
        return version_id
    
    def checkout(self, version_id: str) -> Optional[Dict[str, Any]]:
        """检出到指定版本"""
        if version_id not in self._versions:
            return None
        
        self._head = version_id
        self._branches[self._current_branch] = version_id
        
        return self._versions[version_id].state.copy()
    
    def create_branch(self, name: str) -> bool:
        """创建分支"""
        if name in self._branches:
            return False
        
        self._branches[name] = self._head
        return True
    
    def switch_branch(self, name: str) -> bool:
        """切换分支"""
        if name not in self._branches:
            return False
        
        self._current_branch = name
        self._head = self._branches[name]
        return True
    
    def merge(self, source_branch: str) -> Optional[Dict[str, Any]]:
        """合并分支"""
        if source_branch not in self._branches:
            return None
        
        source_head = self._branches[source_branch]
        if not source_head:
            return None
        
        source_state = self._versions[source_head].state
        current_state = self.get_current_state() or {}
        
        merged = {**current_state, **source_state}
        
        return merged
    
    def tag(self, name: str, version_id: Optional[str] = None) -> bool:
        """创建标签"""
        target_id = version_id or self._head
        if target_id not in self._versions:
            return False
        
        self._tags[name] = target_id
        self._versions[target_id].version.tags.append(name)
        return True
    
    def get_version(self, version_id: str) -> Optional[VersionedState]:
        """获取版本"""
        return self._versions.get(version_id)
    
    def get_current_state(self) -> Optional[Dict[str, Any]]:
        """获取当前状态"""
        if not self._head:
            return None
        return self._versions[self._head].state.copy()
    
    def get_history(self, limit: int = 10) -> List[Version]:
        """获取历史"""
        history = []
        current_id = self._head
        
        while current_id and len(history) < limit:
            if current_id in self._versions:
                history.append(self._versions[current_id].version)
                current_id = self._versions[current_id].version.parent_id
            else:
                break
        
        return history
    
    def get_branches(self) -> Dict[str, Optional[str]]:
        """获取所有分支"""
        return self._branches.copy()
    
    def get_tags(self) -> Dict[str, str]:
        """获取所有标签"""
        return self._tags.copy()
    
    def _generate_id(self, state: Dict[str, Any]) -> str:
        """生成版本ID"""
        content = f"{datetime.now().isoformat()}{str(state)}"
        return hashlib.sha256(content.encode()).hexdigest()[:8]

print("\n版本控制备忘录示例:")

vc = VersionControlCaretaker()

vc.commit({'version': '1.0', 'feature': 'basic'}, "初始版本")
vc.commit({'version': '1.1', 'feature': 'basic', 'bugfix': 'true'}, "修复bug")
vc.tag("v1.0")

vc.create_branch("develop")
vc.switch_branch("develop")

vc.commit({'version': '2.0-dev', 'feature': 'advanced'}, "开发新功能")
vc.commit({'version': '2.0-dev', 'feature': 'advanced', 'test': 'added'}, "添加测试")

print("  当前分支:", vc._current_branch)
print("  当前状态:", vc.get_current_state())

print("\n  切换到main分支:")
vc.switch_branch("main")
print("  当前状态:", vc.get_current_state())

print("\n  历史记录:")
for v in vc.get_history():
    print(f"    {v.id[:6]}: {v.message}")

print("\n  标签:", vc.get_tags())

18.5 企业级应用示例

18.5.1 文本编辑器撤销/重做系统

python
from typing import List, Optional, Dict, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import copy

class OperationType(Enum):
    """操作类型"""
    INSERT = auto()
    DELETE = auto()
    REPLACE = auto()
    MOVE_CURSOR = auto()

@dataclass
class TextOperation:
    """文本操作"""
    type: OperationType
    position: int
    content: str = ""
    deleted_content: str = ""
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class EditorSnapshot:
    """编辑器快照"""
    content: str
    cursor_position: int
    selection_start: int
    selection_end: int
    operations: List[TextOperation] = field(default_factory=list)
    
    def copy(self) -> 'EditorSnapshot':
        return EditorSnapshot(
            content=self.content,
            cursor_position=self.cursor_position,
            selection_start=self.selection_start,
            selection_end=self.selection_end,
            operations=self.operations.copy()
        )

class TextEditorMemento:
    """文本编辑器备忘录"""
    
    def __init__(self, snapshot: EditorSnapshot, operation: Optional[TextOperation] = None):
        self._snapshot = snapshot
        self._operation = operation
        self._timestamp = datetime.now()
    
    @property
    def snapshot(self) -> EditorSnapshot:
        return self._snapshot
    
    @property
    def operation(self) -> Optional[TextOperation]:
        return self._operation
    
    @property
    def timestamp(self) -> datetime:
        return self._timestamp

class TextEditor:
    """文本编辑器"""
    
    def __init__(self):
        self._content = ""
        self._cursor_position = 0
        self._selection_start = 0
        self._selection_end = 0
        self._last_operation: Optional[TextOperation] = None
    
    def type(self, text: str) -> TextOperation:
        """输入文本"""
        old_content = self._content
        
        if self._selection_start != self._selection_end:
            self._delete_selection()
        
        self._content = (
            self._content[:self._cursor_position] +
            text +
            self._content[self._cursor_position:]
        )
        
        operation = TextOperation(
            type=OperationType.INSERT,
            position=self._cursor_position,
            content=text
        )
        
        self._cursor_position += len(text)
        self._last_operation = operation
        
        return operation
    
    def delete(self) -> Optional[TextOperation]:
        """删除字符"""
        if self._selection_start != self._selection_end:
            deleted = self._delete_selection()
            operation = TextOperation(
                type=OperationType.DELETE,
                position=self._selection_start,
                deleted_content=deleted
            )
        elif self._cursor_position > 0:
            deleted = self._content[self._cursor_position - 1]
            self._content = (
                self._content[:self._cursor_position - 1] +
                self._content[self._cursor_position:]
            )
            self._cursor_position -= 1
            operation = TextOperation(
                type=OperationType.DELETE,
                position=self._cursor_position,
                deleted_content=deleted
            )
        else:
            return None
        
        self._last_operation = operation
        return operation
    
    def backspace(self) -> Optional[TextOperation]:
        """退格"""
        return self.delete()
    
    def set_cursor(self, position: int) -> TextOperation:
        """设置光标位置"""
        self._cursor_position = max(0, min(position, len(self._content)))
        self._selection_start = self._cursor_position
        self._selection_end = self._cursor_position
        
        operation = TextOperation(
            type=OperationType.MOVE_CURSOR,
            position=self._cursor_position
        )
        self._last_operation = operation
        return operation
    
    def select(self, start: int, end: int) -> None:
        """选择文本"""
        self._selection_start = max(0, min(start, len(self._content)))
        self._selection_end = max(0, min(end, len(self._content)))
    
    def get_content(self) -> str:
        return self._content
    
    def get_cursor_position(self) -> int:
        return self._cursor_position
    
    def get_selection(self) -> tuple:
        return (self._selection_start, self._selection_end)
    
    def save(self) -> TextEditorMemento:
        """保存状态"""
        snapshot = EditorSnapshot(
            content=self._content,
            cursor_position=self._cursor_position,
            selection_start=self._selection_start,
            selection_end=self._selection_end
        )
        return TextEditorMemento(snapshot, self._last_operation)
    
    def restore(self, memento: TextEditorMemento) -> None:
        """恢复状态"""
        snapshot = memento.snapshot
        self._content = snapshot.content
        self._cursor_position = snapshot.cursor_position
        self._selection_start = snapshot.selection_start
        self._selection_end = snapshot.selection_end
    
    def _delete_selection(self) -> str:
        """删除选中内容"""
        start = min(self._selection_start, self._selection_end)
        end = max(self._selection_start, self._selection_end)
        deleted = self._content[start:end]
        
        self._content = self._content[:start] + self._content[end:]
        self._cursor_position = start
        self._selection_start = start
        self._selection_end = start
        
        return deleted

class EditorHistory:
    """编辑器历史管理"""
    
    def __init__(self, editor: TextEditor, max_history: int = 100):
        self._editor = editor
        self._max_history = max_history
        self._undo_stack: List[TextEditorMemento] = []
        self._redo_stack: List[TextEditorMemento] = []
        self._save_threshold = 10
        self._operation_count = 0
    
    def record_operation(self, operation: Optional[TextOperation] = None) -> None:
        """记录操作"""
        self._operation_count += 1
        
        if self._operation_count >= self._save_threshold:
            self.save()
            self._operation_count = 0
    
    def save(self) -> None:
        """保存当前状态"""
        memento = self._editor.save()
        self._undo_stack.append(memento)
        self._redo_stack.clear()
        
        if len(self._undo_stack) > self._max_history:
            self._undo_stack.pop(0)
    
    def undo(self) -> bool:
        """撤销"""
        if not self._undo_stack:
            return False
        
        current = self._undo_stack.pop()
        self._redo_stack.append(current)
        
        if self._undo_stack:
            self._editor.restore(self._undo_stack[-1])
            return True
        
        self._undo_stack.append(current)
        return False
    
    def redo(self) -> bool:
        """重做"""
        if not self._redo_stack:
            return False
        
        memento = self._redo_stack.pop()
        self._undo_stack.append(memento)
        self._editor.restore(memento)
        return True
    
    def can_undo(self) -> bool:
        return len(self._undo_stack) > 1
    
    def can_redo(self) -> bool:
        return len(self._redo_stack) > 0
    
    def get_history_size(self) -> tuple:
        return (len(self._undo_stack), len(self._redo_stack))

print("\n文本编辑器撤销/重做系统示例:")

editor = TextEditor()
history = EditorHistory(editor)

history.save()
editor.type("Hello")
history.save()
editor.type(" World")
history.save()
editor.type("!")

print(f"  内容: '{editor.get_content()}'")
print(f"  历史大小: {history.get_history_size()}")

print("\n  撤销:")
history.undo()
print(f"  内容: '{editor.get_content()}'")

print("\n  再次撤销:")
history.undo()
print(f"  内容: '{editor.get_content()}'")

print("\n  重做:")
history.redo()
print(f"  内容: '{editor.get_content()}'")

18.5.2 事务回滚系统

python
from typing import Dict, List, Optional, Any, Callable, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import copy

T = TypeVar('T')

class TransactionState(Enum):
    """事务状态"""
    ACTIVE = auto()
    COMMITTED = auto()
    ROLLED_BACK = auto()
    FAILED = auto()

@dataclass
class TransactionLog:
    """事务日志"""
    transaction_id: str
    operation: str
    before_state: Dict[str, Any]
    after_state: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class TransactionMemento:
    """事务备忘录"""
    transaction_id: str
    state: TransactionState
    logs: List[TransactionLog] = field(default_factory=list)
    checkpoint: Dict[str, Any] = field(default_factory=dict)
    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None

class TransactionManager:
    """事务管理器"""
    
    def __init__(self):
        self._active_transactions: Dict[str, TransactionMemento] = {}
        self._completed_transactions: List[TransactionMemento] = []
        self._data: Dict[str, Any] = {}
        self._max_completed = 100
    
    def begin(self) -> str:
        """开始事务"""
        import uuid
        transaction_id = str(uuid.uuid4())[:8]
        
        memento = TransactionMemento(
            transaction_id=transaction_id,
            state=TransactionState.ACTIVE,
            checkpoint=copy.deepcopy(self._data)
        )
        
        self._active_transactions[transaction_id] = memento
        return transaction_id
    
    def execute(
        self,
        transaction_id: str,
        operation: Callable[[Dict[str, Any]], Dict[str, Any]]
    ) -> bool:
        """执行操作"""
        if transaction_id not in self._active_transactions:
            return False
        
        memento = self._active_transactions[transaction_id]
        if memento.state != TransactionState.ACTIVE:
            return False
        
        before_state = copy.deepcopy(self._data)
        
        try:
            result = operation(self._data)
            
            log = TransactionLog(
                transaction_id=transaction_id,
                operation=operation.__name__,
                before_state=before_state,
                after_state=copy.deepcopy(self._data)
            )
            memento.logs.append(log)
            
            return True
        except Exception as e:
            memento.state = TransactionState.FAILED
            return False
    
    def commit(self, transaction_id: str) -> bool:
        """提交事务"""
        if transaction_id not in self._active_transactions:
            return False
        
        memento = self._active_transactions[transaction_id]
        memento.state = TransactionState.COMMITTED
        memento.end_time = datetime.now()
        
        self._completed_transactions.append(memento)
        del self._active_transactions[transaction_id]
        
        self._trim_completed()
        return True
    
    def rollback(self, transaction_id: str) -> bool:
        """回滚事务"""
        if transaction_id not in self._active_transactions:
            return False
        
        memento = self._active_transactions[transaction_id]
        
        self._data = copy.deepcopy(memento.checkpoint)
        
        memento.state = TransactionState.ROLLED_BACK
        memento.end_time = datetime.now()
        
        self._completed_transactions.append(memento)
        del self._active_transactions[transaction_id]
        
        self._trim_completed()
        return True
    
    def get_data(self) -> Dict[str, Any]:
        return self._data.copy()
    
    def set_data(self, key: str, value: Any) -> None:
        self._data[key] = value
    
    def get_active_transactions(self) -> List[str]:
        return list(self._active_transactions.keys())
    
    def get_transaction_status(self, transaction_id: str) -> Optional[TransactionState]:
        if transaction_id in self._active_transactions:
            return self._active_transactions[transaction_id].state
        
        for m in self._completed_transactions:
            if m.transaction_id == transaction_id:
                return m.state
        
        return None
    
    def _trim_completed(self) -> None:
        if len(self._completed_transactions) > self._max_completed:
            self._completed_transactions = self._completed_transactions[-self._max_completed:]

class TransactionalOperation:
    """事务性操作"""
    
    def __init__(self, manager: TransactionManager):
        self._manager = manager
        self._transaction_id: Optional[str] = None
    
    def __enter__(self) -> 'TransactionalOperation':
        self._transaction_id = self._manager.begin()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        if exc_type is not None:
            self._manager.rollback(self._transaction_id)
        else:
            self._manager.commit(self._transaction_id)
        return False
    
    @property
    def transaction_id(self) -> Optional[str]:
        return self._transaction_id

print("\n事务回滚系统示例:")

tm = TransactionManager()

tm.set_data('balance', 1000)
tm.set_data('transactions', [])

print(f"  初始数据: {tm.get_data()}")

print("\n  执行成功事务:")
with TransactionalOperation(tm) as tx:
    def deposit(data):
        data['balance'] += 100
        data['transactions'].append({'type': 'deposit', 'amount': 100})
    
    tm.execute(tx.transaction_id, deposit)
    print(f"  事务中: {tm.get_data()}")

print(f"  提交后: {tm.get_data()}")

print("\n  执行失败事务(自动回滚):")
try:
    with TransactionalOperation(tm) as tx:
        def withdraw(data):
            data['balance'] -= 500
            data['transactions'].append({'type': 'withdraw', 'amount': 500})
        
        tm.execute(tx.transaction_id, withdraw)
        print(f"  事务中: {tm.get_data()}")
        raise Exception("模拟错误")
except:
    pass

print(f"  回滚后: {tm.get_data()}")

18.5.3 游戏存档系统

python
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
import json
import hashlib

class SaveType(Enum):
    """存档类型"""
    MANUAL = auto()
    AUTO = auto()
    QUICK = auto()
    CHECKPOINT = auto()

@dataclass
class PlayerState:
    """玩家状态"""
    name: str
    level: int = 1
    experience: int = 0
    health: int = 100
    max_health: int = 100
    mana: int = 100
    max_mana: int = 100
    gold: int = 0
    position: tuple = (0, 0, 0)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'name': self.name,
            'level': self.level,
            'experience': self.experience,
            'health': self.health,
            'max_health': self.max_health,
            'mana': self.mana,
            'max_mana': self.max_mana,
            'gold': self.gold,
            'position': list(self.position)
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'PlayerState':
        return cls(
            name=data['name'],
            level=data['level'],
            experience=data['experience'],
            health=data['health'],
            max_health=data['max_health'],
            mana=data['mana'],
            max_mana=data['max_mana'],
            gold=data['gold'],
            position=tuple(data['position'])
        )

@dataclass
class WorldState:
    """世界状态"""
    current_map: str = "start_area"
    completed_quests: List[str] = field(default_factory=list)
    unlocked_areas: Set[str] = field(default_factory=lambda: {"start_area"})
    npc_states: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'current_map': self.current_map,
            'completed_quests': self.completed_quests,
            'unlocked_areas': list(self.unlocked_areas),
            'npc_states': self.npc_states
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'WorldState':
        return cls(
            current_map=data['current_map'],
            completed_quests=data['completed_quests'],
            unlocked_areas=set(data['unlocked_areas']),
            npc_states=data['npc_states']
        )

@dataclass
class GameSnapshot:
    """游戏快照"""
    player: PlayerState
    world: WorldState
    inventory: List[str]
    playtime: float
    save_type: SaveType
    timestamp: datetime = field(default_factory=datetime.now)
    description: str = ""
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'player': self.player.to_dict(),
            'world': self.world.to_dict(),
            'inventory': self.inventory,
            'playtime': self.playtime,
            'save_type': self.save_type.name,
            'timestamp': self.timestamp.isoformat(),
            'description': self.description
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'GameSnapshot':
        return cls(
            player=PlayerState.from_dict(data['player']),
            world=WorldState.from_dict(data['world']),
            inventory=data['inventory'],
            playtime=data['playtime'],
            save_type=SaveType[data['save_type']],
            timestamp=datetime.fromisoformat(data['timestamp']),
            description=data.get('description', '')
        )

class GameMemento:
    """游戏备忘录"""
    
    def __init__(self, snapshot: GameSnapshot):
        self._snapshot = snapshot
        self._id = self._generate_id()
    
    @property
    def id(self) -> str:
        return self._id
    
    @property
    def snapshot(self) -> GameSnapshot:
        return self._snapshot
    
    def to_json(self) -> str:
        return json.dumps(self.to_dict(), indent=2)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'id': self._id,
            'snapshot': self._snapshot.to_dict()
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'GameMemento':
        memento = cls(GameSnapshot.from_dict(data['snapshot']))
        memento._id = data['id']
        return memento
    
    def _generate_id(self) -> str:
        content = f"{self._snapshot.timestamp.isoformat()}{self._snapshot.player.name}"
        return hashlib.md5(content.encode()).hexdigest()[:8]

class SaveManager:
    """存档管理器"""
    
    def __init__(self, max_saves: int = 10, auto_save_interval: int = 300):
        self._saves: Dict[str, GameMemento] = {}
        self._quick_save: Optional[GameMemento] = None
        self._auto_saves: List[GameMemento] = []
        self._max_saves = max_saves
        self._max_auto_saves = 3
        self._auto_save_interval = auto_save_interval
    
    def save(
        self,
        game_state: Any,
        save_type: SaveType = SaveType.MANUAL,
        description: str = ""
    ) -> str:
        """保存游戏"""
        snapshot = self._create_snapshot(game_state, save_type, description)
        memento = GameMemento(snapshot)
        
        if save_type == SaveType.QUICK:
            self._quick_save = memento
        elif save_type == SaveType.AUTO:
            self._auto_saves.append(memento)
            if len(self._auto_saves) > self._max_auto_saves:
                self._auto_saves.pop(0)
        else:
            self._saves[memento.id] = memento
            if len(self._saves) > self._max_saves:
                oldest = min(self._saves.values(), key=lambda m: m.snapshot.timestamp)
                del self._saves[oldest.id]
        
        return memento.id
    
    def load(self, save_id: str) -> Optional[GameSnapshot]:
        """加载存档"""
        if save_id == "quick" and self._quick_save:
            return self._quick_save.snapshot
        
        for auto_save in self._auto_saves:
            if auto_save.id == save_id:
                return auto_save.snapshot
        
        if save_id in self._saves:
            return self._saves[save_id].snapshot
        
        return None
    
    def delete(self, save_id: str) -> bool:
        """删除存档"""
        if save_id in self._saves:
            del self._saves[save_id]
            return True
        return False
    
    def list_saves(self) -> List[Dict[str, Any]]:
        """列出所有存档"""
        saves = []
        
        for memento in self._saves.values():
            saves.append({
                'id': memento.id,
                'type': memento.snapshot.save_type.name,
                'timestamp': memento.snapshot.timestamp.isoformat(),
                'description': memento.snapshot.description,
                'player_name': memento.snapshot.player.name,
                'level': memento.snapshot.player.level,
                'playtime': memento.snapshot.playtime
            })
        
        return sorted(saves, key=lambda x: x['timestamp'], reverse=True)
    
    def get_quick_save(self) -> Optional[GameSnapshot]:
        """获取快速存档"""
        return self._quick_save.snapshot if self._quick_save else None
    
    def get_auto_saves(self) -> List[GameSnapshot]:
        """获取自动存档"""
        return [m.snapshot for m in self._auto_saves]
    
    def _create_snapshot(
        self,
        game_state: Any,
        save_type: SaveType,
        description: str
    ) -> GameSnapshot:
        """创建快照"""
        return GameSnapshot(
            player=game_state.player,
            world=game_state.world,
            inventory=game_state.inventory.copy(),
            playtime=game_state.playtime,
            save_type=save_type,
            description=description
        )

class Game:
    """游戏类"""
    
    def __init__(self, player_name: str):
        self.player = PlayerState(name=player_name)
        self.world = WorldState()
        self.inventory: List[str] = []
        self.playtime = 0.0
    
    def gain_experience(self, amount: int) -> None:
        self.player.experience += amount
        while self.player.experience >= self.player.level * 100:
            self.player.experience -= self.player.level * 100
            self.player.level += 1
            self.player.max_health += 10
            self.player.health = self.player.max_health
    
    def take_damage(self, damage: int) -> None:
        self.player.health = max(0, self.player.health - damage)
    
    def heal(self, amount: int) -> None:
        self.player.health = min(self.player.max_health, self.player.health + amount)
    
    def collect_item(self, item: str) -> None:
        self.inventory.append(item)
    
    def complete_quest(self, quest_id: str) -> None:
        if quest_id not in self.world.completed_quests:
            self.world.completed_quests.append(quest_id)
    
    def move_to(self, map_name: str) -> None:
        self.world.current_map = map_name
        if map_name not in self.world.unlocked_areas:
            self.world.unlocked_areas.add(map_name)
    
    def get_status(self) -> str:
        return (
            f"[{self.player.name}] Lv.{self.player.level} "
            f"HP:{self.player.health}/{self.player.max_health} "
            f"MP:{self.player.mana}/{self.player.max_mana} "
            f"Gold:{self.player.gold} "
            f"Map:{self.world.current_map}"
        )

print("\n游戏存档系统示例:")

game = Game("勇者")
save_manager = SaveManager()

print(f"  初始状态: {game.get_status()}")

save_id1 = save_manager.save(game, SaveType.MANUAL, "初始存档")
print(f"  保存存档: {save_id1}")

game.gain_experience(150)
game.collect_item("铁剑")
game.move_to("forest")
print(f"  升级后: {game.get_status()}")

save_id2 = save_manager.save(game, SaveType.MANUAL, "森林探索")
print(f"  保存存档: {save_id2}")

game.take_damage(50)
game.collect_item("药草")
print(f"  受伤后: {game.get_status()}")

print("\n  加载存档:")
snapshot = save_manager.load(save_id1)
if snapshot:
    game.player = snapshot.player
    game.world = snapshot.world
    game.inventory = snapshot.inventory
    print(f"  加载后: {game.get_status()}")

print("\n  存档列表:")
for save in save_manager.list_saves():
    print(f"    {save['id']}: {save['description']} - Lv.{save['level']}")

18.6 模式变体

18.6.1 序列化备忘录

python
from typing import Dict, Any, Optional, TypeVar, Generic
from dataclasses import dataclass, asdict
from datetime import datetime
import json
import pickle
import base64

T = TypeVar('T')

class SerializationFormat:
    """序列化格式"""
    JSON = 'json'
    PICKLE = 'pickle'
    BASE64 = 'base64'

@dataclass
class SerializedMemento:
    """序列化备忘录"""
    data: str
    format: str
    checksum: str
    timestamp: datetime
    metadata: Dict[str, Any]
    
    @classmethod
    def create(
        cls,
        obj: Any,
        format: str = SerializationFormat.JSON,
        metadata: Optional[Dict[str, Any]] = None
    ) -> 'SerializedMemento':
        """创建序列化备忘录"""
        import hashlib
        
        if format == SerializationFormat.JSON:
            if hasattr(obj, 'to_dict'):
                data_dict = obj.to_dict()
            elif hasattr(obj, '__dict__'):
                data_dict = obj.__dict__
            else:
                data_dict = {'value': obj}
            
            data = json.dumps(data_dict, default=str, ensure_ascii=False)
        elif format == SerializationFormat.PICKLE:
            data = base64.b64encode(pickle.dumps(obj)).decode('utf-8')
        elif format == SerializationFormat.BASE64:
            data = base64.b64encode(str(obj).encode()).decode('utf-8')
        else:
            raise ValueError(f"Unsupported format: {format}")
        
        checksum = hashlib.sha256(data.encode()).hexdigest()[:16]
        
        return cls(
            data=data,
            format=format,
            checksum=checksum,
            timestamp=datetime.now(),
            metadata=metadata or {}
        )
    
    def restore(self, target_class: Optional[type] = None) -> Any:
        """恢复对象"""
        if self.format == SerializationFormat.JSON:
            data_dict = json.loads(self.data)
            
            if target_class and hasattr(target_class, 'from_dict'):
                return target_class.from_dict(data_dict)
            elif target_class:
                obj = target_class.__new__(target_class)
                obj.__dict__.update(data_dict)
                return obj
            return data_dict
        
        elif self.format == SerializationFormat.PICKLE:
            return pickle.loads(base64.b64decode(self.data))
        
        elif self.format == SerializationFormat.BASE64:
            return base64.b64decode(self.data).decode()
    
    def verify(self) -> bool:
        """验证完整性"""
        import hashlib
        expected = hashlib.sha256(self.data.encode()).hexdigest()[:16]
        return self.checksum == expected

print("\n序列化备忘录示例:")

@dataclass
class Config:
    theme: str
    language: str
    font_size: int
    
    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'Config':
        return cls(**data)

config = Config(theme="dark", language="zh-CN", font_size=14)

memento = SerializedMemento.create(config, SerializationFormat.JSON, {'version': '1.0'})
print(f"  序列化数据: {memento.data[:50]}...")
print(f"  校验和: {memento.checksum}")
print(f"  验证: {memento.verify()}")

restored = memento.restore(Config)
print(f"  恢复对象: {restored}")

18.6.2 检查点备忘录

python
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto

class CheckpointType(Enum):
    """检查点类型"""
    MANUAL = auto()
    AUTO = auto()
    ERROR = auto()
    MILESTONE = auto()

@dataclass
class Checkpoint:
    """检查点"""
    id: str
    type: CheckpointType
    state: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.now)
    description: str = ""
    parent_id: Optional[str] = None

class CheckpointManager:
    """检查点管理器"""
    
    def __init__(self, max_checkpoints: int = 20):
        self._checkpoints: Dict[str, Checkpoint] = {}
        self._current_id: Optional[str] = None
        self._max_checkpoints = max_checkpoints
        self._auto_checkpoint_interval = 5
        self._operation_count = 0
    
    def create_checkpoint(
        self,
        state: Dict[str, Any],
        checkpoint_type: CheckpointType = CheckpointType.MANUAL,
        description: str = ""
    ) -> str:
        """创建检查点"""
        import uuid
        
        checkpoint_id = str(uuid.uuid4())[:8]
        
        checkpoint = Checkpoint(
            id=checkpoint_id,
            type=checkpoint_type,
            state=state.copy(),
            description=description,
            parent_id=self._current_id
        )
        
        self._checkpoints[checkpoint_id] = checkpoint
        self._current_id = checkpoint_id
        
        self._trim_checkpoints()
        
        return checkpoint_id
    
    def auto_checkpoint(self, state: Dict[str, Any]) -> Optional[str]:
        """自动检查点"""
        self._operation_count += 1
        
        if self._operation_count >= self._auto_checkpoint_interval:
            self._operation_count = 0
            return self.create_checkpoint(
                state,
                CheckpointType.AUTO,
                f"Auto checkpoint at {datetime.now().isoformat()}"
            )
        
        return None
    
    def restore_to_checkpoint(self, checkpoint_id: str) -> Optional[Dict[str, Any]]:
        """恢复到检查点"""
        if checkpoint_id not in self._checkpoints:
            return None
        
        checkpoint = self._checkpoints[checkpoint_id]
        self._current_id = checkpoint_id
        
        return checkpoint.state.copy()
    
    def restore_to_last_milestone(self) -> Optional[Dict[str, Any]]:
        """恢复到上一个里程碑"""
        current = self._current_id
        
        while current:
            checkpoint = self._checkpoints.get(current)
            if not checkpoint:
                break
            
            if checkpoint.type == CheckpointType.MILESTONE:
                return checkpoint.state.copy()
            
            current = checkpoint.parent_id
        
        return None
    
    def get_checkpoint_chain(self) -> List[Checkpoint]:
        """获取检查点链"""
        chain = []
        current = self._current_id
        
        while current:
            checkpoint = self._checkpoints.get(current)
            if not checkpoint:
                break
            chain.append(checkpoint)
            current = checkpoint.parent_id
        
        return chain
    
    def list_checkpoints(self) -> List[Dict[str, Any]]:
        """列出所有检查点"""
        return [
            {
                'id': cp.id,
                'type': cp.type.name,
                'timestamp': cp.timestamp.isoformat(),
                'description': cp.description
            }
            for cp in self._checkpoints.values()
        ]
    
    def _trim_checkpoints(self) -> None:
        """修剪检查点"""
        if len(self._checkpoints) <= self._max_checkpoints:
            return
        
        auto_checkpoints = [
            cp for cp in self._checkpoints.values()
            if cp.type == CheckpointType.AUTO
        ]
        
        auto_checkpoints.sort(key=lambda cp: cp.timestamp)
        
        for cp in auto_checkpoints[:len(auto_checkpoints) // 2]:
            del self._checkpoints[cp.id]

print("\n检查点备忘录示例:")

cp_manager = CheckpointManager()

state = {'step': 0, 'data': []}

cp1 = cp_manager.create_checkpoint(state, CheckpointType.MILESTONE, "开始")
state['step'] = 1
state['data'].append('A')

cp_manager.auto_checkpoint(state)
state['step'] = 2
state['data'].append('B')

cp2 = cp_manager.create_checkpoint(state, CheckpointType.MANUAL, "中间状态")
state['step'] = 3
state['data'].append('C')

print("  当前状态:", state)
print("\n  检查点链:")
for cp in cp_manager.get_checkpoint_chain():
    print(f"    {cp.id}: {cp.type.name} - {cp.description}")

print("\n  恢复到上一个里程碑:")
restored = cp_manager.restore_to_last_milestone()
print("  恢复后状态:", restored)

18.6.3 时间旅行备忘录

python
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum, auto
import heapq

class TimeDirection(Enum):
    """时间方向"""
    FORWARD = auto()
    BACKWARD = auto()

@dataclass
class TimePoint:
    """时间点"""
    timestamp: datetime
    state: Dict[str, Any]
    event: str = ""
    
    def __lt__(self, other: 'TimePoint') -> bool:
        return self.timestamp < other.timestamp

class TimeTravelMemento:
    """时间旅行备忘录"""
    
    def __init__(self, max_points: int = 100):
        self._timeline: List[TimePoint] = []
        self._current_index: int = -1
        self._max_points = max_points
        self._current_time: datetime = datetime.now()
    
    def record(self, state: Dict[str, Any], event: str = "") -> None:
        """记录状态"""
        now = datetime.now()
        
        if self._current_index < len(self._timeline) - 1:
            self._timeline = self._timeline[:self._current_index + 1]
        
        time_point = TimePoint(
            timestamp=now,
            state=state.copy(),
            event=event
        )
        
        self._timeline.append(time_point)
        self._current_index = len(self._timeline) - 1
        
        if len(self._timeline) > self._max_points:
            self._timeline.pop(0)
            self._current_index -= 1
    
    def travel_to(self, target_time: datetime) -> Optional[Dict[str, Any]]:
        """穿越到指定时间"""
        for i, point in enumerate(self._timeline):
            if point.timestamp >= target_time:
                self._current_index = i
                return point.state.copy()
        
        if self._timeline:
            self._current_index = len(self._timeline) - 1
            return self._timeline[-1].state.copy()
        
        return None
    
    def travel_by(self, delta: timedelta, direction: TimeDirection) -> Optional[Dict[str, Any]]:
        """按时间差穿越"""
        if direction == TimeDirection.BACKWARD:
            if self._current_index > 0:
                self._current_index -= 1
                return self._timeline[self._current_index].state.copy()
        else:
            if self._current_index < len(self._timeline) - 1:
                self._current_index += 1
                return self._timeline[self._current_index].state.copy()
        
        return None
    
    def undo(self) -> Optional[Dict[str, Any]]:
        """撤销(向后穿越)"""
        return self.travel_by(timedelta(), TimeDirection.BACKWARD)
    
    def redo(self) -> Optional[Dict[str, Any]]:
        """重做(向前穿越)"""
        return self.travel_by(timedelta(), TimeDirection.FORWARD)
    
    def get_current_state(self) -> Optional[Dict[str, Any]]:
        """获取当前状态"""
        if 0 <= self._current_index < len(self._timeline):
            return self._timeline[self._current_index].state.copy()
        return None
    
    def get_timeline(self) -> List[Tuple[datetime, str]]:
        """获取时间线"""
        return [
            (point.timestamp, point.event)
            for point in self._timeline
        ]
    
    def can_undo(self) -> bool:
        return self._current_index > 0
    
    def can_redo(self) -> bool:
        return self._current_index < len(self._timeline) - 1
    
    def get_current_position(self) -> Tuple[int, int]:
        """获取当前位置 (当前索引, 总数)"""
        return (self._current_index + 1, len(self._timeline))

print("\n时间旅行备忘录示例:")

tt_memento = TimeTravelMemento()

state = {'value': 0}
tt_memento.record(state, "初始化")

state['value'] = 10
tt_memento.record(state, "设置为10")

state['value'] = 20
tt_memento.record(state, "设置为20")

state['value'] = 30
tt_memento.record(state, "设置为30")

print("  当前状态:", tt_memento.get_current_state())
print("  位置:", tt_memento.get_current_position())

print("\n  时间线:")
for ts, event in tt_memento.get_timeline():
    print(f"    {ts.strftime('%H:%M:%S')}: {event}")

print("\n  向后穿越:")
for _ in range(2):
    state = tt_memento.undo()
    print(f"    状态: {state}")

print("\n  向前穿越:")
state = tt_memento.redo()
print(f"    状态: {state}")

18.7 反模式与最佳实践

18.7.1 常见反模式

python
from typing import Dict, Any, Optional
from dataclasses import dataclass

class MementoAntiPatterns:
    """备忘录反模式示例"""
    
    @staticmethod
    def anti_pattern_1_exposed_state():
        """反模式1:暴露内部状态"""
        
        class BadOriginator:
            def __init__(self):
                self._internal_data = {'sensitive': 'data'}
            
            def get_state(self) -> Dict[str, Any]:
                return self._internal_data
            
            def set_state(self, state: Dict[str, Any]) -> None:
                self._internal_data = state
        
        print("    问题: 直接暴露内部状态,破坏封装")
        print("    解决: 使用备忘录对象封装状态")
    
    @staticmethod
    def anti_pattern_2_deep_copy_abuse():
        """反模式2:滥用深拷贝"""
        
        import copy
        
        class LargeObject:
            def __init__(self):
                self.data = list(range(1000000))
        
        class BadCaretaker:
            def __init__(self):
                self._states = []
            
            def save(self, obj: LargeObject) -> None:
                self._states.append(copy.deepcopy(obj))
        
        print("    问题: 对大对象频繁深拷贝,内存消耗大")
        print("    解决: 使用增量存储或压缩")
    
    @staticmethod
    def anti_pattern_3_unlimited_history():
        """反模式3:无限历史记录"""
        
        class BadHistory:
            def __init__(self):
                self._history = []
            
            def save(self, state: Any) -> None:
                self._history.append(state)
        
        print("    问题: 历史记录无限增长,内存溢出")
        print("    解决: 设置最大历史数量,定期清理")
    
    @staticmethod
    def anti_pattern_4_caretaker_manipulation():
        """反模式4:管理者篡改状态"""
        
        class BadCaretaker:
            def __init__(self):
                self._memento = None
            
            def save(self, memento: Any) -> None:
                self._memento = memento
            
            def modify_and_restore(self) -> Any:
                if self._memento:
                    self._memento['modified'] = True
                return self._memento
        
        print("    问题: 管理者篡改备忘录内容")
        print("    解决: 使用不可变备忘录或窄接口")

print("\n备忘录反模式示例:")
print("  反模式1 - 暴露内部状态:")
MementoAntiPatterns.anti_pattern_1_exposed_state()

print("\n  反模式2 - 滥用深拷贝:")
MementoAntiPatterns.anti_pattern_2_deep_copy_abuse()

print("\n  反模式3 - 无限历史记录:")
MementoAntiPatterns.anti_pattern_3_unlimited_history()

print("\n  反模式4 - 管理者篡改状态:")
MementoAntiPatterns.anti_pattern_4_caretaker_manipulation()

18.7.2 最佳实践

python
from typing import Dict, List, Any, Optional, TypeVar, Generic
from dataclasses import dataclass, field
from datetime import datetime
from copy import deepcopy
import json

T = TypeVar('T')

class MementoBestPractices:
    """备忘录最佳实践"""
    
    @staticmethod
    def practice_1_immutable_memento():
        """实践1:不可变备忘录"""
        
        @dataclass(frozen=True)
        class ImmutableMemento:
            state: Dict[str, Any]
            timestamp: datetime = field(default_factory=datetime.now)
            
            def get_state(self) -> Dict[str, Any]:
                return deepcopy(self.state)
        
        print("    使用不可变备忘录防止意外修改")
    
    @staticmethod
    def practice_2_narrow_interface():
        """实践2:窄接口"""
        
        class Memento:
            def __init__(self, state: Dict[str, Any]):
                self._state = state
                self._timestamp = datetime.now()
                self._id = id(self)
            
            def get_state(self, originator: Any) -> Dict[str, Any]:
                if not isinstance(originator, Originator):
                    raise PermissionError("只有原发器可以访问状态")
                return self._state.copy()
            
            @property
            def timestamp(self) -> datetime:
                return self._timestamp
            
            @property
            def id(self) -> int:
                return self._id
        
        class Originator:
            pass
        
        print("    使用窄接口限制管理者访问")
    
    @staticmethod
    def practice_3_incremental_storage():
        """实践3:增量存储"""
        
        class IncrementalMemento:
            def __init__(self):
                self._base_state: Optional[Dict[str, Any]] = None
                self._deltas: List[Dict[str, Any]] = []
            
            def save(self, state: Dict[str, Any]) -> None:
                if self._base_state is None:
                    self._base_state = state.copy()
                else:
                    delta = self._compute_delta(self._base_state, state)
                    self._deltas.append(delta)
            
            def _compute_delta(
                self,
                old: Dict[str, Any],
                new: Dict[str, Any]
            ) -> Dict[str, Any]:
                delta = {}
                for key in set(old.keys()) | set(new.keys()):
                    if key not in old:
                        delta[key] = {'op': 'add', 'value': new[key]}
                    elif key not in new:
                        delta[key] = {'op': 'remove'}
                    elif old[key] != new[key]:
                        delta[key] = {'op': 'modify', 'value': new[key]}
                return delta
        
        print("    使用增量存储减少内存消耗")
    
    @staticmethod
    def practice_4_serialization():
        """实践4:序列化支持"""
        
        class SerializableMemento:
            def __init__(self, state: Dict[str, Any]):
                self._state = state
                self._timestamp = datetime.now().isoformat()
            
            def to_json(self) -> str:
                return json.dumps({
                    'state': self._state,
                    'timestamp': self._timestamp
                })
            
            @classmethod
            def from_json(cls, json_str: str) -> 'SerializableMemento':
                data = json.loads(json_str)
                return cls(data['state'])
        
        print("    支持序列化实现持久化存储")

print("\n备忘录最佳实践示例:")
print("  实践1 - 不可变备忘录:")
MementoBestPractices.practice_1_immutable_memento()

print("\n  实践2 - 窄接口:")
MementoBestPractices.practice_2_narrow_interface()

print("\n  实践3 - 增量存储:")
MementoBestPractices.practice_3_incremental_storage()

print("\n  实践4 - 序列化支持:")
MementoBestPractices.practice_4_serialization()

18.8 决策指南

18.8.1 存储策略选择决策树

┌─────────────────────────────────────────────────────────────────────┐
│                   Storage Strategy Decision Tree                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│                    需要保存对象状态?                                │
│                         │                                            │
│              ┌──────────┴──────────┐                                │
│              │                     │                                │
│             是                    否                                │
│              │                     │                                │
│              ▼                     ▼                                │
│    ┌─────────────────┐      不需要备忘录                            │
│    │ 状态大小?      │                                             │
│    └────────┬────────┘                                             │
│             │                                                       │
│     ┌───────┼───────┐                                              │
│     │       │       │                                              │
│    小      中等     大                                              │
│     │       │       │                                              │
│     ▼       ▼       ▼                                              │
│  全量    增量     压缩                                              │
│  快照    存储     存储                                              │
│                                                                      │
│              需要多少历史?                                          │
│                  │                                                   │
│        ┌────────┴────────┐                                          │
│        │                 │                                          │
│      少量              大量                                         │
│        │                 │                                          │
│        ▼                 ▼                                          │
│   简单列表          分页存储                                         │
│   存储              + 索引                                           │
│                                                                      │
│              是否需要持久化?                                        │
│                    │                                                 │
│         ┌─────────┴─────────┐                                       │
│         │                   │                                       │
│        是                  否                                       │
│         │                   │                                       │
│         ▼                   ▼                                       │
│   序列化存储          内存存储                                       │
│   (JSON/Pickle)                                                     │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

18.8.2 实现方式选择

场景推荐实现原因
文本编辑器撤销全量快照 + 操作日志状态较小,恢复快速
游戏存档序列化快照需要持久化,支持多存档
数据库事务检查点 + 日志支持回滚,保证ACID
版本控制增量存储 + 压缩大文件,频繁保存
配置管理JSON序列化可读性好,易于调试
调试器状态深拷贝快照需要完整状态

18.9 快速参考卡

18.9.1 核心概念速查

┌─────────────────────────────────────────────────────────────────────┐
│                     Memento Pattern Quick Reference                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  【定义】                                                            │
│  在不破坏封装性的前提下,捕获对象内部状态并在对象外保存             │
│                                                                      │
│  【核心组件】                                                        │
│  • Originator: 原发器,创建和恢复备忘录                             │
│  • Memento: 备忘录,存储原发器状态                                  │
│  • Caretaker: 管理者,负责保管备忘录                                │
│                                                                      │
│  【接口设计】                                                        │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  宽接口: Originator 可访问完整状态                            │   │
│  │  窄接口: Caretaker 只能访问元数据                             │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【存储策略】                                                        │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  全量快照: 保存完整状态,恢复快                               │   │
│  │  增量存储: 只保存变化,节省空间                               │   │
│  │  压缩存储: 压缩后存储,适合大对象                             │   │
│  │  序列化: 支持持久化,跨进程传输                               │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【适用场景】                                                        │
│  ✓ 需要保存和恢复对象状态                                           │
│  ✓ 需要实现撤销/重做功能                                            │
│  ✓ 需要保存对象的历史状态                                           │
│  ✓ 需要事务回滚机制                                                 │
│                                                                      │
│  【优点】                                                            │
│  + 保持封装性                                                       │
│  + 简化状态恢复                                                     │
│  + 支持多版本历史                                                   │
│  + 实现撤销/重做                                                    │
│                                                                      │
│  【缺点】                                                            │
│  - 内存消耗大                                                       │
│  - 管理复杂度高                                                     │
│  - 可能影响性能                                                     │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

18.9.2 代码模板速查

python
from typing import TypeVar, Generic, Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from copy import deepcopy

T = TypeVar('T')

@dataclass
class MementoTemplate(Generic[T]):
    """备忘录模板"""
    state: T
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)

class OriginatorTemplate(Generic[T]):
    """原发器模板"""
    
    def __init__(self, initial_state: T):
        self._state = initial_state
    
    def get_state(self) -> T:
        return self._state
    
    def set_state(self, state: T) -> None:
        self._state = state
    
    def save(self) -> MementoTemplate[T]:
        return MementoTemplate(state=deepcopy(self._state))
    
    def restore(self, memento: MementoTemplate[T]) -> None:
        self._state = deepcopy(memento.state)

class CaretakerTemplate(Generic[T]):
    """管理者模板"""
    
    def __init__(self, max_history: int = 100):
        self._history: List[MementoTemplate[T]] = []
        self._redo_stack: List[MementoTemplate[T]] = []
        self._max_history = max_history
    
    def save(self, memento: MementoTemplate[T]) -> None:
        self._history.append(memento)
        self._redo_stack.clear()
        if len(self._history) > self._max_history:
            self._history.pop(0)
    
    def undo(self) -> Optional[MementoTemplate[T]]:
        if len(self._history) <= 1:
            return None
        current = self._history.pop()
        self._redo_stack.append(current)
        return self._history[-1]
    
    def redo(self) -> Optional[MementoTemplate[T]]:
        if not self._redo_stack:
            return None
        memento = self._redo_stack.pop()
        self._history.append(memento)
        return memento

18.10 小结

备忘录模式提供了一种在不破坏封装性的情况下保存和恢复对象状态的机制。本章从形式化定义出发,深入探讨了备忘录的数学基础、状态空间复杂度和存储策略。

关键要点

  1. 形式化理解:备忘录可形式化为状态转换系统 $\mathcal{M} = \langle S, s_0, \Sigma, \delta, \Omega \rangle$,支持状态保存与恢复操作。

  2. 接口设计:宽接口供原发器使用,窄接口供管理者使用,确保封装性。

  3. 存储策略:全量快照、增量存储、差异存储、压缩存储等多种策略适应不同场景。

  4. 企业级应用:文本编辑器撤销/重做、事务回滚系统、游戏存档系统等实际应用。

  5. 模式变体:序列化备忘录、检查点备忘录、时间旅行备忘录等扩展实现。

备忘录模式与命令模式常结合使用,命令模式负责执行操作,备忘录模式负责保存状态。在实现撤销/重做功能时,这两种模式的组合尤为有效。

Python技术丛书 - 江苏省宿城中等专业学校