Skip to content

第13章 堆与优先队列理论

本章导读

堆是一种高效的优先队列实现,其核心思想是利用完全二叉树的数组表示实现 $O(\log n)$ 的插入和删除操作。本章从形式化角度深入分析堆的数学基础,包括堆性质的数学定义、堆操作的正确性证明、堆排序的复杂度分析,以及堆在算法设计中的应用。

学习目标

完成本章学习后,读者将能够:

  • 掌握堆性质的形式化定义与不变式
  • 理解堆操作的正确性证明与复杂度分析
  • 掌握buildHeap的线性时间证明
  • 理解堆排序的正确性与稳定性分析
  • 掌握堆在Top-K、合并有序序列等问题中的应用

13.1 堆的形式化定义

13.1.1 堆性质的数学定义

定义 13.1(二叉堆) 二叉堆是一个数组 $A[1..n]$,可以被看作一棵完全二叉树,满足堆性质。

定义 13.2(最大堆性质) 对于最大堆,每个节点 $i$(除根外)满足:

$$A[parent(i)] \geq A[i]$$

即父节点的值不小于子节点的值。

定义 13.3(最小堆性质) 对于最小堆,每个节点 $i$(除根外)满足:

$$A[parent(i)] \leq A[i]$$

即父节点的值不大于子节点的值。

定义 13.4(堆不变式) 堆不变式定义为:

$$Invariant_{max}(A) = \forall i \in [2, n]: A[parent(i)] \geq A[i]$$

$$Invariant_{min}(A) = \forall i \in [2, n]: A[parent(i)] \leq A[i]$$

定理 13.1(堆的根性质) 在最大堆中,根节点存储最大值;在最小堆中,根节点存储最小值。

证明:由堆性质,根节点 $\geq$(或 $\leq$)所有子节点。递归地,子节点 $\geq$(或 $\leq$)其后代。因此根节点 $\geq$(或 $\leq$)所有节点。∎

13.1.2 堆的数组表示

定义 13.5(堆索引计算) 对于数组下标从 $0$ 开始的堆:

  • $parent(i) = \lfloor(i-1)/2\rfloor$
  • $left(i) = 2i + 1$
  • $right(i) = 2i + 2$

定理 13.2(堆的高度) $n$ 个元素的堆的高度为 $h = \lfloor \log_2 n \rfloor$。

证明:堆是完全二叉树。高度为 $h$ 的完全二叉树节点数 $n$ 满足:

$$2^h \leq n < 2^{h+1}$$

因此 $h \leq \log_2 n < h + 1$,即 $h = \lfloor \log_2 n \rfloor$。∎

定理 13.3(叶子节点索引) 在 $n$ 个元素的堆中,叶子节点的索引范围为 $\lfloor n/2 \rfloor, \ldots, n-1$。

证明:节点 $i$ 是叶子节点当且仅当 $left(i) \geq n$,即 $2i + 1 \geq n$,即 $i \geq \lceil n/2 \rceil - 1$。

由于索引从 $0$ 开始,叶子节点索引从 $\lfloor n/2 \rfloor$ 开始。∎

13.1.3 堆ADT规范

ADT 13.1(MaxHeap)

ADT MaxHeap[T: Ordered]
  Types: MaxHeap[T]
  
  Operations:
    empty: → MaxHeap[T]
    insert: MaxHeap[T] × T → MaxHeap[T]
    extractMax: MaxHeap[T] → T × MaxHeap[T]
    findMax: MaxHeap[T] → T
    size: MaxHeap[T] → ℕ
    isEmpty: MaxHeap[T] → Boolean
  
  Axioms:
    ∀h: MaxHeap[T], ∀x: T
    (A1) isEmpty(empty) = true
    (A2) isEmpty(insert(h, x)) = false
    (A3) findMax(insert(empty, x)) = x
    (A4) findMax(insert(h, x)) = max(x, findMax(h))
    (A5) size(empty) = 0
    (A6) size(insert(h, x)) = size(h) + 1
    (A7) extractMax(insert(empty, x)) = (x, empty)
    (A8) 堆不变式在所有操作后保持

