一、为什么需要观察者
你写了一个数据服务,当数据更新时需要刷新 UI、写日志、发通知。最直接的做法是在数据更新的函数末尾依次调用这三个函数。但很快需求来了:加一个缓存失效的处理。你得再改数据更新的代码。然后又要加一个埋点上报,再改一次。每次新增功能都要动核心逻辑,违反了开闭原则。
看一段典型的耦合代码:
func (s *DataService) Update(id string, data Data) error { // 核心逻辑:更新数据库 if err := s.db.Save(id, data); err != nil { return err }
// 下面全是"附带的"副作用,和核心逻辑无关 s.ui.Refresh(id) // 刷新 UI s.logger.Log("updated", id) // 写日志 s.notifier.Send(id) // 发通知 s.cache.Invalidate(id) // 失效缓存——后加的 s.tracker.Report(id) // 埋点上报——又后加的 return nil}每次加新功能都要改 Update 方法,而且 DataService 要引入越来越多不相关的依赖——它根本不需要知道 UI 怎么刷新、日志怎么写。更糟糕的是,这些副作用之间也可能互相影响:如果 notifier.Send 抛了异常,后面的 cache.Invalidate 就不会执行,缓存就脏了。
用观察者模式重构后:
func (s *DataService) Update(id string, data Data) error { if err := s.db.Save(id, data); err != nil { return err } // 只管发射事件,谁关心谁自己处理 s.emitter.Emit("data:updated", DataEvent{ID: id, Data: data}) return nil}
// 各处独立注册,互不干扰s.emitter.On("data:updated", func(data any) { s.ui.Refresh(data.(DataEvent).ID)})s.emitter.On("data:updated", func(data any) { s.logger.Log("updated", data.(DataEvent).ID)})核心逻辑只做一件事:数据更新后发射事件。谁需要响应、怎么响应,全部在各自模块里注册。新增功能不再动 Update 方法,只需要 On 一下就行。
更深层的问题是耦合。数据服务不应该知道 UI 怎么刷新、日志怎么写。它只需要做一件事:数据变了,告诉关心的人。观察者模式把「一变多应」的关系抽象出来:主题维护订阅者列表,状态变化时遍历列表逐个通知。发送方不知道谁在监听,监听方也不需要轮询检查变化。
这种解耦是观察者模式无处不在的原因:从 DOM 的 addEventListener 到 Redux 的 store.subscribe,从 Node.js 的 EventEmitter 到 React 的 useEffect 清理模式,核心思想完全一致。
不过,解耦也带来了一个容易忽视的问题:内存泄漏。主题持有观察者的引用,如果你注册了观察者却忘了取消订阅,垃圾回收器就无法回收它及闭包捕获的变量。在单页应用中这是最常见的内存泄漏来源——组件销毁了,但事件监听器还在。React 的 useEffect 要求返回清理函数,正是为了解决这个问题:
useEffect(() => { emitter.on("data", handler) return () => emitter.off("data", handler) // 组件卸载时取消订阅}, [])一个实用法则:每写一行 on,就立即写一行 off。很多库让 on 返回取消订阅函数,就是为了降低遗忘概率。
二、现实类比
报纸订阅。你订阅一次,每天早上报纸就送到门口。你不需要跑到报摊去查有没有新报纸——出版商主动推送给所有订户。不想看了?取消订阅,以后就不送了。
类比还可以再进一步:出版商不知道订户拿到报纸之后干什么——有人读头版,有人只看体育。出版商也不关心,它只管印和送。如果某个订户搬家了却忘了取消订阅,报纸还是会送到旧地址,这就对应了前面说的内存泄漏。
三、核心思想
观察者模式建立一对多的依赖关系:当主题状态改变时,所有注册的观察者自动收到通知。主题不知道观察者会做什么——只是调用它们的回调。
3.1 推模型与拉模型
观察者通知观察者的方式有两种风格:推模型和拉模型。
推模型中,主题主动把变化的数据传给观察者。好处是观察者不用再去查询;坏处是主题需要决定传什么,不同观察者关心的数据可能不一样,传少了不够用,传多了浪费。上面代码里 Emit("data:updated", DataEvent{...}) 就是推模型。
拉模型中,主题只发一个”我变了”的信号,观察者收到后自己去查询当前状态。好处是观察者按需获取;坏处是多了查询开销,而且两次查询之间状态可能又变了。经典 MVC 里 Model 通知 View 就偏拉模型——View 收到通知后重新读取 Model 数据来渲染。
Redux 的 subscribe 是拉模型——监听器需要自己调 store.getState() 获取最新状态;RxJS 的 next(value) 是推模型——值随通知直接推送。选择取决于场景:观察者只需要变化数据时推模型更直接,需要根据全局状态做复杂判断时拉模型更灵活。
3.2 错误隔离
一个关键问题:如果某个观察者抛出异常,其他观察者还能收到通知吗?
如果不做任何处理,答案是不能。大多数语言的实现是遍历监听器列表逐个调用,一旦某个回调抛异常,循环就中断了。这意味着一个写得不好的观察者可以阻断整条通知链——这和观察者模式”各观察者独立”的初衷矛盾。
生产级实现必须做错误隔离:对每个观察者的调用用 try-catch 包裹,确保单个观察者的异常不影响其他观察者。Node.js 的 EventEmitter 正是这样做的——如果监听器抛错,错误会被捕获并触发 uncaughtException,但不会阻断后续监听器的执行。上一节 Go 实现里提到的 recover 也是同样的道理。
3.3 同步与异步通知
同步通知:主题遍历观察者列表逐个调用,全部完成后才继续执行。优点是时序可预测——观察者收到通知时主题的状态还没再变。缺点是某个观察者执行慢会阻塞后续所有观察者。大多数实现(Node.js 的 EventEmitter、Go 回调)默认同步。
异步通知:回调放入事件队列,主题不等待就继续。优点是不阻塞主题;缺点是时序不可控——通知发出后主题状态可能已变,观察者拿到的可能是”旧”通知。
选择的关键:观察者是否依赖主题当前的状态快照? 如果是,同步更安全;如果只是做日志等无副作用操作,异步更高效。
关键操作复杂度:
| 属性 | 值 |
|---|---|
| 订阅 | O(1)——添加到监听器集合 |
| 取消订阅 | O(1)——从监听器集合移除 |
| 发射/通知 | O(n)——调用 n 个监听器 |
| 空间 | O(监听器数) 每事件类型 |
需要注意的一个陷阱:观察者的通知顺序通常不可靠。如果观察者 B 依赖观察者 A 的结果,那你就有了一个隐式耦合,而这正是观察者模式想要消除的。每个观察者应该是独立的。
四、变体与对比
| 模式 | 与观察者的关系 | 何时用这个而非观察者 |
|---|---|---|
| 事件循环 | 事件循环将事件分发给注册的观察者 | 需要跨线程/异步调度时,事件循环是观察者的运行基础设施 |
| 脏标记 | 观察者触发即时通知;脏标记延迟昂贵的反应 | 变化频繁但反应开销大时,脏标记把多次变更合并成一次更新 |
| 中间件/管道链 | 中间件观察并转换流经管道的数据 | 需要对数据做链式变换时,管道比观察者更可控 |
| Actor 模型 | 两者都解耦生产者和消费者——观察者通过回调,Actor 通过消息传递 | 需要并发安全时,Actor 的消息队列天然串行化,避免竞态 |
| Pub-Sub | Pub-Sub 是观察者的进阶变体,引入消息代理解耦发送方和接收方 | 发送方和接收方完全不在同一进程时,需要跨服务的事件路由 |
4.1 Pub-Sub:观察者的进阶变体
观察者模式中,主题和观察者直接通信——主题持有观察者的引用。Pub-Sub(发布-订阅)模式在两者之间加了一个消息代理(Message Broker),发送方和接收方互不知道对方的存在。发送方只管往代理发布消息,接收方只管从代理订阅感兴趣的主题。
这个中间层带来三个关键变化:
- 完全解耦:发布者不知道谁在订阅,甚至不知道有没有人订阅。Kafka、RabbitMQ 就是 Pub-Sub 的工业实现——生产者和消费者可以部署在不同机器上,用不同语言编写。
- 消息持久化:观察者离线就错过通知,Pub-Sub 的代理可以持久化消息,消费者上线后再消费。分布式系统几乎都用 Pub-Sub 而非裸观察者,这是主要原因。
- 过滤与路由:代理根据主题、内容过滤,只推送给匹配的订阅者。观察者模式要实现同样效果,需要在主题里写过滤逻辑,又把耦合引回来了。
Pub-Sub 的代价是复杂度:维护消息代理,处理消息重复、顺序、延迟等分布式问题。单进程内用观察者就够了,跨进程、跨服务才需要 Pub-Sub。
观察者 vs 轮询:观察者是推送模型,状态变化时主动通知;轮询是拉取模型,消费者定期检查。推送实时性好但可能产生通知风暴,拉取可控但延迟高。
五、多语言实现
5.1 Go 实现
type Listener func(data any)
type EventEmitter struct { listeners map[string][]Listener}
func NewEmitter() *EventEmitter { return &EventEmitter{listeners: make(map[string][]Listener)}}
// On 注册监听器,返回取消订阅的函数func (e *EventEmitter) On(event string, listener Listener) func() { e.listeners[event] = append(e.listeners[event], listener) return func() { // 从切片中移除该监听器 ls := e.listeners[event] for i, l := range ls { if &l == &listener { e.listeners[event] = append(ls[:i], ls[i+1:]...) return } } }}
// Emit 通知所有订阅了该事件的监听器func (e *EventEmitter) Emit(event string, data any) { for _, listener := range e.listeners[event] { // 错误隔离:防止单个监听器的 panic 中断整个通知链 func() { defer func() { if r := recover(); r != nil { log.Printf("监听器异常: %v", r) } }() listener(data) }() }}Go 实现用切片存储监听器,On 返回闭包用于取消订阅。两个设计选择值得说明:
切片而非 map:切片保持注册顺序,map 删除 O(1) 但切片删除需遍历。取消订阅频繁时可用 map 或带索引的 slice。Go 没有泛型 Set,这里用最简单的切片。
Emit 中的错误隔离:匿名函数 + defer recover 包裹每个监听器调用。没有这个保护,一个监听器的 panic 会打断整个 Emit,后续监听器全部收不到通知。这个模式在 Go 的 HTTP 中间件里也常见——每个 handler 都要 recover,不能让一个请求的 panic 搞挂整个服务。
5.2 TypeScript 实现
type Listener<T> = (data: T) => void;
class EventEmitter<Events extends Record<string, unknown>> { private listeners = new Map<keyof Events, Set<Listener<any>>>();
// 订阅事件,返回取消订阅函数 on<K extends keyof Events>(event: K, listener: Listener<Events[K]>): () => void { if (!this.listeners.has(event)) { this.listeners.set(event, new Set()); } this.listeners.get(event)!.add(listener); return () => this.off(event, listener); }
off<K extends keyof Events>(event: K, listener: Events[K]): void { this.listeners.get(event)?.delete(listener); }
// 通知所有订阅者,带错误隔离 emit<K extends keyof Events>(event: K, data: Events[K]): void { this.listeners.get(event)?.forEach((listener) => { try { listener(data); } catch (err) { console.error(`监听器异常 [${String(event)}]:`, err); } }); }}TypeScript 实现用 Set 存储监听器,天然去重且删除高效。泛型参数 Events 保证事件名和数据类型的对应关系——编译期就能发现传错事件名的问题。
emit 做了错误隔离:try-catch 在每个迭代内部而非 forEach 外面——保证单个监听器异常不会中断遍历。这也是 Node.js EventEmitter 的做法。
on 返回取消订阅函数是实用的模式——调用者不需要保存 listener 引用再调 off,直接调用返回值即可。React 的 useEffect 清理函数、RxJS 的 Subscription.unsubscribe(),设计思路完全一致。
六、生产验证
- Node.js —— events.js#L456-L520
EventEmitter.prototype.emit遍历注册的监听器逐个调用。Node 在 emit 内部用try-catch包裹监听器调用,异常转发到uncaughtException但不中断后续监听器。Node 还设有maxListeners默认阈值(10),超过会打印警告——往往意味着忘了取消订阅导致监听器泄漏。 - Redux —— createStore.ts#L211-L280
subscribe()添加监听器,dispatch()执行 reducer 后调用所有监听器。Redux 做了精巧的快照保护:dispatch 前复制监听器数组,通知过程中的subscribe/unsubscribe不影响当前轮——新订阅者下一轮才收到通知,取消订阅者当前轮仍会收到。这避免了通知过程中列表被修改导致的跳过或重复调用。Redux 还禁止在监听器内部调用dispatch,防止递归通知导致状态不一致。 - Chromium —— EventTarget 是 Blink 中 DOM
addEventListener的实现。Chromium 同样做了快照保护:事件分派时先复制监听器列表,增删不影响当前轮次。Chromium 还区分捕获和冒泡阶段——这是观察者模式在 DOM 树上的分层应用,事件从根节点传播到目标节点再传回来,每一层都可以有观察者拦截。
三个项目的共同点值得注意:都做了错误隔离或快照保护。这不是巧合——没有这些保护的观察者实现在生产环境中一定会出问题。
七、小结
何时使用:
- 事件驱动系统——UI 事件、网络事件、消息队列。浏览器每次点击经过
addEventListener分发,Node.js 每个连接由EventEmitter驱动 - 模块解耦——插件、中间件、扩展点。Webpack 插件系统基于事件钩子(tapable),插件监听编译器生命周期事件来扩展功能
- 状态管理——Redux store、MobX observables、Vue reactivity。Vue 3 响应式系统本质上是精细化的观察者——每个响应式属性维护依赖集合,
set时触发更新 - 日志与监控——发射事件不关心谁在收集。Winston 等日志库内部用事件传递记录,传输层各自监听
何时不用:
- 同步管线——处理顺序和完成很重要时,直接调用函数比回调更可控。比如
parse → validate → transform → save,每步依赖上一步输出,用观察者反而引入不必要的间接性 - 事件风暴——太多事件交织难以调试。高频滚动事件每秒触发几十次,每个观察者再触发更新,性能很快崩溃——应该用 debounce 或 throttle 聚合
- 循环依赖——A 观察 B,B 观察 A,导致无限循环。应重构依赖关系或引入中间状态打破循环
- 强顺序保证——通知顺序不确定,需要严格顺序时用管道或中间件。如果必须保证”A 先于 B”,那 A 和 B 之间有数据依赖,应用显式函数调用
八、参考资料
- Node.js EventEmitter 文档 - Node.js 事件驱动架构的核心模块
- Redux createStore 源码 - 观察者模式在状态管理中的经典应用
- RxJS 官方文档 - 基于观察者模式的响应式流库
- Observer Pattern - Wikipedia - GoF 设计模式中观察者的经典定义
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






