mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
3658 字
10 分钟
批处理(Batch Processing)
2026-06-13

一、为什么需要批处理#

你的应用需要往数据库插入 1000 条记录。如果逐条执行,每条 INSERT 都是一次网络往返、一次 SQL 解析、一次事务提交——1000 次开销叠加,总耗时可能达到几十秒。而一次批量 INSERT 只要一次网络往返、一次解析、一次提交,耗时可能不到一秒。

逐条处理的问题不只是慢。频繁的小操作还会给下游系统带来巨大压力:每次请求都有固定的上下文切换、连接获取、协议封装等开销。当吞吐量是瓶颈时,这些固定开销就会被无限放大。

让我们把开销拆开来看。一次数据库写入的耗时大致分为两部分:固定开销有效负载。固定开销包括 TCP 握手(约 1-3ms)、SQL 解析与执行计划生成(约 0.5-2ms)、事务提交与 WAL 写入(约 1-5ms)。有效负载才是真正写入数据的时间,通常不到 0.1ms。也就是说,一条 INSERT 的总耗时中,固定开销可能占 95% 以上。逐条执行 1000 次时,这 95% 的固定开销被重复了 1000 次;批量执行时,它只发生一次,被 1000 条记录分摊。

网络请求中这个比例更夸张。一次 HTTP 请求的固定开销包括 DNS 解析、TLS 握手、TCP 慢启动,加起来可能 50-200ms,而实际传输一个小的 JSON 体可能只要 1ms。调用 100 次外部 API,逐次调用意味着 5-20 秒花在连接建立上,批量接口可能一次搞定。

批处理的思路是把多个小操作攒成一个大操作一起执行,把每次操作的固定开销分摊到整个批次上。代价是单个操作要等一会儿才能被执行——增加了延迟;收益是整体吞吐量大幅提升。这是延迟和吞吐量之间的经典权衡:你用少量延迟换取大量吞吐。在写入密集型场景中,这个交换几乎总是划算的——一条记录晚 50ms 写入数据库,用户根本感知不到;但吞吐量从 100 QPS 提升到 5000 QPS,系统能承载的流量就完全不同了。

二、现实类比#

装洗碗机。你不会洗一个盘子就开一次机——整天的碗碟攒起来,一次洗完。每个盘子分摊的水费、电费、时间都远低于单独清洗。洗碗机不会因为你只有一个盘子就提前启动,它有自己的”批次大小”或”最长等待时间”。

这个类比还揭示了批处理的两个关键参数:洗碗机有容量上限(对应 maxSize),也有你的耐心上限——你不会让一个脏碗在池子里等三天(对应 maxWait)。容量满了就启动,时间到了也启动,两者缺一不可。

三、核心思想#

批处理器维护一个待处理队列,按照两种策略触发刷新:

  1. 大小触发:队列中的项数达到 maxSize 时立即刷新
  2. 时间触发:距离上次刷新超过 maxWait 时强制刷新,避免项目在队列中无限等待
flowchart LR subgraph "无批处理" A1["op 1"] --> S1["发送"] A2["op 2"] --> S2["发送"] A3["op 3"] --> S3["发送"] end subgraph "有批处理" B1["op 1"] --> Q["队列"] B2["op 2"] --> Q B3["op 3"] --> Q Q --> S4["批量发送"] end

关键属性:

属性说明
吞吐量摊薄每项开销N 项接近 1 项的成本
延迟每项略增等待批次填满或计时器触发
刷写触发双重条件大小阈值 / 时间截止 / 显式刷写
空间O(批次大小)待处理项的有界缓冲区

触发策略对比:

触发条件行为适用场景
仅大小批次满才刷流量稳定且充足
仅时间定时刷写流量低但延迟敏感
大小 + 时间满或超时均刷生产环境标配
显式调用手动控制需要精确控制刷写时机

3.1 双重触发的必要性#

为什么必须同时有大小和时间两种触发?考虑两种极端情况:

高流量时,队列在几毫秒内就填满了,大小触发频繁生效,时间触发几乎不会启动。此时批处理接近最优——每批都是满的,固定开销被充分摊薄。吞吐量接近理论最大值。

低流量时,可能好几分钟才有一条记录入队。如果只有大小触发,这条记录就会一直等在队列里,直到凑满批次——用户可能等了几分钟还没看到写入结果。时间触发就是为这种情况设计的:不管队列里有多少项,到了 maxWait 就强制刷写。单条记录的延迟被控制在 maxWait 以内。

Kafka 的生产者配置把这两个参数分别叫 batch.size(大小触发,默认 16KB)和 linger.ms(时间触发,默认 0ms)。默认 linger.ms=0 意味着不等待,有消息就发——这在低延迟场景下合理。但如果你愿意用少量延迟换吞吐,把 linger.ms 设为 5-10ms,Kafka 就会在这几毫秒内把同一分区的消息攒成一批,网络效率大幅提升。Kafka 按分区(Partition)独立批处理,是因为同一分区的消息写入同一个日志段,物理上连续,合并收益最大;不同分区的消息写入不同位置,无法合并。

