mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1226 字
3 分钟
背压(Backpressure)
2026-06-13

一、为什么需要背压#

你的数据管道跑得好好的,上游每秒生产 1000 条消息,下游每秒消费 1000 条消息,完美平衡。突然,下游的数据库开始变慢——每秒只能处理 200 条了。上游不知道,继续往消息队列里灌数据。队列越来越长,内存越涨越高,最终 OOM 崩溃。整个管道瘫痪。

这就是没有背压的后果。背压是一种流控机制:当下游处理不过来时,反向通知上游减速或停止。没有背压,快速生产者会压垮慢速消费者,导致无限内存增长、消息丢失或系统崩溃。

背压不只是消息队列的问题。HTTP 服务调用下游 API、日志采集器往中心写数据、WebSocket 推送实时事件——任何存在速度不匹配的管道都需要背压。关键要素只有两个:有界缓冲区 + 满时的处理策略

二、现实类比#

服务员告诉厨房「慢点做,桌子都满了」。与其让菜堆在窗口变凉,服务员发信号让厨房降低出菜速度,等食客吃完再加速。消费者控制生产者的节奏——这就是背压。

三、核心思想#

背压的核心是在生产者和消费者之间设置有界缓冲区,缓冲区满时按策略处理:

flowchart LR P[生产者] -->|emit data| B[有界缓冲区<br/>capacity=5] B -->|process data| C[消费者] B -->|WAIT 缓冲区满| P C -->|request n| B
策略工作方式适用场景
阻塞生产者等待直到缓冲区有空间金融交易、用户操作——数据不能丢
丢弃缓冲满时丢弃最新/最旧项监控指标、传感器——新鲜度优先
信号消费者发送 request(n) 精确拉取Reactive Streams——精确流控
限流限制生产者速率API 网关——保护下游
属性
信号开销O(1)——布尔标志或计数器检查
缓冲区边界固定容量——防止内存无限增长
吞吐量动态适应消费者速度
延迟权衡负载下增加——生产者等待而非丢弃

四、变体与对比#

模式关系区别
限流器都控制流速限流以固定速率限制(每秒 N 次);背压根据消费者实际能力动态调整
信号量信号量可以实现背压信号量是并发原语;背压是流控思想
环形缓冲区有界环形缓冲区是实现背压的常见机制环形缓冲区是数据结构;背压是流控策略
批处理批处理平滑突发输入批处理是优化手段;背压是保护机制
Note

背压和限流不是同一个东西。限流说的是「最多每秒 100 个请求」,即使消费者能处理 200 个。背压说的是「以消费者当前能处理的速度发送,不管那个速度是多少」。限流是策略,背压是反馈机制。

五、多语言实现#

Go:有界 channel 天然提供背压#

Go 的带缓冲 channel 在满时自动阻塞发送者,这正是「阻塞」式背压:

func producer(ch chan<- int) {
for i := 0; ; i++ {
ch <- i // 缓冲区满时自动阻塞
}
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v) // 按消费者的节奏处理
time.Sleep(100 * time.Millisecond) // 模拟慢消费
}
}
func Run() {
ch := make(chan int, 10) // 有界缓冲区
go producer(ch)
consumer(ch)
}

TypeScript:有界异步队列#

class BoundedQueue<T> {
private buffer: T[] = [];
private pushWaiters: Array<() => void> = [];
private pullWaiters: Array<(value: T) => void> = [];
constructor(private capacity: number) {}
async push(item: T): Promise<void> {
// 有消费者在等,直接传递
if (this.pullWaiters.length > 0) {
this.pullWaiters.shift()!(item);
return;
}
// 缓冲区满,等待消费者腾出空间
if (this.buffer.length >= this.capacity) {
await new Promise<void>((r) => this.pushWaiters.push(r));
}
this.buffer.push(item);
}
async pull(): Promise<T> {
if (this.buffer.length > 0) {
const item = this.buffer.shift()!;
// 唤醒一个等待的生产者
if (this.pushWaiters.length > 0) this.pushWaiters.shift()!();
return item;
}
// 缓冲区空,等待生产者
return new Promise<T>((r) => this.pullWaiters.push(r));
}
}

六、生产验证#

  • Node.js Streamswritable.js#L548-L585writeOrBuffer() 检查缓冲区是否超过 highWaterMark,超过则设置 kNeedDrain 标志,write() 返回 false,通知生产者暂停直到 drain 事件触发
  • Reactive StreamsSubscription.java#L14-L37request(long n) 是消费者显式从生产者请求 n 项的接口,是 RxJava Flowable、Project Reactor 和 Akka Streams 的基础
  • gRPC — HTTP/2 中的流控窗口实现了背压,防止发送方压垮接收方

七、小结#

何时使用:

  • 流处理——防止快速数据源压垮处理器
  • 微服务——保护下游服务免受过载
  • I/O 管道——磁盘读取快于网络写入(或反之)
  • 事件驱动系统——生产者触发事件快于处理器能处理的速度

何时不用:

  • 允许丢失——丢数据可以接受(指标、采样)时,直接丢弃无需阻塞
  • 同速系统——生产者和消费者以相同速度运行时,背压增加不必要的复杂度
  • 发射后不管——生产者不需要等待时,用无界队列加监控
  • 实时约束——阻塞生产者可能违反延迟 SLA

八、参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

背压(Backpressure)
https://blog.souloss.com/posts/programming/concurrency/concurrency-backpressure/
作者
Tsukimi
发布于
2026-06-13
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时