分布式事务:方案选型与工程实践

一个电商下单操作要同时扣库存、扣余额、核销优惠券——任何一步失败,已执行的步骤都应该回滚。单机数据库用一个事务就搞定了,但当这三个操作分别在三个服务、三个数据库里执行时,问题就来了:本地事务的边界无法跨越网络。

这篇文章围绕一个贯穿始终的电商下单场景,从理论到六种工程方案再到生产级的对账机制,把分布式事务这件事从头理一遍。


从集中式到分布式

集中式系统的特点是一台主机承担所有计算和存储。早期银行系统、大型企业核心业务大多采用这种架构。优点是部署简单、无需考虑节点间协调,但单点故障意味着整个系统瘫痪,纵向扩展(加 CPU / 内存)有物理上限且成本指数增长。

分布式系统用多台普通计算机通过网络协作,对外表现得像一台机器。机器越多,资源越多,能处理的并发就越大。但分布式在解决容量问题的同时,也引入了几个根本性的新问题:

  • 网络不可靠:延迟、丢包、分区随时可能发生
  • 时钟不同步:不同机器的系统时钟存在偏差,无法依赖本地时间戳判定分布式事件的全局顺序
  • 节点故障:集群越大,某台机器宕机的概率越高
  • 数据一致性:请求只会落到其中一台机器上,处理不好就会产生不一致

Leslie Lamport(Paxos 发明者,2013 图灵奖得主)说过:"A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable." —— 一台你甚至不知道其存在的机器的故障,就可能让你的机器不可用。


一致性问题与理论基础

从 ACID 到分布式

单机数据库通过事务机制保证 ACID:

特性 含义 保障手段
Atomicity 操作要么全部成功,要么全部回滚 undo log
Consistency 数据从一个合法状态转到另一个合法状态 由 A、I、D 共同保证
Isolation 并发事务互不干扰 锁 + MVCC
Durability 事务提交后数据不丢 redo log + WAL

在分布式系统中,数据分散在多台机器上,一个数据库事务无法覆盖跨网络的多个操作。用一段代码来说明问题的本质:

// 电商下单 —— 跨三个服务的操作
public OrderResult createOrder(OrderRequest request) {
    // 步骤1:扣减库存(库存服务)
    inventoryService.deduct(request.getSkuId(), request.getQuantity());

    // 步骤2:扣减余额(账户服务)
    accountService.deduct(request.getUserId(), request.getAmount());

    // 步骤3:创建订单(本地数据库)
    orderDao.insert(request.toOrder());

    return OrderResult.success();
}
// 如果步骤2成功但步骤3失败 —— 钱扣了但没生成订单

CAP 定理

2000 年 Eric Brewer 提出 CAP 猜想,2002 年被正式证明:一个分布式系统最多同时满足以下三项中的两项

属性 含义
Consistency 所有节点在同一时刻看到相同数据
Availability 每个请求都能在合理时间内收到非错误响应
Partition tolerance 网络分区发生时系统仍能运作

网络分区(P)是物理现实——光纤会被挖断、交换机会宕机。因此 CAP 的核心不是"三选二",而是在发生网络分区时,选一致性还是可用性

策略 取舍 典型系统
CP 保一致性,牺牲部分可用性 ZooKeeper、etcd、HBase
AP 保可用性,允许短暂不一致 Cassandra、DynamoDB、Eureka

BASE 理论

BASE 是 CAP 中 AP 策略的工程延伸:即使无法做到强一致性,也可以通过适当方式达到最终一致性。

缩写 含义
BA (Basically Available) 出现故障时允许损失部分非核心功能,核心功能可用
S (Soft State) 允许数据存在中间状态,不同节点的副本暂时不一致
E (Eventually Consistent) 经过一段时间后,所有副本最终达到一致

在实际系统中,ACID 和 BASE 不是非此即彼——核心链路(资金)用 ACID,非核心链路(通知、积分)用 BASE。

从理论到工程:CAP/BASE 回答了"数据可以不一致到什么程度",接下来的事务方案回答"工程上如何逼近我们选择的一致性级别"。2PC/3PC 追求强一致性,TCC、Saga、本地消息表、事务消息都是用不同手段逼近最终一致性——区别在于一致性、性能和实现复杂度之间各自做了什么取舍。


刚性事务:2PC

两阶段提交原理

2PC(Two-Phase Commit)是最经典的分布式事务协议,核心思想是先投票,再执行。它引入一个协调者(Coordinator)来统一决策,每个参与者只知道自己的事务结果,协调者负责汇总并做出全局决定。

2PC 两阶段流程

第一阶段:Prepare(投票) —— 协调者向所有参与者发送 Prepare 请求。每个参与者执行本地事务,写入 redo/undo 日志,但不提交,然后向协调者报告 Yes 或 No。

