Skip to content

第16章 迭代器模式

学习目标

完成本章学习后,读者将能够:

  • 理解迭代器模式的数学基础与形式化定义
  • 掌握Python迭代器协议的完整实现机制
  • 设计并实现多种类型的迭代器(外部迭代器、内部迭代器、生成器等)
  • 应用迭代器模式处理大数据集、流式数据和复杂聚合结构
  • 识别迭代器模式的适用场景与反模式

16.1 形式化定义

16.1.1 数学定义

迭代器模式(Iterator Pattern) 提供一种方法顺序访问一个聚合对象中各个元素,而又不需暴露该对象的内部表示。

从数学角度,迭代器可定义为状态转换系统:

$$\mathcal{I} = \langle S, s_0, \Sigma, \delta, F \rangle$$

其中:

  • $S$:状态集合(State Set),包含所有可能的迭代状态
  • $s_0 \in S$:初始状态(Initial State)
  • $\Sigma$:元素集合(Element Set),被迭代的元素
  • $\delta: S \rightarrow S \times (\Sigma \cup {\epsilon})$:状态转换函数
  • $F \subseteq S$:终止状态集合(Final States)

迭代操作的形式化定义

$$\text{next}: S \rightarrow S \times \Sigma \cup {\text{StopIteration}}$$

$$\text{hasNext}: S \rightarrow \mathbb{B}$$

迭代器代数性质

对于迭代器 $I$ 和元素序列 $e_1, e_2, \ldots, e_n$:

$$\forall i, j \in \mathbb{N}, i < j \Rightarrow \text{position}(e_i) < \text{position}(e_j)$$

这保证了迭代器的顺序性(Sequentiality)。

16.1.2 类型理论视角

在类型理论中,迭代器可表示为:

$$\text{Iterator} : \text{Type} \rightarrow \text{Type}$$

$$\text{Iterator}\langle T \rangle = { \text{next}: () \rightarrow \text{Option}\langle T \rangle }$$

其中 $\text{Option}\langle T \rangle = \text{Some}(T) \mid \text{None}$。

迭代器与集合的关系

$$\text{Iterable}\langle T \rangle = { \text{iter}: () \rightarrow \text{Iterator}\langle T \rangle }$$

16.1.3 迭代器代数

迭代器支持多种代数运算:

运算定义类型签名
映射(Map)对每个元素应用函数$\text{map}: (T \rightarrow U) \rightarrow \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle U \rangle$
过滤(Filter)选择满足谓词的元素$\text{filter}: (T \rightarrow \mathbb{B}) \rightarrow \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle T \rangle$
折叠(Fold)将迭代器归约为单值$\text{fold}: (U, T \times U \rightarrow U) \rightarrow \text{Iter}\langle T \rangle \rightarrow U$
链接(Chain)连接多个迭代器$\text{chain}: \text{Iter}\langle T \rangle \times \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle T \rangle$
取值(Take)取前n个元素$\text{take}: \mathbb{N} \rightarrow \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle T \rangle$

16.2 历史背景与演进

16.2.1 发展历程

年代里程碑描述
1970年代CLU语言Barbara Liskov在CLU语言中首次引入迭代器概念
1975年CLU迭代器提出yield语义,成为现代生成器的先驱
1994年GoF设计模式《设计模式》正式收录迭代器模式
1995年Java 1.0引入Enumeration接口
1998年Java 1.2引入Iterator接口,替代Enumeration
1999年C++ STL标准模板库采用迭代器作为核心抽象
2001年Python 2.2引入迭代器协议和生成器
2004年C# 2.0引入yield return语法
2009年Python 3.0引入迭代器协议的完整实现
2011年C++11引入基于范围的for循环
2015年ES6JavaScript引入迭代器协议和生成器

16.2.2 设计动机

迭代器模式的核心设计动机:

  1. 封装内部结构:客户端无需了解聚合对象的内部表示
  2. 统一遍历接口:不同类型的聚合对象可使用相同的遍历方式
  3. 支持多种遍历:同一聚合对象可支持多种遍历策略
  4. 延迟计算:支持按需生成元素,提高内存效率

16.3 UML结构图

16.3.1 标准结构

┌─────────────────────────────────────────────────────────────────────┐
│                        Iterator Pattern Structure                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌─────────────────┐         ┌─────────────────┐                   │
│  │    <<interface>>│         │   <<interface>> │                   │
│  │    Iterable     │         │    Iterator     │                   │
│  ├─────────────────┤         ├─────────────────┤                   │
│  │ + __iter__()    │────────>│ + __next__()    │                   │
│  │                 │  creates│ + __iter__()    │                   │
│  └────────┬────────┘         └────────┬────────┘                   │
│           │                           │                             │
│           │ implements                │ implements                  │
│           ▼                           ▼                             │
│  ┌─────────────────┐         ┌─────────────────┐                   │
│  │  ConcreteIterable│        │ ConcreteIterator│                   │
│  ├─────────────────┤         ├─────────────────┤                   │
│  │ - _data: List   │         │ - _iterable     │                   │
│  │ - _iterator_cls │         │ - _index: int   │                   │
│  ├─────────────────┤         ├─────────────────┤                   │
│  │ + __iter__()    │         │ + __next__()    │                   │
│  │ + add(item)     │         │ + __iter__()    │                   │
│  └─────────────────┘         └─────────────────┘                   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

16.3.2 Python迭代器协议

┌─────────────────────────────────────────────────────────────────────┐
│                    Python Iterator Protocol                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                    Iterable Protocol                           │  │
│  │                                                                │  │
│  │   class Iterable:                                              │  │
│  │       def __iter__(self) -> Iterator:                          │  │
│  │           """返回一个新的迭代器实例"""                          │  │
│  │                                                                │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                              │                                       │
│                              │ returns                               │
│                              ▼                                       │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                    Iterator Protocol                           │  │
│  │                                                                │  │
│  │   class Iterator:                                              │  │
│  │       def __iter__(self) -> Iterator:                          │  │
│  │           """返回self(迭代器本身是可迭代的)"""                │  │
│  │                                                                │  │
│  │       def __next__(self) -> T:                                 │  │
│  │           """返回下一个元素,无元素时抛出StopIteration"""       │  │
│  │                                                                │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

16.3.3 迭代器状态转换图

┌─────────────────────────────────────────────────────────────────────┐
│                    Iterator State Machine                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│                     ┌─────────────┐                                 │
│                     │   Initial   │                                 │
│                     │   State     │                                 │
│                     └──────┬──────┘                                 │
│                            │                                        │
│                            │ __next__()                             │
│                            ▼                                        │
│                     ┌─────────────┐                                 │
│            ┌───────>│  Iterating  │<───────┐                        │
│            │        │   State     │        │                        │
│            │        └──────┬──────┘        │                        │
│            │               │               │                        │
│   has more │               │ has more      │ __next__()             │
│   elements │               │ elements      │                        │
│            │               ▼               │                        │
│            │        ┌─────────────┐        │                        │
│            │        │   Return    │────────┘                        │
│            │        │   Element   │                                 │
│            │        └─────────────┘                                 │
│            │                                                        │
│            │               no more elements                         │
│            │               │                                        │
│            │               ▼                                        │
│            │        ┌─────────────┐                                 │
│            └────────│   Raise     │                                 │
│          (cannot    │StopIteration│                                 │
│          iterate    └──────┬──────┘                                 │
│          again)            │                                        │
│                            ▼                                        │
│                     ┌─────────────┐                                 │
│                     │   Exhausted │                                 │
│                     │   State     │                                 │
│                     └─────────────┘                                 │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

