分布式事务:方案选型与工程实践
一个电商下单操作要同时扣库存、扣余额、核销优惠券——任何一步失败,已执行的步骤都应该回滚。单机数据库用一个事务就搞定了,但当这三个操作分别在三个服务、三个数据库里执行时,问题就来了:本地事务的边界无法跨越网络。
这篇文章围绕一个贯穿始终的电商下单场景,从理论到六种工程方案再到生产级的对账机制,把分布式事务这件事从头理一遍。
从集中式到分布式
集中式系统的特点是一台主机承担所有计算和存储。早期银行系统、大型企业核心业务大多采用这种架构。优点是部署简单、无需考虑节点间协调,但单点故障意味着整个系统瘫痪,纵向扩展(加 CPU / 内存)有物理上限且成本指数增长。
分布式系统用多台普通计算机通过网络协作,对外表现得像一台机器。机器越多,资源越多,能处理的并发就越大。但分布式在解决容量问题的同时,也引入了几个根本性的新问题:
- 网络不可靠:延迟、丢包、分区随时可能发生
- 时钟不同步:不同机器的系统时钟存在偏差,无法依赖本地时间戳判定分布式事件的全局顺序
- 节点故障:集群越大,某台机器宕机的概率越高
- 数据一致性:请求只会落到其中一台机器上,处理不好就会产生不一致
Leslie Lamport(Paxos 发明者,2013 图灵奖得主)说过:"A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable." —— 一台你甚至不知道其存在的机器的故障,就可能让你的机器不可用。
一致性问题与理论基础
从 ACID 到分布式
单机数据库通过事务机制保证 ACID:
| 特性 | 含义 | 保障手段 |
|---|---|---|
| Atomicity | 操作要么全部成功,要么全部回滚 | undo log |
| Consistency | 数据从一个合法状态转到另一个合法状态 | 由 A、I、D 共同保证 |
| Isolation | 并发事务互不干扰 | 锁 + MVCC |
| Durability | 事务提交后数据不丢 | redo log + WAL |
在分布式系统中,数据分散在多台机器上,一个数据库事务无法覆盖跨网络的多个操作。用一段代码来说明问题的本质:
// 电商下单 —— 跨三个服务的操作
public OrderResult createOrder(OrderRequest request) {
// 步骤1:扣减库存(库存服务)
inventoryService.deduct(request.getSkuId(), request.getQuantity());
// 步骤2:扣减余额(账户服务)
accountService.deduct(request.getUserId(), request.getAmount());
// 步骤3:创建订单(本地数据库)
orderDao.insert(request.toOrder());
return OrderResult.success();
}
// 如果步骤2成功但步骤3失败 —— 钱扣了但没生成订单
CAP 定理
2000 年 Eric Brewer 提出 CAP 猜想,2002 年被正式证明:一个分布式系统最多同时满足以下三项中的两项。
| 属性 | 含义 |
|---|---|
| Consistency | 所有节点在同一时刻看到相同数据 |
| Availability | 每个请求都能在合理时间内收到非错误响应 |
| Partition tolerance | 网络分区发生时系统仍能运作 |
网络分区(P)是物理现实——光纤会被挖断、交换机会宕机。因此 CAP 的核心不是"三选二",而是在发生网络分区时,选一致性还是可用性:
| 策略 | 取舍 | 典型系统 |
|---|---|---|
| CP | 保一致性,牺牲部分可用性 | ZooKeeper、etcd、HBase |
| AP | 保可用性,允许短暂不一致 | Cassandra、DynamoDB、Eureka |
BASE 理论
BASE 是 CAP 中 AP 策略的工程延伸:即使无法做到强一致性,也可以通过适当方式达到最终一致性。
| 缩写 | 含义 |
|---|---|
| BA (Basically Available) | 出现故障时允许损失部分非核心功能,核心功能可用 |
| S (Soft State) | 允许数据存在中间状态,不同节点的副本暂时不一致 |
| E (Eventually Consistent) | 经过一段时间后,所有副本最终达到一致 |
在实际系统中,ACID 和 BASE 不是非此即彼——核心链路(资金)用 ACID,非核心链路(通知、积分)用 BASE。
从理论到工程:CAP/BASE 回答了"数据可以不一致到什么程度",接下来的事务方案回答"工程上如何逼近我们选择的一致性级别"。2PC/3PC 追求强一致性,TCC、Saga、本地消息表、事务消息都是用不同手段逼近最终一致性——区别在于一致性、性能和实现复杂度之间各自做了什么取舍。
刚性事务:2PC
两阶段提交原理
2PC(Two-Phase Commit)是最经典的分布式事务协议,核心思想是先投票,再执行。它引入一个协调者(Coordinator)来统一决策,每个参与者只知道自己的事务结果,协调者负责汇总并做出全局决定。
第一阶段:Prepare(投票) —— 协调者向所有参与者发送 Prepare 请求。每个参与者执行本地事务,写入 redo/undo 日志,但不提交,然后向协调者报告 Yes 或 No。
第二阶段:Commit / Rollback —— 如果所有参与者都返回 Yes,协调者发送 Commit,各参与者正式提交事务并释放锁;如果任一参与者返回 No 或超时,协调者发送 Rollback,各参与者利用 undo 日志回滚。
XA 规范与 Java 实现
X/Open 组织定义的 DTP 模型是 2PC 的工业标准实现,XA 是事务管理器(TM)与资源管理器(RM)之间的接口规范,定义了 xa_start、xa_end、xa_prepare、xa_commit、xa_rollback 等接口。
// 使用 JTA 实现 2PC —— 跨两个数据库的转账
public class XATransferService {
public void transfer(long userId, BigDecimal amount) throws Exception {
UserTransaction utx = (UserTransaction)
ctx.lookup("java:comp/UserTransaction");
// XA 数据源:两个独立的数据库
Connection connAccount = xaDataSourceA.getConnection(); // 账户库
Connection connPoints = xaDataSourceB.getConnection(); // 积分库
try {
utx.begin(); // 开启全局事务(TM 开始协调)
// 操作 DB-A:扣减账户余额
PreparedStatement psA = connAccount.prepareStatement(
"UPDATE account SET balance = balance - ? WHERE user_id = ?");
psA.setBigDecimal(1, amount);
psA.setLong(2, userId);
psA.executeUpdate();
// 操作 DB-B:增加积分
PreparedStatement psB = connPoints.prepareStatement(
"UPDATE points SET total = total + ? WHERE user_id = ?");
psB.setInt(1, amount.intValue());
psB.setLong(2, userId);
psB.executeUpdate();
// 两阶段提交:TM 先向两个 RM 发 Prepare,
// 都返回 Yes 后再发 Commit
utx.commit();
} catch (Exception e) {
utx.rollback(); // 两个数据库一起回滚
throw e;
}
}
}
2PC 的问题
同步阻塞:从 Prepare 阶段开始,参与者就持有锁资源,一直到 Commit 阶段完成才释放。在高并发场景下,这种长时间持锁严重影响吞吐量。
单点故障:协调者宕机后,参与者会一直阻塞。尤其在第二阶段,如果协调者发故障,所有参与者都处于锁定状态。新选举的协调者不知道上一个协调者宕机前做了什么决定。
数据不一致:协调者发 Commit 过程中发生局部网络异常或自身宕机,导致只有部分参与者收到 Commit 并提交,其余参与者仍在等待——数据不一致。
极端场景无解:协调者发出 Commit 后宕机,且唯一收到 Commit 的参与者也宕机。即使选出新协调者,这条事务的状态也无法确定。
刚性事务:3PC
3PC 对 2PC 的改进
3PC 针对 2PC 的阻塞问题做了两个核心改动:
- 参与者也引入超时机制:2PC 中只有协调者有超时,参与者等待协调者指令时会无限阻塞。3PC 的参与者在超时后可以自行决定,避免无限阻塞。
- 增加 CanCommit 阶段:在 Prepare 和 Commit 之间插入一个轻量级询问阶段,保证进入最终提交阶段之前各节点状态一致。
三个阶段详解
阶段一:CanCommit(轻量询问) —— 协调者问参与者"能提交吗?"。参与者检查资源、权限等条件,返回 Yes 或 No。注意:此阶段不执行任何事务操作,这是与 2PC 准备阶段的关键区别。
阶段二:PreCommit(预执行) —— 所有参与者返回 Yes 后,协调者发送 PreCommit。参与者执行事务操作,写入 redo/undo 日志,但不提交。如果任一参与者返回 No 或超时,协调者发送 Abort。
阶段三:DoCommit(正式提交) —— 协调者收到所有 ACK 后,发送 DoCommit。参与者正式提交事务并释放锁。
3PC 协调者与参与者伪代码
// 3PC 协调者伪代码
public class ThreePhaseCoordinator {
private List<Participant> participants;
private static final long TIMEOUT_MS = 5000;
public boolean execute() {
// ===== Phase 1: CanCommit =====
// 轻量询问,参与者不执行任何事务操作
for (Participant p : participants) {
CompletableFuture<Boolean> vote = p.canCommit();
Boolean result = vote.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (result == null || !result) {
abortAll();
return false;
}
}
// ===== Phase 2: PreCommit =====
// 参与者执行事务但不提交,写入 redo/undo 日志
for (Participant p : participants) {
CompletableFuture<Boolean> ack = p.preCommit();
Boolean result = ack.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (result == null || !result) {
abortAll(); // 通知所有参与者中断
return false;
}
}
// ===== Phase 3: DoCommit =====
// 正式提交
for (Participant p : participants) {
try {
p.doCommit().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// 参与者超时 —— 参与者端会默认提交
log.warn("participant {} doCommit timeout, "
+ "participant will auto-commit", p.getId());
}
}
return true;
}
private void abortAll() {
for (Participant p : participants) {
try { p.abort(); } catch (Exception ignored) {}
}
}
}
// 3PC 参与者伪代码
public class ThreePhaseParticipant {
private static final long TIMEOUT_MS = 5000;
private TransactionState state = TransactionState.INIT;
// Phase 1: 只做资源检查,不执行事务
public boolean canCommit() {
boolean resourceReady = checkDiskSpace()
&& checkConnectionPool()
&& checkBusinessConstraints();
state = resourceReady
? TransactionState.CAN_COMMIT_YES
: TransactionState.ABORTED;
return resourceReady;
}
// Phase 2: 执行事务但不提交
public boolean preCommit() {
try {
beginLocalTransaction();
executeBusinessLogic(); // 写入 redo/undo 日志
state = TransactionState.PRE_COMMITTED;
return true;
} catch (Exception e) {
rollbackLocalTransaction();
state = TransactionState.ABORTED;
return false;
}
}
// Phase 3: 正式提交
public void doCommit() {
commitLocalTransaction();
releaseResources();
state = TransactionState.COMMITTED;
}
// 超时处理 —— 这是 3PC 的关键设计
public void onPhase3Timeout() {
if (state == TransactionState.PRE_COMMITTED) {
// 超时默认提交:因为能进入 Phase 3,
// 说明 Phase 1 所有人都投了 Yes,大概率应该提交
doCommit();
log.warn("Phase 3 timeout, auto-committed based on "
+ "probability reasoning");
}
}
public void abort() {
if (state == TransactionState.PRE_COMMITTED) {
rollbackLocalTransaction();
}
releaseResources();
state = TransactionState.ABORTED;
}
}
超时默认提交的推理
3PC 最关键的设计决策是:参与者在第三阶段等待超时后,默认提交而非回滚。
推理逻辑:当参与者进入第三阶段,说明它在第二阶段收到了 PreCommit 请求。协调者产生 PreCommit 的前提是——第一阶段所有参与者都返回了 Yes。换句话说,一旦参与者收到了 PreCommit,就意味着它知道所有参与者都同意了。所以当第三阶段超时时,成功提交的概率远大于需要回滚的概率。
这是一个基于概率的工程妥协,而不是一个保证正确的协议设计。
2PC vs 3PC 对比
| 维度 | 2PC | 3PC |
|---|---|---|
| 阶段数 | 2(Prepare + Commit) | 3(CanCommit + PreCommit + DoCommit) |
| 超时机制 | 仅协调者 | 协调者和参与者都有 |
| 阻塞风险 | 高(协调者宕机导致参与者永久阻塞) | 低(参与者超时后默认提交) |
| 第一阶段开销 | 重(参与者执行事务并持锁) | 轻(仅检查资源条件,不持锁) |
| 一致性 | 可能不一致(部分提交) | 仍可能不一致(超时默认提交) |
| 网络开销 | 2 轮通信 | 3 轮通信 |
| 工程使用 | 广泛(XA 规范) | 理论意义大于实践,直接使用极少 |
无论 2PC 还是 3PC 都无法彻底解决分布式一致性问题。Google Chubby 的作者 Mike Burrows 说过:"there is only one consensus protocol, and that's Paxos"。但在工程实践中,我们更多使用的是下面的柔性事务方案。
柔性事务:TCC
原理
TCC(Try-Confirm-Cancel)将每个业务操作拆分为三步:
| 阶段 | 职责 | 电商场景示例 |
|---|---|---|
| Try | 资源预留 | 冻结 10 件库存、冻结 100 元余额 |
| Confirm | 确认执行(幂等) | 将冻结的库存正式扣减 |
| Cancel | 取消释放(幂等) | 将冻结的库存释放回去 |
所有服务的 Try 都成功后,统一执行 Confirm;任一 Try 失败,统一执行 Cancel 回滚已预留的资源。
代码实现
// 库存服务的 TCC 实现 —— 注意字段设计:available + frozen
public class InventoryTccService {
// Try:冻结库存,不真正扣减
public boolean tryDeduct(String txId, String skuId, int quantity) {
// 幂等检查:同一事务 ID 不重复冻结
if (txLogDao.exists(txId, "TRY")) return true;
int updated = jdbcTemplate.update(
"UPDATE inventory SET available = available - ?, "
+ "frozen = frozen + ? "
+ "WHERE sku_id = ? AND available >= ?",
quantity, quantity, skuId, quantity);
if (updated > 0) {
txLogDao.insert(txId, "TRY", skuId, quantity);
}
return updated > 0;
}
// Confirm:将冻结的库存正式扣减(幂等)
public boolean confirm(String txId, String skuId, int quantity) {
if (txLogDao.exists(txId, "CONFIRM")) return true; // 幂等
jdbcTemplate.update(
"UPDATE inventory SET frozen = frozen - ? "
+ "WHERE sku_id = ? AND frozen >= ?",
quantity, skuId, quantity);
txLogDao.insert(txId, "CONFIRM", skuId, quantity);
return true;
}
// Cancel:释放冻结的库存(幂等)
public boolean cancel(String txId, String skuId, int quantity) {
if (txLogDao.exists(txId, "CANCEL")) return true; // 幂等
// 空回滚检查:如果 Try 没有执行过,直接返回
if (!txLogDao.exists(txId, "TRY")) {
txLogDao.insert(txId, "CANCEL", skuId, 0);
return true;
}
jdbcTemplate.update(
"UPDATE inventory SET available = available + ?, "
+ "frozen = frozen - ? "
+ "WHERE sku_id = ? AND frozen >= ?",
quantity, quantity, skuId, quantity);
txLogDao.insert(txId, "CANCEL", skuId, quantity);
return true;
}
}
幂等性设计要点
TCC 的三个核心难题都体现在上面的代码中:
幂等性:网络重试可能导致 Confirm / Cancel 被重复调用。通过事务日志表(txId + phase)去重,同一事务的同一阶段只执行一次。
空回滚:Try 因网络超时未到达参与者,TCC 框架可能直接调 Cancel。Cancel 需要检查 Try 是否执行过,如果没有则记录日志后直接返回。
防悬挂:Cancel 执行后,迟到的 Try 不能再执行。Try 执行前应检查 Cancel 日志是否已存在,如果存在则拒绝执行。
柔性事务:Saga
原理
Saga 将一个长事务拆分为一系列本地事务,每个本地事务都有对应的补偿操作。正向执行 T1 -> T2 -> T3,如果 T3 失败,反向执行补偿 C2 -> C1。
Saga 有两种实现方式:
编排式(Choreography):每个服务完成本地事务后发布事件,下游服务监听事件执行自己的操作。去中心化,服务之间松耦合,但流程难以追踪,跨服务的事务状态散落在各处。
协调式(Orchestration):引入一个 Saga 协调器(通常基于状态机),集中控制每个步骤的执行和补偿。流程清晰,便于监控和调试,但协调器是单点逻辑。
协调式 Saga 代码实现
// Saga 协调器 —— 基于状态机的实现
public class OrderSagaOrchestrator {
private final List<SagaStep> steps = new ArrayList<>();
public OrderSagaOrchestrator() {
// 定义每个步骤的正向操作和补偿操作
steps.add(new SagaStep("创建订单",
req -> orderService.create(req),
req -> orderService.cancel(req)));
steps.add(new SagaStep("扣减库存",
req -> inventoryService.deduct(req),
req -> inventoryService.restore(req)));
steps.add(new SagaStep("执行支付",
req -> paymentService.charge(req),
req -> paymentService.refund(req)));
}
public SagaResult execute(OrderRequest request) {
List<SagaStep> completedSteps = new ArrayList<>();
for (SagaStep step : steps) {
try {
step.forward(request);
completedSteps.add(step);
// 持久化状态,便于宕机恢复
sagaLog.record(request.getSagaId(),
step.getName(), StepStatus.COMPLETED);
} catch (Exception e) {
sagaLog.record(request.getSagaId(),
step.getName(), StepStatus.FAILED);
// 反向补偿已完成的步骤
compensate(completedSteps, request);
return SagaResult.rollback(step.getName(), e);
}
}
return SagaResult.success();
}
private void compensate(List<SagaStep> completed,
OrderRequest request) {
// 从后往前补偿
Collections.reverse(completed);
for (SagaStep step : completed) {
try {
step.compensate(request);
sagaLog.record(request.getSagaId(),
step.getName(), StepStatus.COMPENSATED);
} catch (Exception e) {
// 补偿失败 —— 记录日志,交给对账或人工处理
sagaLog.record(request.getSagaId(),
step.getName(), StepStatus.COMPENSATE_FAILED);
log.error("compensation failed for step: {}",
step.getName(), e);
}
}
}
}
补偿事务设计原则
- 补偿操作必须幂等:和 TCC 的 Cancel 一样,网络重试可能导致重复调用
- 补偿是"语义回滚"而非"物理撤销":比如订单已创建,补偿不是 DELETE,而是将状态改为 CANCELLED
- 补偿可能失败:需要有重试机制和最终的人工兜底
- Saga 没有隔离性:中间状态对外可见。用户可能看到订单状态短暂变为"已创建"然后变为"已取消"
柔性事务:本地消息表
原理与架构
通过本地数据库事务保证"业务操作"和"消息写入"的原子性,再通过异步消息驱动下游操作,最终达到一致。
核心思路:业务表和消息表在同一个数据库里,用一个本地事务同时写入。然后定时任务扫描消息表中未发送的消息,投递到 MQ。下游消费并执行业务操作后回调确认。
代码实现
// 消息表 DDL
// CREATE TABLE local_message (
// id BIGINT AUTO_INCREMENT PRIMARY KEY,
// msg_id VARCHAR(64) UNIQUE NOT NULL,
// topic VARCHAR(128) NOT NULL,
// body TEXT NOT NULL,
// status VARCHAR(16) NOT NULL DEFAULT 'PENDING',
// retry_count INT NOT NULL DEFAULT 0,
// created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
// updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
// );
// 上游服务:业务操作 + 消息写入在同一个本地事务中
@Transactional
public void createOrder(OrderRequest request) {
// 1. 执行业务操作
Order order = request.toOrder();
orderDao.insert(order);
// 2. 写入消息表(同一个数据库,同一个事务)
localMessageDao.insert(new LocalMessage(
UUID.randomUUID().toString(),
"ORDER_CREATED",
JsonUtils.toJson(order),
"PENDING"
));
// 事务提交后,两条记录要么都写入,要么都不写入
}
// 定时任务:扫描并发送未处理的消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> messages =
localMessageDao.queryByStatus("PENDING", 100);
for (LocalMessage msg : messages) {
try {
mqProducer.send(msg.getTopic(), msg.getBody());
localMessageDao.updateStatus(msg.getId(), "SENT");
} catch (Exception e) {
// 递增重试计数,超过阈值标记为 FAILED 等待人工处理
int retryCount = msg.getRetryCount() + 1;
if (retryCount > MAX_RETRY) {
localMessageDao.updateStatus(msg.getId(), "FAILED");
} else {
localMessageDao.incrementRetry(msg.getId());
}
}
}
}
// 下游消费者:幂等执行
@RocketMQMessageListener(topic = "ORDER_CREATED")
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
// 幂等检查
if (consumeLogDao.exists(msgId)) return;
Order order = JsonUtils.fromJson(
new String(message.getBody()), Order.class);
// 执行下游业务(如通知仓储发货)
warehouseService.prepareShipment(order);
consumeLogDao.insert(msgId);
}
本地消息表与 Saga 的本质关系
很多人把本地消息表和 Saga 当作两个平级的方案来比较,但实际上它们的关系是:
本地消息表是 Saga 编排模式的一种具体实现技术。(Choreography) 两者的对应关系如下:
| 概念层次 | Saga (模式) | 本地消息表 (技术) |
|---|---|---|
| 定位 | 设计模式/架构模式 | 实现技术/工程手段 |
| 核心思想 | 将长事务拆分为多个本地事务 + 补偿 | 用本地事务保证"业务 + 消息"原子性 |
| 服务间通信 | 事件/消息 (choreography) 或指令 (orchestration) | 事件/消息 (通过消息表 + MQ) |
| 事件日志 | Saga 需要某种事件日志来追踪进度 | 消息表本身就是事件日志 |
| 实现方式 | 可以用编排或协调 | 本质上是编排模式的一种实现 |
Saga 是一个模式(Pattern),它告诉你"把长事务拆成多步,每步配一个补偿操作"。但 Saga 没有告诉你事件怎么可靠传递。本地消息表解决的正是这个问题——通过数据库事务保证事件(消息)一定被记录,再通过轮询保证事件一定被发送。
换个角度说:如果你用本地消息表实现了多个服务之间的事件驱动协作,并且每个服务都有对应的补偿逻辑,那你其实就是在实现一个 Saga。
柔性事务:事务消息
RocketMQ 半消息机制
RocketMQ 原生支持事务消息,相当于将本地消息表的能力下沉到消息中间件——不再需要自己维护消息表和定时轮询。
核心流程:
- Producer 发送半消息(Half Message)到 RocketMQ,此时消息对 Consumer 不可见
- RocketMQ 返回半消息发送成功
- Producer 执行本地事务(如写数据库)
- 根据本地事务结果:成功发 Commit(消息变为可见),失败发 Rollback(删除半消息)
- 如果 RocketMQ 迟迟没收到 Commit/Rollback,会主动回查 Producer 的本地事务状态
代码实现
// RocketMQ 事务消息 Producer
public class OrderTransactionProducer {
private TransactionMQProducer producer;
public void sendOrderMessage(OrderRequest request) {
Message msg = new Message(
"ORDER_TOPIC",
JsonUtils.toJson(request).getBytes());
// 设置业务 key,便于回查
msg.setKeys(request.getOrderId());
producer.sendMessageInTransaction(msg,
new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
try {
// 执行本地事务
orderService.createOrder(request);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(
MessageExt msg) {
// 回查:检查本地事务是否已执行成功
Order order = orderDao.queryByOrderId(
request.getOrderId());
if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE;
}
// 返回 UNKNOW,RocketMQ 会稍后再次回查
return LocalTransactionState.UNKNOW;
}
}, null);
}
}
消息中间件对比:RocketMQ vs Kafka vs Pulsar
事务消息不是只有 RocketMQ 能做。但三种主流消息中间件的实现路径截然不同:
| 维度 | RocketMQ | Kafka | Pulsar |
|---|---|---|---|
| 事务消息支持 | 原生半消息 + 回查机制 | Kafka Transactions(不是半消息) | 原生事务消息(类似 Kafka) |
| 实现原理 | 半消息暂存在内部 Topic,Commit 后转移到目标 Topic | 原子写入多个 Topic/Partition,配合 exactly-once 语义 | 事务日志 + 原子批量提交 |
| 本地事务协调 | 内置:回查机制自动探测本地事务状态 | 不内置:需要配合 Outbox Pattern 或 CDC 自行保证 | 不内置:需要应用层自行保证 |
| 适用模式 | 直接替代本地消息表 | Producer 端 exactly-once + Outbox Pattern | Producer 端 exactly-once + Outbox Pattern |
| 回查能力 | 有:Broker 主动回查 Producer | 无 | 无 |
| 消费端语义 | at-least-once(需消费者幂等) | exactly-once(配合 consume-transform-produce) | effectively-once(配合去重) |
为什么 RocketMQ 的半消息机制是独特的?
RocketMQ 是唯一在 Broker 层面内置了"本地事务 + 消息发送"原子性保证的消息中间件。它的半消息对 Consumer 不可见,Broker 会主动回查 Producer 的事务状态——这意味着应用层不需要维护消息表、不需要写定时任务,中间件替你做了。
什么时候 Kafka 的方案更好?
Kafka Transactions 解决的是另一个问题:消息链路内部的 exactly-once(consume-transform-produce)。如果你的场景是流处理管道(读 Topic A -> 处理 -> 写 Topic B),Kafka Transactions 是更自然的选择。但如果你要保证"数据库写入 + 消息发送"的原子性,Kafka 本身不提供半消息机制,需要配合 Outbox Pattern:
// Kafka + Outbox Pattern 伪代码
@Transactional
public void createOrderWithOutbox(OrderRequest request) {
// 同一个本地事务
orderDao.insert(request.toOrder());
outboxDao.insert(new OutboxEvent(
"ORDER_CREATED", JsonUtils.toJson(request)));
}
// Debezium CDC 监听 outbox 表的变更,自动发送到 Kafka
// 或者用定时任务轮询 outbox 表(和本地消息表一样)
选择建议:如果团队已经在用 RocketMQ,直接用事务消息最省事。如果用 Kafka,走 Outbox + CDC(Debezium)是成熟路径。Pulsar 在事务消息方面还相对年轻,生产案例较少。
最大努力通知
最大努力通知是最简单的最终一致性方案:上游系统尽最大努力通知下游,失败则按递增间隔重试,超过最大次数后放弃,记录日志等待人工处理或下游主动查询。
典型场景是支付回调。支付宝、微信支付完成扣款后,会多次回调商户通知地址。如果商户系统一直没有返回成功,支付平台按递增间隔重试(1s、5s、30s、5min、30min),超过最大次数后停止。商户可以主动调用支付查询接口获取最终结果。
最大努力通知和事务消息的区别在于:事务消息保证上游事务和消息发送的原子性,最大努力通知不保证——它只负责"尽力通知",一致性的最终兜底依赖下游的主动查询。
对账机制
前面介绍的所有方案都在努力保证一致性,但在生产环境中,没有任何方案能做到 100% 不出问题。网络抖动、服务重启、消息丢失、Bug——总有意外发生。对账(Reconciliation)就是最后一道防线:定期比对多方数据,发现并修复不一致。
为什么需要对账
用电商下单场景来说:
- 订单服务记录"订单已创建,金额 100 元"
- 支付服务记录"已扣款 100 元"
- 第三方支付渠道(支付宝/微信)记录"已收款 100 元"
这三方的数据理论上应该完全一致。但如果中间环节出了问题(比如支付回调丢失、消息消费失败),就会出现不一致。对账就是把三方数据拿出来比一比,找出差异。
对账架构
三种对账策略
定时快照对账(T+1):每天凌晨,各系统导出前一天的数据快照(通常是 CSV/Parquet 文件),对账系统逐条比对。这是最常见也最成熟的方案,适合对实时性要求不高的场景。
事件驱动实时对账:每笔交易完成后,相关服务发送对账事件到对账系统。对账系统在收到所有参与方的事件后进行比对。延迟低(秒级),但实现复杂度高。
三方交叉对账:不仅比对内部系统之间的数据,还和外部第三方(支付渠道、银行)的清算文件比对。这是资金类业务的标准做法。
对账代码实现
// 对账引擎核心逻辑
public class ReconciliationEngine {
// T+1 定时对账:比对订单和支付记录
@Scheduled(cron = "0 30 1 * * ?") // 每天凌晨 1:30
public void dailyReconciliation() {
LocalDate yesterday = LocalDate.now().minusDays(1);
// 1. 拉取各方数据
Map<String, OrderRecord> orderRecords =
orderService.getRecordsByDate(yesterday)
.stream()
.collect(Collectors.toMap(
OrderRecord::getOrderId, r -> r));
Map<String, PaymentRecord> paymentRecords =
paymentService.getRecordsByDate(yesterday)
.stream()
.collect(Collectors.toMap(
PaymentRecord::getOrderId, r -> r));
// 第三方渠道的清算文件(通常是 SFTP 下载 CSV)
Map<String, ChannelRecord> channelRecords =
channelFileParser.parse(yesterday);
// 2. 三方交叉比对
Set<String> allOrderIds = new HashSet<>();
allOrderIds.addAll(orderRecords.keySet());
allOrderIds.addAll(paymentRecords.keySet());
allOrderIds.addAll(channelRecords.keySet());
List<DiscrepancyRecord> discrepancies = new ArrayList<>();
for (String orderId : allOrderIds) {
OrderRecord order = orderRecords.get(orderId);
PaymentRecord payment = paymentRecords.get(orderId);
ChannelRecord channel = channelRecords.get(orderId);
// 检查各种不一致情况
if (order != null && payment == null) {
discrepancies.add(new DiscrepancyRecord(
orderId, DiscrepancyType.PAYMENT_MISSING,
"订单存在但无支付记录"));
} else if (order == null && payment != null) {
discrepancies.add(new DiscrepancyRecord(
orderId, DiscrepancyType.ORDER_MISSING,
"支付存在但无订单记录"));
} else if (order != null && payment != null
&& order.getAmount()
.compareTo(payment.getAmount()) != 0) {
discrepancies.add(new DiscrepancyRecord(
orderId, DiscrepancyType.AMOUNT_MISMATCH,
String.format("订单金额 %s,支付金额 %s",
order.getAmount(),
payment.getAmount())));
}
// 和第三方渠道比对
if (payment != null && channel == null) {
discrepancies.add(new DiscrepancyRecord(
orderId, DiscrepancyType.CHANNEL_MISSING,
"内部有支付记录但渠道无记录"));
}
}
// 3. 处理差异
handleDiscrepancies(discrepancies);
}
private void handleDiscrepancies(
List<DiscrepancyRecord> discrepancies) {
for (DiscrepancyRecord d : discrepancies) {
// 持久化差异记录
discrepancyDao.insert(d);
switch (d.getType()) {
case PAYMENT_MISSING:
// 自动补偿:查询支付渠道确认实际状态
autoCompensate(d);
break;
case AMOUNT_MISMATCH:
case CHANNEL_MISSING:
// 金额不一致或渠道缺失 —— 告警,等人工处理
alertService.sendAlert(d);
break;
default:
log.warn("unhandled discrepancy: {}", d);
}
}
}
// 自动补偿:查询真实状态并修复
private void autoCompensate(DiscrepancyRecord d) {
// 主动查询支付渠道的实际结果
ChannelQueryResult result =
channelClient.query(d.getOrderId());
if (result.isPaid()) {
// 渠道确认已支付 —— 补录支付记录
paymentService.createFromChannelResult(result);
d.setResolution("auto_fixed: payment record created");
} else {
// 渠道确认未支付 —— 关闭订单
orderService.close(d.getOrderId(),
"reconciliation: payment not confirmed");
d.setResolution("auto_fixed: order closed");
}
discrepancyDao.update(d);
}
}
实时对账的事件驱动实现
// 事件驱动对账:收到交易事件后实时比对
public class RealtimeReconciliationService {
// 对账窗口:等待所有参与方的事件到齐
private final Cache<String, ReconciliationWindow> windows =
CacheBuilder.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.removalListener(this::onWindowExpired)
.build();
// 收到订单事件
public void onOrderEvent(OrderEvent event) {
ReconciliationWindow window =
getOrCreateWindow(event.getOrderId());
window.setOrderEvent(event);
checkAndReconcile(window);
}
// 收到支付事件
public void onPaymentEvent(PaymentEvent event) {
ReconciliationWindow window =
getOrCreateWindow(event.getOrderId());
window.setPaymentEvent(event);
checkAndReconcile(window);
}
private void checkAndReconcile(ReconciliationWindow window) {
if (!window.isComplete()) return; // 还有事件没到齐
// 所有事件都到了,进行比对
if (window.isConsistent()) {
// 一致,记录对账成功
reconciliationLog.recordSuccess(window.getOrderId());
} else {
// 不一致,立即告警
alertService.sendRealtimeAlert(
window.getOrderId(), window.getDiscrepancy());
}
}
// 窗口超时:有些事件没有到齐
private void onWindowExpired(
RemovalNotification<String, ReconciliationWindow> n) {
ReconciliationWindow window = n.getValue();
if (!window.isComplete()) {
// 超时未到齐 = 疑似不一致,记录并告警
alertService.sendTimeoutAlert(
window.getOrderId(), window.getMissingEvents());
}
}
}
方案选型
全面对比
| 方案 | 一致性级别 | 性能影响 | 实现复杂度 | 业务侵入 | 适用场景 |
|---|---|---|---|---|---|
| 2PC/XA | 强一致 | 高(同步阻塞、长持锁) | 低(数据库原生支持) | 低 | 跨库事务,一致性要求极高 |
| 3PC | 强一致(仍有缺陷) | 高(多一轮通信) | 中 | 低 | 理论研究,工程中极少直接使用 |
| TCC | 最终一致 | 低 | 高(三接口 + 幂等 + 空回滚 + 防悬挂) | 高 | 资金交易、库存预扣、票务预订 |
| Saga | 最终一致 | 低 | 中 | 中 | 长事务、跨多服务的业务编排 |
| 本地消息表 | 最终一致 | 中(依赖轮询间隔) | 低 | 低 | 实时性要求不高的异步场景 |
| 事务消息 | 最终一致 | 低 | 低(中间件原生支持) | 低 | 消息驱动的异步业务 |
| 最大努力通知 | 最终一致(弱保证) | 低 | 最低 | 最低 | 跨平台通知,如支付回调 |
决策路径
面对一个具体的分布式事务需求,可以沿着这个路径做选择:
能用单库事务解决吗? 如果能,就不要用分布式事务。分布式事务的复杂度远超想象。
一致性要求多高?
- 强一致性 + 能接受性能损失 -> 2PC/XA
- 强一致性 + 不能接受长阻塞 -> TCC(虽然是最终一致,但 Try 阶段的资源预留提供了类似强一致的隔离性)
涉及多步骤编排吗?
- 是,且步骤多、流程长 -> Saga(协调式)
- 是,且服务间已有事件驱动架构 -> Saga(编排式 / 本地消息表)
是异步通知场景吗?
- 上游事务 + 异步通知下游 -> 事务消息(RocketMQ)或 本地消息表
- 跨企业/跨平台通知 -> 最大努力通知
场景推荐
| 业务场景 | 推荐方案 | 理由 |
|---|---|---|
| 跨库转账(同一公司内部) | 2PC/XA | 金额一致性要求最高,且在数据库层面可控 |
| 电商下单(扣库存 + 扣余额) | TCC | 资源可预留(冻结),需要较高的隔离性 |
| 订单流程(创建 -> 支付 -> 发货 -> 通知) | Saga(协调式) | 步骤多、流程长,需要清晰的状态机 |
| 订单创建后异步通知仓储 | 事务消息 / 本地消息表 | 异步场景,保证消息可靠投递即可 |
| 接入第三方支付回调 | 最大努力通知 | 跨企业场景,通知方不保证送达 |
一条经验法则
无论选择哪种方案,有两件事是必须做的:
幂等性设计:网络重试无处不在,任何操作都可能被重复调用。用唯一业务 ID + 去重表是最可靠的做法。
对账兜底:不要相信任何方案能 100% 保证一致性。日终对账是生产系统的标配,它不是"可选项",而是"必选项"。
如果团队没有分布式事务经验,优先选本地消息表而不是 TCC。TCC 的空回滚、防悬挂、幂等三件事加在一起,对实现质量的要求很高。现实中很多"用了 TCC 但出了问题"的案例,根源是实现不完整而不是方案本身有缺陷。