Skip to content

分布式事务

分布式事务是指跨越多个服务或资源的事务,在微服务架构中尤为重要。由于微服务架构将业务拆分为多个独立服务,一个业务操作可能需要调用多个服务,这就带来了分布式事务的挑战。Spring Cloud 提供了多种方案来处理分布式事务问题。

分布式事务的挑战

在单体应用中,我们可以依赖数据库的 ACID 特性来保证事务的一致性。但在分布式系统中,由于以下原因,传统的事务管理变得困难:

  1. 多数据源:不同的微服务可能使用不同的数据库
  2. 网络不可靠:服务间的通信可能因网络问题而失败
  3. 部分失败:部分服务可能成功,部分服务可能失败
  4. 长事务:分布式事务往往耗时较长,长时间锁定资源会降低系统吞吐量

CAP 理论与分布式事务

CAP 理论指出,分布式系统无法同时满足以下三个特性:

  • 一致性 (Consistency):所有节点在同一时间看到的数据是一致的
  • 可用性 (Availability):系统能够继续提供服务,即使部分节点故障
  • 分区容错性 (Partition tolerance):系统能够容忍网络分区故障

在分布式事务的设计中,我们通常需要在一致性和可用性之间做出权衡。

分布式事务的模式

1. 二阶段提交 (2PC)

二阶段提交是一种强一致性的分布式事务协议,分为两个阶段:

  1. 准备阶段:协调者询问所有参与者是否可以提交事务
  2. 提交阶段:如果所有参与者都同意提交,协调者通知所有参与者提交事务;否则通知所有参与者回滚事务

优点

  • 保证强一致性
  • 相对简单直观

缺点

  • 同步阻塞,影响性能
  • 单点故障(协调者)
  • 参与者故障可能导致资源长时间锁定

2. 三阶段提交 (3PC)

三阶段提交在二阶段提交的基础上增加了一个"预提交"阶段,主要解决了 2PC 中的单点故障和阻塞问题。

3. TCC (Try-Confirm-Cancel)

TCC 是一种补偿型事务,分为三个阶段:

  1. Try:尝试执行业务,预留资源
  2. Confirm:确认执行业务,实际使用资源
  3. Cancel:取消执行业务,释放预留资源

优点

  • 相比 2PC 性能更好
  • 可以实现最终一致性
  • 业务侵入性强,能更好地控制资源

缺点

  • 开发成本高,需要实现三个接口
  • 业务侵入性强
  • 要考虑幂等性、空回滚、悬挂等复杂问题

4. Saga 模式

Saga 是一种长事务解决方案,将长事务拆分为多个本地事务,每个本地事务都有对应的补偿事务。

执行过程:

  1. 按顺序执行各本地事务
  2. 如果某个本地事务失败,则按照相反顺序执行已完成事务的补偿事务

优点

  • 不需要锁定资源
  • 适合长事务场景
  • 可以实现最终一致性

缺点

  • 补偿事务设计复杂
  • 事务隔离性较弱
  • 需要处理复杂的回滚逻辑

5. 本地消息表

本地消息表模式使用消息队列和本地事务来实现分布式事务:

  1. 在本地事务中,同时更新业务数据和消息表
  2. 定时任务扫描消息表,将消息发送至消息队列
  3. 消费者消费消息,执行本地事务
  4. 消费者确认消息消费结果

优点

  • 基于本地事务,实现简单
  • 不依赖第三方组件
  • 可靠性高

缺点

  • 需要额外的消息表
  • 消息表与业务耦合
  • 需要定时任务,增加系统复杂性

6. 可靠消息最终一致性

可靠消息最终一致性模式是对本地消息表模式的改进,将消息表抽象为独立的消息服务:

  1. 发送预消息到消息服务
  2. 执行本地事务
  3. 根据事务结果确认或取消消息
  4. 消费者消费消息,执行本地事务
  5. 消息服务根据消费结果,重试或删除消息

优点

  • 解耦了消息管理和业务逻辑
  • 易于集成到现有系统

缺点

  • 需要消息服务支持事务消息
  • 实现复杂度较高