16.4 标准实现

16.4.1 基于ABC的迭代器框架

python
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Iterator, Optional, List, Any
from dataclasses import dataclass
from contextlib import contextmanager

T = TypeVar('T')

class IteratorProtocol(ABC, Generic[T]):
    """迭代器协议抽象基类"""
    
    @abstractmethod
    def __iter__(self) -> 'IteratorProtocol[T]':
        """返回迭代器自身"""
        pass
    
    @abstractmethod
    def __next__(self) -> T:
        """返回下一个元素"""
        pass
    
    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}>"

class IterableProtocol(ABC, Generic[T]):
    """可迭代对象抽象基类"""
    
    @abstractmethod
    def __iter__(self) -> IteratorProtocol[T]:
        """返回一个新的迭代器"""
        pass

class SequenceIterator(IteratorProtocol[T]):
    """序列迭代器实现"""
    
    def __init__(self, sequence: List[T]):
        self._sequence = sequence
        self._index = 0
    
    def __iter__(self) -> 'SequenceIterator[T]':
        return self
    
    def __next__(self) -> T:
        if self._index >= len(self._sequence):
            raise StopIteration
        value = self._sequence[self._index]
        self._index += 1
        return value

class SequenceIterable(IterableProtocol[T]):
    """序列可迭代对象实现"""
    
    def __init__(self, items: Optional[List[T]] = None):
        self._items: List[T] = items or []
    
    def add(self, item: T) -> 'SequenceIterable[T]':
        self._items.append(item)
        return self
    
    def __iter__(self) -> SequenceIterator[T]:
        return SequenceIterator(self._items.copy())
    
    def __len__(self) -> int:
        return len(self._items)

iterable = SequenceIterable[int]()
iterable.add(1).add(2).add(3).add(4).add(5)

print("迭代器模式标准实现:")
for item in iterable:
    print(f"  元素: {item}")

16.4.2 外部迭代器 vs 内部迭代器

python
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Callable, List, Optional, Any
from enum import Enum, auto

T = TypeVar('T')
R = TypeVar('R')

class IteratorType(Enum):
    """迭代器类型枚举"""
    EXTERNAL = auto()
    INTERNAL = auto()

class ExternalIterator(IteratorProtocol[T]):
    """外部迭代器 - 客户端控制迭代过程"""
    
    def __init__(self, data: List[T]):
        self._data = data
        self._index = 0
    
    def __iter__(self) -> 'ExternalIterator[T]':
        return self
    
    def __next__(self) -> T:
        if self._index >= len(self._data):
            raise StopIteration
        value = self._data[self._index]
        self._index += 1
        return value
    
    def has_next(self) -> bool:
        """检查是否还有下一个元素"""
        return self._index < len(self._data)
    
    def peek(self) -> Optional[T]:
        """查看下一个元素但不移动指针"""
        if self._index < len(self._data):
            return self._data[self._index]
        return None
    
    def reset(self) -> None:
        """重置迭代器"""
        self._index = 0

class InternalIterable(Generic[T]):
    """内部迭代器 - 集合控制迭代过程"""
    
    def __init__(self, data: List[T]):
        self._data = data
    
    def for_each(self, action: Callable[[T], None]) -> None:
        """对每个元素执行操作"""
        for item in self._data:
            action(item)
    
    def map(self, transformer: Callable[[T], R]) -> 'InternalIterable[R]':
        """映射每个元素"""
        return InternalIterable([transformer(item) for item in self._data])
    
    def filter(self, predicate: Callable[[T], bool]) -> 'InternalIterable[T]':
        """过滤元素"""
        return InternalIterable([item for item in self._data if predicate(item)])
    
    def reduce(self, initial: R, accumulator: Callable[[R, T], R]) -> R:
        """归约操作"""
        result = initial
        for item in self._data:
            result = accumulator(result, item)
        return result
    
    def find_first(self, predicate: Callable[[T], bool]) -> Optional[T]:
        """查找第一个满足条件的元素"""
        for item in self._data:
            if predicate(item):
                return item
        return None
    
    def take_while(self, predicate: Callable[[T], bool]) -> 'InternalIterable[T]':
        """取满足条件的连续元素"""
        result = []
        for item in self._data:
            if predicate(item):
                result.append(item)
            else:
                break
        return InternalIterable(result)

print("\n外部迭代器示例:")
external_iter = ExternalIterator([1, 2, 3, 4, 5])
while external_iter.has_next():
    current = next(external_iter)
    peeked = external_iter.peek()
    print(f"  当前: {current}, 下一个: {peeked}")

external_iter.reset()
print(f"  重置后第一个元素: {next(external_iter)}")

print("\n内部迭代器示例:")
internal_iter = InternalIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

result = (
    internal_iter
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x * x)
    .reduce(0, lambda acc, x: acc + x)
)
print(f"  偶数平方和: {result}")

internal_iter.for_each(lambda x: print(f"  处理: {x}") if x <= 3 else None)

16.4.3 生成器实现

python
from typing import TypeVar, Generator, Iterator, List, Optional, Callable, Any
from contextlib import contextmanager
import itertools

T = TypeVar('T')

class GeneratorIterator:
    """生成器迭代器工具集"""
    
    @staticmethod
    def count(start: int = 0, step: int = 1) -> Generator[int, None, None]:
        """无限计数器生成器"""
        n = start
        while True:
            yield n
            n += step
    
    @staticmethod
    def cycle(iterable: List[T]) -> Generator[T, None, None]:
        """无限循环生成器"""
        while True:
            for item in iterable:
                yield item
    
    @staticmethod
    def repeat(item: T, times: Optional[int] = None) -> Generator[T, None, None]:
        """重复生成器"""
        if times is None:
            while True:
                yield item
        else:
            for _ in range(times):
                yield item
    
    @staticmethod
    def range_custom(start: int, stop: Optional[int] = None, step: int = 1) -> Generator[int, None, None]:
        """自定义范围生成器"""
        if stop is None:
            start, stop = 0, start
        n = start
        while n < stop:
            yield n
            n += step
    
    @staticmethod
    def chunks(iterable: List[T], size: int) -> Generator[List[T], None, None]:
        """分块生成器"""
        for i in range(0, len(iterable), size):
            yield iterable[i:i + size]
    
    @staticmethod
    def pairwise(iterable: List[T]) -> Generator[tuple, None, None]:
        """相邻配对生成器"""
        it = iter(iterable)
        try:
            prev = next(it)
        except StopIteration:
            return
        for current in it:
            yield (prev, current)
            prev = current
    
    @staticmethod
    def sliding_window(iterable: List[T], size: int) -> Generator[tuple, None, None]:
        """滑动窗口生成器"""
        if size > len(iterable):
            return
        window = tuple(iterable[:size])
        yield window
        for i in range(size, len(iterable)):
            window = window[1:] + (iterable[i],)
            yield window

print("\n生成器迭代器示例:")

print("  无限计数器(前5个):")
for i, n in enumerate(GeneratorIterator.count(10, 2)):
    if i >= 5:
        break
    print(f"    {n}")

print("  分块生成器:")
data = list(range(1, 11))
for chunk in GeneratorIterator.chunks(data, 3):
    print(f"    {chunk}")

print("  相邻配对:")
for pair in GeneratorIterator.pairwise([1, 2, 3, 4, 5]):
    print(f"    {pair}")

print("  滑动窗口:")
for window in GeneratorIterator.sliding_window([1, 2, 3, 4, 5], 3):
    print(f"    {window}")

