第14章 生成器与迭代器
学习目标
完成本章学习后,读者应能够:
- 理解迭代协议:掌握迭代器协议(
__iter__/__next__)与可迭代协议(__iter__)的设计原理与实现机制 - 精通生成器函数:深入理解
yield的执行模型、生成器状态机、协程式通信(send/throw/close) - 运用
yield from:理解子生成器委托机制及其在异步编程中的基础作用 - 掌握生成器表达式:理解惰性求值的内存优势与适用场景
- 运用itertools:熟练使用标准库迭代器工具构建高效的数据处理管道
- 设计迭代器架构:能够为实际项目设计自定义迭代器,处理无限序列、数据流管道等场景
14.1 迭代协议与底层机制
14.1.1 迭代器协议
Python的迭代基于两个核心协议:
- 可迭代协议(Iterable Protocol):对象实现
__iter__()方法,返回一个迭代器 - 迭代器协议(Iterator Protocol):对象实现
__next__()方法,返回下一个元素;元素耗尽时抛出StopIteration
from collections.abc import Iterable, Iterator
class Counter:
def __init__(self, max_count):
self.max_count = max_count
self.count = 0
def __iter__(self):
return self
def __next__(self):
if self.count < self.max_count:
self.count += 1
return self.count
raise StopIteration
counter = Counter(5)
print(f"是否可迭代: {isinstance(counter, Iterable)}")
print(f"是否迭代器: {isinstance(counter, Iterator)}")
for num in counter:
print(num, end=" ")
print()
counter2 = Counter(3)
print(next(counter2))
print(next(counter2))
print(next(counter2))
try:
next(counter2)
except StopIteration:
print("迭代结束")关键理解:迭代器自身也是可迭代的(__iter__返回self),但可迭代对象不一定是迭代器。例如列表是可迭代的,但不是迭代器——每次调用iter(list)都返回一个新的迭代器对象。
my_list = [1, 2, 3]
it1 = iter(my_list)
it2 = iter(my_list)
print(f"it1和it2是同一对象: {it1 is it2}")
print(f"it1: {next(it1)}, it2: {next(it2)}")
counter = Counter(3)
it3 = iter(counter)
it4 = iter(counter)
print(f"it3和it4是同一对象: {it3 is it4}")14.1.2 for循环的底层机制
for循环的本质是调用iter()获取迭代器,然后反复调用next()直到StopIteration:
def manual_for(iterable, func):
iterator = iter(iterable)
while True:
try:
item = next(iterator)
func(item)
except StopIteration:
break
manual_for([1, 2, 3], lambda x: print(x * 2, end=" "))
print()
def for_loop_equivalent(iterable):
result = []
iterator = iter(iterable)
while True:
try:
item = next(iterator)
result.append(item)
except StopIteration:
break
return result
print(for_loop_equivalent(range(5)))14.1.3 分离可迭代对象与迭代器
最佳实践是将可迭代对象和迭代器分离为两个类。这样可迭代对象可以多次迭代,每次创建独立的迭代器:
class Range:
def __init__(self, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self.start = start
self.stop = stop
self.step = step
def __iter__(self):
return RangeIterator(self.start, self.stop, self.step)
def __len__(self):
return max(0, (self.stop - self.start + self.step - 1) // self.step if self.step > 0
else (self.start - self.stop - self.step - 1) // (-self.step))
def __contains__(self, value):
if self.step > 0:
return self.start <= value < self.stop and (value - self.start) % self.step == 0
return self.stop < value <= self.start and (self.start - value) % (-self.step) == 0
class RangeIterator:
def __init__(self, start, stop, step):
self.current = start
self.stop = stop
self.step = step
def __iter__(self):
return self
def __next__(self):
if self.step > 0 and self.current >= self.stop:
raise StopIteration
if self.step < 0 and self.current <= self.stop:
raise StopIteration
result = self.current
self.current += self.step
return result
r = Range(2, 10, 2)
print(f"Range(2,10,2): {list(r)}")
print(f"再次迭代: {list(r)}")
print(f"长度: {len(r)}")
print(f"6 in Range: {6 in r}")14.1.4 迭代器的常见陷阱
迭代器只能消费一次:
it = iter([1, 2, 3])
print(f"第一次: {list(it)}")
print(f"第二次: {list(it)}")
def process_twice(iterable):
data = list(iterable)
print(f"第一遍: {data}")
print(f"第二遍: {data}")
process_twice(iter([1, 2, 3]))迭代器不是序列:迭代器不支持len()、索引和切片,因为它可能是无限的或长度未知的。
it = iter([1, 2, 3])
try:
print(len(it))
except TypeError as e:
print(f"迭代器不支持len: {e}")
try:
print(it[0])
except TypeError as e:
print(f"迭代器不支持索引: {e}")14.2 生成器函数
14.2.1 yield的执行模型
生成器函数是包含yield语句的函数。调用生成器函数不会执行函数体,而是返回一个生成器对象。每次调用next()时,函数从上次暂停的位置恢复执行,直到遇到下一个yield或函数结束。
def simple_generator():
print(" 第1步: 开始")
yield 1
print(" 第2步: 继续")
yield 2
print(" 第3步: 结束")
yield 3
gen = simple_generator()
print(f"生成器类型: {type(gen)}")
print(f"是否迭代器: {isinstance(gen, Iterator)}")
print(f"获取: {next(gen)}")
print(f"获取: {next(gen)}")
print(f"获取: {next(gen)}")
try:
next(gen)
except StopIteration:
print("生成器耗尽")生成器的内部状态:
import inspect
def my_generator():
yield 1
yield 2
yield 3
gen = my_generator()
print(f"创建后: {inspect.getgeneratorstate(gen)}")
next(gen)
print(f"第一次next后: {inspect.getgeneratorstate(gen)}")
next(gen)
print(f"第二次next后: {inspect.getgeneratorstate(gen)}")
next(gen)
print(f"第三次next后: {inspect.getgeneratorstate(gen)}")
try:
next(gen)
except StopIteration:
pass
print(f"耗尽后: {inspect.getgeneratorstate(gen)}")生成器状态有四种:
GEN_CREATED:已创建但尚未启动GEN_RUNNING:正在执行(多线程中可见)GEN_SUSPENDED:在yield处暂停GEN_CLOSED:已关闭
14.2.2 生成器与协程
生成器不仅是迭代器,还是协程(Coroutine)的基础。通过send()、throw()和close()方法,生成器可以接收外部输入、处理异常和响应关闭请求。
send方法:向生成器发送值,该值成为yield表达式的结果:
def accumulator():
total = 0
while True:
value = yield total
if value is None:
break
total += value
gen = accumulator()
next(gen)
print(gen.send(10))
print(gen.send(20))
print(gen.send(30))
gen.close()协程式生成器模式:send使生成器成为数据消费者而非仅是生产者:
def processor():
buffer = []
while True:
data = yield buffer
if data is None:
break
buffer.append(data.upper())
gen = processor()
next(gen)
print(gen.send("hello"))
print(gen.send("world"))
print(gen.send("python"))
gen.close()throw方法:在生成器暂停处抛出指定异常:
def error_handler():
try:
while True:
value = yield
print(f" 处理: {value}")
except ValueError as e:
print(f" 捕获ValueError: {e}")
yield "已恢复"
except Exception as e:
print(f" 捕获异常: {type(e).__name__}: {e}")
gen = error_handler()
next(gen)
gen.send("data1")
gen.send("data2")
result = gen.throw(ValueError, "无效值")
print(f"throw返回: {result}")
gen.close()close方法:在生成器暂停处抛出GeneratorExit异常:
def cleanup_generator():
try:
while True:
value = yield
print(f" 处理: {value}")
except GeneratorExit:
print(" 清理资源...")
finally:
print(" 生成器已关闭")
gen = cleanup_generator()
next(gen)
gen.send("data")
gen.close()
print(f"关闭后状态: {inspect.getgeneratorstate(gen)}")14.2.3 生成器的高级应用
无限序列:生成器天然适合表示无限序列:
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(f"前10个斐波那契数: {first_10}")
def primes():
seen = []
n = 2
while True:
if all(n % p != 0 for p in seen):
seen.append(n)
yield n
n += 1
prime_gen = primes()
first_20_primes = [next(prime_gen) for _ in range(20)]
print(f"前20个素数: {first_20_primes}")
def count(start=0, step=1):
n = start
while True:
yield n
n += step
evens = count(0, 2)
print(f"前5个偶数: {[next(evens) for _ in range(5)]}")数据管道:生成器可以串联形成数据处理管道:
def read_lines(filepath):
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
yield line.rstrip("\n")
def filter_lines(lines, predicate):
for line in lines:
if predicate(line):
yield line
def transform_lines(lines, transform):
for line in lines:
yield transform(line)
def limit(lines, n):
for i, line in enumerate(lines):
if i >= n:
break
yield line
def strip(lines):
for line in lines:
yield line.strip()
def non_empty(lines):
for line in lines:
if line:
yield line
data = [
" Hello World ",
"",
" Python Programming ",
" ",
" Generator Pipeline ",
"",
]
pipeline = limit(
transform_lines(
filter_lines(data, bool),
str.upper
),
3
)
print(list(pipeline))递归生成器:生成器可以递归调用自身处理嵌套结构:
def flatten(nested):
for item in nested:
if isinstance(item, (list, tuple)):
yield from flatten(item)
else:
yield item
nested = [1, [2, 3], [4, [5, 6, [7]]], 8, (9, 10)]
print(f"展平: {list(flatten(nested))}")
def tree_walker(node, depth=0):
if isinstance(node, dict):
for key, value in node.items():
yield (depth, key)
yield from tree_walker(value, depth + 1)
elif isinstance(node, list):
for i, item in enumerate(node):
yield from tree_walker(item, depth)
else:
yield (depth, node)
tree = {
"root": {
"branch1": ["leaf1", "leaf2"],
"branch2": {
"sub1": "leaf3",
"sub2": ["leaf4", "leaf5"]
}
}
}
for depth, value in tree_walker(tree):
print(" " * depth + f"→ {value}")14.3 生成器表达式
14.3.1 语法与性能
生成器表达式是列表推导式的惰性版本,使用圆括号而非方括号:
import sys
list_comp = [x ** 2 for x in range(10000)]
gen_exp = (x ** 2 for x in range(10000))
print(f"列表推导式内存: {sys.getsizeof(list_comp)} 字节")
print(f"生成器表达式内存: {sys.getsizeof(gen_exp)} 字节")
print(f"内存比: {sys.getsizeof(list_comp) / sys.getsizeof(gen_exp):.1f}x")
total = sum(x ** 2 for x in range(1000000))
print(f"1到1000000的平方和: {total}")
max_val = max(x ** 2 for x in range(1000))
print(f"最大平方值: {max_val}")
has_match = any(x > 100 for x in range(50, 150))
print(f"存在大于100的数: {has_match}")
all_positive = all(x > 0 for x in [1, 2, 3, 4, 5])
print(f"全部为正: {all_positive}")14.3.2 嵌套生成器表达式
matrix = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
]
flat = (x for row in matrix for x in row)
print(f"展平矩阵: {list(flat)}")
diagonal = (matrix[i][i] for i in range(len(matrix)))
print(f"对角线: {list(diagonal)}")
transposed = (
[row[i] for row in matrix]
for i in range(len(matrix[0]))
)
print(f"转置: {list(transposed)}")14.3.3 生成器表达式的局限
生成器表达式只能包含单个表达式,不能包含复杂语句。对于复杂逻辑,应使用生成器函数:
# 生成器表达式:简单转换
squares = (x ** 2 for x in range(10))
# 生成器函数:复杂逻辑
def process_data(data):
for item in data:
cleaned = item.strip()
if not cleaned:
continue
try:
value = float(cleaned)
except ValueError:
continue
yield value * 2
raw_data = [" 1.5 ", "", "abc", " 2.3 ", " ", "4.0"]
print(f"处理结果: {list(process_data(raw_data))}")14.4 yield from:子生成器委托
14.4.1 基本机制
yield from是Python 3.3引入的语法,用于将生成器的部分操作委托给子生成器。它不仅简化了嵌套生成器的写法,还建立了主生成器与子生成器之间的双向通信通道。
def chain(*iterables):
for iterable in iterables:
yield from iterable
result = chain([1, 2], "ab", range(3))
print(f"链接: {list(result)}")
def flatten(nested):
for item in nested:
if isinstance(item, (list, tuple)):
yield from flatten(item)
else:
yield item
nested = [1, [2, [3, 4]], [5], 6]
print(f"展平: {list(flatten(nested))}")14.4.2 yield from的完整语义
yield from不仅仅是for item in subgen: yield item的语法糖,它还处理了以下细节:
- 将
send()的值传递给子生成器 - 将
throw()的异常传递给子生成器 - 当子生成器完成时,其
return值成为yield from表达式的值 - 正确处理
StopIteration
def subgen():
yield 1
yield 2
return "subgen完成"
def main_gen():
result = yield from subgen()
print(f" 子生成器返回: {result!r}")
yield "main继续"
gen = main_gen()
print(next(gen))
print(next(gen))
print(next(gen))双向通信示例:
def echo_subgen():
while True:
received = yield
if received is None:
break
yield f"子生成器处理: {received}"
def main_gen():
yield from echo_subgen()
yield "主生成器结束"
gen = main_gen()
next(gen)
print(gen.send("hello"))
next(gen)
print(gen.send("world"))
next(gen)
gen.send(None)14.4.3 yield from与归约模式
yield from的一个重要应用是归约(Reduction)模式——子生成器产生部分结果,主生成器汇总:
def average_gen():
total = 0
count = 0
while True:
value = yield
if value is None:
break
total += value
count += 1
return total / count if count else 0
def aggregator(values):
avg = yield from average_gen()
print(f" 平均值: {avg}")
return avg
gen = aggregator(None)
next(gen)
for v in [10, 20, 30, 40, 50]:
gen.send(v)
try:
gen.send(None)
except StopIteration as e:
print(f"最终结果: {e.value}")14.5 itertools模块详解
14.5.1 无限迭代器
from itertools import count, cycle, repeat
# count: 无限计数
for i in count(10, 2):
if i > 20:
break
print(i, end=" ")
print()
# cycle: 无限循环
colors = cycle(["red", "green", "blue"])
for i, color in enumerate(colors):
if i >= 7:
break
print(color, end=" ")
print()
# repeat: 重复
for val in repeat("hello", 3):
print(val, end=" ")
print()
# 实际应用:为序列生成索引
items = ["a", "b", "c", "d"]
for idx, item in zip(count(), items):
print(f" {idx}: {item}")14.5.2 终止迭代器
from itertools import (
accumulate, chain, compress, dropwhile,
filterfalse, groupby, islice, starmap,
takewhile, tee, zip_longest
)
# accumulate: 累积计算
numbers = [1, 2, 3, 4, 5]
print(f"累加: {list(accumulate(numbers))}")
print(f"累积最大: {list(accumulate(numbers, max))}")
print(f"累积乘积: {list(accumulate(numbers, lambda x, y: x * y))}")
# chain: 链接多个可迭代对象
print(f"链接: {list(chain([1, 2], [3, 4], [5, 6]))}")
print(f"展平: {list(chain.from_iterable([[1, 2], [3, 4], [5, 6]]))}")
# compress: 按选择器过滤
data = ["a", "b", "c", "d"]
selectors = [True, False, True, False]
print(f"压缩: {list(compress(data, selectors))}")
# dropwhile: 跳过直到条件为False
numbers = [1, 2, 3, 4, 5, 1, 2]
print(f"跳过: {list(dropwhile(lambda x: x < 3, numbers))}")
# filterfalse: 反向过滤
print(f"奇数: {list(filterfalse(lambda x: x % 2 == 0, range(10)))}")
# groupby: 分组
data = [("a", 1), ("a", 2), ("b", 1), ("b", 2), ("c", 1)]
for key, group in groupby(data, lambda x: x[0]):
print(f" 组 {key}: {list(group)}")
# islice: 切片
numbers = range(100)
print(f"前10个: {list(islice(numbers, 10))}")
print(f"10-20: {list(islice(numbers, 10, 20))}")
print(f"0-20步长2: {list(islice(numbers, 0, 20, 2))}")
# starmap: 解参映射
data = [(2, 3), (4, 5), (6, 7)]
print(f"幂运算: {list(starmap(pow, data))}")
# takewhile: 取到条件为False
numbers = [1, 2, 3, 4, 5, 1, 2]
print(f"取到: {list(takewhile(lambda x: x < 3, numbers))}")
# tee: 复制迭代器
numbers = iter([1, 2, 3, 4, 5])
it1, it2 = tee(numbers, 2)
print(f"副本1: {list(it1)}")
print(f"副本2: {list(it2)}")
# zip_longest: 不等长zip
print(f"不等长zip: {list(zip_longest([1, 2, 3], ["a", "b"], fillvalue=None))}")14.5.3 组合迭代器
from itertools import combinations, combinations_with_replacement, permutations, product
# 排列
print(f"排列(3选2): {list(permutations([1, 2, 3], 2))}")
# 组合
print(f"组合(3选2): {list(combinations([1, 2, 3], 2))}")
# 可重复组合
print(f"可重复组合(3选2): {list(combinations_with_replacement([1, 2, 3], 2))}")
# 笛卡尔积
print(f"笛卡尔积: {list(product([1, 2], ["a", "b"]))}")
# 实际应用:密码组合生成
import string
def generate_passwords(chars, length):
for combo in product(chars, repeat=length):
yield "".join(combo)
digits = string.digits
count = 0
for pwd in generate_passwords(digits, 3):
count += 1
print(f"3位数字密码总数: {count}")14.5.4 构建数据处理管道
from itertools import islice, chain, groupby
from collections import Counter
def analyze_text(text):
words = text.lower().split()
word_lengths = (len(w.strip(".,!?;:")) for w in words)
length_dist = Counter(word_lengths)
print("词长分布:")
for length, count in sorted(length_dist.items()):
print(f" {length}个字符: {'█' * count} ({count})")
avg_length = sum(len(w.strip(".,!?;:")) for w in words) / len(words) if words else 0
print(f"平均词长: {avg_length:.2f}")
longest = max(words, key=lambda w: len(w.strip(".,!?;:")))
print(f"最长词: {longest}")
text = """
Python is an interpreted high-level general-purpose programming language.
Its design philosophy emphasizes code readability with its use of significant indentation.
Its language constructs as well as its object-oriented approach aim to help programmers
write clear logical code for small and large-scale projects.
"""
analyze_text(text)14.6 自定义迭代器设计模式
14.6.1 惰性求值迭代器
class LazyRange:
def __init__(self, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self.start = start
self.stop = stop
self.step = step
def __iter__(self):
current = self.start
if self.step > 0:
while current < self.stop:
yield current
current += self.step
elif self.step < 0:
while current > self.stop:
yield current
current += self.step
def __contains__(self, value):
if self.step > 0:
return self.start <= value < self.stop and (value - self.start) % self.step == 0
return self.stop < value <= self.start and (self.start - value) % (-self.step) == 0
def __reversed__(self):
if self.step > 0:
last = self.stop - 1 - (self.stop - 1 - self.start) % self.step
return LazyRange(last, self.start - 1, -self.step)
first = self.stop + 1 + (self.start - self.stop - 1) % (-self.step)
return LazyRange(first, self.start + 1, -self.step)
r = LazyRange(0, 10, 2)
print(f"正向: {list(r)}")
print(f"反向: {list(reversed(r))}")
print(f"6 in range: {6 in r}")14.6.2 缓冲迭代器
class BufferedIterator:
def __init__(self, iterable, buffer_size=1):
self._iterator = iter(iterable)
self._buffer = []
self._buffer_size = buffer_size
self._exhausted = False
def __iter__(self):
return self
def __next__(self):
if self._buffer:
return self._buffer.pop(0)
if self._exhausted:
raise StopIteration
try:
for _ in range(self._buffer_size):
self._buffer.append(next(self._iterator))
except StopIteration:
self._exhausted = True
if self._buffer:
return self._buffer.pop(0)
raise StopIteration
def peek(self):
if not self._buffer and not self._exhausted:
try:
self._buffer.append(next(self._iterator))
except StopIteration:
self._exhausted = True
if self._buffer:
return self._buffer[0]
raise StopIteration
def has_next(self):
try:
self.peek()
return True
except StopIteration:
return False
data = [1, 2, 3, 4, 5]
buf = BufferedIterator(data, buffer_size=2)
print(f"peek: {buf.peek()}")
print(f"next: {next(buf)}")
print(f"peek: {buf.peek()}")
print(f"has_next: {buf.has_next()}")
print(f"剩余: {list(buf)}")14.6.3 分块迭代器
def chunked(iterable, size):
it = iter(iterable)
while True:
chunk = []
try:
for _ in range(size):
chunk.append(next(it))
except StopIteration:
if chunk:
yield chunk
break
yield chunk
data = list(range(10))
for chunk in chunked(data, 3):
print(f" 块: {chunk}")
def sliding_window(iterable, size):
it = iter(iterable)
window = []
for _ in range(size):
try:
window.append(next(it))
except StopIteration:
return
yield tuple(window)
for item in it:
window = window[1:] + [item]
yield tuple(window)
data = [1, 2, 3, 4, 5, 6, 7]
print(f"滑动窗口(3): {list(sliding_window(data, 3))}")14.6.4 合并迭代器
def merge_sorted(*iterables, key=None):
import heapq
if key is None:
return heapq.merge(*iterables)
keyed = []
for it in iterables:
it = iter(it)
try:
first = next(it)
keyed.append([key(first), first, it])
except StopIteration:
pass
while keyed:
keyed.sort(key=lambda x: x[0])
k, value, it = keyed[0]
yield value
try:
next_val = next(it)
keyed[0] = [key(next_val), next_val, it]
except StopIteration:
keyed.pop(0)
a = [1, 3, 5, 7, 9]
b = [2, 4, 6, 8, 10]
c = [0, 11, 12]
print(f"合并: {list(merge_sorted(a, b, c))}")14.7 生成器与内存优化
14.7.1 大文件处理
def read_large_file(filepath, chunk_size=8192):
with open(filepath, "r", encoding="utf-8") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def read_lines_lazy(filepath):
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
yield line.rstrip("\n")
def count_lines(filepath):
return sum(1 for _ in read_lines_lazy(filepath))
def grep(pattern, filepath):
import re
regex = re.compile(pattern)
for line in read_lines_lazy(filepath):
if regex.search(line):
yield line14.7.2 流式数据处理
def stream_processor(data_stream):
pipeline = data_stream
pipeline = (line.strip() for line in pipeline)
pipeline = (line for line in pipeline if line)
pipeline = (line.lower() for line in pipeline)
pipeline = (line.split() for line in pipeline)
pipeline = (len(words) for words in pipeline)
return pipeline
sample_data = [
" Hello World ",
"",
" Python Programming ",
" Generator Pipeline ",
"",
]
word_counts = stream_processor(iter(sample_data))
print(f"每行词数: {list(word_counts)}")14.7.3 生成器与递归的对比
import sys
import time
def fibonacci_recursive(n):
if n <= 1:
return n
return fibonacci_recursive(n - 1) + fibonacci_recursive(n - 2)
def fibonacci_generator(limit):
a, b = 0, 1
for _ in range(limit):
yield a
a, b = b, a + b
def fibonacci_memo(limit):
cache = [0, 1]
for i in range(2, limit):
cache.append(cache[i-1] + cache[i-2])
return cache[:limit]
n = 30
start = time.perf_counter()
result_rec = [fibonacci_recursive(i) for i in range(n)]
time_rec = time.perf_counter() - start
start = time.perf_counter()
result_gen = list(fibonacci_generator(n))
time_gen = time.perf_counter() - start
start = time.perf_counter()
result_memo = fibonacci_memo(n)
time_memo = time.perf_counter() - start
print(f"递归: {time_rec:.6f}秒")
print(f"生成器: {time_gen:.6f}秒")
print(f"记忆化: {time_memo:.6f}秒")
print(f"结果一致: {result_rec == result_gen == result_memo}")14.8 前沿技术动态
14.8.1 生成器与异步编程
生成器是Python异步编程(async/await)的基础。在Python 3.5之前,协程就是基于生成器实现的:
import types
@types.coroutine
def legacy_coroutine():
result = yield "暂停"
print(f" 收到: {result}")
return "完成"
async def modern_coroutine():
result = await legacy_coroutine()
print(f" 现代协程结果: {result}")
import asyncio
asyncio.run(modern_coroutine())14.8.2 生成器与结构化并发
Python 3.11+引入了TaskGroup,结合生成器可以构建更安全的并发模式:
import asyncio
async def fetch_item(item_id):
await asyncio.sleep(0.1)
return f"data-{item_id}"
async def fetch_all(item_ids):
async def fetch_and_collect(item_id, results):
result = await fetch_item(item_id)
results.append(result)
results = []
async with asyncio.TaskGroup() as tg:
for item_id in item_ids:
tg.create_task(fetch_and_collect(item_id, results))
return results
async def main():
data = await fetch_all(range(5))
print(f"获取结果: {data}")
asyncio.run(main())14.8.3 生成器在数据科学中的应用
def batch_generator(data, batch_size, shuffle=False):
import random
indices = list(range(len(data)))
if shuffle:
random.shuffle(indices)
for start in range(0, len(indices), batch_size):
batch_indices = indices[start:start + batch_size]
yield [data[i] for i in batch_indices]
class DataStream:
def __init__(self, source, transform=None, filter_fn=None):
self.source = source
self.transform = transform
self.filter_fn = filter_fn
def __iter__(self):
for item in self.source:
if self.filter_fn and not self.filter_fn(item):
continue
if self.transform:
item = self.transform(item)
yield item
def map(self, transform):
return DataStream(self, transform=transform)
def filter(self, filter_fn):
return DataStream(self, filter_fn=filter_fn)
def take(self, n):
return list(islice(self, n))
def collect(self):
return list(self)
from itertools import islice
data = list(range(100))
stream = DataStream(
data,
transform=lambda x: x ** 2,
filter_fn=lambda x: x % 2 == 0
)
print(f"前5个偶数的平方: {stream.take(5)}")14.9 本章小结
本章深入探讨了Python生成器与迭代器机制:
- 迭代协议:可迭代协议与迭代器协议的区分、
for循环的底层机制、分离可迭代对象与迭代器的最佳实践 - 生成器函数:
yield的执行模型、生成器状态机、send/throw/close的协程式通信 - 生成器表达式:惰性求值的内存优势、嵌套表达式、与列表推导式的对比
- yield from:子生成器委托机制、双向通信、归约模式
- itertools:无限迭代器、终止迭代器、组合迭代器、数据处理管道
- 自定义迭代器:惰性求值、缓冲、分块、合并等设计模式
- 内存优化:大文件处理、流式数据处理、生成器与递归的对比
14.10 习题与项目练习
基础习题
迭代器实现:实现一个
PermutationIterator类,给定一个序列,按字典序逐个返回所有排列。生成器管道:使用生成器实现一个CSV文件处理管道,支持过滤、映射、聚合操作,且全程惰性求值。
素数筛:使用生成器实现埃拉托斯特尼筛法(Sieve of Eratosthenes),生成无限素数序列。
进阶习题
协程式生成器:实现一个基于
send的词频统计生成器,支持动态添加文本、查询词频、重置统计。合并K路有序流:实现
merge_k_sorted(k_streams),合并K个已排序的生成器为一个有序生成器,要求空间复杂度为O(K)。迭代器适配器:实现一组迭代器适配器:
peekable(可预览)、partition(分流)、roundrobin(轮流取值)。
项目练习
日志分析引擎:使用生成器管道实现一个日志分析系统,支持:
- 惰性读取大日志文件
- 多条件过滤(时间范围、级别、关键词)
- 实时统计(QPS、错误率、响应时间分布)
- 结果导出(JSON/CSV)
- 支持管道组合与复用
数据流处理框架:设计一个基于生成器的数据流处理框架,包括:
- Source:数据源抽象(文件、网络、数据库)
- Transform:数据转换算子(map、filter、flatmap、window)
- Sink:数据输出(文件、数据库、控制台)
- 支持背压(Backpressure)机制
- 支持检查点(Checkpoint)与故障恢复
14.11 延伸阅读
14.11.1 迭代器与生成器理论
- PEP 234 — Iterators (https://peps.python.org/pep-0234/) — 迭代器协议设计
- PEP 255 — Simple Generators (https://peps.python.org/pep-0255/) — 生成器语法设计
- PEP 380 — Syntax for Delegating to a Subgenerator (https://peps.python.org/pep-0380/) — yield from语法
- 《Fluent Python》第14-17章 — 迭代器、生成器与协程深度解析
14.11.2 itertools与函数式编程
- itertools官方文档 (https://docs.python.org/3/library/itertools.html) — 迭代器工具详解
- more-itertools库 (https://more-itertools.readthedocs.io/) — 扩展迭代器工具
- 《Functional Programming in Python》 (David Beazley) — 函数式编程实践
14.11.3 协程与异步编程
- PEP 492 — Coroutines with async and await syntax (https://peps.python.org/pep-0492/) — 异步语法设计
- 《Python Concurrency with asyncio》 (Matthew Fowler) — 异步编程实战
- asyncio官方文档 (https://docs.python.org/3/library/asyncio.html) — 异步IO框架
14.11.4 数据处理实践
- Generator Tricks for Systems Programmers (David Beazley) — 生成器高级技巧
- Pandas迭代器文档 (https://pandas.pydata.org/docs/user_guide/io.html) — 大数据处理
- Dask数据流 (https://docs.dask.org/) — 并行计算框架
下一章:第15章 异常处理