7. 最大努力通知

最大努力通知是一种弱一致性事务模式:

  1. 系统 A 执行本地事务
  2. 系统 A 通知系统 B
  3. 如果通知失败,则按照一定策略重试(例如,指数退避)

优点

  • 实现简单
  • 侵入性低

缺点

  • 一致性保证较弱
  • 可能需要人工干预解决长时间失败的情况

Spring Cloud 分布式事务解决方案

1. Spring Cloud Alibaba Seata

Seata 是阿里巴巴开源的分布式事务解决方案,提供了 AT、TCC、Saga 和 XA 四种事务模式。Spring Cloud Alibaba 集成了 Seata,使其易于在 Spring Cloud 应用中使用。

AT 模式

AT (Automatic Transaction) 模式是 Seata 的默认模式,它对业务无侵入,通过拦截 SQL 和自动生成反向 SQL 来实现分布式事务。

工作原理

  1. 开始全局事务:创建全局事务 ID
  2. 分支事务执行
    • 拦截 SQL,解析 SQL 获取数据变更前后的快照
    • 通过行锁防止写冲突
    • 将业务数据和回滚日志在同一个本地事务中提交
  3. 提交/回滚
    • 提交:异步删除回滚日志
    • 回滚:根据回滚日志生成反向 SQL 并执行

配置步骤

  1. 添加依赖:
xml
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
  1. 配置 Seata 事务组和服务器地址:
yaml
seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_tx_group
  service:
    vgroup-mapping:
      my_tx_group: default
    grouplist:
      default: 127.0.0.1:8091
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  1. 使用 @GlobalTransactional 注解标记全局事务:
java
@Service
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private AccountClient accountClient;
    
    @Autowired
    private InventoryClient inventoryClient;
    
    @GlobalTransactional
    public Order createOrder(OrderRequest request) {
        // 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.INIT);
        orderRepository.save(order);
        
        // 扣减账户余额
        accountClient.debit(order.getUserId(), order.getAmount());
        
        // 扣减库存
        inventoryClient.deduct(order.getProductId(), 1);
        
        // 更新订单状态
        order.setStatus(OrderStatus.DONE);
        orderRepository.save(order);
        
        return order;
    }
}

TCC 模式

TCC 模式需要业务实现 Try、Confirm 和 Cancel 三个接口。

示例

  1. 定义 TCC 接口:
java
@LocalTCC
public interface AccountService {
    
    @TwoPhaseBusinessAction(name = "debit", commitMethod = "confirm", rollbackMethod = "cancel")
    boolean try(String userId, double amount);
    
    boolean confirm(String userId, double amount);
    
    boolean cancel(String userId, double amount);
}
  1. 实现 TCC 接口:
java
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    
    @Autowired
    private AccountRepository accountRepository;
    
    @Autowired
    private AccountFreezeRepository accountFreezeRepository;
    
    @Override
    @Transactional
    public boolean try(String userId, double amount) {
        log.info("Try to debit account, userId: {}, amount: {}", userId, amount);
        
        // 检查账户余额
        Account account = accountRepository.findById(userId)
            .orElseThrow(() -> new AccountNotFoundException(userId));
        
        if (account.getBalance() < amount) {
            throw new InsufficientBalanceException(userId);
        }
        
        // 冻结金额
        account.setFrozen(account.getFrozen() + amount);
        accountRepository.save(account);
        
        // 记录冻结信息
        AccountFreeze freeze = new AccountFreeze();
        freeze.setUserId(userId);
        freeze.setAmount(amount);
        freeze.setStatus(FreezeStatus.TRY);
        accountFreezeRepository.save(freeze);
        
        return true;
    }
    
    @Override
    @Transactional
    public boolean confirm(String userId, double amount) {
        log.info("Confirm debit account, userId: {}, amount: {}", userId, amount);
        
        // 查找冻结记录
        AccountFreeze freeze = accountFreezeRepository.findByUserIdAndStatus(userId, FreezeStatus.TRY);
        if (freeze == null) {
            return true; // 幂等处理
        }
        
        // 扣减实际余额
        Account account = accountRepository.findById(userId)
            .orElseThrow(() -> new AccountNotFoundException(userId));
        account.setBalance(account.getBalance() - freeze.getAmount());
        account.setFrozen(account.getFrozen() - freeze.getAmount());
        accountRepository.save(account);
        
        // 更新冻结状态
        freeze.setStatus(FreezeStatus.CONFIRMED);
        accountFreezeRepository.save(freeze);
        
        return true;
    }
    
    @Override
    @Transactional
    public boolean cancel(String userId, double amount) {
        log.info("Cancel debit account, userId: {}, amount: {}", userId, amount);
        
        // 查找冻结记录
        AccountFreeze freeze = accountFreezeRepository.findByUserIdAndStatus(userId, FreezeStatus.TRY);
        if (freeze == null) {
            return true; // 幂等处理
        }
        
        // 解冻金额
        Account account = accountRepository.findById(userId)
            .orElseThrow(() -> new AccountNotFoundException(userId));
        account.setFrozen(account.getFrozen() - freeze.getAmount());
        accountRepository.save(account);
        
        // 更新冻结状态
        freeze.setStatus(FreezeStatus.CANCELLED);
        accountFreezeRepository.save(freeze);
        
        return true;
    }
}