16.5 企业级应用示例

16.5.1 数据库分页迭代器

python
from typing import TypeVar, Generic, Iterator, List, Callable, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
import time

T = TypeVar('T')

@dataclass
class Page:
    """分页数据结构"""
    number: int
    size: int
    items: List[Any]
    total: int
    has_next: bool
    
    @property
    def has_previous(self) -> bool:
        return self.number > 1
    
    @property
    def total_pages(self) -> int:
        return (self.total + self.size - 1) // self.size

class DatabasePageIterator(Generic[T]):
    """数据库分页迭代器"""
    
    def __init__(
        self,
        fetch_func: Callable[[int, int], List[T]],
        count_func: Callable[[], int],
        page_size: int = 100,
        prefetch: bool = True
    ):
        self._fetch_func = fetch_func
        self._count_func = count_func
        self._page_size = page_size
        self._prefetch = prefetch
        
        self._current_page = 0
        self._total_count: Optional[int] = None
        self._current_items: List[T] = []
        self._current_index = 0
        self._exhausted = False
    
    def __iter__(self) -> Iterator[T]:
        return self
    
    def __next__(self) -> T:
        if self._exhausted:
            raise StopIteration
        
        if self._current_index >= len(self._current_items):
            self._load_next_page()
            
            if not self._current_items:
                self._exhausted = True
                raise StopIteration
        
        item = self._current_items[self._current_index]
        self._current_index += 1
        return item
    
    def _load_next_page(self) -> None:
        self._current_page += 1
        self._current_items = self._fetch_func(self._current_page, self._page_size)
        self._current_index = 0
    
    def get_current_page_info(self) -> Page:
        """获取当前页信息"""
        if self._total_count is None:
            self._total_count = self._count_func()
        
        return Page(
            number=self._current_page,
            size=self._page_size,
            items=self._current_items.copy(),
            total=self._total_count,
            has_next=len(self._current_items) == self._page_size
        )
    
    def reset(self) -> None:
        """重置迭代器"""
        self._current_page = 0
        self._current_items = []
        self._current_index = 0
        self._exhausted = False

class CursorBasedIterator(Generic[T]):
    """基于游标的迭代器(适用于大数据集)"""
    
    def __init__(
        self,
        fetch_func: Callable[[Optional[Any], int], List[T]],
        get_cursor: Callable[[T], Any],
        page_size: int = 100
    ):
        self._fetch_func = fetch_func
        self._get_cursor = get_cursor
        self._page_size = page_size
        
        self._cursor: Optional[Any] = None
        self._current_items: List[T] = []
        self._current_index = 0
        self._exhausted = False
    
    def __iter__(self) -> Iterator[T]:
        return self
    
    def __next__(self) -> T:
        if self._exhausted:
            raise StopIteration
        
        if self._current_index >= len(self._current_items):
            self._load_next_batch()
            
            if not self._current_items:
                self._exhausted = True
                raise StopIteration
        
        item = self._current_items[self._current_index]
        self._current_index += 1
        return item
    
    def _load_next_batch(self) -> None:
        self._current_items = self._fetch_func(self._cursor, self._page_size)
        self._current_index = 0
        
        if self._current_items:
            self._cursor = self._get_cursor(self._current_items[-1])

@dataclass
class User:
    id: int
    name: str
    email: str
    created_at: datetime = field(default_factory=datetime.now)

user_database = [
    User(id=i, name=f"用户{i}", email=f"user{i}@example.com")
    for i in range(1, 251)
]

def fetch_users_page(page: int, size: int) -> List[User]:
    start = (page - 1) * size
    end = start + size
    print(f"  [DB查询] 获取第{page}页,每页{size}条")
    return user_database[start:end]

def count_users() -> int:
    return len(user_database)

def fetch_users_cursor(cursor: Optional[int], size: int) -> List[User]:
    start = cursor if cursor else 0
    end = start + size
    print(f"  [DB查询] 从游标{cursor}开始获取{size}条")
    return user_database[start:end]

print("\n数据库分页迭代器示例:")
print("  偏移分页迭代器:")
page_iter = DatabasePageIterator(
    fetch_func=fetch_users_page,
    count_func=count_users,
    page_size=50
)

count = 0
for user in page_iter:
    count += 1
    if count <= 3:
        print(f"    用户: {user.name}")
    elif count == 4:
        print(f"    ... (共处理 {len(user_database)} 条记录)")
        break

print("\n  游标迭代器:")
cursor_iter = CursorBasedIterator(
    fetch_func=fetch_users_cursor,
    get_cursor=lambda u: u.id,
    page_size=50
)

count = 0
for user in cursor_iter:
    count += 1
    if count <= 3:
        print(f"    用户: {user.name}")
    elif count == 4:
        print(f"    ... (共处理 {len(user_database)} 条记录)")
        break

16.5.2 流式数据处理迭代器

python
from typing import TypeVar, Generic, Iterator, Callable, Optional, Any, List
from dataclasses import dataclass
from collections import deque
import time

T = TypeVar('T')
R = TypeVar('R')

@dataclass
class StreamConfig:
    """流处理配置"""
    buffer_size: int = 1000
    batch_timeout: float = 1.0
    max_retries: int = 3

class StreamIterator(Generic[T]):
    """流式数据迭代器"""
    
    def __init__(
        self,
        source: Callable[[], Optional[T]],
        config: Optional[StreamConfig] = None
    ):
        self._source = source
        self._config = config or StreamConfig()
        self._buffer: deque = deque(maxlen=self._config.buffer_size)
        self._exhausted = False
    
    def __iter__(self) -> Iterator[T]:
        return self
    
    def __next__(self) -> T:
        if self._buffer:
            return self._buffer.popleft()
        
        if self._exhausted:
            raise StopIteration
        
        item = self._source()
        if item is None:
            self._exhausted = True
            raise StopIteration
        
        return item
    
    def buffer_items(self, count: int) -> int:
        """预缓冲指定数量的元素"""
        buffered = 0
        while buffered < count and not self._exhausted:
            item = self._source()
            if item is None:
                self._exhausted = True
                break
            self._buffer.append(item)
            buffered += 1
        return buffered

class BatchIterator(Generic[T]):
    """批量迭代器"""
    
    def __init__(
        self,
        source: Iterator[T],
        batch_size: int = 100,
        timeout: Optional[float] = None
    ):
        self._source = source
        self._batch_size = batch_size
        self._timeout = timeout
        self._exhausted = False
    
    def __iter__(self) -> Iterator[List[T]]:
        return self
    
    def __next__(self) -> List[T]:
        if self._exhausted:
            raise StopIteration
        
        batch: List[T] = []
        start_time = time.time()
        
        while len(batch) < self._batch_size:
            try:
                item = next(self._source)
                batch.append(item)
            except StopIteration:
                self._exhausted = True
                break
            
            if self._timeout and (time.time() - start_time) >= self._timeout:
                break
        
        if not batch:
            raise StopIteration
        
        return batch

class TransformIterator(Generic[T, R]):
    """转换迭代器"""
    
    def __init__(
        self,
        source: Iterator[T],
        transformer: Callable[[T], R],
        error_handler: Optional[Callable[[Exception, T], Optional[R]]] = None
    ):
        self._source = source
        self._transformer = transformer
        self._error_handler = error_handler
    
    def __iter__(self) -> Iterator[R]:
        return self
    
    def __next__(self) -> R:
        while True:
            item = next(self._source)
            try:
                return self._transformer(item)
            except Exception as e:
                if self._error_handler:
                    result = self._error_handler(e, item)
                    if result is not None:
                        return result
                else:
                    raise

