一、为什么需要合并迭代器
假设你在写一个 LSM Tree 存储引擎的读取路径。一条数据可能散落在 Memtable、Immutable Memtable 和多层 SSTable 中,每个来源内部已经按 key 有序排列,但你需要把它们合并成一个全局有序的结果流返回给用户。最直觉的做法是把所有数据读到内存,拼成一个大数组再排序——这在数据量小的时候没问题,但一旦总数据量达到 GB 级别,全部加载再排序的代价就不可接受了:时间 O(n log n),空间 O(n),n 是所有来源的元素总数。
再看另一个场景:分布式日志系统有 10 个节点,每个节点的日志按时间戳有序。你需要把所有日志合并成一条全局时间线。全量排序同样不现实——10 个节点各 1 GB,总共 10 GB 数据全部加载再排序,光是内存就扛不住,更不用说排序本身的开销。
但仔细想想,每个来源本身就是有序的,我们并不需要”重新排序”,只需要从 K 个有序流中依次取出最小的元素。这就像归并排序的合并阶段,但扩展到了 K 路。用大小为 K 的最小堆维护每个流的当前元素,每次弹出堆顶(全局最小值),再从该流补充下一个元素,就能以 O(n log K) 的时间和 O(K) 的空间完成合并——当 K 远小于 n 时,比”全量排序”快得多,内存开销更是天差地别。关键洞察是:有序性是免费的,不要浪费它。
二、现实类比
期末考试结束,三个教室的监考老师各自把试卷按学号排好了序。教务处需要一份全年级按学号排列的成绩单。你面前摆着三叠已排序的试卷,每次从三叠最上面各看一眼,挑出学号最小的那一份放到结果堆,再从被抽走的那叠补充下一份。始终只需要比较三个数字,不需要把所有试卷摊开重排——这就是 K 路合并的直觉。
注意一个细节:你不需要同时看三叠试卷的全部内容,只需要看每叠最上面的一份。这意味着你的”工作台”只需要放三份试卷的空间,而不是三百份。这正是合并迭代器空间复杂度 O(K) 的现实映射——工作台大小只跟叠数有关,跟每叠多厚无关。
三、核心思想
合并迭代器维护一个大小为 K 的最小堆,堆中每个元素记录两个信息:当前值和它来自哪个流。每次 Next() 操作弹出堆顶(全局最小值),然后从该流取出下一个元素推入堆中,直到所有流耗尽。
3.1 执行流程
以三个有序流 [1, 4, 7]、[2, 5, 8]、[3, 6, 9] 为例:
- 初始化:堆中放入各流首元素,堆为
[1, 2, 3] - 弹出 1(来自流 1),推入流 1 的下一个元素 4,堆变为
[2, 3, 4] - 弹出 2(来自流 2),推入流 2 的下一个元素 5,堆变为
[3, 4, 5] - 弹出 3(来自流 3),推入流 3 的下一个元素 6,堆变为
[4, 5, 6] - 弹出 4(来自流 1),推入流 1 的下一个元素 7,堆变为
[5, 6, 7] - 弹出 5(来自流 2),推入流 2 的下一个元素 8,堆变为
[6, 7, 8] - 弹出 6(来自流 3),推入流 3 的下一个元素 9,堆变为
[7, 8, 9] - 弹出 7(来自流 1),流 1 耗尽,堆变为
[8, 9] - 弹出 8(来自流 2),流 2 耗尽,堆变为
[9] - 弹出 9(来自流 3),流 3 耗尽,堆为空,合并结束
最终输出:[1, 2, 3, 4, 5, 6, 7, 8, 9]。全程堆大小不超过 3,总共 9 次弹出 + 9 次推入,每次堆操作 O(log 3)。
3.2 复杂度
| 指标 | 复杂度 | 说明 |
|---|---|---|
| 时间 | O(n log K) | n 为元素总数,K 为流数,每次堆操作 O(log K) |
| 空间 | O(K) | 堆中最多 K 个元素 |
| 初始化 | O(K) | 各流首元素入堆 |
| 单步推进 | O(log K) | 弹出 + 推入各一次 |
LevelDB 的 MergingIterator 实际上没有用堆,而是用线性扫描遍历所有子迭代器找最小值。原因是 LevelDB 的子迭代器数量通常不超过 10 个(1 个 Memtable + 1 个 Immutable Memtable + 少量 SSTable),线性扫描 O(K) 比堆操作 O(log K) 的常数因子更小,缓存更友好。源码注释也写得很直白:“We might want to use a heap in case there are lots of children. For now we use a simple array since we expect a very small number of children.”
四、变体与对比
4.1 K 路合并 vs 拼接排序 vs 两两合并
| 方案 | 时间复杂度 | 空间复杂度 | 适用场景 |
|---|---|---|---|
| K 路合并(最小堆) | O(n log K) | O(K) | K 路有序流实时合并 |
| 拼接 + 全量排序 | O(n log n) | O(n) | 流无序,或 K 接近 n |
| 两两合并(归并) | O(n log K) | O(1) 额外 | K 较小,不需要同时持有所有流 |
4.2 去重合并
LSM Tree 的同一 key 可能存在于 Memtable 和多个 SSTable 中。合并时遇到相同 key,需要保留最新版本(按时间戳或序列号判断),丢弃旧版本。这要求堆元素额外携带版本信息,弹出时检查与上一个输出是否 key 相同——相同则跳过,不同则输出。
具体实现中,entry 结构需要增加 seq 字段(序列号)。弹出堆顶后,比较其 key 与上次输出的 key:如果相同,说明这是旧版本,直接丢弃并继续弹出下一个;如果不同,输出并记录。这个去重逻辑让合并迭代器从”多路合并”升级为”多路合并 + 最新版本过滤”,是 LSM Tree 读取语义的核心。
4.3 慢流问题
如果某个流的数据产生速度远慢于其他流(比如分布式日志聚合中一个节点网络延迟),它会拖慢整个合并进度。常见解法包括:设置超时跳过、基于水位线(Watermark)的窗口化缓冲、以及将慢流降级为异步回填。核心思路都是不让一个慢流阻塞全局推进。
这个问题在流式系统中尤为突出。静态数据合并时,所有数据都已就绪,不存在”等”的问题。但在实时流合并中,一个慢流意味着堆中始终缺一个来源的候选,输出不得不暂停等待。水位线方案的做法是:设定一个时间窗口 T,超过 T 未到达的数据视为迟到,合并时跳过。这牺牲了完整性换取了实时性,是流处理中常见的权衡。
五、多语言实现
5.1 Go 实现
package mergeiter
import "container/heap"
// 堆元素:当前值 + 来源流编号type entry struct { value int index int // 属于哪个流}
// 最小堆type minHeap []entry
func (h minHeap) Len() int { return len(h) }func (h minHeap) Less(i, j int) bool { return h[i].value < h[j].value }func (h minHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }func (h *minHeap) Push(x any) { *h = append(*h, x.(entry)) }func (h *minHeap) Pop() any { old := *h n := len(old) v := old[n-1] *h = old[:n-1] return v}
// 合并迭代器type MergeIterator struct { heap minHeap streams [][]int // K 路有序流 cursors []int // 每个流的当前位置}
func New(streams [][]int) *MergeIterator { it := &MergeIterator{ streams: streams, cursors: make([]int, len(streams)), heap: make(minHeap, 0, len(streams)), } // 各流首元素入堆 for i, s := range streams { if len(s) > 0 { heap.Push(&it.heap, entry{value: s[0], index: i}) it.cursors[i] = 1 } } heap.Init(&it.heap) return it}
// 推进到下一个元素,返回是否还有数据func (it *MergeIterator) Next() (int, bool) { if it.heap.Len() == 0 { return 0, false } // 弹出全局最小值 top := heap.Pop(&it.heap).(entry) // 从该流补充下一个元素 if it.cursors[top.index] < len(it.streams[top.index]) { heap.Push(&it.heap, entry{ value: it.streams[top.index][it.cursors[top.index]], index: top.index, }) it.cursors[top.index]++ } return top.value, true}Go 标准库的 container/heap 接口需要实现五个方法,写起来略繁琐但语义清晰。核心逻辑只有 Next():弹出堆顶,从对应流补充一个元素。堆的大小始终不超过 K,每步操作 O(log K)。
5.2 TypeScript 实现
// 最小堆元素interface Entry { value: number; streamIdx: number;}
// 最小堆class MinHeap { private data: Entry[] = [];
get size(): number { return this.data.length; }
push(e: Entry): void { this.data.push(e); this.bubbleUp(this.data.length - 1); }
pop(): Entry | undefined { if (this.data.length === 0) return undefined; const top = this.data[0]; const last = this.data.pop()!; if (this.data.length > 0) { this.data[0] = last; this.sinkDown(0); } return top; }
private bubbleUp(i: number): void { while (i > 0) { const parent = (i - 1) >> 1; if (this.data[i].value >= this.data[parent].value) break; [this.data[i], this.data[parent]] = [this.data[parent], this.data[i]]; i = parent; } }
private sinkDown(i: number): void { const n = this.data.length; while (true) { let smallest = i; const left = 2 * i + 1, right = 2 * i + 2; if (left < n && this.data[left].value < this.data[smallest].value) smallest = left; if (right < n && this.data[right].value < this.data[smallest].value) smallest = right; if (smallest === i) break; [this.data[i], this.data[smallest]] = [this.data[smallest], this.data[i]]; i = smallest; } }}
// 合并迭代器class MergeIterator { private heap: MinHeap = new MinHeap(); private cursors: number[];
constructor(private streams: number[][]) { this.cursors = new Array(streams.length).fill(0); // 各流首元素入堆 for (let i = 0; i < streams.length; i++) { if (streams[i].length > 0) { this.heap.push({ value: streams[i][0], streamIdx: i }); this.cursors[i] = 1; } } }
next(): { value: number; done: false } | { done: true } { if (this.heap.size === 0) return { done: true }; const top = this.heap.pop()!; // 从被弹出的流补充下一个元素 const stream = this.streams[top.streamIdx]; if (this.cursors[top.streamIdx] < stream.length) { this.heap.push({ value: stream[this.cursors[top.streamIdx]], streamIdx: top.streamIdx, }); this.cursors[top.streamIdx]++; } return { value: top.value, done: false }; }}TypeScript 没有标准库堆,所以手写了一个最小堆。MergeIterator 的逻辑和 Go 版一致:初始化时各流首元素入堆,next() 弹出堆顶并补充。MinHeap 的 bubbleUp 和 sinkDown 是堆的标准操作,分别用于插入和删除后恢复堆性质。
六、生产验证
6.1 LevelDB MergingIterator
LevelDB 的读取路径需要把 Memtable、Immutable Memtable 和多层 SSTable 的迭代器合并成一个全局有序视图。MergingIterator 就是完成这个任务的组件。
具体来说,DBImpl::NewInternalIterator 先收集所有子迭代器:活跃 Memtable 的迭代器、Immutable Memtable 的迭代器(刷盘期间存在)、以及 Version::AddIterators 添加的各层 SSTable 迭代器。L0 层每个文件一个迭代器(因为 L0 文件之间 key 范围可能重叠),L1 及以上每层一个两级迭代器(因为同层文件 key 范围不重叠)。所有迭代器一起传给 NewMergingIterator,封装成一个全局有序的迭代器。
- 仓库:google/leveldb
- 迭代器实现:
table/merger.cc#L14-L149——MergingIterator类,FindSmallest()用线性扫描而非堆 - 组装逻辑:
db/db_impl.cc——NewInternalIterator把 Memtable 和 SSTable 迭代器收集到一起,传给NewMergingIterator
如前所述,LevelDB 选择线性扫描而非堆,因为子迭代器数量通常不超过 10 个。这是工程务实的体现:理论复杂度 O(K) 不如 O(log K) 好看,但常数因子和缓存行为让它在 K 很小时更快。
6.2 RocksDB Merge Helper
RocksDB 在 Compaction 过程中需要对同一个 key 的多次写入进行合并(Merge Operator),MergeHelper 负责在遍历有序数据时执行这个合并逻辑。
- 仓库:facebook/rocksdb
- 合并辅助:
db/merge_helper.cc#L57-L96——TimedFullMergeCommonImpl模板函数,调用用户定义的FullMergeV3并计时统计 - Compaction 中的合并迭代:RocksDB 的 Compaction 本质上也是一个多路合并过程,把多层 SSTable 的数据合并写出,
MergeHelper在其中处理 Merge 类型的 key
6.3 Go 官方排序库
Go 标准库的 sort.Search 和 slices 包虽然没有直接暴露 K 路合并接口,但 internal/abi 和编译器工具链在外部排序场景中广泛使用多路合并。社区中更典型的例子是 VictoriaMetrics 的快速合并:
- 仓库:VictoriaMetrics/VictoriaMetrics
- 合并实现:
lib/merge/——用于合并多个时间序列分片的有序数据,底层正是 K 路最小堆合并
七、小结
何时使用合并迭代器
- LSM Tree 读取路径:合并 Memtable + 多层 SSTable 的有序数据,这是合并迭代器最经典的应用
- 外部排序:数据量超出内存时,先分块排序写出,再用 K 路合并归并为全局有序结果
- 日志聚合:多个节点或分片的有序日志流合并为统一时间线
- 搜索引擎倒排列表合并:多个 posting list 取交集或并集,天然是 K 路合并
何时不使用合并迭代器
- 输入流无序:合并迭代器依赖每个流内部有序,无序流应先排序再合并,或直接用全量排序
- K = 2:两路合并用双指针更简单,不需要堆,常数因子也更小
- 随机访问模式:合并迭代器是单向顺序遍历,不支持回退或随机定位
- K 很大但每个流很短:堆的初始化和每步操作都有常数开销,流太短时堆的优势被稀释,全量排序可能更快
八、参考资料
- LevelDB table/merger.cc - MergingIterator 实现,线性扫描找最小值
- RocksDB db/merge_helper.cc - TimedFullMerge 合并操作实现
- VictoriaMetrics lib/merge - 时序数据 K 路合并,生产级最小堆实现
- Introduction to Algorithms (CLRS) - 第 6 章堆排序、第 21 章多路归并
- The Log-Structured Merge-Tree - O’Neil et al., 1996, LSM Tree 原始论文,合并读取的理论基础
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






