mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2973 字
8 分钟
合并迭代器(Merge Iterator)
2026-06-13

一、为什么需要合并迭代器#

假设你在写一个 LSM Tree 存储引擎的读取路径。一条数据可能散落在 Memtable、Immutable Memtable 和多层 SSTable 中,每个来源内部已经按 key 有序排列,但你需要把它们合并成一个全局有序的结果流返回给用户。最直觉的做法是把所有数据读到内存,拼成一个大数组再排序——这在数据量小的时候没问题,但一旦总数据量达到 GB 级别,全部加载再排序的代价就不可接受了:时间 O(n log n),空间 O(n),n 是所有来源的元素总数。

再看另一个场景:分布式日志系统有 10 个节点,每个节点的日志按时间戳有序。你需要把所有日志合并成一条全局时间线。全量排序同样不现实——10 个节点各 1 GB,总共 10 GB 数据全部加载再排序,光是内存就扛不住,更不用说排序本身的开销。

但仔细想想,每个来源本身就是有序的,我们并不需要”重新排序”,只需要从 K 个有序流中依次取出最小的元素。这就像归并排序的合并阶段,但扩展到了 K 路。用大小为 K 的最小堆维护每个流的当前元素,每次弹出堆顶(全局最小值),再从该流补充下一个元素,就能以 O(n log K) 的时间和 O(K) 的空间完成合并——当 K 远小于 n 时,比”全量排序”快得多,内存开销更是天差地别。关键洞察是:有序性是免费的,不要浪费它

二、现实类比#

期末考试结束,三个教室的监考老师各自把试卷按学号排好了序。教务处需要一份全年级按学号排列的成绩单。你面前摆着三叠已排序的试卷,每次从三叠最上面各看一眼,挑出学号最小的那一份放到结果堆,再从被抽走的那叠补充下一份。始终只需要比较三个数字,不需要把所有试卷摊开重排——这就是 K 路合并的直觉。

注意一个细节:你不需要同时看三叠试卷的全部内容,只需要看每叠最上面的一份。这意味着你的”工作台”只需要放三份试卷的空间,而不是三百份。这正是合并迭代器空间复杂度 O(K) 的现实映射——工作台大小只跟叠数有关,跟每叠多厚无关。

三、核心思想#

合并迭代器维护一个大小为 K 的最小堆,堆中每个元素记录两个信息:当前值和它来自哪个流。每次 Next() 操作弹出堆顶(全局最小值),然后从该流取出下一个元素推入堆中,直到所有流耗尽。

flowchart LR S1["流 1: 1, 4, 7"] --> H["最小堆\n(容量 K)"] S2["流 2: 2, 5, 8"] --> H S3["流 3: 3, 6, 9"] --> H H --> O["输出: 1, 2, 3, 4, ..."]

3.1 执行流程#

以三个有序流 [1, 4, 7][2, 5, 8][3, 6, 9] 为例:

  1. 初始化:堆中放入各流首元素,堆为 [1, 2, 3]
  2. 弹出 1(来自流 1),推入流 1 的下一个元素 4,堆变为 [2, 3, 4]
  3. 弹出 2(来自流 2),推入流 2 的下一个元素 5,堆变为 [3, 4, 5]
  4. 弹出 3(来自流 3),推入流 3 的下一个元素 6,堆变为 [4, 5, 6]
  5. 弹出 4(来自流 1),推入流 1 的下一个元素 7,堆变为 [5, 6, 7]
  6. 弹出 5(来自流 2),推入流 2 的下一个元素 8,堆变为 [6, 7, 8]
  7. 弹出 6(来自流 3),推入流 3 的下一个元素 9,堆变为 [7, 8, 9]
  8. 弹出 7(来自流 1),流 1 耗尽,堆变为 [8, 9]
  9. 弹出 8(来自流 2),流 2 耗尽,堆变为 [9]
  10. 弹出 9(来自流 3),流 3 耗尽,堆为空,合并结束

最终输出:[1, 2, 3, 4, 5, 6, 7, 8, 9]。全程堆大小不超过 3,总共 9 次弹出 + 9 次推入,每次堆操作 O(log 3)。

3.2 复杂度#

指标复杂度说明
时间O(n log K)n 为元素总数,K 为流数,每次堆操作 O(log K)
空间O(K)堆中最多 K 个元素
初始化O(K)各流首元素入堆
单步推进O(log K)弹出 + 推入各一次
Note

LevelDB 的 MergingIterator 实际上没有用堆,而是用线性扫描遍历所有子迭代器找最小值。原因是 LevelDB 的子迭代器数量通常不超过 10 个(1 个 Memtable + 1 个 Immutable Memtable + 少量 SSTable),线性扫描 O(K) 比堆操作 O(log K) 的常数因子更小,缓存更友好。源码注释也写得很直白:“We might want to use a heap in case there are lots of children. For now we use a simple array since we expect a very small number of children.”

