Appearance
分布式事务
分布式事务是指跨越多个服务或资源的事务,在微服务架构中尤为重要。由于微服务架构将业务拆分为多个独立服务,一个业务操作可能需要调用多个服务,这就带来了分布式事务的挑战。Spring Cloud 提供了多种方案来处理分布式事务问题。
分布式事务的挑战
在单体应用中,我们可以依赖数据库的 ACID 特性来保证事务的一致性。但在分布式系统中,由于以下原因,传统的事务管理变得困难:
- 多数据源:不同的微服务可能使用不同的数据库
- 网络不可靠:服务间的通信可能因网络问题而失败
- 部分失败:部分服务可能成功,部分服务可能失败
- 长事务:分布式事务往往耗时较长,长时间锁定资源会降低系统吞吐量
CAP 理论与分布式事务
CAP 理论指出,分布式系统无法同时满足以下三个特性:
- 一致性 (Consistency):所有节点在同一时间看到的数据是一致的
- 可用性 (Availability):系统能够继续提供服务,即使部分节点故障
- 分区容错性 (Partition tolerance):系统能够容忍网络分区故障
在分布式事务的设计中,我们通常需要在一致性和可用性之间做出权衡。
分布式事务的模式
1. 二阶段提交 (2PC)
二阶段提交是一种强一致性的分布式事务协议,分为两个阶段:
- 准备阶段:协调者询问所有参与者是否可以提交事务
- 提交阶段:如果所有参与者都同意提交,协调者通知所有参与者提交事务;否则通知所有参与者回滚事务
优点:
- 保证强一致性
- 相对简单直观
缺点:
- 同步阻塞,影响性能
- 单点故障(协调者)
- 参与者故障可能导致资源长时间锁定
2. 三阶段提交 (3PC)
三阶段提交在二阶段提交的基础上增加了一个"预提交"阶段,主要解决了 2PC 中的单点故障和阻塞问题。
3. TCC (Try-Confirm-Cancel)
TCC 是一种补偿型事务,分为三个阶段:
- Try:尝试执行业务,预留资源
- Confirm:确认执行业务,实际使用资源
- Cancel:取消执行业务,释放预留资源
优点:
- 相比 2PC 性能更好
- 可以实现最终一致性
- 业务侵入性强,能更好地控制资源
缺点:
- 开发成本高,需要实现三个接口
- 业务侵入性强
- 要考虑幂等性、空回滚、悬挂等复杂问题
4. Saga 模式
Saga 是一种长事务解决方案,将长事务拆分为多个本地事务,每个本地事务都有对应的补偿事务。
执行过程:
- 按顺序执行各本地事务
- 如果某个本地事务失败,则按照相反顺序执行已完成事务的补偿事务
优点:
- 不需要锁定资源
- 适合长事务场景
- 可以实现最终一致性
缺点:
- 补偿事务设计复杂
- 事务隔离性较弱
- 需要处理复杂的回滚逻辑
5. 本地消息表
本地消息表模式使用消息队列和本地事务来实现分布式事务:
- 在本地事务中,同时更新业务数据和消息表
- 定时任务扫描消息表,将消息发送至消息队列
- 消费者消费消息,执行本地事务
- 消费者确认消息消费结果
优点:
- 基于本地事务,实现简单
- 不依赖第三方组件
- 可靠性高
缺点:
- 需要额外的消息表
- 消息表与业务耦合
- 需要定时任务,增加系统复杂性
6. 可靠消息最终一致性
可靠消息最终一致性模式是对本地消息表模式的改进,将消息表抽象为独立的消息服务:
- 发送预消息到消息服务
- 执行本地事务
- 根据事务结果确认或取消消息
- 消费者消费消息,执行本地事务
- 消息服务根据消费结果,重试或删除消息
优点:
- 解耦了消息管理和业务逻辑
- 易于集成到现有系统
缺点:
- 需要消息服务支持事务消息
- 实现复杂度较高
7. 最大努力通知
最大努力通知是一种弱一致性事务模式:
- 系统 A 执行本地事务
- 系统 A 通知系统 B
- 如果通知失败,则按照一定策略重试(例如,指数退避)
优点:
- 实现简单
- 侵入性低
缺点:
- 一致性保证较弱
- 可能需要人工干预解决长时间失败的情况
Spring Cloud 分布式事务解决方案
1. Spring Cloud Alibaba Seata
Seata 是阿里巴巴开源的分布式事务解决方案,提供了 AT、TCC、Saga 和 XA 四种事务模式。Spring Cloud Alibaba 集成了 Seata,使其易于在 Spring Cloud 应用中使用。
AT 模式
AT (Automatic Transaction) 模式是 Seata 的默认模式,它对业务无侵入,通过拦截 SQL 和自动生成反向 SQL 来实现分布式事务。
工作原理:
- 开始全局事务:创建全局事务 ID
- 分支事务执行:
- 拦截 SQL,解析 SQL 获取数据变更前后的快照
- 通过行锁防止写冲突
- 将业务数据和回滚日志在同一个本地事务中提交
- 提交/回滚:
- 提交:异步删除回滚日志
- 回滚:根据回滚日志生成反向 SQL 并执行
配置步骤:
- 添加依赖:
xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
- 配置 Seata 事务组和服务器地址:
yaml
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: my_tx_group
service:
vgroup-mapping:
my_tx_group: default
grouplist:
default: 127.0.0.1:8091
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
- 使用
@GlobalTransactional
注解标记全局事务:
java
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountClient accountClient;
@Autowired
private InventoryClient inventoryClient;
@GlobalTransactional
public Order createOrder(OrderRequest request) {
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.INIT);
orderRepository.save(order);
// 扣减账户余额
accountClient.debit(order.getUserId(), order.getAmount());
// 扣减库存
inventoryClient.deduct(order.getProductId(), 1);
// 更新订单状态
order.setStatus(OrderStatus.DONE);
orderRepository.save(order);
return order;
}
}
TCC 模式
TCC 模式需要业务实现 Try、Confirm 和 Cancel 三个接口。
示例:
- 定义 TCC 接口:
java
@LocalTCC
public interface AccountService {
@TwoPhaseBusinessAction(name = "debit", commitMethod = "confirm", rollbackMethod = "cancel")
boolean try(String userId, double amount);
boolean confirm(String userId, double amount);
boolean cancel(String userId, double amount);
}
- 实现 TCC 接口:
java
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountRepository accountRepository;
@Autowired
private AccountFreezeRepository accountFreezeRepository;
@Override
@Transactional
public boolean try(String userId, double amount) {
log.info("Try to debit account, userId: {}, amount: {}", userId, amount);
// 检查账户余额
Account account = accountRepository.findById(userId)
.orElseThrow(() -> new AccountNotFoundException(userId));
if (account.getBalance() < amount) {
throw new InsufficientBalanceException(userId);
}
// 冻结金额
account.setFrozen(account.getFrozen() + amount);
accountRepository.save(account);
// 记录冻结信息
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setAmount(amount);
freeze.setStatus(FreezeStatus.TRY);
accountFreezeRepository.save(freeze);
return true;
}
@Override
@Transactional
public boolean confirm(String userId, double amount) {
log.info("Confirm debit account, userId: {}, amount: {}", userId, amount);
// 查找冻结记录
AccountFreeze freeze = accountFreezeRepository.findByUserIdAndStatus(userId, FreezeStatus.TRY);
if (freeze == null) {
return true; // 幂等处理
}
// 扣减实际余额
Account account = accountRepository.findById(userId)
.orElseThrow(() -> new AccountNotFoundException(userId));
account.setBalance(account.getBalance() - freeze.getAmount());
account.setFrozen(account.getFrozen() - freeze.getAmount());
accountRepository.save(account);
// 更新冻结状态
freeze.setStatus(FreezeStatus.CONFIRMED);
accountFreezeRepository.save(freeze);
return true;
}
@Override
@Transactional
public boolean cancel(String userId, double amount) {
log.info("Cancel debit account, userId: {}, amount: {}", userId, amount);
// 查找冻结记录
AccountFreeze freeze = accountFreezeRepository.findByUserIdAndStatus(userId, FreezeStatus.TRY);
if (freeze == null) {
return true; // 幂等处理
}
// 解冻金额
Account account = accountRepository.findById(userId)
.orElseThrow(() -> new AccountNotFoundException(userId));
account.setFrozen(account.getFrozen() - freeze.getAmount());
accountRepository.save(account);
// 更新冻结状态
freeze.setStatus(FreezeStatus.CANCELLED);
accountFreezeRepository.save(freeze);
return true;
}
}
Saga 模式
Seata 的 Saga 模式通过状态机引擎来编排服务调用和补偿,适合长事务场景。
配置步骤:
- 定义状态机 JSON 配置文件:
json
{
"name": "createOrderSaga",
"comment": "创建订单 Saga 流程",
"version": "1.0.0",
"states": [
{
"name": "CreateOrder",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "orderService",
"serviceMethod": "createOrder",
"compensateMethod": "cancelOrder",
"paramTypes": ["com.example.order.OrderRequest"],
"paramValues": ["$.[request]"],
"outputDataKey": "order",
"next": "DebitAccount"
},
{
"name": "DebitAccount",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "accountService",
"serviceMethod": "debit",
"compensateMethod": "credit",
"paramTypes": ["java.lang.String", "double"],
"paramValues": ["$.[order].userId", "$.[order].amount"],
"next": "DeductInventory"
},
{
"name": "DeductInventory",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "inventoryService",
"serviceMethod": "deduct",
"compensateMethod": "revert",
"paramTypes": ["java.lang.String", "int"],
"paramValues": ["$.[order].productId", "1"],
"next": "UpdateOrderStatus"
},
{
"name": "UpdateOrderStatus",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "orderService",
"serviceMethod": "updateStatus",
"paramTypes": ["java.lang.String", "com.example.order.OrderStatus"],
"paramValues": ["$.[order].id", "DONE"],
"end": true
}
]
}
- 注册状态机引擎:
java
@Configuration
public class SagaConfig {
@Bean
public StateMachineEngine stateMachineEngine() {
// 创建状态机引擎
return new ProcessCtrlStateMachineEngine();
}
}
- 启动 Saga 流程:
java
@Service
@Slf4j
public class OrderServiceFacade {
@Autowired
private StateMachineEngine stateMachineEngine;
public void createOrder(OrderRequest request) {
// 准备上下文
Map<String, Object> startParams = new HashMap<>();
startParams.put("request", request);
// 启动状态机
StateMachineInstance instance = stateMachineEngine.startWithBusinessKey(
"createOrderSaga",
null,
request.getOrderId(),
startParams
);
if (instance.getException() != null) {
log.error("Saga execution failed", instance.getException());
throw new RuntimeException("Failed to create order", instance.getException());
}
}
}
2. Atomikos (XA 事务)
Atomikos 是一个开源的事务管理器,支持 XA 事务,可以和 Spring Boot 集成使用。
配置步骤:
- 添加依赖:
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
- 配置多数据源:
java
@Configuration
public class DataSourceConfig {
@Bean(initMethod = "init", destroyMethod = "close")
public AtomikosDataSourceBean orderDataSource() {
AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
dataSource.setUniqueResourceName("orderDB");
dataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
Properties properties = new Properties();
properties.setProperty("URL", "jdbc:mysql://localhost:3306/order_db");
properties.setProperty("user", "root");
properties.setProperty("password", "root");
dataSource.setXaProperties(properties);
return dataSource;
}
@Bean(initMethod = "init", destroyMethod = "close")
public AtomikosDataSourceBean accountDataSource() {
AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
dataSource.setUniqueResourceName("accountDB");
dataSource.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
Properties properties = new Properties();
properties.setProperty("URL", "jdbc:mysql://localhost:3306/account_db");
properties.setProperty("user", "root");
properties.setProperty("password", "root");
dataSource.setXaProperties(properties);
return dataSource;
}
}
- 使用
@Transactional
注解标记分布式事务:
java
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private AccountRepository accountRepository;
@Transactional
public Order createOrder(OrderRequest request) {
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.INIT);
orderRepository.save(order);
// 扣减账户余额
Account account = accountRepository.findById(request.getUserId())
.orElseThrow(() -> new AccountNotFoundException(request.getUserId()));
if (account.getBalance() < request.getAmount()) {
throw new InsufficientBalanceException(request.getUserId());
}
account.setBalance(account.getBalance() - request.getAmount());
accountRepository.save(account);
// 更新订单状态
order.setStatus(OrderStatus.DONE);
orderRepository.save(order);
return order;
}
}
3. Spring Cloud Stream + 消息驱动方式
Spring Cloud Stream 可以配合消息队列实现可靠消息最终一致性模式。
实现步骤:
- 添加依赖:
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 配置绑定器:
yaml
spring:
cloud:
stream:
bindings:
order-output:
destination: orders
content-type: application/json
order-input:
destination: orders
group: order-service-group
content-type: application/json
- 实现事务性消息发送:
java
@Service
@Slf4j
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private StreamBridge streamBridge;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public Order createOrder(OrderRequest request) {
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.INIT);
orderRepository.save(order);
// 事务性发送消息
rabbitTemplate.setChannelTransacted(true);
streamBridge.send("order-output", order);
return order;
}
}
- 实现消息消费者:
java
@Service
@Slf4j
public class AccountService {
@Autowired
private AccountRepository accountRepository;
@Bean
public Consumer<Order> orderConsumer() {
return order -> {
log.info("Processing order: {}", order.getId());
try {
processOrder(order);
log.info("Order processed successfully: {}", order.getId());
} catch (Exception e) {
log.error("Failed to process order: {}", order.getId(), e);
// 处理异常,可能需要重试或人工干预
}
};
}
@Transactional
public void processOrder(Order order) {
Account account = accountRepository.findById(order.getUserId())
.orElseThrow(() -> new AccountNotFoundException(order.getUserId()));
if (account.getBalance() < order.getAmount()) {
throw new InsufficientBalanceException(order.getUserId());
}
account.setBalance(account.getBalance() - order.getAmount());
accountRepository.save(account);
}
}
4. Spring Cloud Task + Spring Batch
Spring Cloud Task 和 Spring Batch 可以配合使用,实现分布式批处理和任务调度,适用于需要大规模数据处理的场景。
配置步骤:
- 添加依赖:
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
- 配置 TaskConfigurer 和 BatchConfigurer:
java
@Configuration
@EnableTask
@EnableBatchProcessing
public class BatchTaskConfig {
@Autowired
private DataSource dataSource;
@Bean
public TaskConfigurer taskConfigurer() {
return new SimpleTaskConfigurer(dataSource);
}
@Bean
public BatchConfigurer batchConfigurer() {
return new DefaultBatchConfigurer(dataSource);
}
}
- 创建批处理任务:
java
@Configuration
public class BatchJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job processOrderJob(Step processOrderStep) {
return jobBuilderFactory.get("processOrderJob")
.start(processOrderStep)
.build();
}
@Bean
public Step processOrderStep(ItemReader<Order> reader,
ItemProcessor<Order, Order> processor,
ItemWriter<Order> writer) {
return stepBuilderFactory.get("processOrderStep")
.<Order, Order>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
@StepScope
public JdbcCursorItemReader<Order> orderItemReader() {
JdbcCursorItemReader<Order> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT * FROM orders WHERE status = 'PENDING'");
reader.setRowMapper(new OrderRowMapper());
return reader;
}
@Bean
@StepScope
public ItemProcessor<Order, Order> orderProcessor() {
return order -> {
// 处理订单逻辑
order.setStatus(OrderStatus.PROCESSING);
return order;
};
}
@Bean
@StepScope
public JdbcBatchItemWriter<Order> orderItemWriter() {
JdbcBatchItemWriter<Order> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("UPDATE orders SET status = ? WHERE id = ?");
writer.setItemPreparedStatementSetter((order, ps) -> {
ps.setString(1, order.getStatus().name());
ps.setLong(2, order.getId());
});
return writer;
}
}
- 启动批处理任务:
java
@Component
@Slf4j
public class OrderBatchTaskRunner implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job processOrderJob;
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("run.date", new Date())
.toJobParameters();
try {
JobExecution execution = jobLauncher.run(processOrderJob, jobParameters);
log.info("Job execution status: {}", execution.getStatus());
} catch (Exception e) {
log.error("Failed to run batch job", e);
}
}
}
5. 总结与最佳实践
在设计分布式事务时,我们应优先考虑业务解耦和领域划分,尽量避免分布式事务;当不可避免需要分布式事务时,应优先考虑最终一致性方案;只有在业务真正需要强一致性的场景下,才考虑使用 2PC 或 XA 事务等强一致性方案。同时,要重视事务的可监控性和可恢复性,为系统的可靠运行提供保障。
选择合适的分布式事务方案
方案 | 一致性 | 性能 | 侵入性 | 适用场景 |
---|---|---|---|---|
Seata AT | 强一致性 | 中 | 低 | 简单的 CRUD 场景 |
Seata TCC | 强一致性 | 高 | 高 | 复杂业务逻辑,对性能要求高 |
Seata Saga | 最终一致性 | 高 | 中 | 长事务,有明确的补偿机制 |
Seata XA | 强一致性 | 低 | 低 | 对一致性要求极高的场景 |
消息驱动 | 最终一致性 | 高 | 中 | 异步处理,允许短暂不一致 |
本地消息表 | 最终一致性 | 中 | 高 | 自行实现可靠消息传递机制 |
XA 事务 | 强一致性 | 低 | 低 | 传统企业应用,对性能要求不高 |
分布式事务实施建议
设计阶段:
- 识别事务边界,明确一致性要求
- 考虑业务可补偿性,设计幂等接口
- 评估性能和可用性需求
开发阶段:
- 实现幂等操作,确保重试安全
- 添加充分的日志,便于问题排查
- 实现可靠的监控和告警机制
测试阶段:
- 进行混沌测试,模拟各种故障场景
- 测试高并发下的性能和一致性
- 验证恢复机制的有效性
运维阶段:
- 监控事务执行情况和性能指标
- 建立问题排查和恢复流程
- 定期演练故障恢复
案例:电商订单系统
下面通过一个电商订单系统的例子,展示如何在实际项目中应用分布式事务。
1. 系统架构
- 订单服务:负责创建和管理订单
- 库存服务:负责管理商品库存
- 支付服务:负责处理支付相关业务
- 账户服务:负责管理用户账户余额
- 物流服务:负责处理物流配送信息
2. 使用 Seata AT 模式实现下单流程
java
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private InventoryClient inventoryClient;
@Autowired
private AccountClient accountClient;
@Override
@GlobalTransactional
public Order createOrder(OrderCreateDTO orderCreate) {
log.info("开始创建订单,用户ID:{},商品ID:{},数量:{}",
orderCreate.getUserId(), orderCreate.getProductId(), orderCreate.getCount());
// 1. 创建订单
Order order = new Order();
order.setUserId(orderCreate.getUserId());
order.setProductId(orderCreate.getProductId());
order.setCount(orderCreate.getCount());
order.setAmount(orderCreate.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 2. 扣减库存
boolean inventoryResult = inventoryClient.deduct(orderCreate.getProductId(), orderCreate.getCount());
if (!inventoryResult) {
log.error("扣减库存失败,商品ID:{}", orderCreate.getProductId());
throw new BusinessException("库存不足");
}
// 3. 扣减余额
boolean accountResult = accountClient.debit(orderCreate.getUserId(), orderCreate.getAmount());
if (!accountResult) {
log.error("扣减余额失败,用户ID:{}", orderCreate.getUserId());
throw new BusinessException("余额不足");
}
// 4. 更新订单状态
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
log.info("订单创建成功,订单ID:{}", order.getId());
return order;
}
}
3. 使用 Saga 模式实现订单履行流程
java
@Service
@Slf4j
public class OrderFulfillmentService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private StateMachineEngine stateMachineEngine;
public void fulfillOrder(String orderId) {
log.info("开始订单履行流程,订单ID:{}", orderId);
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
if (order.getStatus() != OrderStatus.PAID) {
throw new IllegalStateException("订单状态不正确:" + order.getStatus());
}
// 准备上下文
Map<String, Object> context = new HashMap<>();
context.put("orderId", orderId);
// 启动 Saga 状态机
StateMachineInstance instance = stateMachineEngine.startWithBusinessKey(
"orderFulfillmentSaga",
null,
orderId,
context
);
if (instance.getException() != null) {
log.error("订单履行失败", instance.getException());
throw new RuntimeException("订单履行失败", instance.getException());
}
log.info("订单履行流程启动成功,状态机实例ID:{}", instance.getId());
}
}
订单履行 Saga 状态机定义:
json
{
"name": "orderFulfillmentSaga",
"comment": "订单履行 Saga 流程",
"version": "1.0.0",
"states": [
{
"name": "AllocateInventory",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "inventoryService",
"serviceMethod": "allocate",
"compensateMethod": "releaseAllocation",
"paramTypes": ["java.lang.String"],
"paramValues": ["$.[orderId]"],
"next": "ArrangeShipment"
},
{
"name": "ArrangeShipment",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "logisticsService",
"serviceMethod": "arrangeShipment",
"compensateMethod": "cancelShipment",
"paramTypes": ["java.lang.String"],
"paramValues": ["$.[orderId]"],
"next": "UpdateOrderStatus"
},
{
"name": "UpdateOrderStatus",
"type": "ServiceTask",
"serviceType": "spring",
"serviceName": "orderService",
"serviceMethod": "updateStatus",
"paramTypes": ["java.lang.String", "com.example.order.OrderStatus"],
"paramValues": ["$.[orderId]", "SHIPPING"],
"end": true
}
]
}
4. 使用可靠消息最终一致性模式实现异步通知
java
@Service
@Slf4j
public class OrderNotificationService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageHistoryRepository messageHistoryRepository;
@Transactional
public void sendOrderCreatedNotification(Order order) {
log.info("发送订单创建通知,订单ID:{}", order.getId());
// 创建消息
OrderCreatedMessage message = new OrderCreatedMessage();
message.setOrderId(order.getId());
message.setUserId(order.getUserId());
message.setAmount(order.getAmount());
message.setCreatedAt(new Date());
// 消息ID
String messageId = UUID.randomUUID().toString();
// 记录消息历史
MessageHistory history = new MessageHistory();
history.setMessageId(messageId);
history.setType("ORDER_CREATED");
history.setPayload(objectMapper.writeValueAsString(message));
history.setStatus(MessageStatus.SENDING);
messageHistoryRepository.save(history);
// 设置消息属性
MessageProperties properties = new MessageProperties();
properties.setMessageId(messageId);
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
// 事务性发送消息
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.send("order.events", "order.created",
new Message(objectMapper.writeValueAsBytes(message), properties));
// 更新消息状态
history.setStatus(MessageStatus.SENT);
messageHistoryRepository.save(history);
log.info("订单创建通知已发送,消息ID:{}", messageId);
}
}
5. 使用定时任务补偿机制
java
@Component
@Slf4j
public class MessageRetryTask {
@Autowired
private MessageHistoryRepository messageHistoryRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 60000) // 每分钟执行一次
public void retryFailedMessages() {
log.info("开始重试失败消息");
// 查找状态为 FAILED 的消息
List<MessageHistory> failedMessages = messageHistoryRepository.findByStatus(MessageStatus.FAILED);
for (MessageHistory message : failedMessages) {
try {
log.info("重试发送消息,消息ID:{}", message.getMessageId());
// 设置消息属性
MessageProperties properties = new MessageProperties();
properties.setMessageId(message.getMessageId());
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
// 重新发送消息
rabbitTemplate.send("order.events", message.getType().toLowerCase(),
new Message(message.getPayload().getBytes(), properties));
// 更新消息状态
message.setStatus(MessageStatus.SENT);
message.setRetryCount(message.getRetryCount() + 1);
message.setLastRetryTime(new Date());
messageHistoryRepository.save(message);
log.info("消息重试成功,消息ID:{}", message.getMessageId());
} catch (Exception e) {
log.error("消息重试失败,消息ID:{}", message.getMessageId(), e);
message.setRetryCount(message.getRetryCount() + 1);
message.setLastRetryTime(new Date());
// 如果重试次数超过阈值,标记为需要人工干预
if (message.getRetryCount() >= 5) {
message.setStatus(MessageStatus.NEED_MANUAL);
}
messageHistoryRepository.save(message);
}
}
log.info("失败消息重试完成");
}
}
分布式事务方案选择指南
不同的分布式事务方案适用于不同的场景,以下是选择指南:
场景 | 推荐方案 | 优点 |
---|---|---|
跨多个服务,对一致性要求高 | Seata AT | 对业务无侵入,易于集成 |
复杂业务流程,需要精细控制 | Seata TCC | 细粒度控制,性能好 |
长事务,涉及多个步骤 | Saga | 无需长时间锁定资源 |
异步业务流程 | 消息最终一致性 | 系统解耦,高可用 |
批处理任务 | Spring Batch + Task | 易于管理和监控 |
简单场景,低一致性要求 | 最大努力通知 | 实现简单,性能高 |
总结
分布式事务是微服务架构中的一个挑战性问题,没有一种万能的解决方案。在实际应用中,我们需要根据业务场景、一致性需求和性能要求选择合适的分布式事务模式。Spring Cloud 及其生态系统提供了多种分布式事务解决方案,如 Seata、Atomikos 以及基于消息的方案等,为构建可靠的分布式系统提供了有力支持。
在设计分布式事务时,我们应优先考虑业务解耦和领域划分,尽量避免分布式事务;当不可避免需要分布式事务时,应优先考虑最终一致性方案;只有在业务真正需要强一致性的场景下,才考虑使用 2PC 或 XA 事务等强一致性方案。同时,要重视事务的可监控性和可恢复性,为系统的可靠运行提供保障。