1182 字
3 分钟
工作窃取(Work Stealing)
一、为什么需要工作窃取
你有一个 8 核的机器,跑了 8 个工作线程处理任务。任务分配很均匀——每个线程 100 个。但现实不是理想世界:有些任务 1ms 就完成了,有些要跑 100ms。很快,几个线程闲得发慌,另外几个忙得要死。空闲线程在等任务,忙碌线程在排长队——负载不均衡。
传统的解决方案是全局任务队列,所有线程从同一个队列取任务。但全局队列是竞争热点——8 个线程同时争抢一把锁,吞吐量上不去。
工作窃取换了一个思路:每个线程有自己的本地队列,空闲线程从忙碌线程的队列里偷任务。没有中央调度器,没有全局锁,负载自然均衡。Go 的 goroutine 调度器、Java 的 ForkJoinPool、Rust 的 Tokio 运行时——它们都用这套机制实现高效的并行调度。
二、现实类比
超市的收银团队。当一个收银员处理完自己的队列,就走到最忙的收银台,从队尾接走几个顾客。工作自然从超载的通道流向空闲的通道——不需要经理来指挥谁去帮谁。
三、核心思想
每个工作线程拥有一个本地双端队列(deque)。工作线程从自己队列的顶部 push/pop 任务(LIFO,利于缓存局部性)。当队列为空时,从其他线程的队列底部窃取任务(FIFO,保证窃取的是最粗粒度的任务)。
flowchart LR
subgraph W0["Worker 0 (忙碌)"]
D0["Task D ← pop<br/>Task C<br/>Task B<br/>Task A"]
end
subgraph W1["Worker 1 (空闲)"]
E1["(empty)"]
end
subgraph W2["Worker 2 (忙碌)"]
D2["Task G ← pop<br/>Task F"]
end
W1 -->|STEAL from bottom| W0
| 属性 | 值 |
|---|---|
| 自身 Push/Pop | O(1)——无需同步(单线程操作) |
| 窃取 | O(1)——对受害者队列底部做 CAS |
| 负载均衡 | 自动、去中心化 |
| 缓存局部性 | 高——自身任务 LIFO,窃取任务 FIFO |
四、变体与对比
| 模式 | 关系 | 区别 |
|---|---|---|
| 协作调度 | 互补关系 | 协作调度解决线程内的让出问题;工作窃取解决线程间的负载均衡 |
| 全局任务队列 | 工作窃取的替代方案 | 全局队列是竞争热点;工作窃取用本地队列避免争抢 |
| 静态分区 | 工作窃取的替代方案 | 静态分区简单但无法应对不均匀负载;工作窃取动态均衡 |
| 对象池 | 工作线程使用线程本地对象池 | 对象池避免分配争用;工作窃取避免调度争用 |
Note
为什么自身用 LIFO 而窃取用 FIFO?LIFO 弹出最近推入的任务,它很可能还在 CPU 缓存中——缓存局部性好。FIFO 窃取最旧的任务,它通常是分治中最早产生的、粒度最粗的子任务——窃取一个大任务比窃取多个小任务更划算,因为摊销了窃取开销。
五、多语言实现
Go:工作窃取调度器
type WorkStealingScheduler struct { queues [][]int}
func NewScheduler(workerCount int) *WorkStealingScheduler { queues := make([][]int, workerCount) for i := range queues { queues[i] = []int{} } return &WorkStealingScheduler{queues: queues}}
func (s *WorkStealingScheduler) Submit(task, workerIdx int) { s.queues[workerIdx] = append(s.queues[workerIdx], task)}
func (s *WorkStealingScheduler) Run(process func(int) int) []int { var results []int for { anyWork := false for w := 0; w < len(s.queues); w++ { if len(s.queues[w]) > 0 { anyWork = true // LIFO:从自己队列尾部取 last := len(s.queues[w]) - 1 task := s.queues[w][last] s.queues[w] = s.queues[w][:last] results = append(results, process(task)) } else { // 尝试从其他 worker 窃取 for other := 0; other < len(s.queues); other++ { if other != w && len(s.queues[other]) > 1 { anyWork = true // FIFO:从受害者队列头部窃取 stolen := s.queues[other][0] s.queues[other] = s.queues[other][1:] results = append(results, process(stolen)) break } } } } if !anyWork { break } } return results}TypeScript:工作窃取调度器
class WorkStealingScheduler { private queues: number[][];
constructor(workerCount: number) { this.queues = Array.from({ length: workerCount }, () => []); }
submit(task: number, workerIdx: number): void { this.queues[workerIdx].push(task); }
run(process: (task: number) => number): number[] { const results: number[] = []; let anyWork = true;
while (anyWork) { anyWork = false; for (let w = 0; w < this.queues.length; w++) { if (this.queues[w].length > 0) { anyWork = true; const task = this.queues[w].pop()!; // LIFO results.push(process(task)); } else { // 窃取:从其他 worker 的队列头部取 for (let other = 0; other < this.queues.length; other++) { if (other !== w && this.queues[other].length > 1) { anyWork = true; const stolen = this.queues[other].shift()!; // FIFO results.push(process(stolen)); break; } } } } } return results; }}六、生产验证
- Go Runtime — proc.go#L3836-L3903:
stealWork是 goroutine 调度器的窃取循环,随机顺序迭代所有 P,调用runqsteal从受害者 P 的本地运行队列中 CAS 抢占一半 goroutine - Tokio (Rust) — worker.rs#L1136-L1175:
Core::steal_work从随机索引开始迭代远程 worker,对窃取队列调用steal_into,仅在不到一半 worker 正在搜索时才尝试窃取 - Java ForkJoinPool —
scan方法实现随机化工作窃取,是 Java 并行流(parallel stream)的底层调度器
七、小结
何时使用:
- 并行运行时——goroutine 调度器(Go)、任务调度器(Tokio、ForkJoinPool)
- 分治算法——子任务大小不均匀的递归任务分解
- 不规则工作负载——任务持续时间不可预测
- NUMA 感知调度——本地工作耗尽后才从远端窃取
何时不用:
- 单线程——没有其他 worker 可窃取
- 均匀任务——静态分区更简单且同样有效
- 极短任务——窃取的 CAS 开销超过任务执行时间
- 严格有序——工作窃取会打乱 FIFO 顺序
八、参考资料
- Go 调度器设计 - Go 运行时工作窃取调度器设计文档
- Tokio 调度器 - Tokio 工作窃取调度器设计文章
- Java ForkJoinPool - Java 工作窃取线程池文档
- Cilk 工作窃取论文 - MIT Cilk 项目,工作窃取的先驱
- 无锁双端队列 - Chase and Lev, 2005, 无锁工作窃取双端队列算法
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时
相关文章 智能推荐