3.2 参数调优的直觉#

maxSize 决定吞吐上限,maxWait 决定延迟上限。调优时先确定你能容忍的最大延迟,把它设为 maxWait;然后根据流量估算 maxSize,让大部分批次在 maxWait 内能凑满。如果流量波动大,宁可让 maxSize 偏大一些——没凑满时有 maxWait 兜底,不会无限等;凑满时大批次吞吐更好。

四、变体与对比#

模式关注点批处理角色适用场景
批处理提升吞吐量核心模式写入密集型操作
背压控制流量速率配合使用消费者跟不上生产者
环形缓冲区高效缓冲区作为内部队列生产者-消费者解耦
指数退避重试处理失败批次中单条重试批次部分失败

批处理和背压经常一起出现:批处理提高了消费速率,背压在消费速率仍不够时限制生产速率。当批次中某一条失败时,可以对该条单独进行指数退避重试,而不需要重试整个批次。

4.1 批次部分失败的处理#

批量操作有一个绕不开的问题:批次中部分项成功、部分项失败怎么办?这取决于下游系统的语义。

原子性语义:整个批次要么全部成功,要么全部回滚。数据库事务就是这种模式——一条 INSERT 违反唯一约束,整个批次回滚。好处是调用方不用处理中间状态,坏处是一条坏数据拖垮整批好数据。你需要在上游做好数据校验,或者把可能冲突的记录分到不同批次。

逐项错误处理:批次中每条记录独立处理,失败的不影响成功的。Elasticsearch 的 Bulk API 就是这种模式——返回结果中每条记录有自己的状态码,调用方逐条检查。更灵活,但调用方逻辑更复杂:需要遍历结果,对失败的条目决定是重试、丢弃还是记录到死信队列。

两种语义各有用武之地。金融转账这类强一致性场景必须用原子性;日志写入、指标上报这类最终一致性场景用逐项处理更高效。关键是在设计批处理接口时就想清楚:你的 process 函数是返回一个整体错误,还是返回每个项各自的结果?

4.2 批次大小的上限#

批次不是越大越好。过大的批次会带来新问题:内存压力增大(一批 10 万条记录可能占几百 MB)、下游超时风险(数据库执行一条巨型 SQL 可能触发语句超时)、失败代价升高(大批次重试的成本远高于小批次)。生产环境中,maxSize 通常设为几十到几百,而不是几千。Kafka 默认 16KB 的 batch.size 也是出于类似考虑——更大的批次虽然吞吐更高,但延迟和内存开销也随之增长。

五、多语言实现#

5.1 Go 实现#

package batch
import (
"sync"
"time"
)
// BatchProcessor 批处理器
type BatchProcessor[T any, R any] struct {
queue []batchEntry[T, R]
process func([]T) []R
maxSize int
maxWait time.Duration
mu sync.Mutex
timer *time.Timer
}
type batchEntry[T any, R any] struct {
item T
ch chan R
}
func New[T any, R any](
process func([]T) []R,
maxSize int,
maxWait time.Duration,
) *BatchProcessor[T, R] {
return &BatchProcessor[T, R]{
process: process,
maxSize: maxSize,
maxWait: maxWait,
}
}
// Add 添加一项到待处理队列
func (bp *BatchProcessor[T, R]) Add(item T) R {
bp.mu.Lock()
defer bp.mu.Unlock()
ch := make(chan R, 1)
bp.queue = append(bp.queue, batchEntry[T, R]{item, ch})
// 大小触发:队列满则立即刷新
if len(bp.queue) >= bp.maxSize {
bp.flush()
} else if bp.timer == nil {
// 时间触发:启动倒计时
bp.timer = time.AfterFunc(bp.maxWait, func() {
bp.mu.Lock()
bp.timer = nil
bp.flush()
bp.mu.Unlock()
})
}
return <-ch
}
// flush 刷新:取出队列中所有项,批量处理
func (bp *BatchProcessor[T, R]) flush() {
if len(bp.queue) == 0 {
return
}
items := make([]T, len(bp.queue))
for i, e := range bp.queue {
items[i] = e.item
}
results := bp.process(items)
for i, e := range bp.queue {
e.ch <- results[i]
}
bp.queue = bp.queue[:0]
}

Go 版本用泛型支持任意输入/输出类型。Add 返回一个 channel,调用方阻塞等待结果。大小和时间双重触发:队列满了立即刷,没满就启动定时器等一会儿再刷。flush 把队列清空、批量处理、逐个回传结果。

这里有几个值得注意的设计细节。Channel 的缓冲区大小为 1make(chan R, 1) 确保写入结果时不会阻塞——flush 把结果写入 channel 后继续处理下一个条目,不需要等调用方读取。调用方在 <-ch 处阻塞等待,直到 flush 完成并回传结果。这种 channel-based 的结果投递模式是 Go 并发编程的惯用手法,比回调函数更清晰,比共享变量更安全。