class FilterIterator(Generic[T]):
    """过滤迭代器"""
    
    def __init__(
        self,
        source: Iterator[T],
        predicate: Callable[[T], bool]
    ):
        self._source = source
        self._predicate = predicate
    
    def __iter__(self) -> Iterator[T]:
        return self
    
    def __next__(self) -> T:
        while True:
            item = next(self._source)
            if self._predicate(item):
                return item

class ChainIterator(Generic[T]):
    """链式迭代器"""
    
    def __init__(self, *iterators: Iterator[T]):
        self._iterators = list(iterators)
        self._current_index = 0
    
    def __iter__(self) -> Iterator[T]:
        return self
    
    def __next__(self) -> T:
        while self._current_index < len(self._iterators):
            try:
                return next(self._iterators[self._current_index])
            except StopIteration:
                self._current_index += 1
        raise StopIteration

data_stream = iter(range(1, 101))

print("\n流式数据处理迭代器示例:")

print("  批量迭代器:")
batch_iter = BatchIterator(iter(range(1, 101)), batch_size=25)
for i, batch in enumerate(batch_iter, 1):
    print(f"    批次{i}: {batch[:3]}... (共{len(batch)}条)")

print("\n  转换迭代器:")
transform_iter = TransformIterator(
    iter(range(1, 11)),
    lambda x: x ** 2,
    lambda e, x: None
)
print(f"    平方结果: {list(transform_iter)}")

print("\n  过滤迭代器:")
filter_iter = FilterIterator(
    iter(range(1, 21)),
    lambda x: x % 2 == 0
)
print(f"    偶数: {list(filter_iter)}")

print("\n  链式迭代器:")
chain_iter = ChainIterator(
    iter([1, 2, 3]),
    iter(['a', 'b', 'c']),
    iter([True, False])
)
print(f"    链式结果: {list(chain_iter)}")

16.5.3 树形结构遍历迭代器

python
from typing import TypeVar, Generic, Iterator, List, Optional, Callable, Any
from dataclasses import dataclass, field
from collections import deque
from enum import Enum, auto

T = TypeVar('T')

class TraversalOrder(Enum):
    """遍历顺序枚举"""
    PREORDER = auto()
    INORDER = auto()
    POSTORDER = auto()
    LEVELORDER = auto()
    REVERSE_PREORDER = auto()
    REVERSE_INORDER = auto()
    REVERSE_POSTORDER = auto()
    REVERSE_LEVELORDER = auto()

@dataclass
class TreeNode(Generic[T]):
    """树节点"""
    value: T
    left: Optional['TreeNode[T]'] = None
    right: Optional['TreeNode[T]'] = None
    children: List['TreeNode[T]'] = field(default_factory=list)
    
    def add_child(self, child: 'TreeNode[T]') -> 'TreeNode[T]':
        self.children.append(child)
        return self
    
    @property
    def is_leaf(self) -> bool:
        return not self.children and not self.left and not self.right

class TreeIterator(Generic[T]):
    """树遍历迭代器"""
    
    def __init__(
        self,
        root: Optional[TreeNode[T]],
        order: TraversalOrder = TraversalOrder.PREORDER
    ):
        self._root = root
        self._order = order
    
    def __iter__(self) -> Iterator[T]:
        if self._root is None:
            return
        
        if self._order == TraversalOrder.PREORDER:
            yield from self._preorder(self._root)
        elif self._order == TraversalOrder.INORDER:
            yield from self._inorder(self._root)
        elif self._order == TraversalOrder.POSTORDER:
            yield from self._postorder(self._root)
        elif self._order == TraversalOrder.LEVELORDER:
            yield from self._levelorder(self._root)
        elif self._order == TraversalOrder.REVERSE_PREORDER:
            yield from self._reverse_preorder(self._root)
        elif self._order == TraversalOrder.REVERSE_INORDER:
            yield from self._reverse_inorder(self._root)
        elif self._order == TraversalOrder.REVERSE_POSTORDER:
            yield from self._reverse_postorder(self._root)
        elif self._order == TraversalOrder.REVERSE_LEVELORDER:
            yield from self._reverse_levelorder(self._root)
    
    def _preorder(self, node: TreeNode[T]) -> Iterator[T]:
        yield node.value
        for child in node.children:
            yield from self._preorder(child)
        if node.left:
            yield from self._preorder(node.left)
        if node.right:
            yield from self._preorder(node.right)
    
    def _inorder(self, node: TreeNode[T]) -> Iterator[T]:
        if node.left:
            yield from self._inorder(node.left)
        yield node.value
        for child in node.children:
            yield from self._inorder(child)
        if node.right:
            yield from self._inorder(node.right)
    
    def _postorder(self, node: TreeNode[T]) -> Iterator[T]:
        for child in node.children:
            yield from self._postorder(child)
        if node.left:
            yield from self._postorder(node.left)
        if node.right:
            yield from self._postorder(node.right)
        yield node.value
    
    def _levelorder(self, node: TreeNode[T]) -> Iterator[T]:
        queue = deque([node])
        while queue:
            current = queue.popleft()
            yield current.value
            queue.extend(current.children)
            if current.left:
                queue.append(current.left)
            if current.right:
                queue.append(current.right)
    
    def _reverse_preorder(self, node: TreeNode[T]) -> Iterator[T]:
        if node.right:
            yield from self._reverse_preorder(node.right)
        if node.left:
            yield from self._reverse_preorder(node.left)
        for child in reversed(node.children):
            yield from self._reverse_preorder(child)
        yield node.value
    
    def _reverse_inorder(self, node: TreeNode[T]) -> Iterator[T]:
        if node.right:
            yield from self._reverse_inorder(node.right)
        yield node.value
        for child in reversed(node.children):
            yield from self._reverse_inorder(child)
        if node.left:
            yield from self._reverse_inorder(node.left)
    
    def _reverse_postorder(self, node: TreeNode[T]) -> Iterator[T]:
        yield node.value
        if node.right:
            yield from self._reverse_postorder(node.right)
        if node.left:
            yield from self._reverse_postorder(node.left)
        for child in reversed(node.children):
            yield from self._reverse_postorder(child)
    
    def _reverse_levelorder(self, node: TreeNode[T]) -> Iterator[T]:
        stack = []
        queue = deque([node])
        while queue:
            current = queue.popleft()
            stack.append(current.value)
            if current.right:
                queue.append(current.right)
            if current.left:
                queue.append(current.left)
            for child in reversed(current.children):
                queue.append(child)
        yield from reversed(stack)

class TreePathIterator(Generic[T]):
    """树路径迭代器"""
    
    def __init__(self, root: Optional[TreeNode[T]]):
        self._root = root
    
    def __iter__(self) -> Iterator[List[T]]:
        if self._root is None:
            return
        yield from self._find_paths(self._root, [])
    
    def _find_paths(self, node: TreeNode[T], path: List[T]) -> Iterator[List[T]]:
        current_path = path + [node.value]
        
        if node.is_leaf:
            yield current_path
        
        for child in node.children:
            yield from self._find_paths(child, current_path)
        
        if node.left:
            yield from self._find_paths(node.left, current_path)
        if node.right:
            yield from self._find_paths(node.right, current_path)