Saga 模式

Seata 的 Saga 模式通过状态机引擎来编排服务调用和补偿,适合长事务场景。

配置步骤

  1. 定义状态机 JSON 配置文件:
json
{
  "name": "createOrderSaga",
  "comment": "创建订单 Saga 流程",
  "version": "1.0.0",
  "states": [
    {
      "name": "CreateOrder",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "orderService",
      "serviceMethod": "createOrder",
      "compensateMethod": "cancelOrder",
      "paramTypes": ["com.example.order.OrderRequest"],
      "paramValues": ["$.[request]"],
      "outputDataKey": "order",
      "next": "DebitAccount"
    },
    {
      "name": "DebitAccount",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "accountService",
      "serviceMethod": "debit",
      "compensateMethod": "credit",
      "paramTypes": ["java.lang.String", "double"],
      "paramValues": ["$.[order].userId", "$.[order].amount"],
      "next": "DeductInventory"
    },
    {
      "name": "DeductInventory",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "inventoryService",
      "serviceMethod": "deduct",
      "compensateMethod": "revert",
      "paramTypes": ["java.lang.String", "int"],
      "paramValues": ["$.[order].productId", "1"],
      "next": "UpdateOrderStatus"
    },
    {
      "name": "UpdateOrderStatus",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "orderService",
      "serviceMethod": "updateStatus",
      "paramTypes": ["java.lang.String", "com.example.order.OrderStatus"],
      "paramValues": ["$.[order].id", "DONE"],
      "end": true
    }
  ]
}
  1. 注册状态机引擎:
java
@Configuration
public class SagaConfig {
    
    @Bean
    public StateMachineEngine stateMachineEngine() {
        // 创建状态机引擎
        return new ProcessCtrlStateMachineEngine();
    }
}
  1. 启动 Saga 流程:
java
@Service
@Slf4j
public class OrderServiceFacade {
    
    @Autowired
    private StateMachineEngine stateMachineEngine;
    
    public void createOrder(OrderRequest request) {
        // 准备上下文
        Map<String, Object> startParams = new HashMap<>();
        startParams.put("request", request);
        
        // 启动状态机
        StateMachineInstance instance = stateMachineEngine.startWithBusinessKey(
            "createOrderSaga", 
            null, 
            request.getOrderId(), 
            startParams
        );
        
        if (instance.getException() != null) {
            log.error("Saga execution failed", instance.getException());
            throw new RuntimeException("Failed to create order", instance.getException());
        }
    }
}

2. Atomikos (XA 事务)

Atomikos 是一个开源的事务管理器,支持 XA 事务,可以和 Spring Boot 集成使用。

配置步骤

  1. 添加依赖:
xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
  1. 配置多数据源:
java
@Configuration
public class DataSourceConfig {
    
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean orderDataSource() {
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        dataSource.setUniqueResourceName("orderDB");
        dataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        
        Properties properties = new Properties();
        properties.setProperty("URL", "jdbc:mysql://localhost:3306/order_db");
        properties.setProperty("user", "root");
        properties.setProperty("password", "root");
        
        dataSource.setXaProperties(properties);
        return dataSource;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    public AtomikosDataSourceBean accountDataSource() {
        AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
        dataSource.setUniqueResourceName("accountDB");
        dataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
        
        Properties properties = new Properties();
        properties.setProperty("URL", "jdbc:mysql://localhost:3306/account_db");
        properties.setProperty("user", "root");
        properties.setProperty("password", "root");
        
        dataSource.setXaProperties(properties);
        return dataSource;
    }
}
  1. 使用 @Transactional 注解标记分布式事务:
java
@Service
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private AccountRepository accountRepository;
    
    @Transactional
    public Order createOrder(OrderRequest request) {
        // 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.INIT);
        orderRepository.save(order);
        
        // 扣减账户余额
        Account account = accountRepository.findById(request.getUserId())
            .orElseThrow(() -> new AccountNotFoundException(request.getUserId()));
        
        if (account.getBalance() < request.getAmount()) {
            throw new InsufficientBalanceException(request.getUserId());
        }
        
        account.setBalance(account.getBalance() - request.getAmount());
        accountRepository.save(account);
        
        // 更新订单状态
        order.setStatus(OrderStatus.DONE);
        orderRepository.save(order);
        
        return order;
    }
}

3. Spring Cloud Stream + 消息驱动方式

Spring Cloud Stream 可以配合消息队列实现可靠消息最终一致性模式。

实现步骤

  1. 添加依赖:
xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. 配置绑定器:
yaml
spring:
  cloud:
    stream:
      bindings:
        order-output:
          destination: orders
          content-type: application/json
        order-input:
          destination: orders
          group: order-service-group
          content-type: application/json
  1. 实现事务性消息发送:
java
@Service
@Slf4j
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private StreamBridge streamBridge;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Transactional
    public Order createOrder(OrderRequest request) {
        // 创建订单
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.INIT);
        orderRepository.save(order);
        
        // 事务性发送消息
        rabbitTemplate.setChannelTransacted(true);
        streamBridge.send("order-output", order);
        
        return order;
    }
}
  1. 实现消息消费者:
java
@Service
@Slf4j
public class AccountService {
    
    @Autowired
    private AccountRepository accountRepository;
    
    @Bean
    public Consumer<Order> orderConsumer() {
        return order -> {
            log.info("Processing order: {}", order.getId());
            
            try {
                processOrder(order);
                log.info("Order processed successfully: {}", order.getId());
            } catch (Exception e) {
                log.error("Failed to process order: {}", order.getId(), e);
                // 处理异常,可能需要重试或人工干预
            }
        };
    }
    
    @Transactional
    public void processOrder(Order order) {
        Account account = accountRepository.findById(order.getUserId())
            .orElseThrow(() -> new AccountNotFoundException(order.getUserId()));
        
        if (account.getBalance() < order.getAmount()) {
            throw new InsufficientBalanceException(order.getUserId());
        }
        
        account.setBalance(account.getBalance() - order.getAmount());
        accountRepository.save(account);
    }
}

4. Spring Cloud Task + Spring Batch

Spring Cloud Task 和 Spring Batch 可以配合使用,实现分布式批处理和任务调度,适用于需要大规模数据处理的场景。

配置步骤

  1. 添加依赖:
xml
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-core</artifactId>
</dependency>
  1. 配置 TaskConfigurer 和 BatchConfigurer:
java
@Configuration
@EnableTask
@EnableBatchProcessing
public class BatchTaskConfig {

    @Autowired
    private DataSource dataSource;

    @Bean
    public TaskConfigurer taskConfigurer() {
        return new SimpleTaskConfigurer(dataSource);
    }

