Appearance
Seata分布式事务
Seata(Simple Extensible Autonomous Transaction Architecture)是阿里巴巴开源的分布式事务解决方案,专为微服务架构设计。Spring Cloud Alibaba Seata 提供了 Spring Cloud 标准的分布式事务实现。
分布式事务问题
在微服务架构中,一个业务操作通常需要调用多个微服务来完成。这些服务可能使用不同的数据库,如何保证这些操作的原子性就成为了一个挑战。
分布式事务面临的主要问题包括:
- 原子性难保证:多个服务的操作要么全部成功,要么全部失败
- 数据一致性:所有节点的数据最终一致
- 故障恢复:某个节点故障后,如何恢复事务状态
- 性能开销:分布式事务会带来额外的性能开销
Seata 架构和原理
Seata 的核心组件
Seata 由以下三个角色组成:
- TC (Transaction Coordinator) - 事务协调者:维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚。
- TM (Transaction Manager) - 事务管理器:定义全局事务的范围,开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器:管理分支事务,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务的提交或回滚。
Seata 的工作流程
- TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
- XID 在微服务调用链路的上下文中传播。
- RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
- TM 向 TC 发起针对 XID 的全局提交或回滚决议。
- TC 调度 XID 下管辖的全部分支事务完成提交或回滚。
Seata 的事务模式
Seata 提供了四种事务模式:
1. AT 模式
AT 模式是 Seata 的默认模式,基于两阶段提交和回滚日志实现,适用于关系型数据库。
工作原理:
- 一阶段:业务数据和回滚日志在同一个本地事务中提交
- 二阶段:
- 提交:异步删除回滚日志
- 回滚:根据回滚日志生成补偿操作,完成数据回滚
优点:
- 对业务无侵入,仅需要添加注解
- 自动生成回滚操作,无需手动编写
适用场景:
- 基于关系型数据库的微服务
- CRUD 操作为主的业务场景
2. TCC 模式
TCC (Try-Confirm-Cancel) 模式需要用户实现三个操作接口,完全由业务逻辑控制。
工作原理:
- Try:资源的检查和预留
- Confirm:真正执行业务,使用预留的资源
- Cancel:释放预留的资源
优点:
- 完全根据业务逻辑实现,灵活性高
- 不依赖于数据库,可支持多种资源类型
适用场景:
- 对性能要求较高的场景
- 非关系型数据库操作
- 复杂业务逻辑,仅靠 SQL 无法实现的场景
3. Saga 模式
Saga 模式是长事务解决方案,基于正向服务和补偿服务实现。
工作原理:
- 每个服务都有对应的补偿服务
- 如果正向服务链路执行成功,则全局事务完成
- 如果中间某个服务失败,则按照相反顺序执行已完成服务的补偿服务
优点:
- 适合长事务
- 无需锁定资源,并发度高
适用场景:
- 业务流程长、业务流程多的场景
- 需要保证最终一致性的场景
4. XA 模式
XA 模式直接使用 XA 协议实现分布式事务,依赖于数据库对 XA 协议的支持。
工作原理:
- 一阶段:执行 SQL 但不提交
- 二阶段:TC 发起全局提交或回滚,RM 执行提交或回滚
优点:
- 强一致性
- 对业务无侵入
适用场景:
- 需要强一致性的场景
- 对性能要求不高的场景
部署 Seata 服务器
方式一:下载安装包
从 Seata GitHub Releases 下载最新版本的安装包
解压后配置数据库:
在
script/server/db
目录下找到对应数据库的 SQL 脚本,创建 Seata 的配置和会话表修改配置文件:
编辑
conf/application.yml
文件,配置数据库连接和注册中心等信息启动服务器:
bash# Linux/Mac sh bin/seata-server.sh # Windows bin\seata-server.bat
方式二:使用 Docker
bash
docker run --name seata-server -p 8091:8091 -e SEATA_CONFIG_NAME=file:/root/seata-config/registry -v /path/to/config:/root/seata-config seataio/seata-server:latest
整合 Spring Cloud Alibaba 应用
1. 添加依赖
xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
2. 配置 Seata
在 application.yml
中添加 Seata 配置:
yaml
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: my_tx_group # 事务分组名称
service:
vgroup-mapping:
my_tx_group: default # 映射到 TC 服务器的集群名
grouplist:
default: 127.0.0.1:8091 # TC 服务器地址
registry:
type: nacos # 注册中心类型,根据实际情况选择
nacos:
server-addr: 127.0.0.1:8848
namespace: public
group: SEATA_GROUP
config:
type: nacos # 配置中心类型,根据实际情况选择
nacos:
server-addr: 127.0.0.1:8848
namespace: public
group: SEATA_GROUP
3. 配置数据源代理
在 Spring Boot 应用中配置数据源代理,使 Seata 能够管理数据库操作:
java
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// 使用 Seata 代理数据源
return new DataSourceProxy(druidDataSource);
}
}
注意:在 Seata 1.2.0 及以上版本,客户端不再需要显式配置 DataSourceProxy,由 Seata 自动处理。
4. 使用 AT 模式
在业务服务的入口方法上添加 @GlobalTransactional
注解:
java
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountClient accountClient;
@Autowired
private InventoryClient inventoryClient;
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
@Override
public Order createOrder(OrderDTO orderDTO) {
// 创建订单
Order order = new Order();
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setAmount(orderDTO.getAmount());
order.setStatus(OrderStatus.INIT);
orderMapper.insert(order);
// 扣减库存
inventoryClient.deduct(orderDTO.getProductId(), orderDTO.getCount());
// 扣减余额
accountClient.debit(orderDTO.getUserId(), orderDTO.getAmount());
// 更新订单状态
order.setStatus(OrderStatus.SUCCESS);
orderMapper.updateById(order);
return order;
}
}
5. 使用 TCC 模式
定义 TCC 接口和实现类:
java
@LocalTCC
public interface AccountService {
@TwoPhaseBusinessAction(name = "debitAccount", commitMethod = "confirm", rollbackMethod = "cancel")
boolean prepare(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "amount") double amount);
boolean confirm(BusinessActionContext context);
boolean cancel(BusinessActionContext context);
}
@Service
public class AccountServiceImpl implements AccountService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private AccountFreezeMapper freezeMapper;
@Override
@Transactional
public boolean prepare(String userId, double amount) {
// 检查用户余额
Account account = accountMapper.selectById(userId);
if (account.getBalance() < amount) {
throw new InsufficientBalanceException();
}
// 冻结资金
account.setFrozen(account.getFrozen() + amount);
accountMapper.updateById(account);
// 记录冻结记录
AccountFreeze freeze = new AccountFreeze();
freeze.setUserId(userId);
freeze.setAmount(amount);
freeze.setTxId(RootContext.getXID());
freezeMapper.insert(freeze);
return true;
}
@Override
@Transactional
public boolean confirm(BusinessActionContext context) {
String userId = context.getActionContext("userId").toString();
Double amount = (Double) context.getActionContext("amount");
String txId = context.getXid();
// 查找冻结记录
AccountFreeze freeze = freezeMapper.selectByTxId(txId);
if (freeze == null) {
return true; // 幂等处理
}
// 扣减实际金额
Account account = accountMapper.selectById(userId);
account.setBalance(account.getBalance() - amount);
account.setFrozen(account.getFrozen() - amount);
accountMapper.updateById(account);
// 删除冻结记录
freezeMapper.deleteById(freeze.getId());
return true;
}
@Override
@Transactional
public boolean cancel(BusinessActionContext context) {
String userId = context.getActionContext("userId").toString();
Double amount = (Double) context.getActionContext("amount");
String txId = context.getXid();
// 查找冻结记录
AccountFreeze freeze = freezeMapper.selectByTxId(txId);
if (freeze == null) {
return true; // 幂等处理
}
// 解冻资金
Account account = accountMapper.selectById(userId);
account.setFrozen(account.getFrozen() - amount);
accountMapper.updateById(account);
// 删除冻结记录
freezeMapper.deleteById(freeze.getId());
return true;
}
}
Seata 的高级特性
1. 全局锁
Seata 的 AT 模式使用全局锁机制来解决写冲突问题:
- 在一阶段,RM 通过 SELECT FOR UPDATE 语句获取本地锁,并向 TC 注册全局锁
- 在二阶段提交时,RM 释放本地锁和全局锁
- 在二阶段回滚时,RM 通过全局锁确保回滚操作的安全执行
2. 隔离级别
Seata 的默认隔离级别是读未提交(Read Uncommitted),可以通过配置参数 client.rm.lock.retryPolicyBranchRollbackOnConflict
来调整。
3. 事务超时
可以通过 @GlobalTransactional
注解的 timeoutMills
参数设置事务超时时间:
java
@GlobalTransactional(timeoutMills = 60000) // 设置60秒超时
public void businessMethod() {
// 业务逻辑
}
4. 配置中心和注册中心
Seata 支持多种配置中心和注册中心,如 Nacos、Eureka、ZooKeeper、Consul 等:
yaml
seata:
registry:
type: nacos # 可选:nacos, eureka, zookeeper, consul 等
# 其他配置...
config:
type: nacos # 可选:nacos, apollo, zookeeper, consul 等
# 其他配置...
常见问题与解决方案
1. 分支事务提交失败
可能原因:
- 网络问题导致 RM 和 TC 通信失败
- TC 服务器异常
解决方案:
- 检查网络连接
- 查看 TC 服务器日志
- 配置适当的重试机制
2. 全局事务锁冲突
可能原因:
- 多个全局事务同时修改同一行数据
解决方案:
- 优化业务逻辑,减少锁冲突
- 设置适当的锁等待时间和重试次数
3. 性能问题
可能原因:
- 使用了不适合的事务模式
- 事务粒度过大,包含过多操作
解决方案:
- 选择合适的事务模式,如 TCC 比 AT 性能更好
- 减小事务范围,只包含必要的操作
- 使用异步确认和回滚机制
4. 数据不一致
可能原因:
- 回滚失败
- 未正确配置所有参与事务的资源
解决方案:
- 实现全面的监控,及时发现不一致
- 设置合理的重试策略
- 必要时实现补偿机制
最佳实践
选择合适的事务模式:
- 简单 CRUD 场景:优先使用 AT 模式
- 性能要求高的场景:考虑 TCC 模式
- 长事务场景:考虑 Saga 模式
- 强一致性需求:考虑 XA 模式
合理控制事务粒度:
- 避免事务过大,减少锁冲突
- 尽可能减少参与全局事务的微服务数量
实现幂等性:
- 确保所有事务操作都是幂等的,防止重复执行
异常处理:
- 全面捕获并处理异常
- 实现回滚补偿机制
- 设置合理的超时时间
监控与告警:
- 监控事务成功率、执行时间等指标
- 对异常事务设置告警
性能优化:
- 使用连接池优化数据库连接
- 考虑使用批处理提高性能
- 对不需要强一致性的操作使用异步处理
总结
Seata 作为一款开源的分布式事务解决方案,提供了 AT、TCC、Saga 和 XA 四种事务模式,能够满足不同场景下的分布式事务需求。通过与 Spring Cloud Alibaba 的无缝集成,开发者可以方便地在微服务架构中实现分布式事务,保证数据的一致性。
选择合适的事务模式、合理控制事务粒度、实现幂等性和异常处理等最佳实践,可以帮助开发者构建更可靠、高效的分布式事务系统。