第二阶段:Commit / Rollback —— 如果所有参与者都返回 Yes,协调者发送 Commit,各参与者正式提交事务并释放锁;如果任一参与者返回 No 或超时,协调者发送 Rollback,各参与者利用 undo 日志回滚。

XA 规范与 Java 实现

X/Open 组织定义的 DTP 模型是 2PC 的工业标准实现,XA 是事务管理器(TM)与资源管理器(RM)之间的接口规范,定义了 xa_startxa_endxa_preparexa_commitxa_rollback 等接口。

// 使用 JTA 实现 2PC —— 跨两个数据库的转账
public class XATransferService {

    public void transfer(long userId, BigDecimal amount) throws Exception {
        UserTransaction utx = (UserTransaction)
            ctx.lookup("java:comp/UserTransaction");

        // XA 数据源:两个独立的数据库
        Connection connAccount = xaDataSourceA.getConnection(); // 账户库
        Connection connPoints  = xaDataSourceB.getConnection(); // 积分库

        try {
            utx.begin(); // 开启全局事务(TM 开始协调)

            // 操作 DB-A:扣减账户余额
            PreparedStatement psA = connAccount.prepareStatement(
                "UPDATE account SET balance = balance - ? WHERE user_id = ?");
            psA.setBigDecimal(1, amount);
            psA.setLong(2, userId);
            psA.executeUpdate();

            // 操作 DB-B:增加积分
            PreparedStatement psB = connPoints.prepareStatement(
                "UPDATE points SET total = total + ? WHERE user_id = ?");
            psB.setInt(1, amount.intValue());
            psB.setLong(2, userId);
            psB.executeUpdate();

            // 两阶段提交:TM 先向两个 RM 发 Prepare,
            // 都返回 Yes 后再发 Commit
            utx.commit();
        } catch (Exception e) {
            utx.rollback(); // 两个数据库一起回滚
            throw e;
        }
    }
}

2PC 的问题

同步阻塞:从 Prepare 阶段开始,参与者就持有锁资源,一直到 Commit 阶段完成才释放。在高并发场景下,这种长时间持锁严重影响吞吐量。

单点故障:协调者宕机后,参与者会一直阻塞。尤其在第二阶段,如果协调者发故障,所有参与者都处于锁定状态。新选举的协调者不知道上一个协调者宕机前做了什么决定。

数据不一致:协调者发 Commit 过程中发生局部网络异常或自身宕机,导致只有部分参与者收到 Commit 并提交,其余参与者仍在等待——数据不一致。

极端场景无解:协调者发出 Commit 后宕机,且唯一收到 Commit 的参与者也宕机。即使选出新协调者,这条事务的状态也无法确定。


刚性事务:3PC

3PC 对 2PC 的改进

3PC 针对 2PC 的阻塞问题做了两个核心改动:

  1. 参与者也引入超时机制:2PC 中只有协调者有超时,参与者等待协调者指令时会无限阻塞。3PC 的参与者在超时后可以自行决定,避免无限阻塞。
  2. 增加 CanCommit 阶段:在 Prepare 和 Commit 之间插入一个轻量级询问阶段,保证进入最终提交阶段之前各节点状态一致。

3PC 三阶段流程

三个阶段详解

阶段一:CanCommit(轻量询问) —— 协调者问参与者"能提交吗?"。参与者检查资源、权限等条件,返回 Yes 或 No。注意:此阶段不执行任何事务操作,这是与 2PC 准备阶段的关键区别。

阶段二:PreCommit(预执行) —— 所有参与者返回 Yes 后,协调者发送 PreCommit。参与者执行事务操作,写入 redo/undo 日志,但不提交。如果任一参与者返回 No 或超时,协调者发送 Abort。

阶段三:DoCommit(正式提交) —— 协调者收到所有 ACK 后,发送 DoCommit。参与者正式提交事务并释放锁。

3PC 协调者与参与者伪代码

// 3PC 协调者伪代码
public class ThreePhaseCoordinator {

    private List<Participant> participants;
    private static final long TIMEOUT_MS = 5000;

    public boolean execute() {
        // ===== Phase 1: CanCommit =====
        // 轻量询问,参与者不执行任何事务操作
        for (Participant p : participants) {
            CompletableFuture<Boolean> vote = p.canCommit();
            Boolean result = vote.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
            if (result == null || !result) {
                abortAll();
                return false;
            }
        }

        // ===== Phase 2: PreCommit =====
        // 参与者执行事务但不提交,写入 redo/undo 日志
        for (Participant p : participants) {
            CompletableFuture<Boolean> ack = p.preCommit();
            Boolean result = ack.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
            if (result == null || !result) {
                abortAll(); // 通知所有参与者中断
                return false;
            }
        }

        // ===== Phase 3: DoCommit =====
        // 正式提交
        for (Participant p : participants) {
            try {
                p.doCommit().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                // 参与者超时 —— 参与者端会默认提交
                log.warn("participant {} doCommit timeout, "
                    + "participant will auto-commit", p.getId());
            }
        }
        return true;
    }

