mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
3415 字
9 分钟
Event Sourcing 事件溯源(Event Sourcing)
2026-06-13

一、为什么需要事件溯源#

银行系统里,一个账户余额是 1000 元。你知道这 1000 元是怎么来的吗?可能是一笔 2000 元的存款减去一笔 1000 元的取款,也可能是十笔 100 元的存款,还可能是一笔转账加上利息。仅凭当前余额,你无法回答”这个状态是怎么来的”——因为传统系统只存当前状态,每次修改都覆盖旧值。

这带来的问题远不止”好奇心的遗憾”。设想一个场景:客户投诉说余额不对,声称自己从未取过那 1000 元。你翻遍数据库,只能看到当前余额是 1000 元,完全没有取款记录。是系统 bug 吗?是客户记错了吗?你无从判断。没有历史记录,调查无从下手,合规审计也无法完成。

再想一个场景:你上线了一个新的风控规则,想看看如果这条规则一直生效,过去一年的交易中会有多少笔被拦截。但你的系统只存了每笔交易的最终结果(成功或失败),没有记录决策过程中的中间状态。想回答这个”假设性问题”,只能从零开始重新导入原始数据——如果你还保留着原始数据的话。

传统持久化方式的核心问题是:状态变更的历史被覆盖后就丢失了。你只有当前这张快照,过去发生了什么、经历了哪些变更,一概不知。一旦需要回溯历史、审计追踪或重放事件,就会发现数据根本不够用。

二、现实类比#

会计的账本。会计师从来不只记一个”当前余额”——他们记录每一笔交易:1 月 3 日收入 5000 元,1 月 5 日支出 2000 元,1 月 10 日收入 3000 元……账户余额不是直接存储的,而是把所有交易加总后得到的。这就是复式记账法的精髓:每一笔变动都有记录,当前状态永远可以从历史推导出来。

如果税务局来查账,会计师不需要额外的审计日志——账本本身就是审计日志。如果对某个数字有疑问,顺着交易记录往回找,每一笔钱的来龙去脉都清清楚楚。这也意味着账本不能修改或删除历史记录,只能追加新的交易来纠正错误(比如用”红字冲销”来抵消一笔错误的收入)。

三、核心思想#

事件溯源的核心主张是:不存状态,存事件。状态不是持久化的对象,而是从事件流中推导出来的结果。用函数式编程的术语来说,状态就是对事件列表的 fold:

状态 = fold(applyEvent, 初始状态, 事件列表)

每次状态变更,不是直接修改当前状态,而是追加一个描述”发生了什么”的事件。需要当前状态时,从初始状态开始,依次回放所有事件,就能得到最新状态。

flowchart LR E1[事件 1\n账户已开户] --> E2[事件 2\n存入 2000] E2 --> E3[事件 3\n取出 1000] E3 --> E4[事件 4\n存入 500] subgraph 事件存储 E1 E2 E3 E4 end subgraph 状态重建 S0[余额: 0] --> S1[余额: 0\n→ 余额: 0] S1 --> S2[余额: 0\n→ 余额: 2000] S2 --> S3[余额: 2000\n→ 余额: 1000] S3 --> S4[余额: 1000\n→ 余额: 1500] end

三个核心概念:

  • 事件存储(Event Store):追加写入的日志,是系统的唯一事实来源。事件一旦写入就不可修改、不可删除——只能追加新事件来”纠正”旧事件
  • 聚合(Aggregate):领域实体,负责接收命令、决定是否产生事件、以及将事件应用到自身状态上。一个聚合的所有事件构成一条事件流
  • 投影(Projection):从事件流中构建的读取模型。同一个事件流可以投影出多种不同的视图,供不同场景使用

关键数据结构:

组件存储内容写入方式读取方式
事件存储不可变事件追加写入按流顺序读取
聚合状态可推导状态事件驱动更新从事件重放或快照加载
投影查询优化视图异步订阅事件直接查询

操作复杂度:

操作复杂度说明
追加事件O(1)顺序追加写入
重建状态O(n)n 为事件数量,需要回放所有事件
快照加载 + 增量回放O(k)k 为快照后的事件数量
投影查询O(1)查询预计算的读取模型