class TreeSearchIterator(Generic[T]):
    """树搜索迭代器"""
    
    def __init__(
        self,
        root: Optional[TreeNode[T]],
        predicate: Callable[[T], bool]
    ):
        self._root = root
        self._predicate = predicate
    
    def __iter__(self) -> Iterator[TreeNode[T]]:
        if self._root is None:
            return
        yield from self._search(self._root)
    
    def _search(self, node: TreeNode[T]) -> Iterator[TreeNode[T]]:
        if self._predicate(node.value):
            yield node
        for child in node.children:
            yield from self._search(child)
        if node.left:
            yield from self._search(node.left)
        if node.right:
            yield from self._search(node.right)

root = TreeNode(1, children=[
    TreeNode(2, children=[
        TreeNode(4),
        TreeNode(5)
    ]),
    TreeNode(3, children=[
        TreeNode(6),
        TreeNode(7)
    ])
])

print("\n树形结构遍历迭代器示例:")

print("  前序遍历:")
print(f"    {list(TreeIterator(root, TraversalOrder.PREORDER))}")

print("  后序遍历:")
print(f"    {list(TreeIterator(root, TraversalOrder.POSTORDER))}")

print("  层序遍历:")
print(f"    {list(TreeIterator(root, TraversalOrder.LEVELORDER))}")

print("  路径迭代器:")
for path in TreePathIterator(root):
    print(f"    {path}")

print("  搜索迭代器(值>4):")
for node in TreeSearchIterator(root, lambda x: x > 4):
    print(f"    找到节点: {node.value}")

16.5.4 文件处理迭代器

python
from typing import Iterator, List, Optional, Dict, Any, Callable
from dataclasses import dataclass
from pathlib import Path
from contextlib import contextmanager
import csv
import json
import re

@dataclass
class FileInfo:
    """文件信息"""
    path: Path
    size: int
    line_count: int
    encoding: str

class FileLineIterator:
    """文件行迭代器"""
    
    def __init__(
        self,
        file_path: str,
        encoding: str = 'utf-8',
        skip_empty: bool = False,
        strip_whitespace: bool = True
    ):
        self._file_path = file_path
        self._encoding = encoding
        self._skip_empty = skip_empty
        self._strip_whitespace = strip_whitespace
        self._file = None
        self._line_number = 0
    
    def __iter__(self) -> Iterator[str]:
        self._file = open(self._file_path, 'r', encoding=self._encoding)
        self._line_number = 0
        return self
    
    def __next__(self) -> str:
        if self._file is None:
            raise StopIteration
        
        while True:
            line = self._file.readline()
            if not line:
                self._file.close()
                self._file = None
                raise StopIteration
            
            self._line_number += 1
            
            if self._strip_whitespace:
                line = line.rstrip('\n\r')
            
            if self._skip_empty and not line:
                continue
            
            return line
    
    @property
    def line_number(self) -> int:
        return self._line_number

class CSVIterator:
    """CSV文件迭代器"""
    
    def __init__(
        self,
        file_path: str,
        delimiter: str = ',',
        encoding: str = 'utf-8',
        has_header: bool = True
    ):
        self._file_path = file_path
        self._delimiter = delimiter
        self._encoding = encoding
        self._has_header = has_header
        self._file = None
        self._headers: Optional[List[str]] = None
        self._row_number = 0
    
    def __iter__(self) -> Iterator[Dict[str, str]]:
        self._file = open(self._file_path, 'r', encoding=self._encoding)
        self._row_number = 0
        
        if self._has_header:
            header_line = self._file.readline()
            self._headers = header_line.rstrip('\n\r').split(self._delimiter)
        
        return self
    
    def __next__(self) -> Dict[str, str]:
        if self._file is None:
            raise StopIteration
        
        line = self._file.readline()
        if not line:
            self._file.close()
            self._file = None
            raise StopIteration
        
        self._row_number += 1
        values = line.rstrip('\n\r').split(self._delimiter)
        
        if self._headers:
            return dict(zip(self._headers, values))
        return {str(i): v for i, v in enumerate(values)}
    
    @property
    def row_number(self) -> int:
        return self._row_number

class JSONLinesIterator:
    """JSON Lines文件迭代器"""
    
    def __init__(self, file_path: str, encoding: str = 'utf-8'):
        self._file_path = file_path
        self._encoding = encoding
        self._file = None
        self._line_number = 0
    
    def __iter__(self) -> Iterator[Dict[str, Any]]:
        self._file = open(self._file_path, 'r', encoding=self._encoding)
        self._line_number = 0
        return self
    
    def __next__(self) -> Dict[str, Any]:
        if self._file is None:
            raise StopIteration
        
        line = self._file.readline()
        if not line:
            self._file.close()
            self._file = None
            raise StopIteration
        
        self._line_number += 1
        return json.loads(line)
    
    @property
    def line_number(self) -> int:
        return self._line_number

class LogFileIterator:
    """日志文件迭代器"""
    
    def __init__(
        self,
        file_path: str,
        pattern: Optional[str] = None,
        encoding: str = 'utf-8'
    ):
        self._file_path = file_path
        self._pattern = re.compile(pattern) if pattern else None
        self._encoding = encoding
        self._file = None
        self._line_number = 0
    
    def __iter__(self) -> Iterator[Dict[str, Any]]:
        self._file = open(self._file_path, 'r', encoding=self._encoding)
        self._line_number = 0
        return self
    
    def __next__(self) -> Dict[str, Any]:
        if self._file is None:
            raise StopIteration
        
        while True:
            line = self._file.readline()
            if not line:
                self._file.close()
                self._file = None
                raise StopIteration
            
            self._line_number += 1
            line = line.rstrip('\n\r')
            
            if self._pattern:
                match = self._pattern.match(line)
                if match:
                    return {
                        'line_number': self._line_number,
                        'raw': line,
                        'groups': match.groups(),
                        'named': match.groupdict()
                    }
            else:
                return {
                    'line_number': self._line_number,
                    'raw': line
                }
    
    @property
    def line_number(self) -> int:
        return self._line_number

class DirectoryIterator:
    """目录迭代器"""
    
    def __init__(
        self,
        directory: str,
        pattern: str = '*',
        recursive: bool = False,
        include_dirs: bool = False
    ):
        self._directory = Path(directory)
        self._pattern = pattern
        self._recursive = recursive
        self._include_dirs = include_dirs
        self._iterator = None
    
    def __iter__(self) -> Iterator[Path]:
        if self._recursive:
            self._iterator = self._directory.rglob(self._pattern)
        else:
            self._iterator = self._directory.glob(self._pattern)
        return self
    
    def __next__(self) -> Path:
        if self._iterator is None:
            raise StopIteration
        
        while True:
            path = next(self._iterator)
            if self._include_dirs or path.is_file():
                return path

print("\n文件处理迭代器示例:")

test_content = """姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州
"""

import tempfile
import os

with tempfile.TemporaryDirectory() as tmpdir:
    csv_file = os.path.join(tmpdir, 'test.csv')
    with open(csv_file, 'w', encoding='utf-8') as f:
        f.write(test_content)
    
    print("  CSV迭代器:")
    csv_iter = CSVIterator(csv_file)
    for row in csv_iter:
        print(f"    第{csv_iter.row_number}行: {row}")
    
    print("\n  目录迭代器:")
    dir_iter = DirectoryIterator(tmpdir, '*.csv')
    for path in dir_iter:
        print(f"    文件: {path.name}")

16.6 模式变体

16.6.1 惰性迭代器

python
from typing import TypeVar, Generic, Iterator, Callable, Optional, List, Any
from functools import wraps

T = TypeVar('T')
R = TypeVar('R')