    private void abortAll() {
        for (Participant p : participants) {
            try { p.abort(); } catch (Exception ignored) {}
        }
    }
}
// 3PC 参与者伪代码
public class ThreePhaseParticipant {

    private static final long TIMEOUT_MS = 5000;
    private TransactionState state = TransactionState.INIT;

    // Phase 1: 只做资源检查,不执行事务
    public boolean canCommit() {
        boolean resourceReady = checkDiskSpace()
            && checkConnectionPool()
            && checkBusinessConstraints();
        state = resourceReady
            ? TransactionState.CAN_COMMIT_YES
            : TransactionState.ABORTED;
        return resourceReady;
    }

    // Phase 2: 执行事务但不提交
    public boolean preCommit() {
        try {
            beginLocalTransaction();
            executeBusinessLogic(); // 写入 redo/undo 日志
            state = TransactionState.PRE_COMMITTED;
            return true;
        } catch (Exception e) {
            rollbackLocalTransaction();
            state = TransactionState.ABORTED;
            return false;
        }
    }

    // Phase 3: 正式提交
    public void doCommit() {
        commitLocalTransaction();
        releaseResources();
        state = TransactionState.COMMITTED;
    }

    // 超时处理 —— 这是 3PC 的关键设计
    public void onPhase3Timeout() {
        if (state == TransactionState.PRE_COMMITTED) {
            // 超时默认提交:因为能进入 Phase 3,
            // 说明 Phase 1 所有人都投了 Yes,大概率应该提交
            doCommit();
            log.warn("Phase 3 timeout, auto-committed based on "
                + "probability reasoning");
        }
    }

    public void abort() {
        if (state == TransactionState.PRE_COMMITTED) {
            rollbackLocalTransaction();
        }
        releaseResources();
        state = TransactionState.ABORTED;
    }
}

超时默认提交的推理

3PC 最关键的设计决策是:参与者在第三阶段等待超时后,默认提交而非回滚

推理逻辑:当参与者进入第三阶段,说明它在第二阶段收到了 PreCommit 请求。协调者产生 PreCommit 的前提是——第一阶段所有参与者都返回了 Yes。换句话说,一旦参与者收到了 PreCommit,就意味着它知道所有参与者都同意了。所以当第三阶段超时时,成功提交的概率远大于需要回滚的概率。

这是一个基于概率的工程妥协,而不是一个保证正确的协议设计。

2PC vs 3PC 对比

维度 2PC 3PC
阶段数 2(Prepare + Commit) 3(CanCommit + PreCommit + DoCommit)
超时机制 仅协调者 协调者和参与者都有
阻塞风险 高(协调者宕机导致参与者永久阻塞) 低(参与者超时后默认提交)
第一阶段开销 重(参与者执行事务并持锁) 轻(仅检查资源条件,不持锁)
一致性 可能不一致(部分提交) 仍可能不一致(超时默认提交)
网络开销 2 轮通信 3 轮通信
工程使用 广泛(XA 规范) 理论意义大于实践,直接使用极少

无论 2PC 还是 3PC 都无法彻底解决分布式一致性问题。Google Chubby 的作者 Mike Burrows 说过:"there is only one consensus protocol, and that's Paxos"。但在工程实践中,我们更多使用的是下面的柔性事务方案。


柔性事务:TCC

原理

TCC(Try-Confirm-Cancel)将每个业务操作拆分为三步:

阶段 职责 电商场景示例
Try 资源预留 冻结 10 件库存、冻结 100 元余额
Confirm 确认执行(幂等) 将冻结的库存正式扣减
Cancel 取消释放(幂等) 将冻结的库存释放回去

所有服务的 Try 都成功后,统一执行 Confirm;任一 Try 失败,统一执行 Cancel 回滚已预留的资源。

TCC Try-Confirm-Cancel 流程

代码实现

// 库存服务的 TCC 实现 —— 注意字段设计:available + frozen
public class InventoryTccService {

    // Try:冻结库存,不真正扣减
    public boolean tryDeduct(String txId, String skuId, int quantity) {
        // 幂等检查:同一事务 ID 不重复冻结
        if (txLogDao.exists(txId, "TRY")) return true;

        int updated = jdbcTemplate.update(
            "UPDATE inventory SET available = available - ?, "
            + "frozen = frozen + ? "
            + "WHERE sku_id = ? AND available >= ?",
            quantity, quantity, skuId, quantity);

        if (updated > 0) {
            txLogDao.insert(txId, "TRY", skuId, quantity);
        }
        return updated > 0;
    }