3.1 快照优化#

当事件数量增长到成千上万甚至更多时,每次都从第一个事件开始回放变得越来越慢。一个运行了三年的账户可能有几十万条事件,重建状态需要遍历每一条——这在请求链路上是不可接受的。

解决方案是定期做快照(Snapshot):在某个时间点把聚合的完整状态保存下来。下次重建状态时,先加载最近的快照,然后只回放快照之后的事件。这和 WAL 中的 checkpoint 思路一致——用空间换时间,用一次完整的持久化换取后续的快速恢复。

快照策略需要权衡频率:太频繁,写入快照的 I/O 开销大;太稀疏,回放事件的时间长。常见做法是每 N 个事件做一次快照,或者在事件数超过阈值时触发。快照本身不需要和事件存储保持强一致——如果快照损坏了,大不了从第一个事件重新回放,正确性不会受影响。

3.2 事件溯源 vs WAL#

事件溯源和预写日志(WAL)看起来很像——都是追加写入的日志,都通过重放来恢复状态。但它们的定位完全不同:

WAL 是基础设施层面的恢复机制。它对应用透明:数据库用 WAL 来保证崩溃后能恢复到一致状态,但应用层完全不知道 WAL 的存在。WAL 记录的是”要做什么”(操作意图),检查点之后旧日志可以被截断覆盖。WAL 是数据库的内部实现细节。

事件溯源是领域层面的建模方式。事件是业务语义的一等公民:AccountOpenedMoneyDepositedMoneyWithdrawn——这些事件本身就是业务语言。事件一旦产生就不可删除,它们是永久的业务记录,用于审计、合规、分析。事件溯源改变了你建模和思考领域逻辑的方式。

简单说:WAL 告诉数据库”怎么做”,事件溯源告诉业务”发生了什么”。WAL 的日志是可丢弃的基础设施数据,事件溯源的事件是不可丢弃的业务资产。

四、变体与对比#

模式存储内容写入方式事件可删除典型用途
事件溯源业务事件追加不可删除审计追踪、时态查询
WAL操作意图追加checkpoint 后截断崩溃恢复
CQRS + ES事件 + 读写分离视图追加 + 异步投影事件不可删除高读写比、复杂查询
状态持久化当前状态原地更新无事件可删简单 CRUD

CQRS(命令查询职责分离)经常和事件溯源搭配使用,但两者是独立模式。事件溯源只管”怎么存”,CQRS 管”怎么读”。没有事件溯源也可以用 CQRS——只是写入模型和读取模型各自优化,数据源仍是传统数据库。两者结合时,写入端产生事件,读取端通过投影构建优化的查询视图,各司其职。

状态持久化是最传统的方式:直接存当前状态,更新就是覆盖旧值。简单直接,但历史信息全部丢失。如果你的系统只需要当前状态,不需要回溯历史,状态持久化就是最合理的选择——别为了”可能有用”而引入事件溯源的复杂性。

五、多语言实现#

5.1 Go 实现#