    @Bean
    public BatchConfigurer batchConfigurer() {
        return new DefaultBatchConfigurer(dataSource);
    }
}
  1. 创建批处理任务:
java
@Configuration
public class BatchJobConfig {
    
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job processOrderJob(Step processOrderStep) {
        return jobBuilderFactory.get("processOrderJob")
                .start(processOrderStep)
                .build();
    }
    
    @Bean
    public Step processOrderStep(ItemReader<Order> reader, 
                                 ItemProcessor<Order, Order> processor,
                                 ItemWriter<Order> writer) {
        return stepBuilderFactory.get("processOrderStep")
                .<Order, Order>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
    
    @Bean
    @StepScope
    public JdbcCursorItemReader<Order> orderItemReader() {
        JdbcCursorItemReader<Order> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT * FROM orders WHERE status = 'PENDING'");
        reader.setRowMapper(new OrderRowMapper());
        return reader;
    }
    
    @Bean
    @StepScope
    public ItemProcessor<Order, Order> orderProcessor() {
        return order -> {
            // 处理订单逻辑
            order.setStatus(OrderStatus.PROCESSING);
            return order;
        };
    }
    
    @Bean
    @StepScope
    public JdbcBatchItemWriter<Order> orderItemWriter() {
        JdbcBatchItemWriter<Order> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("UPDATE orders SET status = ? WHERE id = ?");
        writer.setItemPreparedStatementSetter((order, ps) -> {
            ps.setString(1, order.getStatus().name());
            ps.setLong(2, order.getId());
        });
        return writer;
    }
}
  1. 启动批处理任务:
java
@Component
@Slf4j
public class OrderBatchTaskRunner implements CommandLineRunner {
    
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job processOrderJob;
    
    @Override
    public void run(String... args) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addDate("run.date", new Date())
                .toJobParameters();
        
        try {
            JobExecution execution = jobLauncher.run(processOrderJob, jobParameters);
            log.info("Job execution status: {}", execution.getStatus());
        } catch (Exception e) {
            log.error("Failed to run batch job", e);
        }
    }
}

5. 总结与最佳实践

在设计分布式事务时,我们应优先考虑业务解耦和领域划分,尽量避免分布式事务;当不可避免需要分布式事务时,应优先考虑最终一致性方案;只有在业务真正需要强一致性的场景下,才考虑使用 2PC 或 XA 事务等强一致性方案。同时,要重视事务的可监控性和可恢复性,为系统的可靠运行提供保障。

选择合适的分布式事务方案

方案一致性性能侵入性适用场景
Seata AT强一致性简单的 CRUD 场景
Seata TCC强一致性复杂业务逻辑,对性能要求高
Seata Saga最终一致性长事务,有明确的补偿机制
Seata XA强一致性对一致性要求极高的场景
消息驱动最终一致性异步处理,允许短暂不一致
本地消息表最终一致性自行实现可靠消息传递机制
XA 事务强一致性传统企业应用,对性能要求不高

分布式事务实施建议

  1. 设计阶段

    • 识别事务边界,明确一致性要求
    • 考虑业务可补偿性,设计幂等接口
    • 评估性能和可用性需求
  2. 开发阶段

    • 实现幂等操作,确保重试安全
    • 添加充分的日志,便于问题排查
    • 实现可靠的监控和告警机制
  3. 测试阶段

    • 进行混沌测试,模拟各种故障场景
    • 测试高并发下的性能和一致性
    • 验证恢复机制的有效性
  4. 运维阶段

    • 监控事务执行情况和性能指标
    • 建立问题排查和恢复流程
    • 定期演练故障恢复

案例:电商订单系统

下面通过一个电商订单系统的例子,展示如何在实际项目中应用分布式事务。

1. 系统架构

  • 订单服务:负责创建和管理订单
  • 库存服务:负责管理商品库存
  • 支付服务:负责处理支付相关业务
  • 账户服务:负责管理用户账户余额
  • 物流服务:负责处理物流配送信息

2. 使用 Seata AT 模式实现下单流程