    // Confirm:将冻结的库存正式扣减(幂等)
    public boolean confirm(String txId, String skuId, int quantity) {
        if (txLogDao.exists(txId, "CONFIRM")) return true; // 幂等

        jdbcTemplate.update(
            "UPDATE inventory SET frozen = frozen - ? "
            + "WHERE sku_id = ? AND frozen >= ?",
            quantity, skuId, quantity);

        txLogDao.insert(txId, "CONFIRM", skuId, quantity);
        return true;
    }

    // Cancel:释放冻结的库存(幂等)
    public boolean cancel(String txId, String skuId, int quantity) {
        if (txLogDao.exists(txId, "CANCEL")) return true; // 幂等

        // 空回滚检查:如果 Try 没有执行过,直接返回
        if (!txLogDao.exists(txId, "TRY")) {
            txLogDao.insert(txId, "CANCEL", skuId, 0);
            return true;
        }

        jdbcTemplate.update(
            "UPDATE inventory SET available = available + ?, "
            + "frozen = frozen - ? "
            + "WHERE sku_id = ? AND frozen >= ?",
            quantity, quantity, skuId, quantity);

        txLogDao.insert(txId, "CANCEL", skuId, quantity);
        return true;
    }
}

幂等性设计要点

TCC 的三个核心难题都体现在上面的代码中:

幂等性:网络重试可能导致 Confirm / Cancel 被重复调用。通过事务日志表(txId + phase)去重,同一事务的同一阶段只执行一次。

空回滚:Try 因网络超时未到达参与者,TCC 框架可能直接调 Cancel。Cancel 需要检查 Try 是否执行过,如果没有则记录日志后直接返回。

防悬挂:Cancel 执行后,迟到的 Try 不能再执行。Try 执行前应检查 Cancel 日志是否已存在,如果存在则拒绝执行。


柔性事务:Saga

原理

Saga 将一个长事务拆分为一系列本地事务,每个本地事务都有对应的补偿操作。正向执行 T1 -> T2 -> T3,如果 T3 失败,反向执行补偿 C2 -> C1。

Saga 有两种实现方式:

Saga 编排 vs 协调对比

编排式(Choreography):每个服务完成本地事务后发布事件,下游服务监听事件执行自己的操作。去中心化,服务之间松耦合,但流程难以追踪,跨服务的事务状态散落在各处。

协调式(Orchestration):引入一个 Saga 协调器(通常基于状态机),集中控制每个步骤的执行和补偿。流程清晰,便于监控和调试,但协调器是单点逻辑。

协调式 Saga 代码实现

// Saga 协调器 —— 基于状态机的实现
public class OrderSagaOrchestrator {

    private final List<SagaStep> steps = new ArrayList<>();

    public OrderSagaOrchestrator() {
        // 定义每个步骤的正向操作和补偿操作
        steps.add(new SagaStep("创建订单",
            req -> orderService.create(req),
            req -> orderService.cancel(req)));
        steps.add(new SagaStep("扣减库存",
            req -> inventoryService.deduct(req),
            req -> inventoryService.restore(req)));
        steps.add(new SagaStep("执行支付",
            req -> paymentService.charge(req),
            req -> paymentService.refund(req)));
    }

    public SagaResult execute(OrderRequest request) {
        List<SagaStep> completedSteps = new ArrayList<>();

        for (SagaStep step : steps) {
            try {
                step.forward(request);
                completedSteps.add(step);
                // 持久化状态,便于宕机恢复
                sagaLog.record(request.getSagaId(),
                    step.getName(), StepStatus.COMPLETED);
            } catch (Exception e) {
                sagaLog.record(request.getSagaId(),
                    step.getName(), StepStatus.FAILED);
                // 反向补偿已完成的步骤
                compensate(completedSteps, request);
                return SagaResult.rollback(step.getName(), e);
            }
        }
        return SagaResult.success();
    }

    private void compensate(List<SagaStep> completed,
                            OrderRequest request) {
        // 从后往前补偿
        Collections.reverse(completed);
        for (SagaStep step : completed) {
            try {
                step.compensate(request);
                sagaLog.record(request.getSagaId(),
                    step.getName(), StepStatus.COMPENSATED);
            } catch (Exception e) {
                // 补偿失败 —— 记录日志,交给对账或人工处理
                sagaLog.record(request.getSagaId(),
                    step.getName(), StepStatus.COMPENSATE_FAILED);
                log.error("compensation failed for step: {}",
                    step.getName(), e);
            }
        }
    }
}