class LazyIterator(Generic[T]):
    """惰性迭代器 - 延迟计算"""
    
    def __init__(self, source: Callable[[], Iterator[T]]):
        self._source_factory = source
        self._iterator: Optional[Iterator[T]] = None
    
    def __iter__(self) -> Iterator[T]:
        if self._iterator is None:
            self._iterator = self._source_factory()
        return self._iterator
    
    def __next__(self) -> T:
        if self._iterator is None:
            self._iterator = self._source_factory()
        return next(self._iterator)
    
    def map(self, func: Callable[[T], R]) -> 'LazyIterator[R]':
        """惰性映射"""
        def new_source():
            return (func(x) for x in self._source_factory())
        return LazyIterator(new_source)
    
    def filter(self, predicate: Callable[[T], bool]) -> 'LazyIterator[T]':
        """惰性过滤"""
        def new_source():
            return (x for x in self._source_factory() if predicate(x))
        return LazyIterator(new_source)
    
    def take(self, n: int) -> 'LazyIterator[T]':
        """取前n个元素"""
        def new_source():
            for i, x in enumerate(self._source_factory()):
                if i >= n:
                    break
                yield x
        return LazyIterator(new_source)
    
    def drop(self, n: int) -> 'LazyIterator[T]':
        """跳过前n个元素"""
        def new_source():
            for i, x in enumerate(self._source_factory()):
                if i >= n:
                    yield x
        return LazyIterator(new_source)
    
    def to_list(self) -> List[T]:
        """转换为列表(触发计算)"""
        return list(iter(self))