四、变体与对比#

4.1 K 路合并 vs 拼接排序 vs 两两合并#

方案时间复杂度空间复杂度适用场景
K 路合并(最小堆)O(n log K)O(K)K 路有序流实时合并
拼接 + 全量排序O(n log n)O(n)流无序,或 K 接近 n
两两合并(归并)O(n log K)O(1) 额外K 较小,不需要同时持有所有流

4.2 去重合并#

LSM Tree 的同一 key 可能存在于 Memtable 和多个 SSTable 中。合并时遇到相同 key,需要保留最新版本(按时间戳或序列号判断),丢弃旧版本。这要求堆元素额外携带版本信息,弹出时检查与上一个输出是否 key 相同——相同则跳过,不同则输出。

具体实现中,entry 结构需要增加 seq 字段(序列号)。弹出堆顶后,比较其 key 与上次输出的 key:如果相同,说明这是旧版本,直接丢弃并继续弹出下一个;如果不同,输出并记录。这个去重逻辑让合并迭代器从”多路合并”升级为”多路合并 + 最新版本过滤”,是 LSM Tree 读取语义的核心。

4.3 慢流问题#

如果某个流的数据产生速度远慢于其他流(比如分布式日志聚合中一个节点网络延迟),它会拖慢整个合并进度。常见解法包括:设置超时跳过、基于水位线(Watermark)的窗口化缓冲、以及将慢流降级为异步回填。核心思路都是不让一个慢流阻塞全局推进

这个问题在流式系统中尤为突出。静态数据合并时,所有数据都已就绪,不存在”等”的问题。但在实时流合并中,一个慢流意味着堆中始终缺一个来源的候选,输出不得不暂停等待。水位线方案的做法是:设定一个时间窗口 T,超过 T 未到达的数据视为迟到,合并时跳过。这牺牲了完整性换取了实时性,是流处理中常见的权衡。

五、多语言实现#

5.1 Go 实现#

package mergeiter
import "container/heap"
// 堆元素:当前值 + 来源流编号
type entry struct {
value int
index int // 属于哪个流
}
// 最小堆
type minHeap []entry
func (h minHeap) Len() int { return len(h) }
func (h minHeap) Less(i, j int) bool { return h[i].value < h[j].value }
func (h minHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *minHeap) Push(x any) { *h = append(*h, x.(entry)) }
func (h *minHeap) Pop() any {
old := *h
n := len(old)
v := old[n-1]
*h = old[:n-1]
return v
}
// 合并迭代器
type MergeIterator struct {
heap minHeap
streams [][]int // K 路有序流
cursors []int // 每个流的当前位置
}
func New(streams [][]int) *MergeIterator {
it := &MergeIterator{
streams: streams,
cursors: make([]int, len(streams)),
heap: make(minHeap, 0, len(streams)),
}
// 各流首元素入堆
for i, s := range streams {
if len(s) > 0 {
heap.Push(&it.heap, entry{value: s[0], index: i})
it.cursors[i] = 1
}
}
heap.Init(&it.heap)
return it
}
// 推进到下一个元素,返回是否还有数据
func (it *MergeIterator) Next() (int, bool) {
if it.heap.Len() == 0 {
return 0, false
}
// 弹出全局最小值
top := heap.Pop(&it.heap).(entry)
// 从该流补充下一个元素
if it.cursors[top.index] < len(it.streams[top.index]) {
heap.Push(&it.heap, entry{
value: it.streams[top.index][it.cursors[top.index]],
index: top.index,
})
it.cursors[top.index]++
}
return top.value, true
}

Go 标准库的 container/heap 接口需要实现五个方法,写起来略繁琐但语义清晰。核心逻辑只有 Next():弹出堆顶,从对应流补充一个元素。堆的大小始终不超过 K,每步操作 O(log K)。

5.2 TypeScript 实现#