补偿事务设计原则

  1. 补偿操作必须幂等:和 TCC 的 Cancel 一样,网络重试可能导致重复调用
  2. 补偿是"语义回滚"而非"物理撤销":比如订单已创建,补偿不是 DELETE,而是将状态改为 CANCELLED
  3. 补偿可能失败:需要有重试机制和最终的人工兜底
  4. Saga 没有隔离性:中间状态对外可见。用户可能看到订单状态短暂变为"已创建"然后变为"已取消"

柔性事务:本地消息表

原理与架构

通过本地数据库事务保证"业务操作"和"消息写入"的原子性,再通过异步消息驱动下游操作,最终达到一致。

本地消息表 + 轮询架构

核心思路:业务表和消息表在同一个数据库里,用一个本地事务同时写入。然后定时任务扫描消息表中未发送的消息,投递到 MQ。下游消费并执行业务操作后回调确认。

代码实现

// 消息表 DDL
// CREATE TABLE local_message (
//     id          BIGINT AUTO_INCREMENT PRIMARY KEY,
//     msg_id      VARCHAR(64)  UNIQUE NOT NULL,
//     topic       VARCHAR(128) NOT NULL,
//     body        TEXT         NOT NULL,
//     status      VARCHAR(16)  NOT NULL DEFAULT 'PENDING',
//     retry_count INT          NOT NULL DEFAULT 0,
//     created_at  DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
//     updated_at  DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP
// );

// 上游服务:业务操作 + 消息写入在同一个本地事务中
@Transactional
public void createOrder(OrderRequest request) {
    // 1. 执行业务操作
    Order order = request.toOrder();
    orderDao.insert(order);

    // 2. 写入消息表(同一个数据库,同一个事务)
    localMessageDao.insert(new LocalMessage(
        UUID.randomUUID().toString(),
        "ORDER_CREATED",
        JsonUtils.toJson(order),
        "PENDING"
    ));
    // 事务提交后,两条记录要么都写入,要么都不写入
}

// 定时任务:扫描并发送未处理的消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
    List<LocalMessage> messages =
        localMessageDao.queryByStatus("PENDING", 100);

    for (LocalMessage msg : messages) {
        try {
            mqProducer.send(msg.getTopic(), msg.getBody());
            localMessageDao.updateStatus(msg.getId(), "SENT");
        } catch (Exception e) {
            // 递增重试计数,超过阈值标记为 FAILED 等待人工处理
            int retryCount = msg.getRetryCount() + 1;
            if (retryCount > MAX_RETRY) {
                localMessageDao.updateStatus(msg.getId(), "FAILED");
            } else {
                localMessageDao.incrementRetry(msg.getId());
            }
        }
    }
}

// 下游消费者:幂等执行
@RocketMQMessageListener(topic = "ORDER_CREATED")
public void onMessage(MessageExt message) {
    String msgId = message.getMsgId();
    // 幂等检查
    if (consumeLogDao.exists(msgId)) return;

    Order order = JsonUtils.fromJson(
        new String(message.getBody()), Order.class);
    // 执行下游业务(如通知仓储发货)
    warehouseService.prepareShipment(order);

    consumeLogDao.insert(msgId);
}

本地消息表与 Saga 的本质关系

很多人把本地消息表和 Saga 当作两个平级的方案来比较,但实际上它们的关系是:

本地消息表是 Saga 编排模式的一种具体实现技术。(Choreography) 两者的对应关系如下:

概念层次 Saga (模式) 本地消息表 (技术)
定位 设计模式/架构模式 实现技术/工程手段
核心思想 将长事务拆分为多个本地事务 + 补偿 用本地事务保证"业务 + 消息"原子性
服务间通信 事件/消息 (choreography) 或指令 (orchestration) 事件/消息 (通过消息表 + MQ)
事件日志 Saga 需要某种事件日志来追踪进度 消息表本身就是事件日志
实现方式 可以用编排或协调 本质上是编排模式的一种实现

Saga 是一个模式(Pattern),它告诉你"把长事务拆成多步,每步配一个补偿操作"。但 Saga 没有告诉你事件怎么可靠传递。本地消息表解决的正是这个问题——通过数据库事务保证事件(消息)一定被记录,再通过轮询保证事件一定被发送。

换个角度说:如果你用本地消息表实现了多个服务之间的事件驱动协作,并且每个服务都有对应的补偿逻辑,那你其实就是在实现一个 Saga。


柔性事务:事务消息

RocketMQ 半消息机制

RocketMQ 原生支持事务消息,相当于将本地消息表的能力下沉到消息中间件——不再需要自己维护消息表和定时轮询。

RocketMQ 事务消息半消息流程

核心流程:

  1. Producer 发送半消息(Half Message)到 RocketMQ,此时消息对 Consumer 不可见
  2. RocketMQ 返回半消息发送成功
  3. Producer 执行本地事务(如写数据库)
  4. 根据本地事务结果:成功发 Commit(消息变为可见),失败发 Rollback(删除半消息)
  5. 如果 RocketMQ 迟迟没收到 Commit/Rollback,会主动回查 Producer 的本地事务状态

