第16章 迭代器模式
学习目标
完成本章学习后,读者将能够:
- 理解迭代器模式的数学基础与形式化定义
- 掌握Python迭代器协议的完整实现机制
- 设计并实现多种类型的迭代器(外部迭代器、内部迭代器、生成器等)
- 应用迭代器模式处理大数据集、流式数据和复杂聚合结构
- 识别迭代器模式的适用场景与反模式
16.1 形式化定义
16.1.1 数学定义
迭代器模式(Iterator Pattern) 提供一种方法顺序访问一个聚合对象中各个元素,而又不需暴露该对象的内部表示。
从数学角度,迭代器可定义为状态转换系统:
$$\mathcal{I} = \langle S, s_0, \Sigma, \delta, F \rangle$$
其中:
- $S$:状态集合(State Set),包含所有可能的迭代状态
- $s_0 \in S$:初始状态(Initial State)
- $\Sigma$:元素集合(Element Set),被迭代的元素
- $\delta: S \rightarrow S \times (\Sigma \cup {\epsilon})$:状态转换函数
- $F \subseteq S$:终止状态集合(Final States)
迭代操作的形式化定义:
$$\text{next}: S \rightarrow S \times \Sigma \cup {\text{StopIteration}}$$
$$\text{hasNext}: S \rightarrow \mathbb{B}$$
迭代器代数性质:
对于迭代器 $I$ 和元素序列 $e_1, e_2, \ldots, e_n$:
$$\forall i, j \in \mathbb{N}, i < j \Rightarrow \text{position}(e_i) < \text{position}(e_j)$$
这保证了迭代器的顺序性(Sequentiality)。
16.1.2 类型理论视角
在类型理论中,迭代器可表示为:
$$\text{Iterator} : \text{Type} \rightarrow \text{Type}$$
$$\text{Iterator}\langle T \rangle = { \text{next}: () \rightarrow \text{Option}\langle T \rangle }$$
其中 $\text{Option}\langle T \rangle = \text{Some}(T) \mid \text{None}$。
迭代器与集合的关系:
$$\text{Iterable}\langle T \rangle = { \text{iter}: () \rightarrow \text{Iterator}\langle T \rangle }$$
16.1.3 迭代器代数
迭代器支持多种代数运算:
| 运算 | 定义 | 类型签名 |
|---|---|---|
| 映射(Map) | 对每个元素应用函数 | $\text{map}: (T \rightarrow U) \rightarrow \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle U \rangle$ |
| 过滤(Filter) | 选择满足谓词的元素 | $\text{filter}: (T \rightarrow \mathbb{B}) \rightarrow \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle T \rangle$ |
| 折叠(Fold) | 将迭代器归约为单值 | $\text{fold}: (U, T \times U \rightarrow U) \rightarrow \text{Iter}\langle T \rangle \rightarrow U$ |
| 链接(Chain) | 连接多个迭代器 | $\text{chain}: \text{Iter}\langle T \rangle \times \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle T \rangle$ |
| 取值(Take) | 取前n个元素 | $\text{take}: \mathbb{N} \rightarrow \text{Iter}\langle T \rangle \rightarrow \text{Iter}\langle T \rangle$ |
16.2 历史背景与演进
16.2.1 发展历程
| 年代 | 里程碑 | 描述 |
|---|---|---|
| 1970年代 | CLU语言 | Barbara Liskov在CLU语言中首次引入迭代器概念 |
| 1975年 | CLU迭代器 | 提出yield语义,成为现代生成器的先驱 |
| 1994年 | GoF设计模式 | 《设计模式》正式收录迭代器模式 |
| 1995年 | Java 1.0 | 引入Enumeration接口 |
| 1998年 | Java 1.2 | 引入Iterator接口,替代Enumeration |
| 1999年 | C++ STL | 标准模板库采用迭代器作为核心抽象 |
| 2001年 | Python 2.2 | 引入迭代器协议和生成器 |
| 2004年 | C# 2.0 | 引入yield return语法 |
| 2009年 | Python 3.0 | 引入迭代器协议的完整实现 |
| 2011年 | C++11 | 引入基于范围的for循环 |
| 2015年 | ES6 | JavaScript引入迭代器协议和生成器 |
16.2.2 设计动机
迭代器模式的核心设计动机:
- 封装内部结构:客户端无需了解聚合对象的内部表示
- 统一遍历接口:不同类型的聚合对象可使用相同的遍历方式
- 支持多种遍历:同一聚合对象可支持多种遍历策略
- 延迟计算:支持按需生成元素,提高内存效率
16.3 UML结构图
16.3.1 标准结构
┌─────────────────────────────────────────────────────────────────────┐
│ Iterator Pattern Structure │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ <<interface>>│ │ <<interface>> │ │
│ │ Iterable │ │ Iterator │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ + __iter__() │────────>│ + __next__() │ │
│ │ │ creates│ + __iter__() │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ │ implements │ implements │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ ConcreteIterable│ │ ConcreteIterator│ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ - _data: List │ │ - _iterable │ │
│ │ - _iterator_cls │ │ - _index: int │ │
│ ├─────────────────┤ ├─────────────────┤ │
│ │ + __iter__() │ │ + __next__() │ │
│ │ + add(item) │ │ + __iter__() │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘16.3.2 Python迭代器协议
┌─────────────────────────────────────────────────────────────────────┐
│ Python Iterator Protocol │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Iterable Protocol │ │
│ │ │ │
│ │ class Iterable: │ │
│ │ def __iter__(self) -> Iterator: │ │
│ │ """返回一个新的迭代器实例""" │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ │ returns │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Iterator Protocol │ │
│ │ │ │
│ │ class Iterator: │ │
│ │ def __iter__(self) -> Iterator: │ │
│ │ """返回self(迭代器本身是可迭代的)""" │ │
│ │ │ │
│ │ def __next__(self) -> T: │ │
│ │ """返回下一个元素,无元素时抛出StopIteration""" │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘16.3.3 迭代器状态转换图
┌─────────────────────────────────────────────────────────────────────┐
│ Iterator State Machine │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Initial │ │
│ │ State │ │
│ └──────┬──────┘ │
│ │ │
│ │ __next__() │
│ ▼ │
│ ┌─────────────┐ │
│ ┌───────>│ Iterating │<───────┐ │
│ │ │ State │ │ │
│ │ └──────┬──────┘ │ │
│ │ │ │ │
│ has more │ │ has more │ __next__() │
│ elements │ │ elements │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ │ │
│ │ │ Return │────────┘ │
│ │ │ Element │ │
│ │ └─────────────┘ │
│ │ │
│ │ no more elements │
│ │ │ │
│ │ ▼ │
│ │ ┌─────────────┐ │
│ └────────│ Raise │ │
│ (cannot │StopIteration│ │
│ iterate └──────┬──────┘ │
│ again) │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Exhausted │ │
│ │ State │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘16.4 标准实现
16.4.1 基于ABC的迭代器框架
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Iterator, Optional, List, Any
from dataclasses import dataclass
from contextlib import contextmanager
T = TypeVar('T')
class IteratorProtocol(ABC, Generic[T]):
"""迭代器协议抽象基类"""
@abstractmethod
def __iter__(self) -> 'IteratorProtocol[T]':
"""返回迭代器自身"""
pass
@abstractmethod
def __next__(self) -> T:
"""返回下一个元素"""
pass
def __repr__(self) -> str:
return f"<{self.__class__.__name__}>"
class IterableProtocol(ABC, Generic[T]):
"""可迭代对象抽象基类"""
@abstractmethod
def __iter__(self) -> IteratorProtocol[T]:
"""返回一个新的迭代器"""
pass
class SequenceIterator(IteratorProtocol[T]):
"""序列迭代器实现"""
def __init__(self, sequence: List[T]):
self._sequence = sequence
self._index = 0
def __iter__(self) -> 'SequenceIterator[T]':
return self
def __next__(self) -> T:
if self._index >= len(self._sequence):
raise StopIteration
value = self._sequence[self._index]
self._index += 1
return value
class SequenceIterable(IterableProtocol[T]):
"""序列可迭代对象实现"""
def __init__(self, items: Optional[List[T]] = None):
self._items: List[T] = items or []
def add(self, item: T) -> 'SequenceIterable[T]':
self._items.append(item)
return self
def __iter__(self) -> SequenceIterator[T]:
return SequenceIterator(self._items.copy())
def __len__(self) -> int:
return len(self._items)
iterable = SequenceIterable[int]()
iterable.add(1).add(2).add(3).add(4).add(5)
print("迭代器模式标准实现:")
for item in iterable:
print(f" 元素: {item}")16.4.2 外部迭代器 vs 内部迭代器
from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Callable, List, Optional, Any
from enum import Enum, auto
T = TypeVar('T')
R = TypeVar('R')
class IteratorType(Enum):
"""迭代器类型枚举"""
EXTERNAL = auto()
INTERNAL = auto()
class ExternalIterator(IteratorProtocol[T]):
"""外部迭代器 - 客户端控制迭代过程"""
def __init__(self, data: List[T]):
self._data = data
self._index = 0
def __iter__(self) -> 'ExternalIterator[T]':
return self
def __next__(self) -> T:
if self._index >= len(self._data):
raise StopIteration
value = self._data[self._index]
self._index += 1
return value
def has_next(self) -> bool:
"""检查是否还有下一个元素"""
return self._index < len(self._data)
def peek(self) -> Optional[T]:
"""查看下一个元素但不移动指针"""
if self._index < len(self._data):
return self._data[self._index]
return None
def reset(self) -> None:
"""重置迭代器"""
self._index = 0
class InternalIterable(Generic[T]):
"""内部迭代器 - 集合控制迭代过程"""
def __init__(self, data: List[T]):
self._data = data
def for_each(self, action: Callable[[T], None]) -> None:
"""对每个元素执行操作"""
for item in self._data:
action(item)
def map(self, transformer: Callable[[T], R]) -> 'InternalIterable[R]':
"""映射每个元素"""
return InternalIterable([transformer(item) for item in self._data])
def filter(self, predicate: Callable[[T], bool]) -> 'InternalIterable[T]':
"""过滤元素"""
return InternalIterable([item for item in self._data if predicate(item)])
def reduce(self, initial: R, accumulator: Callable[[R, T], R]) -> R:
"""归约操作"""
result = initial
for item in self._data:
result = accumulator(result, item)
return result
def find_first(self, predicate: Callable[[T], bool]) -> Optional[T]:
"""查找第一个满足条件的元素"""
for item in self._data:
if predicate(item):
return item
return None
def take_while(self, predicate: Callable[[T], bool]) -> 'InternalIterable[T]':
"""取满足条件的连续元素"""
result = []
for item in self._data:
if predicate(item):
result.append(item)
else:
break
return InternalIterable(result)
print("\n外部迭代器示例:")
external_iter = ExternalIterator([1, 2, 3, 4, 5])
while external_iter.has_next():
current = next(external_iter)
peeked = external_iter.peek()
print(f" 当前: {current}, 下一个: {peeked}")
external_iter.reset()
print(f" 重置后第一个元素: {next(external_iter)}")
print("\n内部迭代器示例:")
internal_iter = InternalIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = (
internal_iter
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * x)
.reduce(0, lambda acc, x: acc + x)
)
print(f" 偶数平方和: {result}")
internal_iter.for_each(lambda x: print(f" 处理: {x}") if x <= 3 else None)16.4.3 生成器实现
from typing import TypeVar, Generator, Iterator, List, Optional, Callable, Any
from contextlib import contextmanager
import itertools
T = TypeVar('T')
class GeneratorIterator:
"""生成器迭代器工具集"""
@staticmethod
def count(start: int = 0, step: int = 1) -> Generator[int, None, None]:
"""无限计数器生成器"""
n = start
while True:
yield n
n += step
@staticmethod
def cycle(iterable: List[T]) -> Generator[T, None, None]:
"""无限循环生成器"""
while True:
for item in iterable:
yield item
@staticmethod
def repeat(item: T, times: Optional[int] = None) -> Generator[T, None, None]:
"""重复生成器"""
if times is None:
while True:
yield item
else:
for _ in range(times):
yield item
@staticmethod
def range_custom(start: int, stop: Optional[int] = None, step: int = 1) -> Generator[int, None, None]:
"""自定义范围生成器"""
if stop is None:
start, stop = 0, start
n = start
while n < stop:
yield n
n += step
@staticmethod
def chunks(iterable: List[T], size: int) -> Generator[List[T], None, None]:
"""分块生成器"""
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
@staticmethod
def pairwise(iterable: List[T]) -> Generator[tuple, None, None]:
"""相邻配对生成器"""
it = iter(iterable)
try:
prev = next(it)
except StopIteration:
return
for current in it:
yield (prev, current)
prev = current
@staticmethod
def sliding_window(iterable: List[T], size: int) -> Generator[tuple, None, None]:
"""滑动窗口生成器"""
if size > len(iterable):
return
window = tuple(iterable[:size])
yield window
for i in range(size, len(iterable)):
window = window[1:] + (iterable[i],)
yield window
print("\n生成器迭代器示例:")
print(" 无限计数器(前5个):")
for i, n in enumerate(GeneratorIterator.count(10, 2)):
if i >= 5:
break
print(f" {n}")
print(" 分块生成器:")
data = list(range(1, 11))
for chunk in GeneratorIterator.chunks(data, 3):
print(f" {chunk}")
print(" 相邻配对:")
for pair in GeneratorIterator.pairwise([1, 2, 3, 4, 5]):
print(f" {pair}")
print(" 滑动窗口:")
for window in GeneratorIterator.sliding_window([1, 2, 3, 4, 5], 3):
print(f" {window}")16.5 企业级应用示例
16.5.1 数据库分页迭代器
from typing import TypeVar, Generic, Iterator, List, Callable, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
import time
T = TypeVar('T')
@dataclass
class Page:
"""分页数据结构"""
number: int
size: int
items: List[Any]
total: int
has_next: bool
@property
def has_previous(self) -> bool:
return self.number > 1
@property
def total_pages(self) -> int:
return (self.total + self.size - 1) // self.size
class DatabasePageIterator(Generic[T]):
"""数据库分页迭代器"""
def __init__(
self,
fetch_func: Callable[[int, int], List[T]],
count_func: Callable[[], int],
page_size: int = 100,
prefetch: bool = True
):
self._fetch_func = fetch_func
self._count_func = count_func
self._page_size = page_size
self._prefetch = prefetch
self._current_page = 0
self._total_count: Optional[int] = None
self._current_items: List[T] = []
self._current_index = 0
self._exhausted = False
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
if self._exhausted:
raise StopIteration
if self._current_index >= len(self._current_items):
self._load_next_page()
if not self._current_items:
self._exhausted = True
raise StopIteration
item = self._current_items[self._current_index]
self._current_index += 1
return item
def _load_next_page(self) -> None:
self._current_page += 1
self._current_items = self._fetch_func(self._current_page, self._page_size)
self._current_index = 0
def get_current_page_info(self) -> Page:
"""获取当前页信息"""
if self._total_count is None:
self._total_count = self._count_func()
return Page(
number=self._current_page,
size=self._page_size,
items=self._current_items.copy(),
total=self._total_count,
has_next=len(self._current_items) == self._page_size
)
def reset(self) -> None:
"""重置迭代器"""
self._current_page = 0
self._current_items = []
self._current_index = 0
self._exhausted = False
class CursorBasedIterator(Generic[T]):
"""基于游标的迭代器(适用于大数据集)"""
def __init__(
self,
fetch_func: Callable[[Optional[Any], int], List[T]],
get_cursor: Callable[[T], Any],
page_size: int = 100
):
self._fetch_func = fetch_func
self._get_cursor = get_cursor
self._page_size = page_size
self._cursor: Optional[Any] = None
self._current_items: List[T] = []
self._current_index = 0
self._exhausted = False
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
if self._exhausted:
raise StopIteration
if self._current_index >= len(self._current_items):
self._load_next_batch()
if not self._current_items:
self._exhausted = True
raise StopIteration
item = self._current_items[self._current_index]
self._current_index += 1
return item
def _load_next_batch(self) -> None:
self._current_items = self._fetch_func(self._cursor, self._page_size)
self._current_index = 0
if self._current_items:
self._cursor = self._get_cursor(self._current_items[-1])
@dataclass
class User:
id: int
name: str
email: str
created_at: datetime = field(default_factory=datetime.now)
user_database = [
User(id=i, name=f"用户{i}", email=f"user{i}@example.com")
for i in range(1, 251)
]
def fetch_users_page(page: int, size: int) -> List[User]:
start = (page - 1) * size
end = start + size
print(f" [DB查询] 获取第{page}页,每页{size}条")
return user_database[start:end]
def count_users() -> int:
return len(user_database)
def fetch_users_cursor(cursor: Optional[int], size: int) -> List[User]:
start = cursor if cursor else 0
end = start + size
print(f" [DB查询] 从游标{cursor}开始获取{size}条")
return user_database[start:end]
print("\n数据库分页迭代器示例:")
print(" 偏移分页迭代器:")
page_iter = DatabasePageIterator(
fetch_func=fetch_users_page,
count_func=count_users,
page_size=50
)
count = 0
for user in page_iter:
count += 1
if count <= 3:
print(f" 用户: {user.name}")
elif count == 4:
print(f" ... (共处理 {len(user_database)} 条记录)")
break
print("\n 游标迭代器:")
cursor_iter = CursorBasedIterator(
fetch_func=fetch_users_cursor,
get_cursor=lambda u: u.id,
page_size=50
)
count = 0
for user in cursor_iter:
count += 1
if count <= 3:
print(f" 用户: {user.name}")
elif count == 4:
print(f" ... (共处理 {len(user_database)} 条记录)")
break16.5.2 流式数据处理迭代器
from typing import TypeVar, Generic, Iterator, Callable, Optional, Any, List
from dataclasses import dataclass
from collections import deque
import time
T = TypeVar('T')
R = TypeVar('R')
@dataclass
class StreamConfig:
"""流处理配置"""
buffer_size: int = 1000
batch_timeout: float = 1.0
max_retries: int = 3
class StreamIterator(Generic[T]):
"""流式数据迭代器"""
def __init__(
self,
source: Callable[[], Optional[T]],
config: Optional[StreamConfig] = None
):
self._source = source
self._config = config or StreamConfig()
self._buffer: deque = deque(maxlen=self._config.buffer_size)
self._exhausted = False
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
if self._buffer:
return self._buffer.popleft()
if self._exhausted:
raise StopIteration
item = self._source()
if item is None:
self._exhausted = True
raise StopIteration
return item
def buffer_items(self, count: int) -> int:
"""预缓冲指定数量的元素"""
buffered = 0
while buffered < count and not self._exhausted:
item = self._source()
if item is None:
self._exhausted = True
break
self._buffer.append(item)
buffered += 1
return buffered
class BatchIterator(Generic[T]):
"""批量迭代器"""
def __init__(
self,
source: Iterator[T],
batch_size: int = 100,
timeout: Optional[float] = None
):
self._source = source
self._batch_size = batch_size
self._timeout = timeout
self._exhausted = False
def __iter__(self) -> Iterator[List[T]]:
return self
def __next__(self) -> List[T]:
if self._exhausted:
raise StopIteration
batch: List[T] = []
start_time = time.time()
while len(batch) < self._batch_size:
try:
item = next(self._source)
batch.append(item)
except StopIteration:
self._exhausted = True
break
if self._timeout and (time.time() - start_time) >= self._timeout:
break
if not batch:
raise StopIteration
return batch
class TransformIterator(Generic[T, R]):
"""转换迭代器"""
def __init__(
self,
source: Iterator[T],
transformer: Callable[[T], R],
error_handler: Optional[Callable[[Exception, T], Optional[R]]] = None
):
self._source = source
self._transformer = transformer
self._error_handler = error_handler
def __iter__(self) -> Iterator[R]:
return self
def __next__(self) -> R:
while True:
item = next(self._source)
try:
return self._transformer(item)
except Exception as e:
if self._error_handler:
result = self._error_handler(e, item)
if result is not None:
return result
else:
raise
class FilterIterator(Generic[T]):
"""过滤迭代器"""
def __init__(
self,
source: Iterator[T],
predicate: Callable[[T], bool]
):
self._source = source
self._predicate = predicate
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
while True:
item = next(self._source)
if self._predicate(item):
return item
class ChainIterator(Generic[T]):
"""链式迭代器"""
def __init__(self, *iterators: Iterator[T]):
self._iterators = list(iterators)
self._current_index = 0
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
while self._current_index < len(self._iterators):
try:
return next(self._iterators[self._current_index])
except StopIteration:
self._current_index += 1
raise StopIteration
data_stream = iter(range(1, 101))
print("\n流式数据处理迭代器示例:")
print(" 批量迭代器:")
batch_iter = BatchIterator(iter(range(1, 101)), batch_size=25)
for i, batch in enumerate(batch_iter, 1):
print(f" 批次{i}: {batch[:3]}... (共{len(batch)}条)")
print("\n 转换迭代器:")
transform_iter = TransformIterator(
iter(range(1, 11)),
lambda x: x ** 2,
lambda e, x: None
)
print(f" 平方结果: {list(transform_iter)}")
print("\n 过滤迭代器:")
filter_iter = FilterIterator(
iter(range(1, 21)),
lambda x: x % 2 == 0
)
print(f" 偶数: {list(filter_iter)}")
print("\n 链式迭代器:")
chain_iter = ChainIterator(
iter([1, 2, 3]),
iter(['a', 'b', 'c']),
iter([True, False])
)
print(f" 链式结果: {list(chain_iter)}")16.5.3 树形结构遍历迭代器
from typing import TypeVar, Generic, Iterator, List, Optional, Callable, Any
from dataclasses import dataclass, field
from collections import deque
from enum import Enum, auto
T = TypeVar('T')
class TraversalOrder(Enum):
"""遍历顺序枚举"""
PREORDER = auto()
INORDER = auto()
POSTORDER = auto()
LEVELORDER = auto()
REVERSE_PREORDER = auto()
REVERSE_INORDER = auto()
REVERSE_POSTORDER = auto()
REVERSE_LEVELORDER = auto()
@dataclass
class TreeNode(Generic[T]):
"""树节点"""
value: T
left: Optional['TreeNode[T]'] = None
right: Optional['TreeNode[T]'] = None
children: List['TreeNode[T]'] = field(default_factory=list)
def add_child(self, child: 'TreeNode[T]') -> 'TreeNode[T]':
self.children.append(child)
return self
@property
def is_leaf(self) -> bool:
return not self.children and not self.left and not self.right
class TreeIterator(Generic[T]):
"""树遍历迭代器"""
def __init__(
self,
root: Optional[TreeNode[T]],
order: TraversalOrder = TraversalOrder.PREORDER
):
self._root = root
self._order = order
def __iter__(self) -> Iterator[T]:
if self._root is None:
return
if self._order == TraversalOrder.PREORDER:
yield from self._preorder(self._root)
elif self._order == TraversalOrder.INORDER:
yield from self._inorder(self._root)
elif self._order == TraversalOrder.POSTORDER:
yield from self._postorder(self._root)
elif self._order == TraversalOrder.LEVELORDER:
yield from self._levelorder(self._root)
elif self._order == TraversalOrder.REVERSE_PREORDER:
yield from self._reverse_preorder(self._root)
elif self._order == TraversalOrder.REVERSE_INORDER:
yield from self._reverse_inorder(self._root)
elif self._order == TraversalOrder.REVERSE_POSTORDER:
yield from self._reverse_postorder(self._root)
elif self._order == TraversalOrder.REVERSE_LEVELORDER:
yield from self._reverse_levelorder(self._root)
def _preorder(self, node: TreeNode[T]) -> Iterator[T]:
yield node.value
for child in node.children:
yield from self._preorder(child)
if node.left:
yield from self._preorder(node.left)
if node.right:
yield from self._preorder(node.right)
def _inorder(self, node: TreeNode[T]) -> Iterator[T]:
if node.left:
yield from self._inorder(node.left)
yield node.value
for child in node.children:
yield from self._inorder(child)
if node.right:
yield from self._inorder(node.right)
def _postorder(self, node: TreeNode[T]) -> Iterator[T]:
for child in node.children:
yield from self._postorder(child)
if node.left:
yield from self._postorder(node.left)
if node.right:
yield from self._postorder(node.right)
yield node.value
def _levelorder(self, node: TreeNode[T]) -> Iterator[T]:
queue = deque([node])
while queue:
current = queue.popleft()
yield current.value
queue.extend(current.children)
if current.left:
queue.append(current.left)
if current.right:
queue.append(current.right)
def _reverse_preorder(self, node: TreeNode[T]) -> Iterator[T]:
if node.right:
yield from self._reverse_preorder(node.right)
if node.left:
yield from self._reverse_preorder(node.left)
for child in reversed(node.children):
yield from self._reverse_preorder(child)
yield node.value
def _reverse_inorder(self, node: TreeNode[T]) -> Iterator[T]:
if node.right:
yield from self._reverse_inorder(node.right)
yield node.value
for child in reversed(node.children):
yield from self._reverse_inorder(child)
if node.left:
yield from self._reverse_inorder(node.left)
def _reverse_postorder(self, node: TreeNode[T]) -> Iterator[T]:
yield node.value
if node.right:
yield from self._reverse_postorder(node.right)
if node.left:
yield from self._reverse_postorder(node.left)
for child in reversed(node.children):
yield from self._reverse_postorder(child)
def _reverse_levelorder(self, node: TreeNode[T]) -> Iterator[T]:
stack = []
queue = deque([node])
while queue:
current = queue.popleft()
stack.append(current.value)
if current.right:
queue.append(current.right)
if current.left:
queue.append(current.left)
for child in reversed(current.children):
queue.append(child)
yield from reversed(stack)
class TreePathIterator(Generic[T]):
"""树路径迭代器"""
def __init__(self, root: Optional[TreeNode[T]]):
self._root = root
def __iter__(self) -> Iterator[List[T]]:
if self._root is None:
return
yield from self._find_paths(self._root, [])
def _find_paths(self, node: TreeNode[T], path: List[T]) -> Iterator[List[T]]:
current_path = path + [node.value]
if node.is_leaf:
yield current_path
for child in node.children:
yield from self._find_paths(child, current_path)
if node.left:
yield from self._find_paths(node.left, current_path)
if node.right:
yield from self._find_paths(node.right, current_path)
class TreeSearchIterator(Generic[T]):
"""树搜索迭代器"""
def __init__(
self,
root: Optional[TreeNode[T]],
predicate: Callable[[T], bool]
):
self._root = root
self._predicate = predicate
def __iter__(self) -> Iterator[TreeNode[T]]:
if self._root is None:
return
yield from self._search(self._root)
def _search(self, node: TreeNode[T]) -> Iterator[TreeNode[T]]:
if self._predicate(node.value):
yield node
for child in node.children:
yield from self._search(child)
if node.left:
yield from self._search(node.left)
if node.right:
yield from self._search(node.right)
root = TreeNode(1, children=[
TreeNode(2, children=[
TreeNode(4),
TreeNode(5)
]),
TreeNode(3, children=[
TreeNode(6),
TreeNode(7)
])
])
print("\n树形结构遍历迭代器示例:")
print(" 前序遍历:")
print(f" {list(TreeIterator(root, TraversalOrder.PREORDER))}")
print(" 后序遍历:")
print(f" {list(TreeIterator(root, TraversalOrder.POSTORDER))}")
print(" 层序遍历:")
print(f" {list(TreeIterator(root, TraversalOrder.LEVELORDER))}")
print(" 路径迭代器:")
for path in TreePathIterator(root):
print(f" {path}")
print(" 搜索迭代器(值>4):")
for node in TreeSearchIterator(root, lambda x: x > 4):
print(f" 找到节点: {node.value}")16.5.4 文件处理迭代器
from typing import Iterator, List, Optional, Dict, Any, Callable
from dataclasses import dataclass
from pathlib import Path
from contextlib import contextmanager
import csv
import json
import re
@dataclass
class FileInfo:
"""文件信息"""
path: Path
size: int
line_count: int
encoding: str
class FileLineIterator:
"""文件行迭代器"""
def __init__(
self,
file_path: str,
encoding: str = 'utf-8',
skip_empty: bool = False,
strip_whitespace: bool = True
):
self._file_path = file_path
self._encoding = encoding
self._skip_empty = skip_empty
self._strip_whitespace = strip_whitespace
self._file = None
self._line_number = 0
def __iter__(self) -> Iterator[str]:
self._file = open(self._file_path, 'r', encoding=self._encoding)
self._line_number = 0
return self
def __next__(self) -> str:
if self._file is None:
raise StopIteration
while True:
line = self._file.readline()
if not line:
self._file.close()
self._file = None
raise StopIteration
self._line_number += 1
if self._strip_whitespace:
line = line.rstrip('\n\r')
if self._skip_empty and not line:
continue
return line
@property
def line_number(self) -> int:
return self._line_number
class CSVIterator:
"""CSV文件迭代器"""
def __init__(
self,
file_path: str,
delimiter: str = ',',
encoding: str = 'utf-8',
has_header: bool = True
):
self._file_path = file_path
self._delimiter = delimiter
self._encoding = encoding
self._has_header = has_header
self._file = None
self._headers: Optional[List[str]] = None
self._row_number = 0
def __iter__(self) -> Iterator[Dict[str, str]]:
self._file = open(self._file_path, 'r', encoding=self._encoding)
self._row_number = 0
if self._has_header:
header_line = self._file.readline()
self._headers = header_line.rstrip('\n\r').split(self._delimiter)
return self
def __next__(self) -> Dict[str, str]:
if self._file is None:
raise StopIteration
line = self._file.readline()
if not line:
self._file.close()
self._file = None
raise StopIteration
self._row_number += 1
values = line.rstrip('\n\r').split(self._delimiter)
if self._headers:
return dict(zip(self._headers, values))
return {str(i): v for i, v in enumerate(values)}
@property
def row_number(self) -> int:
return self._row_number
class JSONLinesIterator:
"""JSON Lines文件迭代器"""
def __init__(self, file_path: str, encoding: str = 'utf-8'):
self._file_path = file_path
self._encoding = encoding
self._file = None
self._line_number = 0
def __iter__(self) -> Iterator[Dict[str, Any]]:
self._file = open(self._file_path, 'r', encoding=self._encoding)
self._line_number = 0
return self
def __next__(self) -> Dict[str, Any]:
if self._file is None:
raise StopIteration
line = self._file.readline()
if not line:
self._file.close()
self._file = None
raise StopIteration
self._line_number += 1
return json.loads(line)
@property
def line_number(self) -> int:
return self._line_number
class LogFileIterator:
"""日志文件迭代器"""
def __init__(
self,
file_path: str,
pattern: Optional[str] = None,
encoding: str = 'utf-8'
):
self._file_path = file_path
self._pattern = re.compile(pattern) if pattern else None
self._encoding = encoding
self._file = None
self._line_number = 0
def __iter__(self) -> Iterator[Dict[str, Any]]:
self._file = open(self._file_path, 'r', encoding=self._encoding)
self._line_number = 0
return self
def __next__(self) -> Dict[str, Any]:
if self._file is None:
raise StopIteration
while True:
line = self._file.readline()
if not line:
self._file.close()
self._file = None
raise StopIteration
self._line_number += 1
line = line.rstrip('\n\r')
if self._pattern:
match = self._pattern.match(line)
if match:
return {
'line_number': self._line_number,
'raw': line,
'groups': match.groups(),
'named': match.groupdict()
}
else:
return {
'line_number': self._line_number,
'raw': line
}
@property
def line_number(self) -> int:
return self._line_number
class DirectoryIterator:
"""目录迭代器"""
def __init__(
self,
directory: str,
pattern: str = '*',
recursive: bool = False,
include_dirs: bool = False
):
self._directory = Path(directory)
self._pattern = pattern
self._recursive = recursive
self._include_dirs = include_dirs
self._iterator = None
def __iter__(self) -> Iterator[Path]:
if self._recursive:
self._iterator = self._directory.rglob(self._pattern)
else:
self._iterator = self._directory.glob(self._pattern)
return self
def __next__(self) -> Path:
if self._iterator is None:
raise StopIteration
while True:
path = next(self._iterator)
if self._include_dirs or path.is_file():
return path
print("\n文件处理迭代器示例:")
test_content = """姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州
"""
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
csv_file = os.path.join(tmpdir, 'test.csv')
with open(csv_file, 'w', encoding='utf-8') as f:
f.write(test_content)
print(" CSV迭代器:")
csv_iter = CSVIterator(csv_file)
for row in csv_iter:
print(f" 第{csv_iter.row_number}行: {row}")
print("\n 目录迭代器:")
dir_iter = DirectoryIterator(tmpdir, '*.csv')
for path in dir_iter:
print(f" 文件: {path.name}")16.6 模式变体
16.6.1 惰性迭代器
from typing import TypeVar, Generic, Iterator, Callable, Optional, List, Any
from functools import wraps
T = TypeVar('T')
R = TypeVar('R')
class LazyIterator(Generic[T]):
"""惰性迭代器 - 延迟计算"""
def __init__(self, source: Callable[[], Iterator[T]]):
self._source_factory = source
self._iterator: Optional[Iterator[T]] = None
def __iter__(self) -> Iterator[T]:
if self._iterator is None:
self._iterator = self._source_factory()
return self._iterator
def __next__(self) -> T:
if self._iterator is None:
self._iterator = self._source_factory()
return next(self._iterator)
def map(self, func: Callable[[T], R]) -> 'LazyIterator[R]':
"""惰性映射"""
def new_source():
return (func(x) for x in self._source_factory())
return LazyIterator(new_source)
def filter(self, predicate: Callable[[T], bool]) -> 'LazyIterator[T]':
"""惰性过滤"""
def new_source():
return (x for x in self._source_factory() if predicate(x))
return LazyIterator(new_source)
def take(self, n: int) -> 'LazyIterator[T]':
"""取前n个元素"""
def new_source():
for i, x in enumerate(self._source_factory()):
if i >= n:
break
yield x
return LazyIterator(new_source)
def drop(self, n: int) -> 'LazyIterator[T]':
"""跳过前n个元素"""
def new_source():
for i, x in enumerate(self._source_factory()):
if i >= n:
yield x
return LazyIterator(new_source)
def to_list(self) -> List[T]:
"""转换为列表(触发计算)"""
return list(iter(self))
def lazy(func: Callable[..., Iterator[T]]) -> Callable[..., LazyIterator[T]]:
"""惰性迭代器装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
return LazyIterator(lambda: func(*args, **kwargs))
return wrapper
print("\n惰性迭代器示例:")
@lazy
def generate_numbers(n: int) -> Iterator[int]:
print(f" [计算] 生成{n}个数字")
for i in range(n):
print(f" [计算] 生成第{i+1}个数字")
yield i
lazy_nums = generate_numbers(10)
print(" 创建惰性迭代器(未计算)")
transformed = (
lazy_nums
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * x)
.take(3)
)
print(" 链式转换(未计算)")
print(" 触发计算:")
result = transformed.to_list()
print(f" 结果: {result}")16.6.2 可重置迭代器
from typing import TypeVar, Generic, Iterator, Callable, List, Optional
from copy import deepcopy
T = TypeVar('T')
class ResettableIterator(Generic[T]):
"""可重置迭代器"""
def __init__(self, iterable: List[T]):
self._data = iterable
self._index = 0
self._history: List[T] = []
self._checkpoint_index = 0
def __iter__(self) -> 'ResettableIterator[T]':
return self
def __next__(self) -> T:
if self._index >= len(self._data):
raise StopIteration
value = self._data[self._index]
self._history.append(value)
self._index += 1
return value
def reset(self) -> None:
"""重置到初始状态"""
self._index = 0
self._history.clear()
self._checkpoint_index = 0
def checkpoint(self) -> int:
"""创建检查点,返回检查点ID"""
self._checkpoint_index = self._index
return self._checkpoint_index
def rollback(self, checkpoint_id: Optional[int] = None) -> None:
"""回滚到检查点"""
if checkpoint_id is not None:
self._index = checkpoint_id
else:
self._index = self._checkpoint_index
def get_history(self) -> List[T]:
"""获取已迭代的元素历史"""
return self._history.copy()
@property
def position(self) -> int:
"""当前位置"""
return self._index
class PeekableIterator(Generic[T]):
"""可预览迭代器"""
def __init__(self, iterator: Iterator[T]):
self._iterator = iterator
self._buffer: List[T] = []
self._exhausted = False
def __iter__(self) -> 'PeekableIterator[T]':
return self
def __next__(self) -> T:
if self._buffer:
return self._buffer.pop(0)
return next(self._iterator)
def peek(self, n: int = 1) -> List[T]:
"""预览接下来的n个元素"""
while len(self._buffer) < n and not self._exhausted:
try:
self._buffer.append(next(self._iterator))
except StopIteration:
self._exhausted = True
break
return self._buffer[:n]
def has_next(self) -> bool:
"""检查是否还有下一个元素"""
if self._buffer:
return True
try:
self._buffer.append(next(self._iterator))
return True
except StopIteration:
self._exhausted = True
return False
print("\n可重置迭代器示例:")
resettable = ResettableIterator([1, 2, 3, 4, 5])
print(" 第一次迭代:")
for item in resettable:
print(f" {item}")
if item == 2:
checkpoint = resettable.checkpoint()
print(f" 创建检查点: {checkpoint}")
print(f" 当前位置: {resettable.position}")
print(f" 历史: {resettable.get_history()}")
resettable.rollback()
print(" 回滚后继续迭代:")
for item in resettable:
print(f" {item}")
print("\n可预览迭代器示例:")
peekable = PeekableIterator(iter([1, 2, 3, 4, 5]))
print(f" 预览前3个: {peekable.peek(3)}")
print(f" 下一个元素: {next(peekable)}")
print(f" 再次预览: {peekable.peek(2)}")
print(f" 是否还有下一个: {peekable.has_next()}")16.6.3 并行迭代器
from typing import TypeVar, Generic, Iterator, List, Tuple, Optional, Callable
from itertools import zip_longest
from concurrent.futures import ThreadPoolExecutor, as_completed
T = TypeVar('T')
R = TypeVar('R')
class ParallelIterator(Generic[T]):
"""并行迭代器"""
def __init__(
self,
iterable: List[T],
max_workers: int = 4
):
self._data = iterable
self._max_workers = max_workers
def map_parallel(
self,
func: Callable[[T], R],
chunk_size: int = 1
) -> List[R]:
"""并行映射"""
with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
futures = [
executor.submit(func, item)
for item in self._data
]
return [future.result() for future in as_completed(futures)]
def filter_parallel(
self,
predicate: Callable[[T], bool]
) -> List[T]:
"""并行过滤"""
with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
futures = {
executor.submit(predicate, item): item
for item in self._data
}
return [
item for future, item in futures.items()
if future.result()
]
class ZipIterator:
"""压缩迭代器"""
@staticmethod
def zip_strict(*iterables: Iterator) -> Iterator[Tuple]:
"""严格压缩 - 长度必须相等"""
iterators = [iter(it) for it in iterables]
while True:
items = []
exhausted = []
for i, it in enumerate(iterators):
try:
items.append(next(it))
except StopIteration:
exhausted.append(i)
if exhausted:
if items:
raise ValueError(
f"迭代器长度不一致,索引{exhausted}已耗尽"
)
return
yield tuple(items)
@staticmethod
def zip_longest_custom(
*iterables: Iterator,
fillvalue: Optional[T] = None
) -> Iterator[Tuple]:
"""最长压缩 - 用填充值补齐"""
return zip_longest(*iterables, fillvalue=fillvalue)
@staticmethod
def zip_dict(**iterables: Iterator) -> Iterator[dict]:
"""字典压缩"""
keys = list(iterables.keys())
iterators = [iter(it) for it in iterables.values()]
while True:
try:
values = [next(it) for it in iterators]
yield dict(zip(keys, values))
except StopIteration:
return
class ProductIterator:
"""笛卡尔积迭代器"""
@staticmethod
def product(*iterables: List, repeat: int = 1) -> Iterator[Tuple]:
"""笛卡尔积"""
pools = [tuple(pool) for pool in iterables] * repeat
if not pools:
yield ()
return
def backtrack(index: int, current: List):
if index == len(pools):
yield tuple(current)
return
for item in pools[index]:
yield from backtrack(index + 1, current + [item])
yield from backtrack(0, [])
print("\n并行迭代器示例:")
parallel_iter = ParallelIterator(list(range(1, 11)), max_workers=4)
squares = parallel_iter.map_parallel(lambda x: x ** 2)
print(f" 并行平方: {sorted(squares)}")
evens = parallel_iter.filter_parallel(lambda x: x % 2 == 0)
print(f" 并行过滤偶数: {sorted(evens)}")
print("\n压缩迭代器示例:")
names = ['张三', '李四', '王五']
ages = [25, 30, 28]
cities = ['北京', '上海', '广州']
print(" 字典压缩:")
for person in ZipIterator.zip_dict(name=names, age=ages, city=cities):
print(f" {person}")
print("\n笛卡尔积迭代器示例:")
colors = ['红', '蓝']
sizes = ['S', 'M', 'L']
print(" 颜色×尺寸:")
for combo in ProductIterator.product(colors, sizes):
print(f" {combo}")16.7 反模式与最佳实践
16.7.1 常见反模式
from typing import List, Any, Iterator, Optional
class IteratorAntiPatterns:
"""迭代器反模式示例"""
@staticmethod
def anti_pattern_1_list_conversion():
"""反模式1:不必要的列表转换"""
data = range(1000000)
bad_result = list(x * 2 for x in data)
good_result = (x * 2 for x in data)
return bad_result, good_result
@staticmethod
def anti_pattern_2_multiple_iteration():
"""反模式2:多次迭代同一迭代器"""
data = iter([1, 2, 3, 4, 5])
print(" 第一次迭代:", list(data))
print(" 第二次迭代:", list(data))
good_data = [1, 2, 3, 4, 5]
print(" 正确做法第一次:", list(good_data))
print(" 正确做法第二次:", list(good_data))
@staticmethod
def anti_pattern_3_modify_during_iteration():
"""反模式3:迭代时修改集合"""
data = [1, 2, 3, 4, 5]
bad_data = data.copy()
try:
for item in bad_data:
if item == 3:
bad_data.remove(item)
except RuntimeError as e:
print(f" 错误: {e}")
good_data = data.copy()
to_remove = []
for item in good_data:
if item == 3:
to_remove.append(item)
for item in to_remove:
good_data.remove(item)
print(f" 正确结果: {good_data}")
better_data = [x for x in data if x != 3]
print(f" 更好做法: {better_data}")
@staticmethod
def anti_pattern_4_infinite_iterator():
"""反模式4:无限迭代器未限制"""
def infinite_counter():
n = 0
while True:
yield n
n += 1
counter = infinite_counter()
print(" 限制迭代次数:")
for i, n in enumerate(counter):
if i >= 5:
break
print(f" {n}")
@staticmethod
def anti_pattern_5_exception_in_iterator():
"""反模式5:迭代器中的异常处理不当"""
def bad_generator():
data = [1, 2, 'three', 4, 5]
for item in data:
yield item * 2
def good_generator():
data = [1, 2, 'three', 4, 5]
for item in data:
try:
yield item * 2
except TypeError:
yield None
print(" 不当处理:")
try:
for item in bad_generator():
print(f" {item}")
except TypeError as e:
print(f" 异常: {e}")
print(" 正确处理:")
for item in good_generator():
print(f" {item}")
print("\n迭代器反模式示例:")
print(" 反模式2 - 多次迭代:")
IteratorAntiPatterns.anti_pattern_2_multiple_iteration()
print("\n 反模式3 - 迭代时修改:")
IteratorAntiPatterns.anti_pattern_3_modify_during_iteration()
print("\n 反模式4 - 无限迭代器:")
IteratorAntiPatterns.anti_pattern_4_infinite_iterator()
print("\n 反模式5 - 异常处理:")
IteratorAntiPatterns.anti_pattern_5_exception_in_iterator()16.7.2 最佳实践
from typing import TypeVar, Generic, Iterator, List, Callable, Optional, Any
from contextlib import contextmanager
from functools import wraps
import itertools
T = TypeVar('T')
class IteratorBestPractices:
"""迭代器最佳实践"""
@staticmethod
def practice_1_use_generators():
"""实践1:优先使用生成器"""
def bad_range(n: int) -> List[int]:
result = []
for i in range(n):
result.append(i)
return result
def good_range(n: int) -> Iterator[int]:
for i in range(n):
yield i
print(" 列表方式占用更多内存")
print(" 生成器方式惰性计算,内存友好")
@staticmethod
def practice_2_use_itertools():
"""实践2:善用itertools模块"""
data = [1, 2, 3, 4, 5]
print(" chain - 链接多个迭代器:")
print(f" {list(itertools.chain([1, 2], [3, 4], [5]))}")
print(" islice - 切片迭代器:")
print(f" {list(itertools.islice(range(100), 5, 10))}")
print(" takewhile - 条件取值:")
print(f" {list(itertools.takewhile(lambda x: x < 4, data))}")
print(" dropwhile - 条件跳过:")
print(f" {list(itertools.dropwhile(lambda x: x < 4, data))}")
print(" groupby - 分组:")
data_sorted = sorted(['apple', 'banana', 'apricot', 'blueberry', 'cherry'])
for key, group in itertools.groupby(data_sorted, key=lambda x: x[0]):
print(f" {key}: {list(group)}")
@staticmethod
@contextmanager
def practice_3_context_manager():
"""实践3:使用上下文管理器管理资源"""
class SafeFileIterator:
def __init__(self, file_path: str):
self._file_path = file_path
self._file = None
def __enter__(self):
self._file = open(self._file_path, 'r')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._file:
self._file.close()
return False
def __iter__(self):
return self._file
print(" 使用上下文管理器确保资源正确释放")
@staticmethod
def practice_4_type_hints():
"""实践4:使用类型提示"""
from typing import Iterator, Generator
def typed_generator(n: int) -> Generator[int, None, None]:
for i in range(n):
yield i
def typed_iterator(data: List[str]) -> Iterator[str]:
return iter(data)
print(" 类型提示提高代码可读性和IDE支持")
@staticmethod
def practice_5_documentation():
"""实践5:添加文档说明"""
def documented_generator(n: int) -> Iterator[int]:
"""
生成从0到n-1的整数序列。
Args:
n: 生成整数的数量
Yields:
int: 序列中的下一个整数
Examples:
>>> list(documented_generator(3))
[0, 1, 2]
"""
for i in range(n):
yield i
print(" 文档说明提高代码可维护性")
print("\n迭代器最佳实践示例:")
print(" 实践2 - 善用itertools:")
IteratorBestPractices.practice_2_use_itertools()16.8 决策指南
16.8.1 迭代器类型选择决策树
┌─────────────────────────────────────────────────────────────────────┐
│ Iterator Type Decision Tree │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 需要遍历集合元素? │
│ │ │
│ ┌────────────┴────────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ 不需要迭代器 │
│ │ 数据量大小? │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────┼───────┐ │
│ │ │ │ │
│ 小 中等 大 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 列表 生成器 流式迭代器 │
│ 迭代器 + 分页 │
│ │
│ 需要多次遍历? │
│ │ │
│ ┌────────┴────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 可重置迭代器 标准迭代器 │
│ 或使用列表 │
│ │
│ 需要预览元素? │
│ │ │
│ ┌────────┴────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 可预览迭代器 标准迭代器 │
│ │
│ 需要并行处理? │
│ │ │
│ ┌────────┴────────┐ │
│ │ │ │
│ 是 否 │
│ │ │ │
│ ▼ ▼ │
│ 并行迭代器 标准迭代器 │
│ │
└─────────────────────────────────────────────────────────────────────┘16.8.2 实现方式选择
| 场景 | 推荐实现 | 原因 |
|---|---|---|
| 简单序列遍历 | 生成器函数 | 代码简洁,内存高效 |
| 需要状态管理 | 迭代器类 | 可维护复杂状态 |
| 大数据集 | 流式迭代器 + 分页 | 避免内存溢出 |
| 需要多次遍历 | 列表或可重置迭代器 | 支持重复访问 |
| 树形结构 | 递归生成器 | 自然表达遍历逻辑 |
| 文件处理 | 上下文管理器 + 迭代器 | 确保资源正确释放 |
| 并行处理 | ThreadPoolExecutor + 迭代器 | 提高处理速度 |
16.9 快速参考卡
16.9.1 核心概念速查
┌─────────────────────────────────────────────────────────────────────┐
│ Iterator Pattern Quick Reference │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 【定义】 │
│ 提供一种方法顺序访问聚合对象元素,而不暴露其内部表示 │
│ │
│ 【Python迭代器协议】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ class Iterator: │ │
│ │ def __iter__(self) -> Iterator # 返回self │ │
│ │ def __next__(self) -> T # 返回下一元素或Stop │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【核心组件】 │
│ • Iterator: 迭代器接口,定义遍历方法 │
│ • ConcreteIterator: 具体迭代器实现 │
│ • Aggregate: 聚合接口,定义创建迭代器的方法 │
│ • ConcreteAggregate: 具体聚合实现 │
│ │
│ 【常用itertools函数】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ count(start, step) # 无限计数器 │ │
│ │ cycle(iterable) # 无限循环 │ │
│ │ repeat(item, times) # 重复元素 │ │
│ │ chain(*iterables) # 链接迭代器 │ │
│ │ islice(iterable, ...) # 切片迭代器 │ │
│ │ takewhile(pred, iter) # 条件取值 │ │
│ │ dropwhile(pred, iter) # 条件跳过 │ │
│ │ groupby(iter, key) # 分组 │ │
│ │ permutations(iter, r) # 排列 │ │
│ │ combinations(iter, r) # 组合 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【生成器语法】 │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ def generator(): │ │
│ │ yield value # 产出值 │ │
│ │ yield from iterable # 委托生成器 │ │
│ │ │ │
│ │ gen = (x for x in iterable) # 生成器表达式 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ 【适用场景】 │
│ ✓ 遍历聚合对象而不暴露内部表示 │
│ ✓ 支持多种遍历方式 │
│ ✓ 处理大数据集或流式数据 │
│ ✓ 需要延迟计算 │
│ │
│ 【优点】 │
│ + 统一遍历接口 │
│ + 解耦遍历逻辑与集合 │
│ + 支持延迟计算 │
│ + 内存效率高 │
│ │
│ 【缺点】 │
│ - 通常只能单向遍历 │
│ - 迭代器通常不可重置 │
│ - 可能增加系统复杂性 │
│ │
└─────────────────────────────────────────────────────────────────────┘16.9.2 代码模板速查
from typing import TypeVar, Iterator, Generator, List, Optional
from abc import ABC, abstractmethod
T = TypeVar('T')
class IteratorTemplate(ABC, Generic[T]):
"""迭代器模板"""
@abstractmethod
def __iter__(self) -> 'IteratorTemplate[T]':
return self
@abstractmethod
def __next__(self) -> T:
raise StopIteration
class SequenceIteratorTemplate(IteratorTemplate[T]):
"""序列迭代器模板"""
def __init__(self, sequence: List[T]):
self._sequence = sequence
self._index = 0
def __iter__(self) -> 'SequenceIteratorTemplate[T]':
return self
def __next__(self) -> T:
if self._index >= len(self._sequence):
raise StopIteration
value = self._sequence[self._index]
self._index += 1
return value
def generator_template(start: int, end: int) -> Generator[int, None, None]:
"""生成器模板"""
current = start
while current < end:
yield current
current += 1
def recursive_generator_template(data: List[T]) -> Generator[T, None, None]:
"""递归生成器模板"""
if not data:
return
yield data[0]
yield from recursive_generator_template(data[1:])16.10 小结
迭代器模式是Python中最核心的设计模式之一,Python语言原生支持迭代器协议。本章从形式化定义出发,深入探讨了迭代器的数学基础、类型理论视角和代数性质。
关键要点:
形式化理解:迭代器可形式化为状态转换系统 $\mathcal{I} = \langle S, s_0, \Sigma, \delta, F \rangle$,支持映射、过滤、折叠等代数运算。
Python原生支持:通过
__iter__和__next__方法实现迭代器协议,生成器提供了更简洁的实现方式。多种实现方式:外部迭代器(客户端控制)、内部迭代器(集合控制)、生成器、惰性迭代器、可重置迭代器等。
企业级应用:数据库分页迭代器、流式数据处理、树形结构遍历、文件处理等场景。
最佳实践:优先使用生成器、善用itertools模块、使用上下文管理器管理资源、添加类型提示和文档。
迭代器模式与生成器、惰性求值等概念紧密相关,是函数式编程和流式处理的基础。掌握迭代器模式对于处理大数据、实现高效数据处理管道至关重要。