13.2 堆操作与正确性证明

13.2.1 下沉操作(Heapify)

算法 13.1(MAX-HEAPIFY)

MAX-HEAPIFY(A, i)
    l = LEFT(i)
    r = RIGHT(i)
    largest = i
    if l ≤ A.heap-size and A[l] > A[i]
        largest = l
    if r ≤ A.heap-size and A[r] > A[largest]
        largest = r
    if largest ≠ i
        exchange A[i] with A[largest]
        MAX-HEAPIFY(A, largest)

定理 13.4(Heapify正确性) 若节点 $i$ 的左右子树都是最大堆,则MAX-HEAPIFY(A, i)执行后,以 $i$ 为根的子树是最大堆。

证明:算法比较 $A[i]$ 与其子节点的值,将最大值交换到根位置。

  • 若 $A[i]$ 已经是最大,无需交换,子树保持最大堆性质
  • 若某个子节点更大,交换后 $A[i]$ 变为最大值,被交换的子节点可能违反堆性质,递归修复

由于每次递归树的高度减少1,且子树原本是最大堆,递归最终终止,整个子树恢复最大堆性质。∎

定理 13.5(Heapify复杂度) MAX-HEAPIFY的时间复杂度为 $O(\log n)$,更精确地为 $O(h)$,其中 $h$ 是节点 $i$ 的高度。

证明:每次递归调用将问题规模从高度 $h$ 减少到 $h-1$。最多递归 $h$ 次,每次常数时间。故总时间 $O(h)$。∎

13.2.2 建堆操作

算法 13.2(BUILD-MAX-HEAP)

BUILD-MAX-HEAP(A)
    A.heap-size = A.length
    for i = ⌊A.length/2⌋ downto 1
        MAX-HEAPIFY(A, i)

定理 13.6(BUILD-MAX-HEAP复杂度) BUILD-MAX-HEAP的时间复杂度为 $O(n)$。

证明:分析每个节点调用Heapify的代价。

高度为 $h$ 的节点最多有 $\lceil n/2^{h+1} \rceil$ 个,每个代价 $O(h)$。

总代价:

$$T(n) = \sum_{h=0}^{\lfloor \log n \rfloor} \lceil \frac{n}{2^{h+1}} \rceil \cdot O(h)$$

$$= O\left(n \sum_{h=0}^{\lfloor \log n \rfloor} \frac{h}{2^h}\right)$$

由于 $\sum_{h=0}^{\infty} \frac{h}{2^h} = 2$,故 $T(n) = O(n)$。∎

13.2.3 插入操作

算法 13.3(HEAP-INSERT)

HEAP-INSERT(A, key)
    A.heap-size = A.heap-size + 1
    A[A.heap-size] = -∞
    HEAP-INCREASE-KEY(A, A.heap-size, key)

HEAP-INCREASE-KEY(A, i, key)
    if key < A[i]
        error "new key is smaller than current key"
    A[i] = key
    while i > 1 and A[PARENT(i)] < A[i]
        exchange A[i] with A[PARENT(i)]
        i = PARENT(i)

定理 13.7(插入正确性) HEAP-INSERT将新元素插入正确位置,保持最大堆性质。

证明:新元素先放在叶子位置,然后上浮到正确位置。

上浮过程中,每次交换将较大元素上移。由于路径上的父节点原本 $\geq$ 其子树,交换后父节点仍然 $\geq$ 另一子树。

最终,新元素到达满足堆性质的位置。∎

定理 13.8(插入复杂度) HEAP-INSERT的时间复杂度为 $O(\log n)$。

证明:上浮操作最多经过从叶子到根的路径,长度为 $O(\log n)$。∎

13.2.4 堆实现

python
from typing import TypeVar, Generic, List, Optional, Callable

T = TypeVar('T')