代码实现

// RocketMQ 事务消息 Producer
public class OrderTransactionProducer {

    private TransactionMQProducer producer;

    public void sendOrderMessage(OrderRequest request) {
        Message msg = new Message(
            "ORDER_TOPIC",
            JsonUtils.toJson(request).getBytes());
        // 设置业务 key,便于回查
        msg.setKeys(request.getOrderId());

        producer.sendMessageInTransaction(msg,
            new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(
                    Message msg, Object arg) {
                try {
                    // 执行本地事务
                    orderService.createOrder(request);
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(
                    MessageExt msg) {
                // 回查:检查本地事务是否已执行成功
                Order order = orderDao.queryByOrderId(
                    request.getOrderId());
                if (order != null) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                // 返回 UNKNOW,RocketMQ 会稍后再次回查
                return LocalTransactionState.UNKNOW;
            }
        }, null);
    }
}

消息中间件对比:RocketMQ vs Kafka vs Pulsar

事务消息不是只有 RocketMQ 能做。但三种主流消息中间件的实现路径截然不同:

维度 RocketMQ Kafka Pulsar
事务消息支持 原生半消息 + 回查机制 Kafka Transactions(不是半消息) 原生事务消息(类似 Kafka)
实现原理 半消息暂存在内部 Topic,Commit 后转移到目标 Topic 原子写入多个 Topic/Partition,配合 exactly-once 语义 事务日志 + 原子批量提交
本地事务协调 内置:回查机制自动探测本地事务状态 不内置:需要配合 Outbox Pattern 或 CDC 自行保证 不内置:需要应用层自行保证
适用模式 直接替代本地消息表 Producer 端 exactly-once + Outbox Pattern Producer 端 exactly-once + Outbox Pattern
回查能力 有:Broker 主动回查 Producer
消费端语义 at-least-once(需消费者幂等) exactly-once(配合 consume-transform-produce) effectively-once(配合去重)

为什么 RocketMQ 的半消息机制是独特的?

RocketMQ 是唯一在 Broker 层面内置了"本地事务 + 消息发送"原子性保证的消息中间件。它的半消息对 Consumer 不可见,Broker 会主动回查 Producer 的事务状态——这意味着应用层不需要维护消息表、不需要写定时任务,中间件替你做了。

什么时候 Kafka 的方案更好?

Kafka Transactions 解决的是另一个问题:消息链路内部的 exactly-once(consume-transform-produce)。如果你的场景是流处理管道(读 Topic A -> 处理 -> 写 Topic B),Kafka Transactions 是更自然的选择。但如果你要保证"数据库写入 + 消息发送"的原子性,Kafka 本身不提供半消息机制,需要配合 Outbox Pattern:

// Kafka + Outbox Pattern 伪代码
@Transactional
public void createOrderWithOutbox(OrderRequest request) {
    // 同一个本地事务
    orderDao.insert(request.toOrder());
    outboxDao.insert(new OutboxEvent(
        "ORDER_CREATED", JsonUtils.toJson(request)));
}

// Debezium CDC 监听 outbox 表的变更,自动发送到 Kafka
// 或者用定时任务轮询 outbox 表(和本地消息表一样)

选择建议:如果团队已经在用 RocketMQ,直接用事务消息最省事。如果用 Kafka,走 Outbox + CDC(Debezium)是成熟路径。Pulsar 在事务消息方面还相对年轻,生产案例较少。


最大努力通知

最大努力通知是最简单的最终一致性方案:上游系统尽最大努力通知下游,失败则按递增间隔重试,超过最大次数后放弃,记录日志等待人工处理或下游主动查询。

典型场景是支付回调。支付宝、微信支付完成扣款后,会多次回调商户通知地址。如果商户系统一直没有返回成功,支付平台按递增间隔重试(1s、5s、30s、5min、30min),超过最大次数后停止。商户可以主动调用支付查询接口获取最终结果。

最大努力通知和事务消息的区别在于:事务消息保证上游事务和消息发送的原子性,最大努力通知不保证——它只负责"尽力通知",一致性的最终兜底依赖下游的主动查询。


对账机制

前面介绍的所有方案都在努力保证一致性,但在生产环境中,没有任何方案能做到 100% 不出问题。网络抖动、服务重启、消息丢失、Bug——总有意外发生。对账(Reconciliation)就是最后一道防线:定期比对多方数据,发现并修复不一致

为什么需要对账

用电商下单场景来说:

  • 订单服务记录"订单已创建,金额 100 元"
  • 支付服务记录"已扣款 100 元"
  • 第三方支付渠道(支付宝/微信)记录"已收款 100 元"

这三方的数据理论上应该完全一致。但如果中间环节出了问题(比如支付回调丢失、消息消费失败),就会出现不一致。对账就是把三方数据拿出来比一比,找出差异。

对账架构

对账架构 - 三方对账

三种对账策略

定时快照对账(T+1):每天凌晨,各系统导出前一天的数据快照(通常是 CSV/Parquet 文件),对账系统逐条比对。这是最常见也最成熟的方案,适合对实时性要求不高的场景。

事件驱动实时对账:每笔交易完成后,相关服务发送对账事件到对账系统。对账系统在收到所有参与方的事件后进行比对。延迟低(秒级),但实现复杂度高。

三方交叉对账:不仅比对内部系统之间的数据,还和外部第三方(支付渠道、银行)的清算文件比对。这是资金类业务的标准做法。

对账代码实现

// 对账引擎核心逻辑
public class ReconciliationEngine {

    // T+1 定时对账:比对订单和支付记录
    @Scheduled(cron = "0 30 1 * * ?") // 每天凌晨 1:30
    public void dailyReconciliation() {
        LocalDate yesterday = LocalDate.now().minusDays(1);

        // 1. 拉取各方数据
        Map<String, OrderRecord> orderRecords =
            orderService.getRecordsByDate(yesterday)
                .stream()
                .collect(Collectors.toMap(
                    OrderRecord::getOrderId, r -> r));

        Map<String, PaymentRecord> paymentRecords =
            paymentService.getRecordsByDate(yesterday)
                .stream()
                .collect(Collectors.toMap(
                    PaymentRecord::getOrderId, r -> r));

        // 第三方渠道的清算文件(通常是 SFTP 下载 CSV)
        Map<String, ChannelRecord> channelRecords =
            channelFileParser.parse(yesterday);

        // 2. 三方交叉比对
        Set<String> allOrderIds = new HashSet<>();
        allOrderIds.addAll(orderRecords.keySet());
        allOrderIds.addAll(paymentRecords.keySet());
        allOrderIds.addAll(channelRecords.keySet());

        List<DiscrepancyRecord> discrepancies = new ArrayList<>();

        for (String orderId : allOrderIds) {
            OrderRecord  order   = orderRecords.get(orderId);
            PaymentRecord payment = paymentRecords.get(orderId);
            ChannelRecord channel = channelRecords.get(orderId);

            // 检查各种不一致情况
            if (order != null && payment == null) {
                discrepancies.add(new DiscrepancyRecord(
                    orderId, DiscrepancyType.PAYMENT_MISSING,
                    "订单存在但无支付记录"));
            } else if (order == null && payment != null) {
                discrepancies.add(new DiscrepancyRecord(
                    orderId, DiscrepancyType.ORDER_MISSING,
                    "支付存在但无订单记录"));
            } else if (order != null && payment != null
                    && order.getAmount()
                        .compareTo(payment.getAmount()) != 0) {
                discrepancies.add(new DiscrepancyRecord(
                    orderId, DiscrepancyType.AMOUNT_MISMATCH,
                    String.format("订单金额 %s,支付金额 %s",
                        order.getAmount(),
                        payment.getAmount())));
            }

            // 和第三方渠道比对
            if (payment != null && channel == null) {
                discrepancies.add(new DiscrepancyRecord(
                    orderId, DiscrepancyType.CHANNEL_MISSING,
                    "内部有支付记录但渠道无记录"));
            }
        }

        // 3. 处理差异
        handleDiscrepancies(discrepancies);
    }

    private void handleDiscrepancies(
            List<DiscrepancyRecord> discrepancies) {
        for (DiscrepancyRecord d : discrepancies) {
            // 持久化差异记录
            discrepancyDao.insert(d);

            switch (d.getType()) {
                case PAYMENT_MISSING:
                    // 自动补偿:查询支付渠道确认实际状态
                    autoCompensate(d);
                    break;
                case AMOUNT_MISMATCH:
                case CHANNEL_MISSING:
                    // 金额不一致或渠道缺失 —— 告警,等人工处理
                    alertService.sendAlert(d);
                    break;
                default:
                    log.warn("unhandled discrepancy: {}", d);
            }
        }
    }

    // 自动补偿:查询真实状态并修复
    private void autoCompensate(DiscrepancyRecord d) {
        // 主动查询支付渠道的实际结果
        ChannelQueryResult result =
            channelClient.query(d.getOrderId());

        if (result.isPaid()) {
            // 渠道确认已支付 —— 补录支付记录
            paymentService.createFromChannelResult(result);
            d.setResolution("auto_fixed: payment record created");
        } else {
            // 渠道确认未支付 —— 关闭订单
            orderService.close(d.getOrderId(),
                "reconciliation: payment not confirmed");
            d.setResolution("auto_fixed: order closed");
        }
        discrepancyDao.update(d);
    }
}

实时对账的事件驱动实现

// 事件驱动对账:收到交易事件后实时比对
public class RealtimeReconciliationService {