package eventsourcing
import (
"fmt"
"time"
)
// ---- 事件定义 ----
type EventType string
const (
EventAccountOpened EventType = "AccountOpened"
EventMoneyDeposited EventType = "MoneyDeposited"
EventMoneyWithdrawn EventType = "MoneyWithdrawn"
)
type Event struct {
Type EventType
Timestamp time.Time
Data map[string]interface{}
}
// ---- 聚合:银行账户 ----
type BankAccount struct {
ID string
Owner string
Balance float64
Version int // 用于乐观并发控制
}
// ApplyEvent 将单个事件应用到聚合上
func (a *BankAccount) ApplyEvent(event Event) {
switch event.Type {
case EventAccountOpened:
a.ID = event.Data["id"].(string)
a.Owner = event.Data["owner"].(string)
a.Balance = 0
case EventMoneyDeposited:
a.Balance += event.Data["amount"].(float64)
case EventMoneyWithdrawn:
a.Balance -= event.Data["amount"].(float64)
}
a.Version++
}
// ReplayFromEvents 从事件流重建聚合状态
func ReplayFromEvents(events []Event) *BankAccount {
account := &BankAccount{}
for _, event := range events {
account.ApplyEvent(event)
}
return account
}
// ---- 事件存储 ----
type EventStore struct {
streams map[string][]Event // 聚合 ID → 事件流
}
func NewEventStore() *EventStore {
return &EventStore{
streams: make(map[string][]Event),
}
}
// Append 向指定聚合的事件流追加事件
func (s *EventStore) Append(aggregateID string, events []Event) error {
// 实际实现应检查乐观并发版本号
s.streams[aggregateID] = append(s.streams[aggregateID], events...)
return nil
}
// LoadEvents 加载指定聚合的全部事件
func (s *EventStore) LoadEvents(aggregateID string) ([]Event, error) {
events, ok := s.streams[aggregateID]
if !ok {
return nil, fmt.Errorf("aggregate %s not found", aggregateID)
}
return events, nil
}
// ---- 快照机制 ----
type Snapshot struct {
AggregateID string
Version int
State *BankAccount
}
type SnapshotStore struct {
snapshots map[string]Snapshot
}
func NewSnapshotStore() *SnapshotStore {
return &SnapshotStore{
snapshots: make(map[string]Snapshot),
}
}
// SaveSnapshot 保存聚合在某个版本的快照
func (ss *SnapshotStore) SaveSnapshot(snap Snapshot) {
ss.snapshots[snap.AggregateID] = snap
}
// LoadSnapshot 加载最近的快照
func (ss *SnapshotStore) LoadSnapshot(aggregateID string) (Snapshot, bool) {
snap, ok := ss.snapshots[aggregateID]
return snap, ok
}
// LoadWithSnapshot 用快照 + 增量事件重建状态
func LoadWithSnapshot(
snapStore *SnapshotStore,
eventStore *EventStore,
aggregateID string,
) (*BankAccount, error) {
// 先尝试加载快照
if snap, ok := snapStore.LoadSnapshot(aggregateID); ok {
account := snap.State
// 只回放快照之后的事件
allEvents, err := eventStore.LoadEvents(aggregateID)
if err != nil {
return nil, err
}
for _, event := range allEvents[snap.Version:] {
account.ApplyEvent(event)
}
return account, nil
}
// 没有快照,从头回放
events, err := eventStore.LoadEvents(aggregateID)
if err != nil {
return nil, err
}
return ReplayFromEvents(events), nil
}

Go 实现展示了事件溯源的三个关键部分:BankAccount 聚合通过 ApplyEvent 方法将事件应用到自身状态;EventStore 以追加写入的方式管理事件流;SnapshotStore 用快照优化长事件流的重建性能。LoadWithSnapshot 是实际生产中的典型读取路径——先尝试快照,再增量回放,避免每次都遍历全部历史事件。

有一个设计细节值得注意:Version 字段。它用于乐观并发控制——当两个请求同时想往同一个聚合追加事件时,版本号能检测到冲突。实际的事件存储(如 EventStoreDB)会在追加时检查期望版本号,如果不匹配就拒绝写入,强制客户端重新加载最新状态后重试。

5.2 TypeScript 实现#

