Skip to content

第8章 队列与广度优先搜索理论

本章导读

队列是最基础的线性数据结构之一,其FIFO(先进先出)特性使其成为广度优先搜索、任务调度、消息传递等核心应用的基础。本章从形式化角度深入分析队列的数学基础,包括队列ADT规范、循环队列理论、优先队列的堆实现,以及BFS算法的正确性证明。

学习目标

完成本章学习后,读者将能够:

  • 掌握队列ADT的形式化规范与公理系统
  • 理解循环队列的数学模型与实现
  • 掌握优先队列的堆实现与复杂度分析
  • 理解BFS算法的正确性证明与应用
  • 掌握队列在系统设计中的应用模式

8.1 队列的形式化理论

8.1.1 队列的数学定义

定义 8.1(队列) 队列是一个有序序列 $Q = \langle q_1, q_2, \ldots, q_n \rangle$,支持在一端(队尾)插入、另一端(队首)删除。

定义 8.2(队列操作) 队列支持以下基本操作:

  • $enqueue(Q, x) = \langle q_1, \ldots, q_n, x \rangle$:将 $x$ 加入队尾
  • $dequeue(Q) = \langle q_2, \ldots, q_n \rangle$:移除队首元素
  • $front(Q) = q_1$:返回队首元素
  • $isEmpty(Q) = (n = 0)$:判断队列是否为空

定义 8.3(FIFO性质) 队列满足先进先出性质:最先入队的元素最先出队。

$$\forall Q, x: front(enqueue(Q, x)) = front(Q) \quad \text{(当 } Q \neq \emptyset \text{)}$$

$$\forall Q, x: dequeue(enqueue(Q, x)) = enqueue(dequeue(Q), x) \quad \text{(当 } Q \neq \emptyset \text{)}$$

8.1.2 队列ADT规范

ADT 8.1(Queue)

ADT Queue[T]
  Types: Queue[T]
  
  Operations:
    empty: → Queue[T]
    enqueue: Queue[T] × T → Queue[T]
    dequeue: Queue[T] → Queue[T]
    front: Queue[T] → T
    isEmpty: Queue[T] → Boolean
    size: Queue[T] → ℕ
  
  Axioms:
    ∀q: Queue[T], ∀x: T
    (A1) isEmpty(empty) = true
    (A2) isEmpty(enqueue(q, x)) = false
    (A3) front(enqueue(q, x)) = front(q)       (当 ¬isEmpty(q))
    (A4) front(enqueue(empty, x)) = x
    (A5) dequeue(enqueue(q, x)) = enqueue(dequeue(q), x)  (当 ¬isEmpty(q))
    (A6) dequeue(enqueue(empty, x)) = empty
    (A7) size(empty) = 0
    (A8) size(enqueue(q, x)) = size(q) + 1
    (A9) dequeue(empty) = error
    (A10) front(empty) = error

定理 8.1(队列ADT一致性) 队列ADT的公理系统是一致的。

证明:构造列表模型。设队列为列表 $L$,队首为列表头部,队尾为列表末尾。

  • $empty = []$
  • $enqueue(L, x) = L + [x]$
  • $dequeue(L) = L[1..|L|-1]$(当 $L \neq []$)
  • $front(L) = L[0]$(当 $L \neq []$)
  • $isEmpty(L) = (L = [])$

可直接验证所有公理成立。∎

定理 8.2(队列操作的顺序性) 对于任意队列 $Q$ 和元素序列 $x_1, x_2, \ldots, x_k$:

$$front(dequeue^i(enqueue(Q, x_1) \circ \ldots \circ enqueue(x_k))) = x_{i+1}$$

其中 $0 \leq i < k$,$dequeue^i$ 表示连续 $i$ 次 $dequeue$ 操作。

证明:对 $i$ 进行归纳。

基础情况($i = 0$):$front(enqueue(Q, x_1) \circ \ldots) = x_1$,由公理 (A3) 和 (A4)。

归纳步骤:假设对 $i$ 成立,则对 $i + 1$:

$$front(dequeue^{i+1}(enqueue(Q, x_1) \circ \ldots \circ enqueue(x_k)))$$ $$= front(dequeue(dequeue^i(enqueue(Q, x_1) \circ \ldots \circ enqueue(x_k))))$$

由归纳假设,$dequeue^i$ 后队首为 $x_{i+1}$,故 $dequeue$ 后队首为 $x_{i+2}$。∎

8.1.3 队列与栈的对偶性

定理 8.3(队列-栈对偶性) 队列和栈的操作存在对偶关系:

队列
LIFOFIFO
$top(push(S, x)) = x$$front(enqueue(Q, x)) = front(Q)$(当 $Q \neq \emptyset$)
$pop(push(S, x)) = S$$dequeue(enqueue(Q, x)) \neq Q$(一般情况)

定理 8.4(用栈实现队列) 可以使用两个栈模拟队列,使得每个队列操作的均摊时间为 $O(1)$。

证明:使用栈 $S_1$(入队栈)和 $S_2$(出队栈)。

  • enqueue(x):压入 $S_1$,$O(1)$
  • dequeue():若 $S_2$ 为空,将 $S_1$ 所有元素弹出并压入 $S_2$;然后弹出 $S_2$ 栈顶

每个元素最多被移动两次($S_1 \to S_2$),因此均摊代价为 $O(1)$。∎


8.2 循环队列理论

8.2.1 循环队列的数学模型

定义 8.4(循环队列) 循环队列使用固定大小数组,通过模运算实现环形索引。

定义 8.5(循环队列状态) 循环队列状态为四元组 $(A, f, r, c)$:

  • $A[0..m-1]$:存储数组
  • $f$:队首指针
  • $r$:队尾指针
  • $c$:容量

定义 8.6(循环索引) 循环队列的索引计算:

$$next(i) = (i + 1) \mod c$$

$$prev(i) = (i - 1 + c) \mod c$$

定理 8.5(循环队列大小) 循环队列的元素数量为:

$$size = (r - f + c) \mod c$$

证明:考虑两种情况:

  1. $r \geq f$:元素在 $[f, r)$ 区间,数量为 $r - f$
  2. $r < f$:元素在 $[f, c) \cup [0, r)$,数量为 $(c - f) + r = r - f + c$

统一为 $(r - f + c) \mod c$。∎

8.2.2 循环队列实现

python
from typing import TypeVar, Generic, Optional, Iterator, List

T = TypeVar('T')

class CircularQueue(Generic[T]):
    """
    循环队列实现
    
    时间复杂度:所有操作 O(1)
    空间复杂度:O(capacity)
    
    特点:
    - 固定容量
    - 无假溢出
    - 高效利用空间
    """
    
    def __init__(self, capacity: int):
        if capacity <= 0:
            raise ValueError("Capacity must be positive")
        self._capacity = capacity
        self._items: List[Optional[T]] = [None] * capacity
        self._front = 0
        self._rear = 0
        self._size = 0
    
    def enqueue(self, item: T) -> bool:
        """入队 - O(1)"""
        if self.is_full():
            return False
        self._items[self._rear] = item
        self._rear = (self._rear + 1) % self._capacity
        self._size += 1
        return True
    
    def dequeue(self) -> Optional[T]:
        """出队 - O(1)"""
        if self.is_empty():
            return None
        item = self._items[self._front]
        self._items[self._front] = None
        self._front = (self._front + 1) % self._capacity
        self._size -= 1
        return item
    
    def peek(self) -> Optional[T]:
        """查看队首 - O(1)"""
        if self.is_empty():
            return None
        return self._items[self._front]
    
    def is_empty(self) -> bool:
        return self._size == 0
    
    def is_full(self) -> bool:
        return self._size == self._capacity
    
    def __len__(self) -> int:
        return self._size
    
    @property
    def capacity(self) -> int:
        return self._capacity
    
    def __iter__(self) -> Iterator[T]:
        if self.is_empty():
            return
        i = self._front
        for _ in range(self._size):
            yield self._items[i]
            i = (i + 1) % self._capacity
    
    def __repr__(self) -> str:
        return f"CircularQueue({list(self)}, size={self._size}/{self._capacity})"