    // 对账窗口:等待所有参与方的事件到齐
    private final Cache<String, ReconciliationWindow> windows =
        CacheBuilder.newBuilder()
            .expireAfterWrite(5, TimeUnit.MINUTES)
            .removalListener(this::onWindowExpired)
            .build();

    // 收到订单事件
    public void onOrderEvent(OrderEvent event) {
        ReconciliationWindow window =
            getOrCreateWindow(event.getOrderId());
        window.setOrderEvent(event);
        checkAndReconcile(window);
    }

    // 收到支付事件
    public void onPaymentEvent(PaymentEvent event) {
        ReconciliationWindow window =
            getOrCreateWindow(event.getOrderId());
        window.setPaymentEvent(event);
        checkAndReconcile(window);
    }

    private void checkAndReconcile(ReconciliationWindow window) {
        if (!window.isComplete()) return; // 还有事件没到齐

        // 所有事件都到了,进行比对
        if (window.isConsistent()) {
            // 一致,记录对账成功
            reconciliationLog.recordSuccess(window.getOrderId());
        } else {
            // 不一致,立即告警
            alertService.sendRealtimeAlert(
                window.getOrderId(), window.getDiscrepancy());
        }
    }

    // 窗口超时:有些事件没有到齐
    private void onWindowExpired(
            RemovalNotification<String, ReconciliationWindow> n) {
        ReconciliationWindow window = n.getValue();
        if (!window.isComplete()) {
            // 超时未到齐 = 疑似不一致,记录并告警
            alertService.sendTimeoutAlert(
                window.getOrderId(), window.getMissingEvents());
        }
    }
}

方案选型

全面对比

方案 一致性级别 性能影响 实现复杂度 业务侵入 适用场景
2PC/XA 强一致 高(同步阻塞、长持锁) 低(数据库原生支持) 跨库事务,一致性要求极高
3PC 强一致(仍有缺陷) 高(多一轮通信) 理论研究,工程中极少直接使用
TCC 最终一致 高(三接口 + 幂等 + 空回滚 + 防悬挂) 资金交易、库存预扣、票务预订
Saga 最终一致 长事务、跨多服务的业务编排
本地消息表 最终一致 中(依赖轮询间隔) 实时性要求不高的异步场景
事务消息 最终一致 低(中间件原生支持) 消息驱动的异步业务
最大努力通知 最终一致(弱保证) 最低 最低 跨平台通知,如支付回调

决策路径

面对一个具体的分布式事务需求,可以沿着这个路径做选择:

  1. 能用单库事务解决吗? 如果能,就不要用分布式事务。分布式事务的复杂度远超想象。

  2. 一致性要求多高?

    • 强一致性 + 能接受性能损失 -> 2PC/XA
    • 强一致性 + 不能接受长阻塞 -> TCC(虽然是最终一致,但 Try 阶段的资源预留提供了类似强一致的隔离性)
  3. 涉及多步骤编排吗?

    • 是,且步骤多、流程长 -> Saga(协调式)
    • 是,且服务间已有事件驱动架构 -> Saga(编排式 / 本地消息表)
  4. 是异步通知场景吗?

    • 上游事务 + 异步通知下游 -> 事务消息(RocketMQ)或 本地消息表
    • 跨企业/跨平台通知 -> 最大努力通知

场景推荐

业务场景 推荐方案 理由
跨库转账(同一公司内部) 2PC/XA 金额一致性要求最高,且在数据库层面可控
电商下单(扣库存 + 扣余额) TCC 资源可预留(冻结),需要较高的隔离性
订单流程(创建 -> 支付 -> 发货 -> 通知) Saga(协调式) 步骤多、流程长,需要清晰的状态机
订单创建后异步通知仓储 事务消息 / 本地消息表 异步场景,保证消息可靠投递即可
接入第三方支付回调 最大努力通知 跨企业场景,通知方不保证送达

一条经验法则

无论选择哪种方案,有两件事是必须做的:

  1. 幂等性设计:网络重试无处不在,任何操作都可能被重复调用。用唯一业务 ID + 去重表是最可靠的做法。

  2. 对账兜底:不要相信任何方案能 100% 保证一致性。日终对账是生产系统的标配,它不是"可选项",而是"必选项"。

如果团队没有分布式事务经验,优先选本地消息表而不是 TCC。TCC 的空回滚、防悬挂、幂等三件事加在一起,对实现质量的要求很高。现实中很多"用了 TCC 但出了问题"的案例,根源是实现不完整而不是方案本身有缺陷。

加载导航中...

评论