def lazy(func: Callable[..., Iterator[T]]) -> Callable[..., LazyIterator[T]]:
    """惰性迭代器装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        return LazyIterator(lambda: func(*args, **kwargs))
    return wrapper

print("\n惰性迭代器示例:")

@lazy
def generate_numbers(n: int) -> Iterator[int]:
    print(f"  [计算] 生成{n}个数字")
    for i in range(n):
        print(f"    [计算] 生成第{i+1}个数字")
        yield i

lazy_nums = generate_numbers(10)
print("  创建惰性迭代器(未计算)")

transformed = (
    lazy_nums
    .filter(lambda x: x % 2 == 0)
    .map(lambda x: x * x)
    .take(3)
)
print("  链式转换(未计算)")

print("  触发计算:")
result = transformed.to_list()
print(f"  结果: {result}")

16.6.2 可重置迭代器

python
from typing import TypeVar, Generic, Iterator, Callable, List, Optional
from copy import deepcopy

T = TypeVar('T')

class ResettableIterator(Generic[T]):
    """可重置迭代器"""
    
    def __init__(self, iterable: List[T]):
        self._data = iterable
        self._index = 0
        self._history: List[T] = []
        self._checkpoint_index = 0
    
    def __iter__(self) -> 'ResettableIterator[T]':
        return self
    
    def __next__(self) -> T:
        if self._index >= len(self._data):
            raise StopIteration
        value = self._data[self._index]
        self._history.append(value)
        self._index += 1
        return value
    
    def reset(self) -> None:
        """重置到初始状态"""
        self._index = 0
        self._history.clear()
        self._checkpoint_index = 0
    
    def checkpoint(self) -> int:
        """创建检查点,返回检查点ID"""
        self._checkpoint_index = self._index
        return self._checkpoint_index
    
    def rollback(self, checkpoint_id: Optional[int] = None) -> None:
        """回滚到检查点"""
        if checkpoint_id is not None:
            self._index = checkpoint_id
        else:
            self._index = self._checkpoint_index
    
    def get_history(self) -> List[T]:
        """获取已迭代的元素历史"""
        return self._history.copy()
    
    @property
    def position(self) -> int:
        """当前位置"""
        return self._index

class PeekableIterator(Generic[T]):
    """可预览迭代器"""
    
    def __init__(self, iterator: Iterator[T]):
        self._iterator = iterator
        self._buffer: List[T] = []
        self._exhausted = False
    
    def __iter__(self) -> 'PeekableIterator[T]':
        return self
    
    def __next__(self) -> T:
        if self._buffer:
            return self._buffer.pop(0)
        return next(self._iterator)
    
    def peek(self, n: int = 1) -> List[T]:
        """预览接下来的n个元素"""
        while len(self._buffer) < n and not self._exhausted:
            try:
                self._buffer.append(next(self._iterator))
            except StopIteration:
                self._exhausted = True
                break
        return self._buffer[:n]
    
    def has_next(self) -> bool:
        """检查是否还有下一个元素"""
        if self._buffer:
            return True
        try:
            self._buffer.append(next(self._iterator))
            return True
        except StopIteration:
            self._exhausted = True
            return False

print("\n可重置迭代器示例:")

resettable = ResettableIterator([1, 2, 3, 4, 5])
print("  第一次迭代:")
for item in resettable:
    print(f"    {item}")
    if item == 2:
        checkpoint = resettable.checkpoint()
        print(f"    创建检查点: {checkpoint}")

print(f"  当前位置: {resettable.position}")
print(f"  历史: {resettable.get_history()}")

resettable.rollback()
print("  回滚后继续迭代:")
for item in resettable:
    print(f"    {item}")

print("\n可预览迭代器示例:")
peekable = PeekableIterator(iter([1, 2, 3, 4, 5]))
print(f"  预览前3个: {peekable.peek(3)}")
print(f"  下一个元素: {next(peekable)}")
print(f"  再次预览: {peekable.peek(2)}")
print(f"  是否还有下一个: {peekable.has_next()}")

16.6.3 并行迭代器

python
from typing import TypeVar, Generic, Iterator, List, Tuple, Optional, Callable
from itertools import zip_longest
from concurrent.futures import ThreadPoolExecutor, as_completed

T = TypeVar('T')
R = TypeVar('R')

class ParallelIterator(Generic[T]):
    """并行迭代器"""
    
    def __init__(
        self,
        iterable: List[T],
        max_workers: int = 4
    ):
        self._data = iterable
        self._max_workers = max_workers
    
    def map_parallel(
        self,
        func: Callable[[T], R],
        chunk_size: int = 1
    ) -> List[R]:
        """并行映射"""
        with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
            futures = [
                executor.submit(func, item)
                for item in self._data
            ]
            return [future.result() for future in as_completed(futures)]
    
    def filter_parallel(
        self,
        predicate: Callable[[T], bool]
    ) -> List[T]:
        """并行过滤"""
        with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
            futures = {
                executor.submit(predicate, item): item
                for item in self._data
            }
            return [
                item for future, item in futures.items()
                if future.result()
            ]

class ZipIterator:
    """压缩迭代器"""
    
    @staticmethod
    def zip_strict(*iterables: Iterator) -> Iterator[Tuple]:
        """严格压缩 - 长度必须相等"""
        iterators = [iter(it) for it in iterables]
        while True:
            items = []
            exhausted = []
            for i, it in enumerate(iterators):
                try:
                    items.append(next(it))
                except StopIteration:
                    exhausted.append(i)
            
            if exhausted:
                if items:
                    raise ValueError(
                        f"迭代器长度不一致,索引{exhausted}已耗尽"
                    )
                return
            
            yield tuple(items)
    
    @staticmethod
    def zip_longest_custom(
        *iterables: Iterator,
        fillvalue: Optional[T] = None
    ) -> Iterator[Tuple]:
        """最长压缩 - 用填充值补齐"""
        return zip_longest(*iterables, fillvalue=fillvalue)
    
    @staticmethod
    def zip_dict(**iterables: Iterator) -> Iterator[dict]:
        """字典压缩"""
        keys = list(iterables.keys())
        iterators = [iter(it) for it in iterables.values()]
        
        while True:
            try:
                values = [next(it) for it in iterators]
                yield dict(zip(keys, values))
            except StopIteration:
                return

class ProductIterator:
    """笛卡尔积迭代器"""
    
    @staticmethod
    def product(*iterables: List, repeat: int = 1) -> Iterator[Tuple]:
        """笛卡尔积"""
        pools = [tuple(pool) for pool in iterables] * repeat
        
        if not pools:
            yield ()
            return
        
        def backtrack(index: int, current: List):
            if index == len(pools):
                yield tuple(current)
                return
            
            for item in pools[index]:
                yield from backtrack(index + 1, current + [item])
        
        yield from backtrack(0, [])

print("\n并行迭代器示例:")

parallel_iter = ParallelIterator(list(range(1, 11)), max_workers=4)
squares = parallel_iter.map_parallel(lambda x: x ** 2)
print(f"  并行平方: {sorted(squares)}")

evens = parallel_iter.filter_parallel(lambda x: x % 2 == 0)
print(f"  并行过滤偶数: {sorted(evens)}")

print("\n压缩迭代器示例:")
names = ['张三', '李四', '王五']
ages = [25, 30, 28]
cities = ['北京', '上海', '广州']

print("  字典压缩:")
for person in ZipIterator.zip_dict(name=names, age=ages, city=cities):
    print(f"    {person}")

print("\n笛卡尔积迭代器示例:")
colors = ['红', '蓝']
sizes = ['S', 'M', 'L']
print("  颜色×尺寸:")
for combo in ProductIterator.product(colors, sizes):
    print(f"    {combo}")

16.7 反模式与最佳实践

16.7.1 常见反模式

python
from typing import List, Any, Iterator, Optional

class IteratorAntiPatterns:
    """迭代器反模式示例"""
    
    @staticmethod
    def anti_pattern_1_list_conversion():
        """反模式1:不必要的列表转换"""
        data = range(1000000)
        
        bad_result = list(x * 2 for x in data)
        
        good_result = (x * 2 for x in data)
        
        return bad_result, good_result
    
    @staticmethod
    def anti_pattern_2_multiple_iteration():
        """反模式2:多次迭代同一迭代器"""
        data = iter([1, 2, 3, 4, 5])
        
        print("    第一次迭代:", list(data))
        print("    第二次迭代:", list(data))
        
        good_data = [1, 2, 3, 4, 5]
        print("    正确做法第一次:", list(good_data))
        print("    正确做法第二次:", list(good_data))
    
    @staticmethod
    def anti_pattern_3_modify_during_iteration():
        """反模式3:迭代时修改集合"""
        data = [1, 2, 3, 4, 5]
        
        bad_data = data.copy()
        try:
            for item in bad_data:
                if item == 3:
                    bad_data.remove(item)
        except RuntimeError as e:
            print(f"    错误: {e}")
        
        good_data = data.copy()
        to_remove = []
        for item in good_data:
            if item == 3:
                to_remove.append(item)
        for item in to_remove:
            good_data.remove(item)
        print(f"    正确结果: {good_data}")
        
        better_data = [x for x in data if x != 3]
        print(f"    更好做法: {better_data}")
    
    @staticmethod
    def anti_pattern_4_infinite_iterator():
        """反模式4:无限迭代器未限制"""
        def infinite_counter():
            n = 0
            while True:
                yield n
                n += 1
        
        counter = infinite_counter()
        
        print("    限制迭代次数:")
        for i, n in enumerate(counter):
            if i >= 5:
                break
            print(f"      {n}")
    
    @staticmethod
    def anti_pattern_5_exception_in_iterator():
        """反模式5:迭代器中的异常处理不当"""
        def bad_generator():
            data = [1, 2, 'three', 4, 5]
            for item in data:
                yield item * 2
        
        def good_generator():
            data = [1, 2, 'three', 4, 5]
            for item in data:
                try:
                    yield item * 2
                except TypeError:
                    yield None
        
        print("    不当处理:")
        try:
            for item in bad_generator():
                print(f"      {item}")
        except TypeError as e:
            print(f"      异常: {e}")
        
        print("    正确处理:")
        for item in good_generator():
            print(f"      {item}")

print("\n迭代器反模式示例:")
print("  反模式2 - 多次迭代:")
IteratorAntiPatterns.anti_pattern_2_multiple_iteration()

print("\n  反模式3 - 迭代时修改:")
IteratorAntiPatterns.anti_pattern_3_modify_during_iteration()

print("\n  反模式4 - 无限迭代器:")
IteratorAntiPatterns.anti_pattern_4_infinite_iterator()

print("\n  反模式5 - 异常处理:")
IteratorAntiPatterns.anti_pattern_5_exception_in_iterator()

16.7.2 最佳实践

python
from typing import TypeVar, Generic, Iterator, List, Callable, Optional, Any
from contextlib import contextmanager
from functools import wraps
import itertools

T = TypeVar('T')

class IteratorBestPractices:
    """迭代器最佳实践"""
    
    @staticmethod
    def practice_1_use_generators():
        """实践1:优先使用生成器"""
        def bad_range(n: int) -> List[int]:
            result = []
            for i in range(n):
                result.append(i)
            return result
        
        def good_range(n: int) -> Iterator[int]:
            for i in range(n):
                yield i
        
        print("    列表方式占用更多内存")
        print("    生成器方式惰性计算,内存友好")
    
    @staticmethod
    def practice_2_use_itertools():
        """实践2:善用itertools模块"""
        data = [1, 2, 3, 4, 5]
        
        print("    chain - 链接多个迭代器:")
        print(f"      {list(itertools.chain([1, 2], [3, 4], [5]))}")
        
        print("    islice - 切片迭代器:")
        print(f"      {list(itertools.islice(range(100), 5, 10))}")
        
        print("    takewhile - 条件取值:")
        print(f"      {list(itertools.takewhile(lambda x: x < 4, data))}")
        
        print("    dropwhile - 条件跳过:")
        print(f"      {list(itertools.dropwhile(lambda x: x < 4, data))}")
        
        print("    groupby - 分组:")
        data_sorted = sorted(['apple', 'banana', 'apricot', 'blueberry', 'cherry'])
        for key, group in itertools.groupby(data_sorted, key=lambda x: x[0]):
            print(f"      {key}: {list(group)}")
    
    @staticmethod
    @contextmanager
    def practice_3_context_manager():
        """实践3:使用上下文管理器管理资源"""
        class SafeFileIterator:
            def __init__(self, file_path: str):
                self._file_path = file_path
                self._file = None
            
            def __enter__(self):
                self._file = open(self._file_path, 'r')
                return self
            
            def __exit__(self, exc_type, exc_val, exc_tb):
                if self._file:
                    self._file.close()
                return False
            
            def __iter__(self):
                return self._file
            
        print("    使用上下文管理器确保资源正确释放")
    
    @staticmethod
    def practice_4_type_hints():
        """实践4:使用类型提示"""
        from typing import Iterator, Generator
        
        def typed_generator(n: int) -> Generator[int, None, None]:
            for i in range(n):
                yield i
        
        def typed_iterator(data: List[str]) -> Iterator[str]:
            return iter(data)
        
        print("    类型提示提高代码可读性和IDE支持")
    
    @staticmethod
    def practice_5_documentation():
        """实践5:添加文档说明"""
        def documented_generator(n: int) -> Iterator[int]:
            """
            生成从0到n-1的整数序列。
            
            Args:
                n: 生成整数的数量
                
            Yields:
                int: 序列中的下一个整数
                
            Examples:
                >>> list(documented_generator(3))
                [0, 1, 2]
            """
            for i in range(n):
                yield i
        
        print("    文档说明提高代码可维护性")

print("\n迭代器最佳实践示例:")
print("  实践2 - 善用itertools:")
IteratorBestPractices.practice_2_use_itertools()

16.8 决策指南

16.8.1 迭代器类型选择决策树

┌─────────────────────────────────────────────────────────────────────┐
│                    Iterator Type Decision Tree                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│                      需要遍历集合元素?                              │
│                           │                                          │
│              ┌────────────┴────────────┐                            │
│              │                         │                            │
│             是                        否                            │
│              │                         │                            │
│              ▼                         ▼                            │
│    ┌─────────────────┐         不需要迭代器                         │
│    │ 数据量大小?    │                                               │
│    └────────┬────────┘                                               │
│             │                                                        │
│     ┌───────┼───────┐                                               │
│     │       │       │                                               │
│    小      中等     大                                               │
│     │       │       │                                               │
│     ▼       ▼       ▼                                               │
│  列表    生成器   流式迭代器                                         │
│  迭代器           + 分页                                             │
│                                                                      │
│              需要多次遍历?                                          │
│                   │                                                  │
│         ┌────────┴────────┐                                         │
│         │                 │                                         │
│        是                否                                         │
│         │                 │                                         │
│         ▼                 ▼                                         │
│   可重置迭代器      标准迭代器                                       │
│   或使用列表                                                         │
│                                                                      │
│              需要预览元素?                                          │
│                   │                                                  │
│         ┌────────┴────────┐                                         │
│         │                 │                                         │
│        是                否                                         │
│         │                 │                                         │
│         ▼                 ▼                                         │
│   可预览迭代器      标准迭代器                                       │
│                                                                      │
│              需要并行处理?                                          │
│                   │                                                  │
│         ┌────────┴────────┐                                         │
│         │                 │                                         │
│        是                否                                         │
│         │                 │                                         │
│         ▼                 ▼                                         │
│   并行迭代器        标准迭代器                                       │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

16.8.2 实现方式选择

场景推荐实现原因
简单序列遍历生成器函数代码简洁,内存高效
需要状态管理迭代器类可维护复杂状态
大数据集流式迭代器 + 分页避免内存溢出
需要多次遍历列表或可重置迭代器支持重复访问
树形结构递归生成器自然表达遍历逻辑
文件处理上下文管理器 + 迭代器确保资源正确释放
并行处理ThreadPoolExecutor + 迭代器提高处理速度

16.9 快速参考卡

16.9.1 核心概念速查

┌─────────────────────────────────────────────────────────────────────┐
│                    Iterator Pattern Quick Reference                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  【定义】                                                            │
│  提供一种方法顺序访问聚合对象元素,而不暴露其内部表示               │
│                                                                      │
│  【Python迭代器协议】                                                │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  class Iterator:                                              │   │
│  │      def __iter__(self) -> Iterator   # 返回self             │   │
│  │      def __next__(self) -> T          # 返回下一元素或Stop    │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【核心组件】                                                        │
│  • Iterator: 迭代器接口,定义遍历方法                               │
│  • ConcreteIterator: 具体迭代器实现                                 │
│  • Aggregate: 聚合接口,定义创建迭代器的方法                        │
│  • ConcreteAggregate: 具体聚合实现                                  │
│                                                                      │
│  【常用itertools函数】                                               │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  count(start, step)    # 无限计数器                           │   │
│  │  cycle(iterable)       # 无限循环                             │   │
│  │  repeat(item, times)   # 重复元素                             │   │
│  │  chain(*iterables)     # 链接迭代器                           │   │
│  │  islice(iterable, ...) # 切片迭代器                           │   │
│  │  takewhile(pred, iter) # 条件取值                             │   │
│  │  dropwhile(pred, iter) # 条件跳过                             │   │
│  │  groupby(iter, key)    # 分组                                 │   │
│  │  permutations(iter, r) # 排列                                 │   │
│  │  combinations(iter, r) # 组合                                 │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【生成器语法】                                                      │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │  def generator():                                             │   │
│  │      yield value              # 产出值                        │   │
│  │      yield from iterable      # 委托生成器                    │   │
│  │                                                               │   │
│  │  gen = (x for x in iterable)  # 生成器表达式                  │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                      │
│  【适用场景】                                                        │
│  ✓ 遍历聚合对象而不暴露内部表示                                     │
│  ✓ 支持多种遍历方式                                                 │
│  ✓ 处理大数据集或流式数据                                           │
│  ✓ 需要延迟计算                                                     │
│                                                                      │
│  【优点】                                                            │
│  + 统一遍历接口                                                     │
│  + 解耦遍历逻辑与集合                                               │
│  + 支持延迟计算                                                     │
│  + 内存效率高                                                       │
│                                                                      │
│  【缺点】                                                            │
│  - 通常只能单向遍历                                                 │
│  - 迭代器通常不可重置                                               │
│  - 可能增加系统复杂性                                               │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

16.9.2 代码模板速查

python
from typing import TypeVar, Iterator, Generator, List, Optional
from abc import ABC, abstractmethod

T = TypeVar('T')

class IteratorTemplate(ABC, Generic[T]):
    """迭代器模板"""
    
    @abstractmethod
    def __iter__(self) -> 'IteratorTemplate[T]':
        return self
    
    @abstractmethod
    def __next__(self) -> T:
        raise StopIteration

class SequenceIteratorTemplate(IteratorTemplate[T]):
    """序列迭代器模板"""
    
    def __init__(self, sequence: List[T]):
        self._sequence = sequence
        self._index = 0
    
    def __iter__(self) -> 'SequenceIteratorTemplate[T]':
        return self
    
    def __next__(self) -> T:
        if self._index >= len(self._sequence):
            raise StopIteration
        value = self._sequence[self._index]
        self._index += 1
        return value

def generator_template(start: int, end: int) -> Generator[int, None, None]:
    """生成器模板"""
    current = start
    while current < end:
        yield current
        current += 1

def recursive_generator_template(data: List[T]) -> Generator[T, None, None]:
    """递归生成器模板"""
    if not data:
        return
    yield data[0]
    yield from recursive_generator_template(data[1:])

16.10 小结

迭代器模式是Python中最核心的设计模式之一,Python语言原生支持迭代器协议。本章从形式化定义出发,深入探讨了迭代器的数学基础、类型理论视角和代数性质。

关键要点

  1. 形式化理解:迭代器可形式化为状态转换系统 $\mathcal{I} = \langle S, s_0, \Sigma, \delta, F \rangle$,支持映射、过滤、折叠等代数运算。

  2. Python原生支持:通过__iter____next__方法实现迭代器协议,生成器提供了更简洁的实现方式。

  3. 多种实现方式:外部迭代器(客户端控制)、内部迭代器(集合控制)、生成器、惰性迭代器、可重置迭代器等。

  4. 企业级应用:数据库分页迭代器、流式数据处理、树形结构遍历、文件处理等场景。

  5. 最佳实践:优先使用生成器、善用itertools模块、使用上下文管理器管理资源、添加类型提示和文档。

迭代器模式与生成器、惰性求值等概念紧密相关,是函数式编程和流式处理的基础。掌握迭代器模式对于处理大数据、实现高效数据处理管道至关重要。

Python技术丛书 - 江苏省宿城中等专业学校