定时器的生命周期管理也很关键。timer 只在队列为空后的第一个 Add 时创建,避免重复设置定时器。flush 执行时将 timer 置为 nil,下次 Add 时才会重新创建。如果大小触发先于时间触发执行,AfterFunc 创建的定时器虽然会到期,但此时 timer 已经被置为 nil,回调函数中的 flush 会发现队列为空直接返回,不会重复处理。

5.2 TypeScript 实现#

class BatchProcessor<T, R> {
private queue: Array<{ item: T; resolve: (r: R) => void }> = [];
private timer: ReturnType<typeof setTimeout> | null = null;
constructor(
private processBatch: (items: T[]) => Promise<R[]>,
private maxSize: number = 10,
private maxWaitMs: number = 50,
) {}
async add(item: T): Promise<R> {
return new Promise<R>((resolve) => {
this.queue.push({ item, resolve });
// 大小触发:队列满则立即刷新
if (this.queue.length >= this.maxSize) {
this.flush();
} else if (!this.timer) {
// 时间触发:启动倒计时
this.timer = setTimeout(() => this.flush(), this.maxWaitMs);
}
});
}
private async flush(): Promise<void> {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
// 取出当前队列的所有项
const batch = this.queue.splice(0);
if (batch.length === 0) return;
// 批量处理
const results = await this.processBatch(batch.map((b) => b.item));
batch.forEach((b, i) => b.resolve(results[i]!));
}
}

TypeScript 版本用 Promise 实现异步批处理。每个 add 返回一个 Promise,调用方 await 等待结果。flush 先清空队列再处理,确保后续 add 的新项目进入新批次。这种模式在 GraphQL 的 DataLoader 中非常常见。

Promise-based 的结果投递和 Go 的 channel 思路一致,但更贴合 JavaScript 的异步模型。add 创建 Promise 时拿到 resolve 函数,存入队列;flush 处理完成后逐个调用 resolve,等待中的 await 就会收到结果。不需要 channel,不需要共享状态,Promise 本身就是结果投递的载体。

定时器管理上有一个容易忽略的细节:flush 开头必须 clearTimeout。因为大小触发和时间触发可能竞争——队列刚好在定时器到期前填满,此时 flush 被大小触发调用,如果不取消定时器,它到期后又会触发一次空刷。Go 版本通过 timer = nil 的检查避免了这个问题,TypeScript 版本则用 clearTimeout 主动取消。两种写法效果相同,但思路不同:Go 是”让定时器到期但忽略空队列”,TypeScript 是”直接取消定时器不让它到期”。

六、生产验证#

项目源码位置用途
Apache Kafka ProducerRecordAccumulatorKafka 生产者按分区累积记录为批次。append() 添加记录,sender 线程排空就绪批次。这是 Kafka 实现百万消息/秒的关键
Linux Block Layerblk_attempt_req_merge块层将相邻 I/O 请求合并为批量操作,摊薄寻道时间。合并前检查两个请求是否有连续扇区和兼容标志
React 18 自动批处理批量状态更新同一事件处理器中的多次 setState 被批处理为一次重渲染,React 18 默认对所有更新启用批处理

七、小结#

何时使用:

  • 数据库写入——批量 INSERT 替代 N 次单条 INSERT,吞吐量提升数倍。固定开销从 N 份降为 1 份,最直接也最普遍的收益
  • API 批量调用——减少网络往返,GraphQL DataLoader、Elasticsearch Bulk API 的核心。DataLoader 把 N 次单独查询合并为一次 WHERE id IN (...),Elasticsearch 把 N 次索引请求合并为一次 Bulk 请求
  • 消息队列——Kafka 按分区批处理、SQS 批量发送/接收。生产者攒批减少网络请求,Broker 顺序写入批量消息减少磁盘寻道
  • UI 更新——React 批量 setState、浏览器批量 DOM 重排。5 次 setState 没有批处理就是 5 次渲染,有批处理就是 1 次

何时不用:

  • 延迟敏感路径——批处理天然增加等待时间,实时系统不适用。交易下单、实时报价这类场景,50ms 的等待就不可接受
  • 极低流量——很少超过 1 个项时,批处理增加复杂性却无收益。内部管理后台每天几十次操作,省下的毫秒不值得多维护一个队列和定时器
  • 严格原子性要求——批次中某条失败时需要逐条错误处理,增加复杂度。业务要求”全成功或全失败”时,逐项错误处理不适用,原子性批处理又面临”一条坏数据拖垮整批”的风险
  • 无界内存风险——没有大小限制时,流量高峰期队列可能无限增长。maxSize 不只是触发阈值,更是内存安全阀

八、参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

批处理(Batch Processing)
https://blog.souloss.com/posts/programming/system-patterns/system-patterns-batch-processing/
作者
Tsukimi
发布于
2026-06-13
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时