class DynamicCircularQueue(Generic[T]):
    """
    动态扩容循环队列
    
    当队列满时自动扩容
    """
    
    def __init__(self, initial_capacity: int = 8):
        self._capacity = max(1, initial_capacity)
        self._items: List[Optional[T]] = [None] * self._capacity
        self._front = 0
        self._rear = 0
        self._size = 0
    
    def _resize(self, new_capacity: int) -> None:
        new_items = [None] * new_capacity
        for i in range(self._size):
            new_items[i] = self._items[(self._front + i) % self._capacity]
        self._items = new_items
        self._front = 0
        self._rear = self._size
        self._capacity = new_capacity
    
    def enqueue(self, item: T) -> None:
        if self._size == self._capacity:
            self._resize(2 * self._capacity)
        self._items[self._rear] = item
        self._rear = (self._rear + 1) % self._capacity
        self._size += 1
    
    def dequeue(self) -> Optional[T]:
        if self.is_empty():
            return None
        item = self._items[self._front]
        self._items[self._front] = None
        self._front = (self._front + 1) % self._capacity
        self._size -= 1
        
        if 0 < self._size < self._capacity // 4:
            self._resize(max(1, self._capacity // 2))
        
        return item
    
    def peek(self) -> Optional[T]:
        if self.is_empty():
            return None
        return self._items[self._front]
    
    def is_empty(self) -> bool:
        return self._size == 0
    
    def __len__(self) -> int:
        return self._size
    
    def __iter__(self) -> Iterator[T]:
        for i in range(self._size):
            yield self._items[(self._front + i) % self._capacity]
    
    def __repr__(self) -> str:
        return f"DynamicCircularQueue({list(self)})"

8.3 双端队列理论

8.3.1 双端队列ADT

ADT 8.2(Deque)

ADT Deque[T]
  Types: Deque[T]
  
  Operations:
    empty: → Deque[T]
    addFront: Deque[T] × T → Deque[T]
    addRear: Deque[T] × T → Deque[T]
    removeFront: Deque[T] → Deque[T]
    removeRear: Deque[T] → Deque[T]
    front: Deque[T] → T
    rear: Deque[T] → T
    isEmpty: Deque[T] → Boolean
  
  Axioms:
    ∀d: Deque[T], ∀x: T
    (A1) isEmpty(empty) = true
    (A2) isEmpty(addFront(d, x)) = false
    (A3) front(addFront(d, x)) = x
    (A4) rear(addRear(d, x)) = x
    (A5) removeFront(addFront(d, x)) = d
    (A6) removeRear(addRear(d, x)) = d
    ... (其他公理类似队列)

定理 8.6(双端队列的通用性) 双端队列可以同时模拟栈和队列。

证明

  • 模拟栈:只用 $addFront$ 和 $removeFront$,实现 LIFO
  • 模拟队列:用 $addRear$ 和 $removeFront$,实现 FIFO

因此双端队列是栈和队列的超集。∎

8.3.2 Python deque实现分析

CPython的 collections.deque 使用双向链表块结构:

deque内部结构
┌─────────────────────────────────────────────────────────────┐
│  BLOCK_SIZE = 64 (每个块存储64个元素)                        │
├─────────────────────────────────────────────────────────────┤
│  leftblock ──→ [││││││││││││││││││││]                      │
│                    ↑                                         │
│               leftindex                                      │
│                                                              │
│  rightblock ─→ [││││││││││││││││││││]                      │
│                              ↑                               │
│                         rightindex                           │
└─────────────────────────────────────────────────────────────┘

定理 8.7(deque操作复杂度) Python deque的操作复杂度:

操作时间复杂度
append / appendleft$O(1)$
pop / popleft$O(1)$
访问中间元素$O(n)$
python
from collections import deque
from typing import TypeVar, Generic, Iterator, List, Optional

T = TypeVar('T')

class Deque(Generic[T]):
    """
    双端队列封装
    
    时间复杂度:
    - 两端操作:O(1)
    - 中间访问:O(n)
    """
    
    def __init__(self, iterable=None, maxlen: Optional[int] = None):
        self._deque = deque(iterable or [], maxlen=maxlen)
    
    def add_front(self, item: T) -> None:
        self._deque.appendleft(item)
    
    def add_rear(self, item: T) -> None:
        self._deque.append(item)
    
    def remove_front(self) -> T:
        if self.is_empty():
            raise IndexError("remove from empty deque")
        return self._deque.popleft()
    
    def remove_rear(self) -> T:
        if self.is_empty():
            raise IndexError("remove from empty deque")
        return self._deque.pop()
    
    def front(self) -> T:
        if self.is_empty():
            raise IndexError("front from empty deque")
        return self._deque[0]
    
    def rear(self) -> T:
        if self.is_empty():
            raise IndexError("rear from empty deque")
        return self._deque[-1]
    
    def is_empty(self) -> bool:
        return len(self._deque) == 0
    
    def __len__(self) -> int:
        return len(self._deque)
    
    def __iter__(self) -> Iterator[T]:
        return iter(self._deque)
    
    def rotate(self, n: int) -> None:
        """旋转n步(正数右移,负数左移)"""
        self._deque.rotate(n)
    
    def __repr__(self) -> str:
        return f"Deque({list(self._deque)})"

8.4 优先队列理论

8.4.1 优先队列ADT

定义 8.7(优先队列) 优先队列是一种抽象数据类型,其中每个元素关联一个优先级,出队操作返回优先级最高(或最低)的元素。

ADT 8.3(PriorityQueue)

ADT PriorityQueue[T, P]
  Types: PriorityQueue[T, P]
  
  Operations:
    empty: → PriorityQueue[T, P]
    insert: PriorityQueue[T, P] × T × P → PriorityQueue[T, P]
    extractMax: PriorityQueue[T, P] → (T, P) × PriorityQueue[T, P]
    findMax: PriorityQueue[T, P] → (T, P)
    isEmpty: PriorityQueue[T, P] → Boolean
  
  Axioms:
    ∀pq: PriorityQueue[T, P], ∀x: T, ∀p: P
    (A1) isEmpty(empty) = true
    (A2) isEmpty(insert(pq, x, p)) = false
    (A3) findMax(insert(empty, x, p)) = (x, p)
    (A4) findMax(insert(pq, x, p)) = max((x, p), findMax(pq))

8.4.2 二叉堆实现

定义 8.8(二叉堆) 二叉堆是完全二叉树,满足堆性质:

  • 最大堆:每个节点的值 ≥ 其子节点的值
  • 最小堆:每个节点的值 ≤ 其子节点的值

定理 8.8(堆操作复杂度)

操作时间复杂度
insert$O(\log n)$
extractMax/Min$O(\log n)$
findMax/Min$O(1)$
buildHeap$O(n)$

定理 8.9(buildHeap复杂度) 从无序数组构建堆的时间复杂度为 $O(n)$。

证明:设堆高度为 $h$,第 $i$ 层最多有 $2^i$ 个节点,每个节点最多下沉 $h-i$ 次。

$$T(n) = \sum_{i=0}^{h} 2^i \cdot (h - i)$$

设 $j = h - i$:

$$T(n) = \sum_{j=0}^{h} 2^{h-j} \cdot j = 2^h \sum_{j=0}^{h} \frac{j}{2^j}$$

由于 $\sum_{j=0}^{\infty} \frac{j}{2^j} = 2$,且 $2^h \approx n/2$:

$$T(n) = O(n)$$

python
import heapq
from typing import TypeVar, Generic, List, Tuple, Optional, Callable

T = TypeVar('T')
P = TypeVar('P')

class PriorityQueue(Generic[T]):
    """
    优先队列实现(最小堆)
    
    时间复杂度:
    - push: O(log n)
    - pop: O(log n)
    - peek: O(1)
    
    空间复杂度:O(n)
    """
    
    def __init__(self, key: Optional[Callable[[T], int]] = None):
        self._heap: List[Tuple[int, int, T]] = []
        self._counter = 0
        self._key = key or (lambda x: x)
    
    def push(self, item: T) -> None:
        priority = self._key(item)
        heapq.heappush(self._heap, (priority, self._counter, item))
        self._counter += 1
    
    def pop(self) -> T:
        if self.is_empty():
            raise IndexError("pop from empty priority queue")
        return heapq.heappop(self._heap)[2]
    
    def peek(self) -> T:
        if self.is_empty():
            raise IndexError("peek from empty priority queue")
        return self._heap[0][2]
    
    def is_empty(self) -> bool:
        return len(self._heap) == 0
    
    def __len__(self) -> int:
        return len(self._heap)
    
    def __repr__(self) -> str:
        items = [(p, item) for p, _, item in sorted(self._heap)]
        return f"PriorityQueue({items})"


class MaxHeap(Generic[T]):
    """最大堆实现"""
    
    def __init__(self):
        self._heap: List[T] = []
    
    def _parent(self, i: int) -> int:
        return (i - 1) // 2
    
    def _left(self, i: int) -> int:
        return 2 * i + 1
    
    def _right(self, i: int) -> int:
        return 2 * i + 2
    
    def _sift_up(self, i: int) -> None:
        while i > 0 and self._heap[self._parent(i)] < self._heap[i]:
            self._heap[i], self._heap[self._parent(i)] = \
                self._heap[self._parent(i)], self._heap[i]
            i = self._parent(i)
    
    def _sift_down(self, i: int) -> None:
        n = len(self._heap)
        while True:
            largest = i
            l, r = self._left(i), self._right(i)
            
            if l < n and self._heap[l] > self._heap[largest]:
                largest = l
            if r < n and self._heap[r] > self._heap[largest]:
                largest = r
            
            if largest == i:
                break
            
            self._heap[i], self._heap[largest] = self._heap[largest], self._heap[i]
            i = largest
    
    def push(self, item: T) -> None:
        self._heap.append(item)
        self._sift_up(len(self._heap) - 1)
    
    def pop(self) -> T:
        if not self._heap:
            raise IndexError("pop from empty heap")
        
        max_item = self._heap[0]
        self._heap[0] = self._heap[-1]
        self._heap.pop()
        
        if self._heap:
            self._sift_down(0)
        
        return max_item
    
    def peek(self) -> T:
        if not self._heap:
            raise IndexError("peek from empty heap")
        return self._heap[0]
    
    @classmethod
    def from_list(cls, items: List[T]) -> 'MaxHeap[T]':
        heap = cls()
        heap._heap = items[:]
        for i in range(len(items) // 2 - 1, -1, -1):
            heap._sift_down(i)
        return heap
    
    def __len__(self) -> int:
        return len(self._heap)
    
    def __repr__(self) -> str:
        return f"MaxHeap({self._heap})"

8.5 广度优先搜索理论

8.5.1 BFS算法的形式化

定义 8.9(广度优先搜索) BFS从起点开始,按距离递增的顺序访问图中所有可达节点。

算法 8.1(BFS)

BFS(G, s):
    for each vertex u ∈ G.V - {s}:
        u.color = WHITE
        u.d = ∞
        u.π = NIL
    s.color = GRAY
    s.d = 0
    s.π = NIL
    Q = empty queue
    ENQUEUE(Q, s)
    while Q ≠ empty:
        u = DEQUEUE(Q)
        for each v ∈ G.Adj[u]:
            if v.color == WHITE:
                v.color = GRAY
                v.d = u.d + 1
                v.π = u
                ENQUEUE(Q, v)
        u.color = BLACK

定理 8.10(BFS正确性) BFS正确计算从起点 $s$ 到所有可达节点的最短路径距离。

证明:定义 $d(s, v)$ 为从 $s$ 到 $v$ 的最短路径距离。需要证明 BFS 结束时 $v.d = d(s, v)$。

引理1:入队顺序按距离递增。

设 $v_1, v_2$ 先后入队。若 $v_1$ 从 $u_1$ 发现,$v_2$ 从 $u_2$ 发现,则 $v_1.d = u_1.d + 1$,$v_2.d = u_2.d + 1$。由于 $u_1$ 先于 $u_2$ 出队,故 $u_1.d \leq u_2.d$,因此 $v_1.d \leq v_2.d$。

引理2:$v.d \geq d(s, v)$。

对 $v$ 被发现的时刻归纳。$s.d = 0 = d(s, s)$。若 $v$ 从 $u$ 发现,则 $v.d = u.d + 1 \geq d(s, u) + 1 \geq d(s, v)$。

引理3:$v.d \leq d(s, v)$。

设 $s \to \ldots \to u \to v$ 是最短路径。由归纳假设 $u.d = d(s, u)$。当 $u$ 出队时,若 $v$ 未被发现,则 $v.d = u.d + 1 = d(s, u) + 1 = d(s, v)$。

综上,$v.d = d(s, v)$。∎

定理 8.11(BFS复杂度) BFS的时间复杂度为 $O(V + E)$。

证明:每个顶点最多入队一次、出队一次,每次处理邻接表。总操作次数正比于顶点数加边数。∎

8.5.2 BFS实现

python
from collections import deque
from typing import Dict, List, Set, TypeVar, Optional, Tuple

T = TypeVar('T')

def bfs(graph: Dict[T, List[T]], start: T) -> List[T]:
    """
    广度优先搜索
    
    时间复杂度:O(V + E)
    空间复杂度:O(V)
    """
    visited: Set[T] = {start}
    result: List[T] = []
    queue: deque[T] = deque([start])
    
    while queue:
        node = queue.popleft()
        result.append(node)
        
        for neighbor in graph.get(node, []):
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)
    
    return result


def bfs_shortest_path(graph: Dict[T, List[T]], start: T, end: T) -> Optional[List[T]]:
    """
    BFS求最短路径
    
    时间复杂度:O(V + E)
    """
    if start == end:
        return [start]
    
    visited: Set[T] = {start}
    queue: deque[Tuple[T, List[T]]] = deque([(start, [start])])
    
    while queue:
        node, path = queue.popleft()
        
        for neighbor in graph.get(node, []):
            if neighbor == end:
                return path + [neighbor]
            
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append((neighbor, path + [neighbor]))
    
    return None


def bfs_distances(graph: Dict[T, List[T]], start: T) -> Dict[T, int]:
    """
    BFS计算最短距离
    
    返回:{节点: 距离}
    """
    distances: Dict[T, int] = {start: 0}
    queue: deque[T] = deque([start])
    
    while queue:
        node = queue.popleft()
        
        for neighbor in graph.get(node, []):
            if neighbor not in distances:
                distances[neighbor] = distances[node] + 1
                queue.append(neighbor)
    
    return distances


def bfs_level_order(graph: Dict[T, List[T]], start: T) -> List[List[T]]:
    """
    BFS按层遍历
    
    返回:[[第0层节点], [第1层节点], ...]
    """
    if start not in graph:
        return []
    
    result: List[List[T]] = []
    visited: Set[T] = {start}
    current_level: List[T] = [start]
    
    while current_level:
        result.append(current_level)
        next_level: List[T] = []
        
        for node in current_level:
            for neighbor in graph.get(node, []):
                if neighbor not in visited:
                    visited.add(neighbor)
                    next_level.append(neighbor)
        
        current_level = next_level
    
    return result

8.5.3 BFS应用

python
from typing import List, Tuple

def maze_shortest_path(maze: List[List[str]], start: Tuple[int, int], end: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]:
    """
    迷宫最短路径
    
    maze: '#' 表示墙,'.' 表示通路
    """
    rows, cols = len(maze), len(maze[0])
    directions = [(0, 1), (0, -1), (1, 0), (-1, 0)]
    
    visited = {start}
    queue = deque([(start, [start])])
    
    while queue:
        (r, c), path = queue.popleft()
        
        if (r, c) == end:
            return path
        
        for dr, dc in directions:
            nr, nc = r + dr, c + dc
            
            if (0 <= nr < rows and 0 <= nc < cols and
                maze[nr][nc] != '#' and (nr, nc) not in visited):
                visited.add((nr, nc))
                queue.append(((nr, nc), path + [(nr, nc)]))
    
    return None


def word_ladder(begin: str, end: str, word_list: List[str]) -> int:
    """
    单词接龙最短长度
    
    每次只能改变一个字母
    """
    if end not in word_list:
        return 0
    
    word_set = set(word_list)
    queue = deque([(begin, 1)])
    visited = {begin}
    
    while queue:
        word, length = queue.popleft()
        
        for i in range(len(word)):
            for c in 'abcdefghijklmnopqrstuvwxyz':
                new_word = word[:i] + c + word[i+1:]
                
                if new_word == end:
                    return length + 1
                
                if new_word in word_set and new_word not in visited:
                    visited.add(new_word)
                    queue.append((new_word, length + 1))
    
    return 0


def islands_count(grid: List[List[str]]) -> int:
    """
    岛屿数量
    
    '1' 表示陆地,'0' 表示水
    """
    if not grid:
        return 0
    
    rows, cols = len(grid), len(grid[0])
    visited = set()
    count = 0
    
    for r in range(rows):
        for c in range(cols):
            if grid[r][c] == '1' and (r, c) not in visited:
                count += 1
                queue = deque([(r, c)])
                visited.add((r, c))
                
                while queue:
                    cr, cc = queue.popleft()
                    
                    for dr, dc in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
                        nr, nc = cr + dr, cc + dc
                        
                        if (0 <= nr < rows and 0 <= nc < cols and
                            grid[nr][nc] == '1' and (nr, nc) not in visited):
                            visited.add((nr, nc))
                            queue.append((nr, nc))
    
    return count

8.6 本章小结

8.6.1 核心定理总结

定理内容应用
定理 8.1队列ADT一致性形式化验证
定理 8.4栈实现队列数据结构转换
定理 8.9buildHeap复杂度堆构造优化
定理 8.10BFS正确性最短路径证明
定理 8.11BFS复杂度算法分析

8.6.2 时间复杂度汇总

数据结构入队出队查看队首
列表队列$O(1)$$O(n)$$O(1)$
deque$O(1)$$O(1)$$O(1)$
循环队列$O(1)$$O(1)$$O(1)$
优先队列$O(\log n)$$O(\log n)$$O(1)$

8.6.3 应用场景

应用数据结构算法
BFS队列图遍历
任务调度队列/优先队列FIFO/优先级
滑动窗口双端队列单调队列
缓冲区循环队列环形存储
最短路径队列BFS

8.7 习题

理论题

  1. 证明:使用两个栈实现队列,每个操作的均摊时间为 $O(1)$。

  2. 证明:buildHeap的时间复杂度为 $O(n)$。

  3. 证明:BFS正确计算最短路径距离。

  4. 分析循环队列与线性队列的空间利用率。

设计题

  1. 设计一个支持 $O(1)$ 获取最大值的双端队列。

  2. 实现一个支持并发访问的线程安全队列。

实现题

  1. 实现一个支持动态优先级更新的优先队列。

  2. 实现一个多级反馈队列调度器。


8.8 参考文献

  1. Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2022). Introduction to Algorithms (4th ed.). MIT Press. Chapter 22: Elementary Graph Algorithms.

  2. Knuth, D. E. (1997). The Art of Computer Programming, Volume 1: Fundamental Algorithms (3rd ed.). Addison-Wesley. Section 2.2.2: Queues.

  3. Floyd, R. W. (1964). Algorithm 245: Treesort 3. Communications of the ACM, 7(12), 701.

  4. Williams, J. W. J. (1964). Algorithm 232: Heapsort. Communications of the ACM, 7(6), 347-348.

  5. CPython Source Code. (2024). Modules/_collectionsmodule.c. https://github.com/python/cpython/blob/main/Modules/_collectionsmodule.c


下一章:第9章 链表与指针操作理论

Python技术丛书 - 江苏省宿城中等专业学校