java
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private InventoryClient inventoryClient;
    
    @Autowired
    private AccountClient accountClient;
    
    @Override
    @GlobalTransactional
    public Order createOrder(OrderCreateDTO orderCreate) {
        log.info("开始创建订单,用户ID:{},商品ID:{},数量:{}", 
            orderCreate.getUserId(), orderCreate.getProductId(), orderCreate.getCount());
        
        // 1. 创建订单
        Order order = new Order();
        order.setUserId(orderCreate.getUserId());
        order.setProductId(orderCreate.getProductId());
        order.setCount(orderCreate.getCount());
        order.setAmount(orderCreate.getAmount());
        order.setStatus(OrderStatus.CREATED);
        orderRepository.save(order);
        
        // 2. 扣减库存
        boolean inventoryResult = inventoryClient.deduct(orderCreate.getProductId(), orderCreate.getCount());
        if (!inventoryResult) {
            log.error("扣减库存失败,商品ID:{}", orderCreate.getProductId());
            throw new BusinessException("库存不足");
        }
        
        // 3. 扣减余额
        boolean accountResult = accountClient.debit(orderCreate.getUserId(), orderCreate.getAmount());
        if (!accountResult) {
            log.error("扣减余额失败,用户ID:{}", orderCreate.getUserId());
            throw new BusinessException("余额不足");
        }
        
        // 4. 更新订单状态
        order.setStatus(OrderStatus.PAID);
        orderRepository.save(order);
        
        log.info("订单创建成功,订单ID:{}", order.getId());
        return order;
    }
}

3. 使用 Saga 模式实现订单履行流程

java
@Service
@Slf4j
public class OrderFulfillmentService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private StateMachineEngine stateMachineEngine;
    
    public void fulfillOrder(String orderId) {
        log.info("开始订单履行流程,订单ID:{}", orderId);
        
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
        
        if (order.getStatus() != OrderStatus.PAID) {
            throw new IllegalStateException("订单状态不正确:" + order.getStatus());
        }
        
        // 准备上下文
        Map<String, Object> context = new HashMap<>();
        context.put("orderId", orderId);
        
        // 启动 Saga 状态机
        StateMachineInstance instance = stateMachineEngine.startWithBusinessKey(
            "orderFulfillmentSaga", 
            null, 
            orderId, 
            context
        );
        
        if (instance.getException() != null) {
            log.error("订单履行失败", instance.getException());
            throw new RuntimeException("订单履行失败", instance.getException());
        }
        
        log.info("订单履行流程启动成功,状态机实例ID:{}", instance.getId());
    }
}

订单履行 Saga 状态机定义:

json
{
  "name": "orderFulfillmentSaga",
  "comment": "订单履行 Saga 流程",
  "version": "1.0.0",
  "states": [
    {
      "name": "AllocateInventory",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "inventoryService",
      "serviceMethod": "allocate",
      "compensateMethod": "releaseAllocation",
      "paramTypes": ["java.lang.String"],
      "paramValues": ["$.[orderId]"],
      "next": "ArrangeShipment"
    },
    {
      "name": "ArrangeShipment",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "logisticsService",
      "serviceMethod": "arrangeShipment",
      "compensateMethod": "cancelShipment",
      "paramTypes": ["java.lang.String"],
      "paramValues": ["$.[orderId]"],
      "next": "UpdateOrderStatus"
    },
    {
      "name": "UpdateOrderStatus",
      "type": "ServiceTask",
      "serviceType": "spring",
      "serviceName": "orderService",
      "serviceMethod": "updateStatus",
      "paramTypes": ["java.lang.String", "com.example.order.OrderStatus"],
      "paramValues": ["$.[orderId]", "SHIPPING"],
      "end": true
    }
  ]
}

4. 使用可靠消息最终一致性模式实现异步通知

