一、为什么需要批处理
你的应用需要往数据库插入 1000 条记录。如果逐条执行,每条 INSERT 都是一次网络往返、一次 SQL 解析、一次事务提交——1000 次开销叠加,总耗时可能达到几十秒。而一次批量 INSERT 只要一次网络往返、一次解析、一次提交,耗时可能不到一秒。
逐条处理的问题不只是慢。频繁的小操作还会给下游系统带来巨大压力:每次请求都有固定的上下文切换、连接获取、协议封装等开销。当吞吐量是瓶颈时,这些固定开销就会被无限放大。
让我们把开销拆开来看。一次数据库写入的耗时大致分为两部分:固定开销和有效负载。固定开销包括 TCP 握手(约 1-3ms)、SQL 解析与执行计划生成(约 0.5-2ms)、事务提交与 WAL 写入(约 1-5ms)。有效负载才是真正写入数据的时间,通常不到 0.1ms。也就是说,一条 INSERT 的总耗时中,固定开销可能占 95% 以上。逐条执行 1000 次时,这 95% 的固定开销被重复了 1000 次;批量执行时,它只发生一次,被 1000 条记录分摊。
网络请求中这个比例更夸张。一次 HTTP 请求的固定开销包括 DNS 解析、TLS 握手、TCP 慢启动,加起来可能 50-200ms,而实际传输一个小的 JSON 体可能只要 1ms。调用 100 次外部 API,逐次调用意味着 5-20 秒花在连接建立上,批量接口可能一次搞定。
批处理的思路是把多个小操作攒成一个大操作一起执行,把每次操作的固定开销分摊到整个批次上。代价是单个操作要等一会儿才能被执行——增加了延迟;收益是整体吞吐量大幅提升。这是延迟和吞吐量之间的经典权衡:你用少量延迟换取大量吞吐。在写入密集型场景中,这个交换几乎总是划算的——一条记录晚 50ms 写入数据库,用户根本感知不到;但吞吐量从 100 QPS 提升到 5000 QPS,系统能承载的流量就完全不同了。
二、现实类比
装洗碗机。你不会洗一个盘子就开一次机——整天的碗碟攒起来,一次洗完。每个盘子分摊的水费、电费、时间都远低于单独清洗。洗碗机不会因为你只有一个盘子就提前启动,它有自己的”批次大小”或”最长等待时间”。
这个类比还揭示了批处理的两个关键参数:洗碗机有容量上限(对应 maxSize),也有你的耐心上限——你不会让一个脏碗在池子里等三天(对应 maxWait)。容量满了就启动,时间到了也启动,两者缺一不可。
三、核心思想
批处理器维护一个待处理队列,按照两种策略触发刷新:
- 大小触发:队列中的项数达到
maxSize时立即刷新 - 时间触发:距离上次刷新超过
maxWait时强制刷新,避免项目在队列中无限等待
关键属性:
| 属性 | 值 | 说明 |
|---|---|---|
| 吞吐量 | 摊薄每项开销 | N 项接近 1 项的成本 |
| 延迟 | 每项略增 | 等待批次填满或计时器触发 |
| 刷写触发 | 双重条件 | 大小阈值 / 时间截止 / 显式刷写 |
| 空间 | O(批次大小) | 待处理项的有界缓冲区 |
触发策略对比:
| 触发条件 | 行为 | 适用场景 |
|---|---|---|
| 仅大小 | 批次满才刷 | 流量稳定且充足 |
| 仅时间 | 定时刷写 | 流量低但延迟敏感 |
| 大小 + 时间 | 满或超时均刷 | 生产环境标配 |
| 显式调用 | 手动控制 | 需要精确控制刷写时机 |
3.1 双重触发的必要性
为什么必须同时有大小和时间两种触发?考虑两种极端情况:
高流量时,队列在几毫秒内就填满了,大小触发频繁生效,时间触发几乎不会启动。此时批处理接近最优——每批都是满的,固定开销被充分摊薄。吞吐量接近理论最大值。
低流量时,可能好几分钟才有一条记录入队。如果只有大小触发,这条记录就会一直等在队列里,直到凑满批次——用户可能等了几分钟还没看到写入结果。时间触发就是为这种情况设计的:不管队列里有多少项,到了 maxWait 就强制刷写。单条记录的延迟被控制在 maxWait 以内。
Kafka 的生产者配置把这两个参数分别叫 batch.size(大小触发,默认 16KB)和 linger.ms(时间触发,默认 0ms)。默认 linger.ms=0 意味着不等待,有消息就发——这在低延迟场景下合理。但如果你愿意用少量延迟换吞吐,把 linger.ms 设为 5-10ms,Kafka 就会在这几毫秒内把同一分区的消息攒成一批,网络效率大幅提升。Kafka 按分区(Partition)独立批处理,是因为同一分区的消息写入同一个日志段,物理上连续,合并收益最大;不同分区的消息写入不同位置,无法合并。
3.2 参数调优的直觉
maxSize 决定吞吐上限,maxWait 决定延迟上限。调优时先确定你能容忍的最大延迟,把它设为 maxWait;然后根据流量估算 maxSize,让大部分批次在 maxWait 内能凑满。如果流量波动大,宁可让 maxSize 偏大一些——没凑满时有 maxWait 兜底,不会无限等;凑满时大批次吞吐更好。
四、变体与对比
| 模式 | 关注点 | 批处理角色 | 适用场景 |
|---|---|---|---|
| 批处理 | 提升吞吐量 | 核心模式 | 写入密集型操作 |
| 背压 | 控制流量速率 | 配合使用 | 消费者跟不上生产者 |
| 环形缓冲区 | 高效缓冲区 | 作为内部队列 | 生产者-消费者解耦 |
| 指数退避重试 | 处理失败 | 批次中单条重试 | 批次部分失败 |
批处理和背压经常一起出现:批处理提高了消费速率,背压在消费速率仍不够时限制生产速率。当批次中某一条失败时,可以对该条单独进行指数退避重试,而不需要重试整个批次。
4.1 批次部分失败的处理
批量操作有一个绕不开的问题:批次中部分项成功、部分项失败怎么办?这取决于下游系统的语义。
原子性语义:整个批次要么全部成功,要么全部回滚。数据库事务就是这种模式——一条 INSERT 违反唯一约束,整个批次回滚。好处是调用方不用处理中间状态,坏处是一条坏数据拖垮整批好数据。你需要在上游做好数据校验,或者把可能冲突的记录分到不同批次。
逐项错误处理:批次中每条记录独立处理,失败的不影响成功的。Elasticsearch 的 Bulk API 就是这种模式——返回结果中每条记录有自己的状态码,调用方逐条检查。更灵活,但调用方逻辑更复杂:需要遍历结果,对失败的条目决定是重试、丢弃还是记录到死信队列。
两种语义各有用武之地。金融转账这类强一致性场景必须用原子性;日志写入、指标上报这类最终一致性场景用逐项处理更高效。关键是在设计批处理接口时就想清楚:你的 process 函数是返回一个整体错误,还是返回每个项各自的结果?
4.2 批次大小的上限
批次不是越大越好。过大的批次会带来新问题:内存压力增大(一批 10 万条记录可能占几百 MB)、下游超时风险(数据库执行一条巨型 SQL 可能触发语句超时)、失败代价升高(大批次重试的成本远高于小批次)。生产环境中,maxSize 通常设为几十到几百,而不是几千。Kafka 默认 16KB 的 batch.size 也是出于类似考虑——更大的批次虽然吞吐更高,但延迟和内存开销也随之增长。
五、多语言实现
5.1 Go 实现
package batch
import ( "sync" "time")
// BatchProcessor 批处理器type BatchProcessor[T any, R any] struct { queue []batchEntry[T, R] process func([]T) []R maxSize int maxWait time.Duration mu sync.Mutex timer *time.Timer}
type batchEntry[T any, R any] struct { item T ch chan R}
func New[T any, R any]( process func([]T) []R, maxSize int, maxWait time.Duration,) *BatchProcessor[T, R] { return &BatchProcessor[T, R]{ process: process, maxSize: maxSize, maxWait: maxWait, }}
// Add 添加一项到待处理队列func (bp *BatchProcessor[T, R]) Add(item T) R { bp.mu.Lock() defer bp.mu.Unlock()
ch := make(chan R, 1) bp.queue = append(bp.queue, batchEntry[T, R]{item, ch})
// 大小触发:队列满则立即刷新 if len(bp.queue) >= bp.maxSize { bp.flush() } else if bp.timer == nil { // 时间触发:启动倒计时 bp.timer = time.AfterFunc(bp.maxWait, func() { bp.mu.Lock() bp.timer = nil bp.flush() bp.mu.Unlock() }) }
return <-ch}
// flush 刷新:取出队列中所有项,批量处理func (bp *BatchProcessor[T, R]) flush() { if len(bp.queue) == 0 { return } items := make([]T, len(bp.queue)) for i, e := range bp.queue { items[i] = e.item } results := bp.process(items) for i, e := range bp.queue { e.ch <- results[i] } bp.queue = bp.queue[:0]}Go 版本用泛型支持任意输入/输出类型。Add 返回一个 channel,调用方阻塞等待结果。大小和时间双重触发:队列满了立即刷,没满就启动定时器等一会儿再刷。flush 把队列清空、批量处理、逐个回传结果。
这里有几个值得注意的设计细节。Channel 的缓冲区大小为 1:make(chan R, 1) 确保写入结果时不会阻塞——flush 把结果写入 channel 后继续处理下一个条目,不需要等调用方读取。调用方在 <-ch 处阻塞等待,直到 flush 完成并回传结果。这种 channel-based 的结果投递模式是 Go 并发编程的惯用手法,比回调函数更清晰,比共享变量更安全。
定时器的生命周期管理也很关键。timer 只在队列为空后的第一个 Add 时创建,避免重复设置定时器。flush 执行时将 timer 置为 nil,下次 Add 时才会重新创建。如果大小触发先于时间触发执行,AfterFunc 创建的定时器虽然会到期,但此时 timer 已经被置为 nil,回调函数中的 flush 会发现队列为空直接返回,不会重复处理。
5.2 TypeScript 实现
class BatchProcessor<T, R> { private queue: Array<{ item: T; resolve: (r: R) => void }> = []; private timer: ReturnType<typeof setTimeout> | null = null;
constructor( private processBatch: (items: T[]) => Promise<R[]>, private maxSize: number = 10, private maxWaitMs: number = 50, ) {}
async add(item: T): Promise<R> { return new Promise<R>((resolve) => { this.queue.push({ item, resolve });
// 大小触发:队列满则立即刷新 if (this.queue.length >= this.maxSize) { this.flush(); } else if (!this.timer) { // 时间触发:启动倒计时 this.timer = setTimeout(() => this.flush(), this.maxWaitMs); } }); }
private async flush(): Promise<void> { if (this.timer) { clearTimeout(this.timer); this.timer = null; }
// 取出当前队列的所有项 const batch = this.queue.splice(0); if (batch.length === 0) return;
// 批量处理 const results = await this.processBatch(batch.map((b) => b.item)); batch.forEach((b, i) => b.resolve(results[i]!)); }}TypeScript 版本用 Promise 实现异步批处理。每个 add 返回一个 Promise,调用方 await 等待结果。flush 先清空队列再处理,确保后续 add 的新项目进入新批次。这种模式在 GraphQL 的 DataLoader 中非常常见。
Promise-based 的结果投递和 Go 的 channel 思路一致,但更贴合 JavaScript 的异步模型。add 创建 Promise 时拿到 resolve 函数,存入队列;flush 处理完成后逐个调用 resolve,等待中的 await 就会收到结果。不需要 channel,不需要共享状态,Promise 本身就是结果投递的载体。
定时器管理上有一个容易忽略的细节:flush 开头必须 clearTimeout。因为大小触发和时间触发可能竞争——队列刚好在定时器到期前填满,此时 flush 被大小触发调用,如果不取消定时器,它到期后又会触发一次空刷。Go 版本通过 timer = nil 的检查避免了这个问题,TypeScript 版本则用 clearTimeout 主动取消。两种写法效果相同,但思路不同:Go 是”让定时器到期但忽略空队列”,TypeScript 是”直接取消定时器不让它到期”。
六、生产验证
| 项目 | 源码位置 | 用途 |
|---|---|---|
| Apache Kafka Producer | RecordAccumulator | Kafka 生产者按分区累积记录为批次。append() 添加记录,sender 线程排空就绪批次。这是 Kafka 实现百万消息/秒的关键 |
| Linux Block Layer | blk_attempt_req_merge | 块层将相邻 I/O 请求合并为批量操作,摊薄寻道时间。合并前检查两个请求是否有连续扇区和兼容标志 |
| React 18 自动批处理 | 批量状态更新 | 同一事件处理器中的多次 setState 被批处理为一次重渲染,React 18 默认对所有更新启用批处理 |
七、小结
何时使用:
- 数据库写入——批量 INSERT 替代 N 次单条 INSERT,吞吐量提升数倍。固定开销从 N 份降为 1 份,最直接也最普遍的收益
- API 批量调用——减少网络往返,GraphQL DataLoader、Elasticsearch Bulk API 的核心。DataLoader 把 N 次单独查询合并为一次
WHERE id IN (...),Elasticsearch 把 N 次索引请求合并为一次 Bulk 请求 - 消息队列——Kafka 按分区批处理、SQS 批量发送/接收。生产者攒批减少网络请求,Broker 顺序写入批量消息减少磁盘寻道
- UI 更新——React 批量
setState、浏览器批量 DOM 重排。5 次setState没有批处理就是 5 次渲染,有批处理就是 1 次
何时不用:
- 延迟敏感路径——批处理天然增加等待时间,实时系统不适用。交易下单、实时报价这类场景,50ms 的等待就不可接受
- 极低流量——很少超过 1 个项时,批处理增加复杂性却无收益。内部管理后台每天几十次操作,省下的毫秒不值得多维护一个队列和定时器
- 严格原子性要求——批次中某条失败时需要逐条错误处理,增加复杂度。业务要求”全成功或全失败”时,逐项错误处理不适用,原子性批处理又面临”一条坏数据拖垮整批”的风险
- 无界内存风险——没有大小限制时,流量高峰期队列可能无限增长。
maxSize不只是触发阈值,更是内存安全阀
八、参考资料
- Kafka Producer Internals - Kafka 生产者批处理配置,含
batch.size和linger.ms - DataLoader - GraphQL 批量加载库,批处理 + 缓存的经典实现
- Elasticsearch Bulk API - 批量索引文档的 API,含部分失败处理
- React Automatic Batching - React 18 自动批处理的官方说明
- Linux Block Layer Merging - Linux 块层请求合并机制文档
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






