1226 字
3 分钟
背压(Backpressure)
一、为什么需要背压
你的数据管道跑得好好的,上游每秒生产 1000 条消息,下游每秒消费 1000 条消息,完美平衡。突然,下游的数据库开始变慢——每秒只能处理 200 条了。上游不知道,继续往消息队列里灌数据。队列越来越长,内存越涨越高,最终 OOM 崩溃。整个管道瘫痪。
这就是没有背压的后果。背压是一种流控机制:当下游处理不过来时,反向通知上游减速或停止。没有背压,快速生产者会压垮慢速消费者,导致无限内存增长、消息丢失或系统崩溃。
背压不只是消息队列的问题。HTTP 服务调用下游 API、日志采集器往中心写数据、WebSocket 推送实时事件——任何存在速度不匹配的管道都需要背压。关键要素只有两个:有界缓冲区 + 满时的处理策略。
二、现实类比
服务员告诉厨房「慢点做,桌子都满了」。与其让菜堆在窗口变凉,服务员发信号让厨房降低出菜速度,等食客吃完再加速。消费者控制生产者的节奏——这就是背压。
三、核心思想
背压的核心是在生产者和消费者之间设置有界缓冲区,缓冲区满时按策略处理:
flowchart LR
P[生产者] -->|emit data| B[有界缓冲区<br/>capacity=5]
B -->|process data| C[消费者]
B -->|WAIT 缓冲区满| P
C -->|request n| B
| 策略 | 工作方式 | 适用场景 |
|---|---|---|
| 阻塞 | 生产者等待直到缓冲区有空间 | 金融交易、用户操作——数据不能丢 |
| 丢弃 | 缓冲满时丢弃最新/最旧项 | 监控指标、传感器——新鲜度优先 |
| 信号 | 消费者发送 request(n) 精确拉取 | Reactive Streams——精确流控 |
| 限流 | 限制生产者速率 | API 网关——保护下游 |
| 属性 | 值 |
|---|---|
| 信号开销 | O(1)——布尔标志或计数器检查 |
| 缓冲区边界 | 固定容量——防止内存无限增长 |
| 吞吐量 | 动态适应消费者速度 |
| 延迟权衡 | 负载下增加——生产者等待而非丢弃 |
四、变体与对比
| 模式 | 关系 | 区别 |
|---|---|---|
| 限流器 | 都控制流速 | 限流以固定速率限制(每秒 N 次);背压根据消费者实际能力动态调整 |
| 信号量 | 信号量可以实现背压 | 信号量是并发原语;背压是流控思想 |
| 环形缓冲区 | 有界环形缓冲区是实现背压的常见机制 | 环形缓冲区是数据结构;背压是流控策略 |
| 批处理 | 批处理平滑突发输入 | 批处理是优化手段;背压是保护机制 |
Note
背压和限流不是同一个东西。限流说的是「最多每秒 100 个请求」,即使消费者能处理 200 个。背压说的是「以消费者当前能处理的速度发送,不管那个速度是多少」。限流是策略,背压是反馈机制。
五、多语言实现
Go:有界 channel 天然提供背压
Go 的带缓冲 channel 在满时自动阻塞发送者,这正是「阻塞」式背压:
func producer(ch chan<- int) { for i := 0; ; i++ { ch <- i // 缓冲区满时自动阻塞 }}
func consumer(ch <-chan int) { for v := range ch { fmt.Println(v) // 按消费者的节奏处理 time.Sleep(100 * time.Millisecond) // 模拟慢消费 }}
func Run() { ch := make(chan int, 10) // 有界缓冲区 go producer(ch) consumer(ch)}TypeScript:有界异步队列
class BoundedQueue<T> { private buffer: T[] = []; private pushWaiters: Array<() => void> = []; private pullWaiters: Array<(value: T) => void> = [];
constructor(private capacity: number) {}
async push(item: T): Promise<void> { // 有消费者在等,直接传递 if (this.pullWaiters.length > 0) { this.pullWaiters.shift()!(item); return; } // 缓冲区满,等待消费者腾出空间 if (this.buffer.length >= this.capacity) { await new Promise<void>((r) => this.pushWaiters.push(r)); } this.buffer.push(item); }
async pull(): Promise<T> { if (this.buffer.length > 0) { const item = this.buffer.shift()!; // 唤醒一个等待的生产者 if (this.pushWaiters.length > 0) this.pushWaiters.shift()!(); return item; } // 缓冲区空,等待生产者 return new Promise<T>((r) => this.pullWaiters.push(r)); }}六、生产验证
- Node.js Streams — writable.js#L548-L585:
writeOrBuffer()检查缓冲区是否超过highWaterMark,超过则设置kNeedDrain标志,write()返回false,通知生产者暂停直到drain事件触发 - Reactive Streams — Subscription.java#L14-L37:
request(long n)是消费者显式从生产者请求 n 项的接口,是 RxJava Flowable、Project Reactor 和 Akka Streams 的基础 - gRPC — HTTP/2 中的流控窗口实现了背压,防止发送方压垮接收方
七、小结
何时使用:
- 流处理——防止快速数据源压垮处理器
- 微服务——保护下游服务免受过载
- I/O 管道——磁盘读取快于网络写入(或反之)
- 事件驱动系统——生产者触发事件快于处理器能处理的速度
何时不用:
- 允许丢失——丢数据可以接受(指标、采样)时,直接丢弃无需阻塞
- 同速系统——生产者和消费者以相同速度运行时,背压增加不必要的复杂度
- 发射后不管——生产者不需要等待时,用无界队列加监控
- 实时约束——阻塞生产者可能违反延迟 SLA
八、参考资料
- Reactive Streams 规范 - 背压感知的异步流处理标准
- Node.js Stream 背压 - Node.js 官方背压指南
- gRPC 流控 - HTTP/2 流控窗口机制
- RxJava Flowable 文档 - 背压感知的响应式流
- Kafka 生产者流控 -
buffer.memory和max.block.ms配置说明
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时
相关文章 智能推荐