class BinaryHeap(Generic[T]):
    """
    二叉堆实现
    
    时间复杂度:
    - insert: O(log n)
    - extract: O(log n)
    - peek: O(1)
    - build_heap: O(n)
    
    空间复杂度:O(n)
    """
    
    def __init__(self, comparator: Callable[[T, T], bool] = lambda a, b: a < b):
        """
        Args:
            comparator: 比较函数,a < b 返回True表示最小堆
        """
        self._heap: List[T] = []
        self._compare = comparator
    
    def _parent(self, i: int) -> int:
        return (i - 1) // 2
    
    def _left(self, i: int) -> int:
        return 2 * i + 1
    
    def _right(self, i: int) -> int:
        return 2 * i + 2
    
    def _sift_up(self, i: int) -> None:
        """上浮操作"""
        while i > 0:
            parent = self._parent(i)
            if self._compare(self._heap[parent], self._heap[i]):
                break
            self._heap[i], self._heap[parent] = self._heap[parent], self._heap[i]
            i = parent
    
    def _sift_down(self, i: int) -> None:
        """下沉操作"""
        n = len(self._heap)
        
        while True:
            extreme = i
            left = self._left(i)
            right = self._right(i)
            
            if left < n and self._compare(self._heap[left], self._heap[extreme]):
                extreme = left
            
            if right < n and self._compare(self._heap[right], self._heap[extreme]):
                extreme = right
            
            if extreme == i:
                break
            
            self._heap[i], self._heap[extreme] = self._heap[extreme], self._heap[i]
            i = extreme
    
    def push(self, value: T) -> None:
        """插入元素 - O(log n)"""
        self._heap.append(value)
        self._sift_up(len(self._heap) - 1)
    
    def pop(self) -> T:
        """弹出堆顶元素 - O(log n)"""
        if not self._heap:
            raise IndexError("pop from empty heap")
        
        result = self._heap[0]
        last = self._heap.pop()
        
        if self._heap:
            self._heap[0] = last
            self._sift_down(0)
        
        return result
    
    def peek(self) -> Optional[T]:
        """查看堆顶元素 - O(1)"""
        return self._heap[0] if self._heap else None
    
    def replace(self, value: T) -> Optional[T]:
        """替换堆顶元素并返回原堆顶 - O(log n)"""
        if not self._heap:
            self.push(value)
            return None
        
        result = self._heap[0]
        self._heap[0] = value
        self._sift_down(0)
        return result
    
    def heapify(self, items: List[T]) -> None:
        """从列表建堆 - O(n)"""
        self._heap = items[:]
        n = len(self._heap)
        
        for i in range(n // 2 - 1, -1, -1):
            self._sift_down(i)
    
    def is_valid_heap(self) -> bool:
        """验证堆性质"""
        n = len(self._heap)
        for i in range(n):
            left = self._left(i)
            right = self._right(i)
            
            if left < n and not self._compare(self._heap[i], self._heap[left]):
                return False
            if right < n and not self._compare(self._heap[i], self._heap[right]):
                return False
        
        return True
    
    def __len__(self) -> int:
        return len(self._heap)
    
    def __bool__(self) -> bool:
        return bool(self._heap)
    
    def __repr__(self) -> str:
        return f"BinaryHeap({self._heap})"


class MinHeap(BinaryHeap[T]):
    """最小堆"""
    
    def __init__(self):
        super().__init__(comparator=lambda a, b: a < b)


class MaxHeap(BinaryHeap[T]):
    """最大堆"""
    
    def __init__(self):
        super().__init__(comparator=lambda a, b: a > b)

13.3 堆排序理论

13.3.1 堆排序算法

算法 13.4(HEAPSORT)

HEAPSORT(A)
    BUILD-MAX-HEAP(A)
    for i = A.length downto 2
        exchange A[1] with A[i]
        A.heap-size = A.heap-size - 1
        MAX-HEAPIFY(A, 1)

定理 13.9(堆排序正确性) HEAPSORT输出有序序列。

证明:初始时,BUILD-MAX-HEAP使整个数组满足最大堆性质。

每次迭代:

  1. 将最大元素(根)与最后一个元素交换
  2. 缩小堆大小
  3. 恢复堆性质

循环不变式:每次迭代开始时,$A[i+1..n]$ 包含已排序的最大 $n-i$ 个元素,$A[1..i]$ 是最大堆。

由循环不变式,结束时 $A[1..n]$ 有序。∎

定理 13.10(堆排序复杂度) HEAPSORT的时间复杂度为 $O(n \log n)$。

证明:BUILD-MAX-HEAP为 $O(n)$。$n-1$ 次迭代,每次MAX-HEAPIFY为 $O(\log n)$。

总时间:$O(n) + (n-1) \cdot O(\log n) = O(n \log n)$。∎

13.3.2 堆排序实现

python
def heap_sort(arr: List[T]) -> List[T]:
    """
    堆排序
    
    时间复杂度:O(n log n)
    空间复杂度:O(1) 原地
    
    特点:
    - 不稳定排序
    - 原地排序
    - 最坏情况也是 O(n log n)
    """
    def sift_down(heap: List[T], i: int, n: int) -> None:
        largest = i
        left = 2 * i + 1
        right = 2 * i + 2
        
        if left < n and heap[left] > heap[largest]:
            largest = left
        if right < n and heap[right] > heap[largest]:
            largest = right
        
        if largest != i:
            heap[i], heap[largest] = heap[largest], heap[i]
            sift_down(heap, largest, n)
    
    n = len(arr)
    
    for i in range(n // 2 - 1, -1, -1):
        sift_down(arr, i, n)
    
    for i in range(n - 1, 0, -1):
        arr[0], arr[i] = arr[i], arr[0]
        sift_down(arr, 0, i)
    
    return arr

13.3.3 堆排序与快速排序比较

定理 13.11(堆排序vs快速排序)

特性堆排序快速排序
最坏时间$O(n \log n)$$O(n^2)$
平均时间$O(n \log n)$$O(n \log n)$
空间$O(1)$$O(\log n)$
稳定性不稳定不稳定
缓存友好

证明:堆排序的访问模式是跳跃的(父子节点索引相差较大),导致缓存命中率低。快速排序的分区操作顺序访问数组,缓存友好。∎


13.4 优先队列应用

13.4.1 Top-K问题

问题 13.1 找出数组中最大的 $k$ 个元素。

定理 13.12(Top-K最优算法) 使用大小为 $k$ 的最小堆可以在 $O(n \log k)$ 时间内解决Top-K问题。

证明:维护一个大小为 $k$ 的最小堆。

  • 遍历数组,若元素大于堆顶,替换堆顶
  • 每次操作 $O(\log k)$,共 $n$ 次

总时间 $O(n \log k)$,当 $k \ll n$ 时优于排序的 $O(n \log n)$。∎

python
import heapq
from typing import List, Tuple

def top_k_largest(nums: List[int], k: int) -> List[int]:
    """
    找出最大的k个元素
    
    时间复杂度:O(n log k)
    空间复杂度:O(k)
    """
    if k <= 0:
        return []
    if k >= len(nums):
        return sorted(nums, reverse=True)
    
    min_heap = nums[:k]
    heapq.heapify(min_heap)
    
    for num in nums[k:]:
        if num > min_heap[0]:
            heapq.heapreplace(min_heap, num)
    
    return sorted(min_heap, reverse=True)


def top_k_smallest(nums: List[int], k: int) -> List[int]:
    """找出最小的k个元素"""
    if k <= 0:
        return []
    if k >= len(nums):
        return sorted(nums)
    
    max_heap = [-x for x in nums[:k]]
    heapq.heapify(max_heap)
    
    for num in nums[k:]:
        if num < -max_heap[0]:
            heapq.heapreplace(max_heap, -num)
    
    return sorted([-x for x in max_heap])


def top_k_frequent(nums: List[int], k: int) -> List[int]:
    """
    前K个高频元素
    
    时间复杂度:O(n log k)
    """
    from collections import Counter
    counter = Counter(nums)
    
    return [item for item, _ in counter.most_common(k)]

13.4.2 合并有序序列

问题 13.2 合并 $k$ 个有序链表/数组。

定理 13.13(合并k路有序序列) 使用大小为 $k$ 的最小堆可以在 $O(n \log k)$ 时间内合并 $k$ 个有序序列,其中 $n$ 为总元素数。

python
def merge_k_sorted_lists(lists: List[List[int]]) -> List[int]:
    """
    合并k个有序列表
    
    时间复杂度:O(n log k)
    空间复杂度:O(k)
    """
    import heapq
    
    min_heap = []
    result = []
    
    for i, lst in enumerate(lists):
        if lst:
            heapq.heappush(min_heap, (lst[0], i, 0))
    
    while min_heap:
        val, list_idx, elem_idx = heapq.heappop(min_heap)
        result.append(val)
        
        if elem_idx + 1 < len(lists[list_idx]):
            next_val = lists[list_idx][elem_idx + 1]
            heapq.heappush(min_heap, (next_val, list_idx, elem_idx + 1))
    
    return result

13.4.3 中位数维护

问题 13.3 动态维护数据流的中位数。

定理 13.14(双堆中位数算法) 使用一个最大堆和一个最小堆可以在 $O(\log n)$ 时间内维护中位数。

python
class MedianFinder:
    """
    中位数查找器
    
    使用两个堆:
    - max_heap: 存储较小的一半
    - min_heap: 存储较大的一半
    
    时间复杂度:
    - addNum: O(log n)
    - findMedian: O(1)
    """
    
    def __init__(self):
        self._max_heap = []
        self._min_heap = []
    
    def addNum(self, num: int) -> None:
        import heapq
        
        heapq.heappush(self._max_heap, -num)
        heapq.heappush(self._min_heap, -heapq.heappop(self._max_heap))
        
        if len(self._min_heap) > len(self._max_heap):
            heapq.heappush(self._max_heap, -heapq.heappop(self._min_heap))
    
    def findMedian(self) -> float:
        if len(self._max_heap) > len(self._min_heap):
            return -self._max_heap[0]
        return (-self._max_heap[0] + self._min_heap[0]) / 2

13.5 高级堆结构

13.5.1 二项堆

定义 13.6(二项树) 二项树 $B_k$ 递归定义为:

  • $B_0$ 是单节点
  • $B_k$ 由两棵 $B_{k-1}$ 组成,其中一棵的根是另一棵的根的左子节点

定义 13.7(二项堆) 二项堆是二项树的集合,满足:

  • 每棵二项树满足最小堆性质
  • 任意两棵二项树度数不同

定理 13.15(二项堆操作复杂度)

操作时间复杂度
insert$O(\log n)$ 最坏,$O(1)$ 均摊
findMin$O(\log n)$
extractMin$O(\log n)$
union$O(\log n)$

13.5.2 斐波那契堆

定义 13.8(斐波那契堆) 斐波那契堆是堆有序树的集合,支持懒惰合并。

定理 13.16(斐波那契堆均摊复杂度)

操作均摊时间
insert$O(1)$
findMin$O(1)$
decreaseKey$O(1)$
extractMin$O(\log n)$

证明:使用势能方法。定义势能函数 $\Phi(H) = t(H) + 2m(H)$,其中 $t(H)$ 是树的数量,$m(H)$ 是标记节点的数量。

分析各操作的势能变化,可得上述均摊复杂度。∎

13.5.3 左偏堆

python
class LeftistNode(Generic[T]):
    """左偏堆节点"""
    __slots__ = ['value', 'left', 'right', 'npl']
    
    def __init__(self, value: T):
        self.value = value
        self.left: Optional['LeftistNode[T]'] = None
        self.right: Optional['LeftistNode[T]'] = None
        self.npl: int = 0


class LeftistHeap(Generic[T]):
    """
    左偏堆
    
    特点:
    - 支持高效合并 O(log n)
    - 左偏性质:左子树的npl >= 右子树的npl
    - npl (null path length): 到最近空节点的距离
    
    时间复杂度:
    - merge: O(log n)
    - insert: O(log n)
    - extractMin: O(log n)
    """
    
    def __init__(self):
        self._root: Optional[LeftistNode[T]] = None
        self._size: int = 0
    
    @staticmethod
    def _npl(node: Optional[LeftistNode[T]]) -> int:
        return node.npl if node else -1
    
    def _merge(self, h1: Optional[LeftistNode[T]], h2: Optional[LeftistNode[T]]) -> Optional[LeftistNode[T]]:
        if not h1:
            return h2
        if not h2:
            return h1
        
        if h1.value > h2.value:
            h1, h2 = h2, h1
        
        h1.right = self._merge(h1.right, h2)
        
        if self._npl(h1.left) < self._npl(h1.right):
            h1.left, h1.right = h1.right, h1.left
        
        h1.npl = self._npl(h1.right) + 1
        return h1
    
    def push(self, value: T) -> None:
        new_node = LeftistNode(value)
        self._root = self._merge(self._root, new_node)
        self._size += 1
    
    def pop(self) -> T:
        if not self._root:
            raise IndexError("pop from empty heap")
        
        result = self._root.value
        self._root = self._merge(self._root.left, self._root.right)
        self._size -= 1
        return result
    
    def peek(self) -> Optional[T]:
        return self._root.value if self._root else None
    
    def merge(self, other: 'LeftistHeap[T]') -> None:
        """合并另一个堆"""
        self._root = self._merge(self._root, other._root)
        self._size += other._size
        other._root = None
        other._size = 0
    
    def __len__(self) -> int:
        return self._size

13.6 本章小结

13.6.1 核心定理总结

定理内容应用
定理 13.1堆的根性质优先队列
定理 13.6buildHeap线性时间堆构造
定理 13.9堆排序正确性排序算法
定理 13.12Top-K最优算法选择问题
定理 13.16斐波那契堆均摊复杂度高级应用

13.6.2 堆类型对比

堆类型insertextractMinmergedecreaseKey
二叉堆$O(\log n)$$O(\log n)$$O(n)$$O(\log n)$
二项堆$O(\log n)$/$O(1)$*$O(\log n)$$O(\log n)$$O(\log n)$
斐波那契堆$O(1)$*$O(\log n)$*$O(1)$*$O(1)$*
左偏堆$O(\log n)$$O(\log n)$$O(\log n)$-

*均摊复杂度

13.6.3 应用场景

场景推荐堆类型原因
简单优先队列二叉堆实现简单,常数因子小
频繁合并二项堆/左偏堆高效合并
Dijkstra算法斐波那契堆decreaseKey高效
Top-K问题大小为K的堆空间高效

13.7 习题

理论题

  1. 证明:BUILD-MAX-HEAP的时间复杂度为 $O(n)$。

  2. 证明:堆排序是不稳定排序。

  3. 分析二叉堆insert操作的最好、最坏、平均情况。

  4. 证明:使用大小为 $k$ 的堆解决Top-K问题的时间复杂度为 $O(n \log k)$。

设计题

  1. 设计一个支持 $O(1)$ 时间获取中位数的堆结构。

  2. 实现一个支持持久化的堆(修改不破坏原堆)。

实现题

  1. 实现一个支持删除任意元素的优先队列。

  2. 实现一个双端优先队列(同时支持获取最大和最小)。


13.8 参考文献

  1. Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2022). Introduction to Algorithms (4th ed.). MIT Press. Chapter 6: Heapsort.

  2. Floyd, R. W. (1964). Algorithm 245: Treesort 3. Communications of the ACM, 7(12), 701.

  3. Williams, J. W. J. (1964). Algorithm 232: Heapsort. Communications of the ACM, 7(6), 347-348.

  4. Fredman, M. L., & Tarjan, R. E. (1987). Fibonacci heaps and their uses in improved network optimization algorithms. Journal of the ACM, 34(3), 596-615.

  5. Vuillemin, J. (1978). A data structure for manipulating priority queues. Communications of the ACM, 21(4), 309-315.

  6. Crane, C. A. (1972). Linear lists and priority queues as balanced binary trees. PhD thesis, Stanford University.


下一章:第14章 平衡树理论

Python技术丛书 - 江苏省宿城中等专业学校