// ---- 事件定义 ----
type EventType = "TaskCreated" | "TaskCompleted" | "TaskReopened";
interface Event {
type: EventType;
timestamp: Date;
data: Record<string, unknown>;
}
// ---- 聚合:任务 ----
class Task {
id = "";
title = "";
completed = false;
version = 0;
// 将事件应用到自身状态
applyEvent(event: Event): void {
switch (event.type) {
case "TaskCreated":
this.id = event.data.id as string;
this.title = event.data.title as string;
this.completed = false;
break;
case "TaskCompleted":
this.completed = true;
break;
case "TaskReopened":
this.completed = false;
break;
}
this.version++;
}
}
// ---- 事件存储 ----
class EventStore {
private streams = new Map<string, Event[]>();
// 追加事件到指定聚合的流
append(aggregateId: string, events: Event[]): void {
const existing = this.streams.get(aggregateId) ?? [];
this.streams.set(aggregateId, [...existing, ...events]);
}
// 加载指定聚合的全部事件
loadEvents(aggregateId: string): Event[] {
return this.streams.get(aggregateId) ?? [];
}
}
// ---- 事件回放与状态推导 ----
function replayFromEvents(events: Event[]): Task {
const task = new Task();
for (const event of events) {
task.applyEvent(event);
}
return task;
}
// ---- 使用示例 ----
const store = new EventStore();
const taskId = "task-001";
// 记录任务生命周期的事件
store.append(taskId, [
{
type: "TaskCreated",
timestamp: new Date("2026-01-10"),
data: { id: taskId, title: "实现事件溯源" },
},
{
type: "TaskCompleted",
timestamp: new Date("2026-01-15"),
data: {},
},
{
type: "TaskReopened",
timestamp: new Date("2026-01-16"),
data: { reason: "发现回归 bug" },
},
]);
// 从事件流重建状态——任何时候都可以得到一致的状态
const task = replayFromEvents(store.loadEvents(taskId));
console.log(task.title); // "实现事件溯源"
console.log(task.completed); // false(被重新打开了)
console.log(task.version); // 3(经历了 3 个事件)

TypeScript 版本用 Task 聚合演示了事件溯源在状态机场景下的应用。一个任务经历”创建 → 完成 → 重新打开”的生命周期,每个状态转换都是一个不可变事件。replayFromEvents 从事件流推导出当前状态——无论何时调用,结果都是确定的。

这个实现暴露了事件溯源的一个天然优势:时态查询几乎免费。如果你想看”1 月 15 日这个任务是什么状态”,只需要回放到第 2 个事件就知道了。传统系统要回答同样的问题,要么额外维护一张历史表,要么根本无法回答。

六、生产验证#

项目源码位置用途
EventStoreDBEventRecord 及存储层专用事件存储数据库。追加写入优化、乐观并发控制、内置投影引擎。金融和电商领域广泛使用
Apache KafkaLog segment 追加写入分布式事件流平台,常作为事件存储使用。KRaft 模式下不再依赖 ZooKeeper。事件保留策略支持按时间或大小过期
Apache Flink状态后端 + Checkpoint 机制流处理引擎,通过 Savepoint 实现事件回放和状态迁移。支持从任意时间点重新消费 Kafka 事件流

七、小结#

何时使用:

  • 审计追踪是硬性要求——金融、医疗、法律等领域的系统,监管要求每笔操作都可追溯。事件溯源的不可变事件流天然满足这一要求,不需要额外维护审计日志
  • 需要时态查询——“这个实体在某个时间点是什么状态”是核心业务问题。传统方式需要额外设计历史表,事件溯源则天然支持回放到任意时间点
  • 复杂领域逻辑——聚合的状态转换规则复杂,用事件来表达业务语义比用字段更新更清晰。事件本身就是领域语言,新人读事件流就能理解业务流程
  • 需要事件回放——修复 bug 后想重新计算状态,或者上线新功能后想用历史数据验证。只需要清空投影、回放事件流即可,无需数据迁移

何时不用:

  • 简单 CRUD——如果你的系统只是增删改查,没有复杂的业务规则,事件溯源引入的复杂性远大于收益。一个用户管理后台,何必用事件溯源?
  • 没有审计需求——如果没有人会问”这个状态是怎么来的”,维护不可变事件流就是浪费存储和计算
  • 团队不熟悉——事件溯源的学习曲线陡峭,调试困难(你不能直接看数据库就知道当前状态),CQRS 的引入更增加了系统复杂度。团队没有准备好,强行上只会制造混乱
  • 最终一致性不可接受——事件溯源 + CQRS 的读取模型是异步投影,存在延迟。如果业务要求写入后立刻读到最新状态,要么接受复杂的双写逻辑,要么选择其他方案

八、参考资料#

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Event Sourcing 事件溯源(Event Sourcing)
https://blog.souloss.com/posts/programming/system-patterns/system-patterns-event-sourcing/
作者
Tsukimi
发布于
2026-06-13
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时