// 最小堆元素
interface Entry {
value: number;
streamIdx: number;
}
// 最小堆
class MinHeap {
private data: Entry[] = [];
get size(): number {
return this.data.length;
}
push(e: Entry): void {
this.data.push(e);
this.bubbleUp(this.data.length - 1);
}
pop(): Entry | undefined {
if (this.data.length === 0) return undefined;
const top = this.data[0];
const last = this.data.pop()!;
if (this.data.length > 0) {
this.data[0] = last;
this.sinkDown(0);
}
return top;
}
private bubbleUp(i: number): void {
while (i > 0) {
const parent = (i - 1) >> 1;
if (this.data[i].value >= this.data[parent].value) break;
[this.data[i], this.data[parent]] = [this.data[parent], this.data[i]];
i = parent;
}
}
private sinkDown(i: number): void {
const n = this.data.length;
while (true) {
let smallest = i;
const left = 2 * i + 1, right = 2 * i + 2;
if (left < n && this.data[left].value < this.data[smallest].value) smallest = left;
if (right < n && this.data[right].value < this.data[smallest].value) smallest = right;
if (smallest === i) break;
[this.data[i], this.data[smallest]] = [this.data[smallest], this.data[i]];
i = smallest;
}
}
}
// 合并迭代器
class MergeIterator {
private heap: MinHeap = new MinHeap();
private cursors: number[];
constructor(private streams: number[][]) {
this.cursors = new Array(streams.length).fill(0);
// 各流首元素入堆
for (let i = 0; i < streams.length; i++) {
if (streams[i].length > 0) {
this.heap.push({ value: streams[i][0], streamIdx: i });
this.cursors[i] = 1;
}
}
}
next(): { value: number; done: false } | { done: true } {
if (this.heap.size === 0) return { done: true };
const top = this.heap.pop()!;
// 从被弹出的流补充下一个元素
const stream = this.streams[top.streamIdx];
if (this.cursors[top.streamIdx] < stream.length) {
this.heap.push({
value: stream[this.cursors[top.streamIdx]],
streamIdx: top.streamIdx,
});
this.cursors[top.streamIdx]++;
}
return { value: top.value, done: false };
}
}

TypeScript 没有标准库堆,所以手写了一个最小堆。MergeIterator 的逻辑和 Go 版一致:初始化时各流首元素入堆,next() 弹出堆顶并补充。MinHeapbubbleUpsinkDown 是堆的标准操作,分别用于插入和删除后恢复堆性质。

六、生产验证#

6.1 LevelDB MergingIterator#

LevelDB 的读取路径需要把 Memtable、Immutable Memtable 和多层 SSTable 的迭代器合并成一个全局有序视图。MergingIterator 就是完成这个任务的组件。

具体来说,DBImpl::NewInternalIterator 先收集所有子迭代器:活跃 Memtable 的迭代器、Immutable Memtable 的迭代器(刷盘期间存在)、以及 Version::AddIterators 添加的各层 SSTable 迭代器。L0 层每个文件一个迭代器(因为 L0 文件之间 key 范围可能重叠),L1 及以上每层一个两级迭代器(因为同层文件 key 范围不重叠)。所有迭代器一起传给 NewMergingIterator,封装成一个全局有序的迭代器。

  • 仓库:google/leveldb
  • 迭代器实现:table/merger.cc#L14-L149——MergingIterator 类,FindSmallest() 用线性扫描而非堆
  • 组装逻辑:db/db_impl.cc——NewInternalIterator 把 Memtable 和 SSTable 迭代器收集到一起,传给 NewMergingIterator

如前所述,LevelDB 选择线性扫描而非堆,因为子迭代器数量通常不超过 10 个。这是工程务实的体现:理论复杂度 O(K) 不如 O(log K) 好看,但常数因子和缓存行为让它在 K 很小时更快。

6.2 RocksDB Merge Helper#

RocksDB 在 Compaction 过程中需要对同一个 key 的多次写入进行合并(Merge Operator),MergeHelper 负责在遍历有序数据时执行这个合并逻辑。

  • 仓库:facebook/rocksdb
  • 合并辅助:db/merge_helper.cc#L57-L96——TimedFullMergeCommonImpl 模板函数,调用用户定义的 FullMergeV3 并计时统计
  • Compaction 中的合并迭代:RocksDB 的 Compaction 本质上也是一个多路合并过程,把多层 SSTable 的数据合并写出,MergeHelper 在其中处理 Merge 类型的 key

6.3 Go 官方排序库#

Go 标准库的 sort.Searchslices 包虽然没有直接暴露 K 路合并接口,但 internal/abi 和编译器工具链在外部排序场景中广泛使用多路合并。社区中更典型的例子是 VictoriaMetrics 的快速合并:

七、小结#

何时使用合并迭代器#

  • LSM Tree 读取路径:合并 Memtable + 多层 SSTable 的有序数据,这是合并迭代器最经典的应用
  • 外部排序:数据量超出内存时,先分块排序写出,再用 K 路合并归并为全局有序结果
  • 日志聚合:多个节点或分片的有序日志流合并为统一时间线
  • 搜索引擎倒排列表合并:多个 posting list 取交集或并集,天然是 K 路合并

何时不使用合并迭代器#

  • 输入流无序:合并迭代器依赖每个流内部有序,无序流应先排序再合并,或直接用全量排序
  • K = 2:两路合并用双指针更简单,不需要堆,常数因子也更小
  • 随机访问模式:合并迭代器是单向顺序遍历,不支持回退或随机定位
  • K 很大但每个流很短:堆的初始化和每步操作都有常数开销,流太短时堆的优势被稀释,全量排序可能更快

八、参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

合并迭代器(Merge Iterator)
https://blog.souloss.com/posts/programming/data-structures/data-structures-merge-iterator/
作者
Tsukimi
发布于
2026-06-13
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时