Skip to content

第14章 生成器与迭代器

学习目标

完成本章学习后,读者应能够:

  1. 理解迭代协议:掌握迭代器协议(__iter__/__next__)与可迭代协议(__iter__)的设计原理与实现机制
  2. 精通生成器函数:深入理解yield的执行模型、生成器状态机、协程式通信(send/throw/close
  3. 运用yield from:理解子生成器委托机制及其在异步编程中的基础作用
  4. 掌握生成器表达式:理解惰性求值的内存优势与适用场景
  5. 运用itertools:熟练使用标准库迭代器工具构建高效的数据处理管道
  6. 设计迭代器架构:能够为实际项目设计自定义迭代器,处理无限序列、数据流管道等场景

14.1 迭代协议与底层机制

14.1.1 迭代器协议

Python的迭代基于两个核心协议:

  • 可迭代协议(Iterable Protocol):对象实现__iter__()方法,返回一个迭代器
  • 迭代器协议(Iterator Protocol):对象实现__next__()方法,返回下一个元素;元素耗尽时抛出StopIteration
python
from collections.abc import Iterable, Iterator

class Counter:
    def __init__(self, max_count):
        self.max_count = max_count
        self.count = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.count < self.max_count:
            self.count += 1
            return self.count
        raise StopIteration

counter = Counter(5)
print(f"是否可迭代: {isinstance(counter, Iterable)}")
print(f"是否迭代器: {isinstance(counter, Iterator)}")

for num in counter:
    print(num, end=" ")
print()

counter2 = Counter(3)
print(next(counter2))
print(next(counter2))
print(next(counter2))
try:
    next(counter2)
except StopIteration:
    print("迭代结束")

关键理解:迭代器自身也是可迭代的(__iter__返回self),但可迭代对象不一定是迭代器。例如列表是可迭代的,但不是迭代器——每次调用iter(list)都返回一个新的迭代器对象。

python
my_list = [1, 2, 3]
it1 = iter(my_list)
it2 = iter(my_list)
print(f"it1和it2是同一对象: {it1 is it2}")
print(f"it1: {next(it1)}, it2: {next(it2)}")

counter = Counter(3)
it3 = iter(counter)
it4 = iter(counter)
print(f"it3和it4是同一对象: {it3 is it4}")

14.1.2 for循环的底层机制

for循环的本质是调用iter()获取迭代器,然后反复调用next()直到StopIteration

python
def manual_for(iterable, func):
    iterator = iter(iterable)
    while True:
        try:
            item = next(iterator)
            func(item)
        except StopIteration:
            break

manual_for([1, 2, 3], lambda x: print(x * 2, end=" "))
print()

def for_loop_equivalent(iterable):
    result = []
    iterator = iter(iterable)
    while True:
        try:
            item = next(iterator)
            result.append(item)
        except StopIteration:
            break
    return result

print(for_loop_equivalent(range(5)))

14.1.3 分离可迭代对象与迭代器

最佳实践是将可迭代对象和迭代器分离为两个类。这样可迭代对象可以多次迭代,每次创建独立的迭代器:

python
class Range:
    def __init__(self, start, stop=None, step=1):
        if stop is None:
            start, stop = 0, start
        self.start = start
        self.stop = stop
        self.step = step

    def __iter__(self):
        return RangeIterator(self.start, self.stop, self.step)

    def __len__(self):
        return max(0, (self.stop - self.start + self.step - 1) // self.step if self.step > 0
                   else (self.start - self.stop - self.step - 1) // (-self.step))

    def __contains__(self, value):
        if self.step > 0:
            return self.start <= value < self.stop and (value - self.start) % self.step == 0
        return self.stop < value <= self.start and (self.start - value) % (-self.step) == 0

class RangeIterator:
    def __init__(self, start, stop, step):
        self.current = start
        self.stop = stop
        self.step = step

    def __iter__(self):
        return self

    def __next__(self):
        if self.step > 0 and self.current >= self.stop:
            raise StopIteration
        if self.step < 0 and self.current <= self.stop:
            raise StopIteration
        result = self.current
        self.current += self.step
        return result

r = Range(2, 10, 2)
print(f"Range(2,10,2): {list(r)}")
print(f"再次迭代: {list(r)}")
print(f"长度: {len(r)}")
print(f"6 in Range: {6 in r}")

14.1.4 迭代器的常见陷阱

迭代器只能消费一次

python
it = iter([1, 2, 3])
print(f"第一次: {list(it)}")
print(f"第二次: {list(it)}")

def process_twice(iterable):
    data = list(iterable)
    print(f"第一遍: {data}")
    print(f"第二遍: {data}")

process_twice(iter([1, 2, 3]))

迭代器不是序列:迭代器不支持len()、索引和切片,因为它可能是无限的或长度未知的。

python
it = iter([1, 2, 3])
try:
    print(len(it))
except TypeError as e:
    print(f"迭代器不支持len: {e}")

try:
    print(it[0])
except TypeError as e:
    print(f"迭代器不支持索引: {e}")

14.2 生成器函数

14.2.1 yield的执行模型

生成器函数是包含yield语句的函数。调用生成器函数不会执行函数体,而是返回一个生成器对象。每次调用next()时,函数从上次暂停的位置恢复执行,直到遇到下一个yield或函数结束。

python
def simple_generator():
    print("  第1步: 开始")
    yield 1
    print("  第2步: 继续")
    yield 2
    print("  第3步: 结束")
    yield 3

gen = simple_generator()
print(f"生成器类型: {type(gen)}")
print(f"是否迭代器: {isinstance(gen, Iterator)}")

print(f"获取: {next(gen)}")
print(f"获取: {next(gen)}")
print(f"获取: {next(gen)}")
try:
    next(gen)
except StopIteration:
    print("生成器耗尽")

生成器的内部状态

python
import inspect

def my_generator():
    yield 1
    yield 2
    yield 3

gen = my_generator()
print(f"创建后: {inspect.getgeneratorstate(gen)}")

next(gen)
print(f"第一次next后: {inspect.getgeneratorstate(gen)}")

next(gen)
print(f"第二次next后: {inspect.getgeneratorstate(gen)}")

next(gen)
print(f"第三次next后: {inspect.getgeneratorstate(gen)}")

try:
    next(gen)
except StopIteration:
    pass
print(f"耗尽后: {inspect.getgeneratorstate(gen)}")

生成器状态有四种:

  • GEN_CREATED:已创建但尚未启动
  • GEN_RUNNING:正在执行(多线程中可见)
  • GEN_SUSPENDED:在yield处暂停
  • GEN_CLOSED:已关闭

14.2.2 生成器与协程

生成器不仅是迭代器,还是协程(Coroutine)的基础。通过send()throw()close()方法,生成器可以接收外部输入、处理异常和响应关闭请求。

send方法:向生成器发送值,该值成为yield表达式的结果:

python
def accumulator():
    total = 0
    while True:
        value = yield total
        if value is None:
            break
        total += value

gen = accumulator()
next(gen)

print(gen.send(10))
print(gen.send(20))
print(gen.send(30))

gen.close()

协程式生成器模式send使生成器成为数据消费者而非仅是生产者:

python
def processor():
    buffer = []
    while True:
        data = yield buffer
        if data is None:
            break
        buffer.append(data.upper())

gen = processor()
next(gen)

print(gen.send("hello"))
print(gen.send("world"))
print(gen.send("python"))

gen.close()

throw方法:在生成器暂停处抛出指定异常:

python
def error_handler():
    try:
        while True:
            value = yield
            print(f"  处理: {value}")
    except ValueError as e:
        print(f"  捕获ValueError: {e}")
        yield "已恢复"
    except Exception as e:
        print(f"  捕获异常: {type(e).__name__}: {e}")

gen = error_handler()
next(gen)

gen.send("data1")
gen.send("data2")

result = gen.throw(ValueError, "无效值")
print(f"throw返回: {result}")

gen.close()

close方法:在生成器暂停处抛出GeneratorExit异常:

python
def cleanup_generator():
    try:
        while True:
            value = yield
            print(f"  处理: {value}")
    except GeneratorExit:
        print("  清理资源...")
    finally:
        print("  生成器已关闭")

gen = cleanup_generator()
next(gen)
gen.send("data")
gen.close()
print(f"关闭后状态: {inspect.getgeneratorstate(gen)}")

14.2.3 生成器的高级应用

无限序列:生成器天然适合表示无限序列:

python
def fibonacci():
    a, b = 0, 1
    while True:
        yield a
        a, b = b, a + b

fib = fibonacci()
first_10 = [next(fib) for _ in range(10)]
print(f"前10个斐波那契数: {first_10}")

def primes():
    seen = []
    n = 2
    while True:
        if all(n % p != 0 for p in seen):
            seen.append(n)
            yield n
        n += 1

prime_gen = primes()
first_20_primes = [next(prime_gen) for _ in range(20)]
print(f"前20个素数: {first_20_primes}")

def count(start=0, step=1):
    n = start
    while True:
        yield n
        n += step

evens = count(0, 2)
print(f"前5个偶数: {[next(evens) for _ in range(5)]}")

数据管道:生成器可以串联形成数据处理管道:

python
def read_lines(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            yield line.rstrip("\n")

def filter_lines(lines, predicate):
    for line in lines:
        if predicate(line):
            yield line

def transform_lines(lines, transform):
    for line in lines:
        yield transform(line)

def limit(lines, n):
    for i, line in enumerate(lines):
        if i >= n:
            break
        yield line

def strip(lines):
    for line in lines:
        yield line.strip()

def non_empty(lines):
    for line in lines:
        if line:
            yield line

data = [
    "  Hello World  ",
    "",
    "  Python Programming  ",
    "  ",
    "  Generator Pipeline  ",
    "",
]

pipeline = limit(
    transform_lines(
        filter_lines(data, bool),
        str.upper
    ),
    3
)
print(list(pipeline))

递归生成器:生成器可以递归调用自身处理嵌套结构:

python
def flatten(nested):
    for item in nested:
        if isinstance(item, (list, tuple)):
            yield from flatten(item)
        else:
            yield item

nested = [1, [2, 3], [4, [5, 6, [7]]], 8, (9, 10)]
print(f"展平: {list(flatten(nested))}")

def tree_walker(node, depth=0):
    if isinstance(node, dict):
        for key, value in node.items():
            yield (depth, key)
            yield from tree_walker(value, depth + 1)
    elif isinstance(node, list):
        for i, item in enumerate(node):
            yield from tree_walker(item, depth)
    else:
        yield (depth, node)

tree = {
    "root": {
        "branch1": ["leaf1", "leaf2"],
        "branch2": {
            "sub1": "leaf3",
            "sub2": ["leaf4", "leaf5"]
        }
    }
}

for depth, value in tree_walker(tree):
    print("  " * depth + f"→ {value}")

14.3 生成器表达式

14.3.1 语法与性能

生成器表达式是列表推导式的惰性版本,使用圆括号而非方括号:

python
import sys

list_comp = [x ** 2 for x in range(10000)]
gen_exp = (x ** 2 for x in range(10000))

print(f"列表推导式内存: {sys.getsizeof(list_comp)} 字节")
print(f"生成器表达式内存: {sys.getsizeof(gen_exp)} 字节")
print(f"内存比: {sys.getsizeof(list_comp) / sys.getsizeof(gen_exp):.1f}x")

total = sum(x ** 2 for x in range(1000000))
print(f"1到1000000的平方和: {total}")

max_val = max(x ** 2 for x in range(1000))
print(f"最大平方值: {max_val}")

has_match = any(x > 100 for x in range(50, 150))
print(f"存在大于100的数: {has_match}")

all_positive = all(x > 0 for x in [1, 2, 3, 4, 5])
print(f"全部为正: {all_positive}")

14.3.2 嵌套生成器表达式

python
matrix = [
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9],
]

flat = (x for row in matrix for x in row)
print(f"展平矩阵: {list(flat)}")

diagonal = (matrix[i][i] for i in range(len(matrix)))
print(f"对角线: {list(diagonal)}")

transposed = (
    [row[i] for row in matrix]
    for i in range(len(matrix[0]))
)
print(f"转置: {list(transposed)}")

14.3.3 生成器表达式的局限

生成器表达式只能包含单个表达式,不能包含复杂语句。对于复杂逻辑,应使用生成器函数:

python
# 生成器表达式:简单转换
squares = (x ** 2 for x in range(10))

# 生成器函数:复杂逻辑
def process_data(data):
    for item in data:
        cleaned = item.strip()
        if not cleaned:
            continue
        try:
            value = float(cleaned)
        except ValueError:
            continue
        yield value * 2

raw_data = [" 1.5 ", "", "abc", " 2.3 ", "  ", "4.0"]
print(f"处理结果: {list(process_data(raw_data))}")

14.4 yield from:子生成器委托

14.4.1 基本机制

yield from是Python 3.3引入的语法,用于将生成器的部分操作委托给子生成器。它不仅简化了嵌套生成器的写法,还建立了主生成器与子生成器之间的双向通信通道。

python
def chain(*iterables):
    for iterable in iterables:
        yield from iterable

result = chain([1, 2], "ab", range(3))
print(f"链接: {list(result)}")

def flatten(nested):
    for item in nested:
        if isinstance(item, (list, tuple)):
            yield from flatten(item)
        else:
            yield item

nested = [1, [2, [3, 4]], [5], 6]
print(f"展平: {list(flatten(nested))}")

14.4.2 yield from的完整语义

yield from不仅仅是for item in subgen: yield item的语法糖,它还处理了以下细节:

  1. send()的值传递给子生成器
  2. throw()的异常传递给子生成器
  3. 当子生成器完成时,其return值成为yield from表达式的值
  4. 正确处理StopIteration
python
def subgen():
    yield 1
    yield 2
    return "subgen完成"

def main_gen():
    result = yield from subgen()
    print(f"  子生成器返回: {result!r}")
    yield "main继续"

gen = main_gen()
print(next(gen))
print(next(gen))
print(next(gen))

双向通信示例

python
def echo_subgen():
    while True:
        received = yield
        if received is None:
            break
        yield f"子生成器处理: {received}"

def main_gen():
    yield from echo_subgen()
    yield "主生成器结束"

gen = main_gen()
next(gen)

print(gen.send("hello"))
next(gen)
print(gen.send("world"))
next(gen)
gen.send(None)

14.4.3 yield from与归约模式

yield from的一个重要应用是归约(Reduction)模式——子生成器产生部分结果,主生成器汇总:

python
def average_gen():
    total = 0
    count = 0
    while True:
        value = yield
        if value is None:
            break
        total += value
        count += 1
    return total / count if count else 0

def aggregator(values):
    avg = yield from average_gen()
    print(f"  平均值: {avg}")
    return avg

gen = aggregator(None)
next(gen)
for v in [10, 20, 30, 40, 50]:
    gen.send(v)
try:
    gen.send(None)
except StopIteration as e:
    print(f"最终结果: {e.value}")

14.5 itertools模块详解

14.5.1 无限迭代器

python
from itertools import count, cycle, repeat

# count: 无限计数
for i in count(10, 2):
    if i > 20:
        break
    print(i, end=" ")
print()

# cycle: 无限循环
colors = cycle(["red", "green", "blue"])
for i, color in enumerate(colors):
    if i >= 7:
        break
    print(color, end=" ")
print()

# repeat: 重复
for val in repeat("hello", 3):
    print(val, end=" ")
print()

# 实际应用:为序列生成索引
items = ["a", "b", "c", "d"]
for idx, item in zip(count(), items):
    print(f"  {idx}: {item}")

14.5.2 终止迭代器

python
from itertools import (
    accumulate, chain, compress, dropwhile,
    filterfalse, groupby, islice, starmap,
    takewhile, tee, zip_longest
)

# accumulate: 累积计算
numbers = [1, 2, 3, 4, 5]
print(f"累加: {list(accumulate(numbers))}")
print(f"累积最大: {list(accumulate(numbers, max))}")
print(f"累积乘积: {list(accumulate(numbers, lambda x, y: x * y))}")

# chain: 链接多个可迭代对象
print(f"链接: {list(chain([1, 2], [3, 4], [5, 6]))}")
print(f"展平: {list(chain.from_iterable([[1, 2], [3, 4], [5, 6]]))}")

# compress: 按选择器过滤
data = ["a", "b", "c", "d"]
selectors = [True, False, True, False]
print(f"压缩: {list(compress(data, selectors))}")

# dropwhile: 跳过直到条件为False
numbers = [1, 2, 3, 4, 5, 1, 2]
print(f"跳过: {list(dropwhile(lambda x: x < 3, numbers))}")

# filterfalse: 反向过滤
print(f"奇数: {list(filterfalse(lambda x: x % 2 == 0, range(10)))}")

# groupby: 分组
data = [("a", 1), ("a", 2), ("b", 1), ("b", 2), ("c", 1)]
for key, group in groupby(data, lambda x: x[0]):
    print(f"  组 {key}: {list(group)}")

# islice: 切片
numbers = range(100)
print(f"前10个: {list(islice(numbers, 10))}")
print(f"10-20: {list(islice(numbers, 10, 20))}")
print(f"0-20步长2: {list(islice(numbers, 0, 20, 2))}")

# starmap: 解参映射
data = [(2, 3), (4, 5), (6, 7)]
print(f"幂运算: {list(starmap(pow, data))}")

# takewhile: 取到条件为False
numbers = [1, 2, 3, 4, 5, 1, 2]
print(f"取到: {list(takewhile(lambda x: x < 3, numbers))}")

# tee: 复制迭代器
numbers = iter([1, 2, 3, 4, 5])
it1, it2 = tee(numbers, 2)
print(f"副本1: {list(it1)}")
print(f"副本2: {list(it2)}")

# zip_longest: 不等长zip
print(f"不等长zip: {list(zip_longest([1, 2, 3], ["a", "b"], fillvalue=None))}")

14.5.3 组合迭代器

python
from itertools import combinations, combinations_with_replacement, permutations, product

# 排列
print(f"排列(3选2): {list(permutations([1, 2, 3], 2))}")

# 组合
print(f"组合(3选2): {list(combinations([1, 2, 3], 2))}")

# 可重复组合
print(f"可重复组合(3选2): {list(combinations_with_replacement([1, 2, 3], 2))}")

# 笛卡尔积
print(f"笛卡尔积: {list(product([1, 2], ["a", "b"]))}")

# 实际应用:密码组合生成
import string

def generate_passwords(chars, length):
    for combo in product(chars, repeat=length):
        yield "".join(combo)

digits = string.digits
count = 0
for pwd in generate_passwords(digits, 3):
    count += 1
print(f"3位数字密码总数: {count}")

14.5.4 构建数据处理管道

python
from itertools import islice, chain, groupby
from collections import Counter

def analyze_text(text):
    words = text.lower().split()
    word_lengths = (len(w.strip(".,!?;:")) for w in words)
    length_dist = Counter(word_lengths)

    print("词长分布:")
    for length, count in sorted(length_dist.items()):
        print(f"  {length}个字符: {'█' * count} ({count})")

    avg_length = sum(len(w.strip(".,!?;:")) for w in words) / len(words) if words else 0
    print(f"平均词长: {avg_length:.2f}")

    longest = max(words, key=lambda w: len(w.strip(".,!?;:")))
    print(f"最长词: {longest}")

text = """
Python is an interpreted high-level general-purpose programming language.
Its design philosophy emphasizes code readability with its use of significant indentation.
Its language constructs as well as its object-oriented approach aim to help programmers
write clear logical code for small and large-scale projects.
"""

analyze_text(text)

14.6 自定义迭代器设计模式

14.6.1 惰性求值迭代器

python
class LazyRange:
    def __init__(self, start, stop=None, step=1):
        if stop is None:
            start, stop = 0, start
        self.start = start
        self.stop = stop
        self.step = step

    def __iter__(self):
        current = self.start
        if self.step > 0:
            while current < self.stop:
                yield current
                current += self.step
        elif self.step < 0:
            while current > self.stop:
                yield current
                current += self.step

    def __contains__(self, value):
        if self.step > 0:
            return self.start <= value < self.stop and (value - self.start) % self.step == 0
        return self.stop < value <= self.start and (self.start - value) % (-self.step) == 0

    def __reversed__(self):
        if self.step > 0:
            last = self.stop - 1 - (self.stop - 1 - self.start) % self.step
            return LazyRange(last, self.start - 1, -self.step)
        first = self.stop + 1 + (self.start - self.stop - 1) % (-self.step)
        return LazyRange(first, self.start + 1, -self.step)

r = LazyRange(0, 10, 2)
print(f"正向: {list(r)}")
print(f"反向: {list(reversed(r))}")
print(f"6 in range: {6 in r}")

14.6.2 缓冲迭代器

python
class BufferedIterator:
    def __init__(self, iterable, buffer_size=1):
        self._iterator = iter(iterable)
        self._buffer = []
        self._buffer_size = buffer_size
        self._exhausted = False

    def __iter__(self):
        return self

    def __next__(self):
        if self._buffer:
            return self._buffer.pop(0)
        if self._exhausted:
            raise StopIteration
        try:
            for _ in range(self._buffer_size):
                self._buffer.append(next(self._iterator))
        except StopIteration:
            self._exhausted = True
        if self._buffer:
            return self._buffer.pop(0)
        raise StopIteration

    def peek(self):
        if not self._buffer and not self._exhausted:
            try:
                self._buffer.append(next(self._iterator))
            except StopIteration:
                self._exhausted = True
        if self._buffer:
            return self._buffer[0]
        raise StopIteration

    def has_next(self):
        try:
            self.peek()
            return True
        except StopIteration:
            return False

data = [1, 2, 3, 4, 5]
buf = BufferedIterator(data, buffer_size=2)
print(f"peek: {buf.peek()}")
print(f"next: {next(buf)}")
print(f"peek: {buf.peek()}")
print(f"has_next: {buf.has_next()}")
print(f"剩余: {list(buf)}")

14.6.3 分块迭代器

python
def chunked(iterable, size):
    it = iter(iterable)
    while True:
        chunk = []
        try:
            for _ in range(size):
                chunk.append(next(it))
        except StopIteration:
            if chunk:
                yield chunk
            break
        yield chunk

data = list(range(10))
for chunk in chunked(data, 3):
    print(f"  块: {chunk}")

def sliding_window(iterable, size):
    it = iter(iterable)
    window = []
    for _ in range(size):
        try:
            window.append(next(it))
        except StopIteration:
            return
    yield tuple(window)
    for item in it:
        window = window[1:] + [item]
        yield tuple(window)

data = [1, 2, 3, 4, 5, 6, 7]
print(f"滑动窗口(3): {list(sliding_window(data, 3))}")

14.6.4 合并迭代器

python
def merge_sorted(*iterables, key=None):
    import heapq
    if key is None:
        return heapq.merge(*iterables)
    
    keyed = []
    for it in iterables:
        it = iter(it)
        try:
            first = next(it)
            keyed.append([key(first), first, it])
        except StopIteration:
            pass
    
    while keyed:
        keyed.sort(key=lambda x: x[0])
        k, value, it = keyed[0]
        yield value
        try:
            next_val = next(it)
            keyed[0] = [key(next_val), next_val, it]
        except StopIteration:
            keyed.pop(0)

a = [1, 3, 5, 7, 9]
b = [2, 4, 6, 8, 10]
c = [0, 11, 12]

print(f"合并: {list(merge_sorted(a, b, c))}")

14.7 生成器与内存优化

14.7.1 大文件处理

python
def read_large_file(filepath, chunk_size=8192):
    with open(filepath, "r", encoding="utf-8") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def read_lines_lazy(filepath):
    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            yield line.rstrip("\n")

def count_lines(filepath):
    return sum(1 for _ in read_lines_lazy(filepath))

def grep(pattern, filepath):
    import re
    regex = re.compile(pattern)
    for line in read_lines_lazy(filepath):
        if regex.search(line):
            yield line

14.7.2 流式数据处理

python
def stream_processor(data_stream):
    pipeline = data_stream
    pipeline = (line.strip() for line in pipeline)
    pipeline = (line for line in pipeline if line)
    pipeline = (line.lower() for line in pipeline)
    pipeline = (line.split() for line in pipeline)
    pipeline = (len(words) for words in pipeline)
    return pipeline

sample_data = [
    "  Hello World  ",
    "",
    "  Python Programming  ",
    "  Generator Pipeline  ",
    "",
]

word_counts = stream_processor(iter(sample_data))
print(f"每行词数: {list(word_counts)}")

14.7.3 生成器与递归的对比

python
import sys
import time

def fibonacci_recursive(n):
    if n <= 1:
        return n
    return fibonacci_recursive(n - 1) + fibonacci_recursive(n - 2)

def fibonacci_generator(limit):
    a, b = 0, 1
    for _ in range(limit):
        yield a
        a, b = b, a + b

def fibonacci_memo(limit):
    cache = [0, 1]
    for i in range(2, limit):
        cache.append(cache[i-1] + cache[i-2])
    return cache[:limit]

n = 30
start = time.perf_counter()
result_rec = [fibonacci_recursive(i) for i in range(n)]
time_rec = time.perf_counter() - start

start = time.perf_counter()
result_gen = list(fibonacci_generator(n))
time_gen = time.perf_counter() - start

start = time.perf_counter()
result_memo = fibonacci_memo(n)
time_memo = time.perf_counter() - start

print(f"递归: {time_rec:.6f}秒")
print(f"生成器: {time_gen:.6f}秒")
print(f"记忆化: {time_memo:.6f}秒")
print(f"结果一致: {result_rec == result_gen == result_memo}")

14.8 前沿技术动态

14.8.1 生成器与异步编程

生成器是Python异步编程(async/await)的基础。在Python 3.5之前,协程就是基于生成器实现的:

python
import types

@types.coroutine
def legacy_coroutine():
    result = yield "暂停"
    print(f"  收到: {result}")
    return "完成"

async def modern_coroutine():
    result = await legacy_coroutine()
    print(f"  现代协程结果: {result}")

import asyncio
asyncio.run(modern_coroutine())

14.8.2 生成器与结构化并发

Python 3.11+引入了TaskGroup,结合生成器可以构建更安全的并发模式:

python
import asyncio

async def fetch_item(item_id):
    await asyncio.sleep(0.1)
    return f"data-{item_id}"

async def fetch_all(item_ids):
    async def fetch_and_collect(item_id, results):
        result = await fetch_item(item_id)
        results.append(result)

    results = []
    async with asyncio.TaskGroup() as tg:
        for item_id in item_ids:
            tg.create_task(fetch_and_collect(item_id, results))
    return results

async def main():
    data = await fetch_all(range(5))
    print(f"获取结果: {data}")

asyncio.run(main())

14.8.3 生成器在数据科学中的应用

python
def batch_generator(data, batch_size, shuffle=False):
    import random
    indices = list(range(len(data)))
    if shuffle:
        random.shuffle(indices)
    
    for start in range(0, len(indices), batch_size):
        batch_indices = indices[start:start + batch_size]
        yield [data[i] for i in batch_indices]

class DataStream:
    def __init__(self, source, transform=None, filter_fn=None):
        self.source = source
        self.transform = transform
        self.filter_fn = filter_fn

    def __iter__(self):
        for item in self.source:
            if self.filter_fn and not self.filter_fn(item):
                continue
            if self.transform:
                item = self.transform(item)
            yield item

    def map(self, transform):
        return DataStream(self, transform=transform)

    def filter(self, filter_fn):
        return DataStream(self, filter_fn=filter_fn)

    def take(self, n):
        return list(islice(self, n))

    def collect(self):
        return list(self)

from itertools import islice

data = list(range(100))
stream = DataStream(
    data,
    transform=lambda x: x ** 2,
    filter_fn=lambda x: x % 2 == 0
)
print(f"前5个偶数的平方: {stream.take(5)}")

14.9 本章小结

本章深入探讨了Python生成器与迭代器机制:

  1. 迭代协议:可迭代协议与迭代器协议的区分、for循环的底层机制、分离可迭代对象与迭代器的最佳实践
  2. 生成器函数yield的执行模型、生成器状态机、send/throw/close的协程式通信
  3. 生成器表达式:惰性求值的内存优势、嵌套表达式、与列表推导式的对比
  4. yield from:子生成器委托机制、双向通信、归约模式
  5. itertools:无限迭代器、终止迭代器、组合迭代器、数据处理管道
  6. 自定义迭代器:惰性求值、缓冲、分块、合并等设计模式
  7. 内存优化:大文件处理、流式数据处理、生成器与递归的对比

14.10 习题与项目练习

基础习题

  1. 迭代器实现:实现一个PermutationIterator类,给定一个序列,按字典序逐个返回所有排列。

  2. 生成器管道:使用生成器实现一个CSV文件处理管道,支持过滤、映射、聚合操作,且全程惰性求值。

  3. 素数筛:使用生成器实现埃拉托斯特尼筛法(Sieve of Eratosthenes),生成无限素数序列。

进阶习题

  1. 协程式生成器:实现一个基于send的词频统计生成器,支持动态添加文本、查询词频、重置统计。

  2. 合并K路有序流:实现merge_k_sorted(k_streams),合并K个已排序的生成器为一个有序生成器,要求空间复杂度为O(K)。

  3. 迭代器适配器:实现一组迭代器适配器:peekable(可预览)、partition(分流)、roundrobin(轮流取值)。

项目练习

  1. 日志分析引擎:使用生成器管道实现一个日志分析系统,支持:

    • 惰性读取大日志文件
    • 多条件过滤(时间范围、级别、关键词)
    • 实时统计(QPS、错误率、响应时间分布)
    • 结果导出(JSON/CSV)
    • 支持管道组合与复用
  2. 数据流处理框架:设计一个基于生成器的数据流处理框架,包括:

    • Source:数据源抽象(文件、网络、数据库)
    • Transform:数据转换算子(map、filter、flatmap、window)
    • Sink:数据输出(文件、数据库、控制台)
    • 支持背压(Backpressure)机制
    • 支持检查点(Checkpoint)与故障恢复

14.11 延伸阅读

14.11.1 迭代器与生成器理论

14.11.2 itertools与函数式编程

14.11.3 协程与异步编程

14.11.4 数据处理实践


下一章:第15章 异常处理

青少年创意编程 - 高中Python组 - 江苏省宿城中等专业学校