java
@Service
@Slf4j
public class OrderNotificationService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private MessageHistoryRepository messageHistoryRepository;
    
    @Transactional
    public void sendOrderCreatedNotification(Order order) {
        log.info("发送订单创建通知,订单ID:{}", order.getId());
        
        // 创建消息
        OrderCreatedMessage message = new OrderCreatedMessage();
        message.setOrderId(order.getId());
        message.setUserId(order.getUserId());
        message.setAmount(order.getAmount());
        message.setCreatedAt(new Date());
        
        // 消息ID
        String messageId = UUID.randomUUID().toString();
        
        // 记录消息历史
        MessageHistory history = new MessageHistory();
        history.setMessageId(messageId);
        history.setType("ORDER_CREATED");
        history.setPayload(objectMapper.writeValueAsString(message));
        history.setStatus(MessageStatus.SENDING);
        messageHistoryRepository.save(history);
        
        // 设置消息属性
        MessageProperties properties = new MessageProperties();
        properties.setMessageId(messageId);
        properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        
        // 事务性发送消息
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.send("order.events", "order.created", 
            new Message(objectMapper.writeValueAsBytes(message), properties));
        
        // 更新消息状态
        history.setStatus(MessageStatus.SENT);
        messageHistoryRepository.save(history);
        
        log.info("订单创建通知已发送,消息ID:{}", messageId);
    }
}

5. 使用定时任务补偿机制

java
@Component
@Slf4j
public class MessageRetryTask {
    
    @Autowired
    private MessageHistoryRepository messageHistoryRepository;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Scheduled(fixedDelay = 60000) // 每分钟执行一次
    public void retryFailedMessages() {
        log.info("开始重试失败消息");
        
        // 查找状态为 FAILED 的消息
        List<MessageHistory> failedMessages = messageHistoryRepository.findByStatus(MessageStatus.FAILED);
        
        for (MessageHistory message : failedMessages) {
            try {
                log.info("重试发送消息,消息ID:{}", message.getMessageId());
                
                // 设置消息属性
                MessageProperties properties = new MessageProperties();
                properties.setMessageId(message.getMessageId());
                properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
                
                // 重新发送消息
                rabbitTemplate.send("order.events", message.getType().toLowerCase(), 
                    new Message(message.getPayload().getBytes(), properties));
                
                // 更新消息状态
                message.setStatus(MessageStatus.SENT);
                message.setRetryCount(message.getRetryCount() + 1);
                message.setLastRetryTime(new Date());
                messageHistoryRepository.save(message);
                
                log.info("消息重试成功,消息ID:{}", message.getMessageId());
            } catch (Exception e) {
                log.error("消息重试失败,消息ID:{}", message.getMessageId(), e);
                message.setRetryCount(message.getRetryCount() + 1);
                message.setLastRetryTime(new Date());
                
                // 如果重试次数超过阈值,标记为需要人工干预
                if (message.getRetryCount() >= 5) {
                    message.setStatus(MessageStatus.NEED_MANUAL);
                }
                
                messageHistoryRepository.save(message);
            }
        }
        
        log.info("失败消息重试完成");
    }
}

分布式事务方案选择指南

不同的分布式事务方案适用于不同的场景,以下是选择指南:

场景推荐方案优点
跨多个服务,对一致性要求高Seata AT对业务无侵入,易于集成
复杂业务流程,需要精细控制Seata TCC细粒度控制,性能好
长事务,涉及多个步骤Saga无需长时间锁定资源
异步业务流程消息最终一致性系统解耦,高可用
批处理任务Spring Batch + Task易于管理和监控
简单场景,低一致性要求最大努力通知实现简单,性能高

总结

分布式事务是微服务架构中的一个挑战性问题,没有一种万能的解决方案。在实际应用中,我们需要根据业务场景、一致性需求和性能要求选择合适的分布式事务模式。Spring Cloud 及其生态系统提供了多种分布式事务解决方案,如 Seata、Atomikos 以及基于消息的方案等,为构建可靠的分布式系统提供了有力支持。

在设计分布式事务时,我们应优先考虑业务解耦和领域划分,尽量避免分布式事务;当不可避免需要分布式事务时,应优先考虑最终一致性方案;只有在业务真正需要强一致性的场景下,才考虑使用 2PC 或 XA 事务等强一致性方案。同时,要重视事务的可监控性和可恢复性,为系统的可靠运行提供保障。