栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 前沿技术 > 云计算 > Docker/k8s

Springboot之分布式事务框架Seata实现原理源码分析

Docker/k8s 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力



环境:springboot2.2.11 + seata1.3.0

1 准备环境

  1.    com.alibaba.cloud 
  2.   spring-cloud-starter-alibaba-seata    
  3.            io.seata 
  4.       seata-all      
  5.     
  6.    io.seata 
  7.   seata-all   1.3.0 
  8.  

开启全局事务

  1. seata:   service: 
  2.     disable-global-transaction: true   
2 代理数据源及注册代理Bean
  1. @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) 
  2. @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) { 
  3.   if (LOGGER.isInfoEnabled()) {     LOGGER.info("Automatically configure Seata"); 
  4.   }   return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(),failureHandler); 
  5. }  
  6. @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR) @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true) 
  7. @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class) public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) { 
  8.   return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying()); } 
2.1 创建代理Bean

Seata通过GlobalTransactionScanner来注册我们项目中所有带有@GlobalTransactional注解的方法类。

  1. public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean, ApplicationContextAware, DisposableBean 

AbstractAutoProxyCreator继承层次


Springboot之分布式事务框架Seata实现原理源码分析

从这里也知道GlobalTransactionScanner类其实是一个BeanPostProcessor处理器。

InstantiationAwareBeanPostProcessor类有如下3个方法是很有用的

postProcessBeforeInstantiation 实例化前执行

postProcessAfterInstantiation 实例化之后执行

postProcessProperties 属性填充时执行

当然在这里GlobalTransactionScanner类并没有覆盖这3个方法。

BeanPostProcessor相关的2个方法在父类中有实现

  1. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { 
  2.   return bean; } 
  3. @Override public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) { 
  4.   if (bean != null) {     Object cacheKey = getCacheKey(bean.getClass(), beanName); 
  5.     if (this.earlyProxyReferences.remove(cacheKey) != bean) {       return wrapIfNecessary(bean, beanName, cacheKey); 
  6.     }   } 
  7.   return bean; } 

在实例化Bean的时候会执行父类中

postProcessAfterInitialization方法。关键是该方法中的wrapIfNecessary方法,该方法在GlobalTransactionScanner类中被重写了。


Springboot之分布式事务框架Seata实现原理源码分析

existsAnnotation 方法判断当前的类方法上是否有@GlobalTransactional注解。如果不存在会直接返回当前Bean。

interceptor 判断当前拦截器是否为空,为空创建

GlobalTransactionalInterceptor该拦截器处理全局事务的地方。

  1. if (!AopUtils.isAopProxy(bean)) {   bean = super.wrapIfNecessary(bean, beanName, cacheKey); 
  2. } else {   AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); 
  3.   Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));   for (Advisor avr : advisor) { 
  4.     advised.addAdvisor(0, avr);   } 

该片段代码,判断当前的Bean是否是代理类(JDK或CGLIB),如果不是那么会先的执行下父类的wrapIfNecessary方法。

如果当前Bean是代理类对象,那么会获取当前代理类的AdvisedSupport(内部维护了切面类的集合)对象。这里可以看看JDK和CGLIB两种方式创建的代理类对象是否都具有AdvisedSupport对象。

  1. public static AdvisedSupport getAdvisedSupport(Object proxy) throws Exception {   Field h; 
  2.   if (AopUtils.isJdkDynamicProxy(proxy)) {     h = proxy.getClass().getSuperclass().getDeclaredField("h"); 
  3.   } else {     h = proxy.getClass().getDeclaredField("CGLIB$CALLBACK_0"); 
  4.   }   h.setAccessible(true); 
  5.   Object dynamicAdvisedInterceptor = h.get(proxy);   Field advised = dynamicAdvisedInterceptor.getClass().getDeclaredField("advised"); 
  6.   advised.setAccessible(true);   return (AdvisedSupport)advised.get(dynamicAdvisedInterceptor); 

jdk创建代理对象时使用的InvocationHandler

  1. final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializable {  
  2.      private static final Log logger = LogFactory.getLog(JdkDynamicAopProxy.class); 
  3.     
  4.   private final AdvisedSupport advised; }     

cglib 获取CGLIB$CALLBACK_0字段,该字段是MethodInterceptor对象

  1. public class PersonDAOImpl$$EnhancerBySpringCGLIB$$d4658dad extends PersonDAOImpl implements SpringProxy, Advised, Factory{   private boolean CGLIB$BOUND; 
  2.   public static Object CGLIB$FACTORY_DATA;   private static final ThreadLocal CGLIB$THREAD_CALLBACKS; 
  3.   private static final Callback[] CGLIB$STATIC_CALLBACKS;   private MethodInterceptor CGLIB$CALLBACK_0; 
  4. }     

 接下来就是将Seata的拦截器添加到AdvisedSupport中。

  1. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { 
  2.   advised.addAdvisor(0, avr); } 
  3. protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException {   return new Object[]{interceptor}; 

到此就将Seata的方法拦截器包装成Advisor切面添加到了当前的AdvisedSupport管理的切面集合中。

2.2 创建代理数据源

对数据源上的方法调用进行代理处理通过DataSourceProxy

  1. public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {   private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class); 
  2.   private final String[] excludes;   private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice()); 
  3.    public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) { 
  4.     this.excludes = excludes;     setProxyTargetClass(!useJdkProxy); 
  5.   }  
  6.   @Override   protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException { 
  7.     if (LOGGER.isInfoEnabled()) {       LOGGER.info("Auto proxy of [{}]", beanName); 
  8.     }     return new Object[]{advisor}; 
  9.   }  
  10.   @Override   protected boolean shouldSkip(Class beanClass, String beanName) { 
  11.     return SeataProxy.class.isAssignableFrom(beanClass) || !DataSource.class.isAssignableFrom(beanClass) ||                   Arrays.asList(excludes).contains(beanClass.getName());     } 

shouldSkip 该方法确定了如果当前beanClass是SeataProxy的子类并且beanClass不是DataSource的子类或者当前的bean名称不再excludes集合中就会进行代理。简单说就是代理当前系统的默认数据源对象。

getAdvicesAndAdvisorsForBean 方法直接返回DefaultIntroductionAdvisor切面,通知类是SeataAutoDataSourceProxyAdvice

  1. public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {  
  2.   @Override   public Object invoke(MethodInvocation invocation) throws Throwable { 
  3.     DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());     Method method = invocation.getMethod(); 
  4.     Object[] args = invocation.getArguments();     Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes()); 
  5.     if (m != null) {       return m.invoke(dataSourceProxy, args); 
  6.     } else {       return invocation.proceed(); 
  7.     }   } 
  8.    @Override 
  9.   public Class[] getInterfaces() {     return new Class[]{SeataProxy.class}; 
  10.   }  
3 全局事务拦截器

在需要进行发起全局事务的方法是被代理的 具体执行的拦截器是

GlobalTransactionalInterceptor


Springboot之分布式事务框架Seata实现原理源码分析

handleGlobalTransaction方法

通过事务模版执行,TransactionalExecutor类进行收集当前@GlobalTransactional注解上配置的相关信息封装到TransactionInfo中。

TransactionalTemplate.execute方法

  1. // 该方法中根据不同的事务传播特性进行不同的处理。 public Object execute(TransactionalExecutor business) throws Throwable { 
  2.     // 1 get transactionInfo     TransactionInfo txInfo = business.getTransactionInfo(); 
  3.     if (txInfo == null) {         throw new ShouldNeverHappenException("transactionInfo does not exist"); 
  4.     }     // 1.1 get or create a transaction 
  5.     GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();  
  6.     // 1.2 Handle the Transaction propatation and the branchType     Propagation propagation = txInfo.getPropagation(); 
  7.     SuspendedResourcesHolder suspendedResourcesHolder = null;     try { 
  8.         switch (propagation) {             case NOT_SUPPORTED: 
  9.                 suspendedResourcesHolder = tx.suspend(true);                 return business.execute(); 
  10.             case REQUIRES_NEW:                 suspendedResourcesHolder = tx.suspend(true); 
  11.                 break;             case SUPPORTS: 
  12.                 if (!existingTransaction()) {                     return business.execute(); 
  13.                 }                 break; 
  14.             case REQUIRED:                 break; 
  15.             case NEVER:                 // 存在事务抛出异常 
  16.                 if (existingTransaction()) {                     throw new TransactionException( 
  17.                         String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"                                       ,RootContext.getXID())); 
  18.                 } else {                     // 直接执行业务代码 
  19.                     return business.execute();                 } 
  20.             case MANDATORY:                 if (!existingTransaction()) { 
  21.                     throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");                 } 
  22.                 break;             default: 
  23.                 throw new TransactionException("Not Supported Propagation:" + propagation);         } 
  24.          try { 
  25.              // 2. 开始事务 
  26.             beginTransaction(txInfo, tx);  
  27.             Object rs = null;             try { 
  28.                  // 执行我们的业务代码 
  29.                 rs = business.execute();  
  30.             } catch (Throwable ex) {  
  31.                 // 3.the needed business exception to rollback.                 completeTransactionAfterThrowing(txInfo, tx, ex); 
  32.                 throw ex;             } 
  33.              // 4. 一切正常提交事务。 
  34.             commitTransaction(tx);  
  35.             return rs;         } finally { 
  36.             //5. clear             triggerAfterCompletion(); 
  37.             cleanUp();         } 
  38.     } finally {         tx.resume(suspendedResourcesHolder); 
  39.     }  
3.1 获取全局事务
  1. GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); public static GlobalTransaction getCurrentOrCreate() { 
  2.   // 首次这里会返回null,执行createNew方法     GlobalTransaction tx = getCurrent(); 
  3.   if (tx == null) {     return createNew(); 
  4.   }   return tx; 
  5. } // 获取全局事务对象 
  6. private static GlobalTransaction getCurrent() {   String xid = RootContext.getXID(); 
  7.   if (xid == null) {     return null; 
  8.   }   return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant); 
  9. } private static GlobalTransaction createNew() { 
  10.   return new DefaultGlobalTransaction(); } 
3.2 开始全局事务
  1. beginTransaction(txInfo, tx); private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { 
  2.   try {     triggerBeforeBegin(); 
  3.     tx.begin(txInfo.getTimeOut(), txInfo.getName());     triggerAfterBegin(); 
  4.   } catch (TransactionException txe) {     throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); 
  5.   } } 
  6. // 开始全局事务,并且通过TC获取全局事务唯一ID  xid public void begin(int timeout, String name) throws TransactionException { 
  7.   // 全局事务的开启必须是Launcher     if (role != GlobalTransactionRole.Launcher) { 
  8.     assertXIDNotNull();     if (LOGGER.isDebugEnabled()) { 
  9.       LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);     } 
  10.     return;   } 
  11.   assertXIDNull();   if (RootContext.getXID() != null) { 
  12.     throw new IllegalStateException();   } 
  13.   xid = transactionManager.begin(null, null, name, timeout);   status = GlobalStatus.Begin; 
  14.   // 将当前获取到的xid绑定到当前thread上(ThreadLocal)     RootContext.bind(xid); 
  15.   if (LOGGER.isInfoEnabled()) {     LOGGER.info("Begin new global transaction [{}]", xid); 
  16.   } } 

将xid绑定到当前执行thread(ThreadLocal)在这里seata是通过SPI技术来实现的

  1. private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load(); public static void bind(String xid) { 
  2.   if (LOGGER.isDebugEnabled()) {     LOGGER.debug("bind {}", xid); 
  3.   }   CONTEXT_HOLDER.put(KEY_XID, xid); 
  4. } //通过SPI加载具体的ContextCore实现 
  5. public class ContextCoreLoader {  
  6.   private ContextCoreLoader() {   } 
  7.    private static class ContextCoreHolder { 
  8.     private static final ContextCore INSTANCE = Optional.ofNullable(EnhancedServiceLoader.load(ContextCore.class)).orElse(new ThreadLocalContextCore());   } 
  9.   public static ContextCore load() {     return ContextCoreHolder.INSTANCE; 
  10.   }  
  11. } // meta-INF/services/io.seata.core.context.ContextCore 文件内容 
  12. io.seata.core.context.ThreadLocalContextCore io.seata.core.context.FastThreadLocalContextCore 
 3.3 执行本地业务

执行本地事务时会向TC注册分支然后提交本地事务,接下来看看本地分支事务的注册及处理。

  1. rs = business.execute(); 
3.3.1 提交本地事务

执行完业务代码后提交事务ConnectionProxy.commit()

  1. @Override public void commit() throws SQLException { 
  2.   try {     LOCK_RETRY_POLICY.execute(() -> { 
  3.       // 事务提交         doCommit(); 
  4.       return null;     }); 
  5.   } catch (SQLException e) {     throw e; 
  6.   } catch (Exception e) {     throw new SQLException(e); 
  7.   } } 
  8. private void doCommit() throws SQLException {   // 判断当前是否在全局事务中(xid != null)   
  9.   if (context.inGlobalTransaction()) {     // 处理全局事务   
  10.     processGlobalTransactionCommit();   } else if (context.isGlobalLockRequire()) { 
  11.     processLocalCommitWithGlobalLocks();   } else { 
  12.     targetConnection.commit();   } 

进入

processGlobalTransactionCommit方法

3.3.2 注册本次事务分支
  1. private void processGlobalTransactionCommit() throws SQLException {   try { 
  2.     register();   } catch (TransactionException e) { 
  3.     recognizeLockKeyConflictException(e, context.buildLockKeys());   } 
  4.   try {     UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); 
  5.     targetConnection.commit();   } catch (Throwable ex) { 
  6.     report(false);     throw new SQLException(ex); 
  7.   }   if (IS_REPORT_SUCCESS_ENABLE) { 
  8.     report(true);   } 
  9.   context.reset(); } 
  10.  // 注册本次事务执行RM,将返回的branchId保存到当前的上下文中 
  11. private void register() throws TransactionException {   if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) { 
  12.     return;   } 
  13.   Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), null, context.buildLockKeys());   // 将注册RM返回的branchId绑定到当前的上下文中ConnectionContext   
  14.   context.setBranchId(branchId); } 

进入branchRegister方法

DefaultResourceManager.java

  1. @Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) 
  2.   throws TransactionException {   return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys); 
  1. @Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { 
  2.   try {     BranchRegisterRequest request = new BranchRegisterRequest(); 
  3.     request.setXid(xid);     request.setLockKey(lockKeys); 
  4.     request.setResourceId(resourceId);     request.setBranchType(branchType); 
  5.     request.setApplicationData(applicationData);  
  6.     BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);     if (response.getResultCode() == ResultCode.Failed) { 
  7.       throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));     } 
  8.     return response.getBranchId();   } catch (TimeoutException toe) { 
  9.     throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);   } catch (RuntimeException rex) { 
  10.     throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);   } 
3.3.3 记录undo log日志

undo log主要记录了数据的逻辑变化,比如一条 INSERT 语句,对应一条DELETE 的 undo log ,对于每个 UPDATE 语句,对应一条相反的 UPDATE 的 undo log ,这样在发生错误时,就能回滚到事务之前的数据状态。

  1. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // 记录undo log日志 
  2. public void flushUndoLogs(ConnectionProxy cp) throws SQLException {   ConnectionContext connectionContext = cp.getContext(); 
  3.   if (!connectionContext.hasUndoLog()) {     return; 
  4.   }  
  5.   String xid = connectionContext.getXid();   long branchId = connectionContext.getBranchId(); 
  6.    BranchUndoLog branchUndoLog = new BranchUndoLog(); 
  7.   branchUndoLog.setXid(xid);   branchUndoLog.setBranchId(branchId); 
  8.   branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());  
  9.   UndoLogParser parser = UndoLogParserFactory.getInstance();   byte[] undoLogContent = parser.encode(branchUndoLog); 
  10.    if (LOGGER.isDebugEnabled()) { 
  11.     LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));   } 
  12.    // 将undo log日志出入到undo_log表中   
  13.   insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent, cp.getTargetConnection()); } 
  14. // 这里会根据是Oracle或MySQL自动执行;我当前环境使用的MySQL,所以使用的是MySQLUndoLogManager @Override 
  15. protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx,                                        byte[] undoLogContent, Connection conn) throws SQLException { 
  16.     insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn); } 
  17. private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, State state, Connection conn) throws SQLException {     try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) { 
  18.         pst.setLong(1, branchId);         pst.setString(2, xid); 
  19.         pst.setString(3, rollbackCtx);         pst.setBlob(4, BlobUtils.bytes2Blob(undoLogContent)); 
  20.         pst.setInt(5, state.getValue());         pst.executeUpdate(); 
  21.     } catch (Exception e) {         if (!(e instanceof SQLException)) { 
  22.             e = new SQLException(e);         } 
  23.         throw (SQLException) e;     } 
3.3.4 本地事务提交

业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。

  1. targetConnection.commit(); 
3.3.5 重置当前上下文

将当前ConnectionProxy中的ConnectionContext重置

  1. context.reset(); // 重置 
  2. void reset(String xid) {   this.xid = xid; 
  3.   branchId = null;   this.isGlobalLockRequire = false; 
  4.   lockKeysBuffer.clear();   sqlUndoItemsBuffer.clear(); 

到此整个全局事务的第一阶段完成了(通过feign的调用也成功返回);接下来是第二阶段的提交。

3.4 全局事物提交
  1. // TransactionalTemplate.java commitTransaction(tx); 
  2. private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {   try { 
  3.     triggerBeforeCommit();     tx.commit(); 
  4.     triggerAfterCommit();   } catch (TransactionException txe) { 
  5.     // 4.1 Failed to commit     throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); 
  6.   } } 
  7. // DefaultGlobalTransaction.java public void commit() throws TransactionException { 
  8.   int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;   try { 
  9.     while (retry > 0) {       try { 
  10.         // 根据当前的全局唯一事务id xid提交事务。         status = transactionManager.commit(xid); 
  11.         break;       } catch (Throwable ex) { 
  12.         retry--;           if (retry == 0) { 
  13.             throw new TransactionException("Failed to report global commit", ex);           } 
  14.       }     } 
  15.   } finally {     if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) { 
  16.       suspend(true);     } 
  17.   } } 
  18. // DefaultTransactionManager.java @Override 
  19. public GlobalStatus commit(String xid) throws TransactionException {   GlobalCommitRequest globalCommit = new GlobalCommitRequest(); 
  20.   globalCommit.setXid(xid);   GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); 
  21.   return response.getGlobalStatus(); } 
  22. // 通过Netty同步调用 private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { 
  23.   try {     return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); 
  24.   } catch (TimeoutException toe) {     throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); 
  25.   } } 

到此第二阶段的事务就提交完成了。

3.5 全局事务回滚

参与全局事务的任何一个分支发生异常将对整个事务进行回滚。

3.5.1 全局事务发起端异常

代码片段

  1. try {   // Do Your Business 
  2.   rs = business.execute(); } catch (Throwable ex) { 
  3.   // 3.the needed business exception to rollback.   completeTransactionAfterThrowing(txInfo, tx, ex); 
  4.   throw ex; } 

当本地业务执行时发生异常后执行

completeTransactionAfterThrowing方法。

  1. private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {   //roll back 
  2.   if (txInfo != null && txInfo.rollbackOn(originalException)) {     try { 
  3.       // 全局事务回滚         rollbackTransaction(tx, originalException); 
  4.     } catch (TransactionException txe) {       // Failed to rollback 
  5.       throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);     } 
  6.   } else {     // not roll back on this exception, so commit 
  7.     commitTransaction(tx);   } 

进入rollbackTransaction方法。

  1. private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {   triggerBeforeRollback(); 
  2.   tx.rollback();   triggerAfterRollback(); 
  3.   // 3.1 Successfully rolled back   throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException); 

 进入rollback方法。

  1. public void rollback() throws TransactionException {   // Participant(参与者),如果当前是参与者那么直接返回,全局事务的回滚必须是Launcher 
  2.   if (role == GlobalTransactionRole.Participant) {     return; 
  3.   }   assertXIDNotNull(); 
  4.   int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;   try { 
  5.     while (retry > 0) {       try { 
  6.         status = transactionManager.rollback(xid);         break; 
  7.       } catch (Throwable ex) {         retry--; 
  8.         if (retry == 0) {           throw new TransactionException("Failed to report global rollback", ex); 
  9.         }       } 
  10.     }   } finally { 
  11.     if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {       suspend(true); 
  12.     }   } 

进入

transactionManager.rollback方法

  1. public GlobalStatus rollback(String xid) throws TransactionException {   GlobalRollbackRequest globalRollback = new GlobalRollbackRequest(); 
  2.   globalRollback.setXid(xid);   GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback); 
  3.   return response.getGlobalStatus(); } 

到此全局事务就进行了回滚

3.5.2 全局事务参与者异常 4 XID的传递 4.1 RestTemplate
  1. @Configuration(proxyBeanMethods = false) public class SeataRestTemplateAutoConfiguration { 
  2.    // 该拦截器的作用就是为请求的Header中传递XID 
  3.   @Bean   public SeataRestTemplateInterceptor seataRestTemplateInterceptor() { 
  4.     return new SeataRestTemplateInterceptor();   } 
  5.   // 获取当前IOC容器中所有的的RestTemplate对象   @Autowired(required = false) 
  6.   private Collection restTemplates;  
  7.   @Autowired   private SeataRestTemplateInterceptor seataRestTemplateInterceptor; 
  8.    // 当前这个Bean(SeataRestTemplateAutoConfiguration)被创建且相关属性被注入后执行 
  9.   @PostConstruct   public void init() { 
  10.     if (this.restTemplates != null) {       // 为所有的RestTemplate设置一个拦截器。SeataRestTemplateInterceptor 
  11.       for (RestTemplate restTemplate : restTemplates) {         List interceptors = new ArrayList(restTemplate.getInterceptors()); 
  12.         interceptors.add(this.seataRestTemplateInterceptor);         restTemplate.setInterceptors(interceptors); 
  13.       }     } 
  14.   } } 
  1. public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {  
  2.   @Override   public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException { 
  3.     HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);  
  4.     String xid = RootContext.getXID();  
  5.     if (!StringUtils.isEmpty(xid)) {       requestWrapper.getHeaders().add(RootContext.KEY_XID, xid); 
  6.     }     return clientHttpRequestExecution.execute(requestWrapper, bytes); 
  7.   }  

该拦截器的作用就是为当前的请求header中放置TX_XID头信息。(是不是有点过分了,所有的RestTemplate都添加这个Header;应该像@LoadBalanced一样只有添加有该注解的才具有负载均衡的作用)。

到此RestTemplate调用方式传递XID值信息就这么简单。

4.2 Feign
  1. @Configuration(proxyBeanMethods = false) @ConditionalOnClass(Client.class) 
  2. @AutoConfigureBefore(FeignAutoConfiguration.class) public class SeataFeignClientAutoConfiguration { 
  3.   @Bean   @Scope("prototype") 
  4.   @ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")   @ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true") 
  5.   Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {     return SeataHystrixFeignBuilder.builder(beanFactory); 
  6.   } } 

每一个Feign客户端是一个FeignClientFactoryBean 工厂Bean。当在调用接口的时候会执行getObject方法

  1. @Override public Object getObject() throws Exception { 
  2.   return getTarget(); } 
  3.   T getTarget() { 
  4.   FeignContext context = this.applicationContext.getBean(FeignContext.class);   // 这个方法会从当前的IOC容器中获取Feign.Builder对象   
  5.   Feign.Builder builder = feign(context);  
  6.   if (!StringUtils.hasText(this.url)) {     if (!this.name.startsWith("http")) { 
  7.       this.url = "http://" + this.name;     } else { 
  8.       this.url = this.name;     } 
  9.     this.url += cleanPath();     // 负载均衡调用目标服务(通过服务发现调用)   
  10.     return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type, this.name, this.url));   } 
  11.   // 下面是通过配置的url直接调用目标服务。     if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) { 
  12.     this.url = "http://" + this.url;   } 
  13.   String url = this.url + cleanPath();   Client client = getOptional(context, Client.class); 
  14.   if (client != null) {     if (client instanceof LoadBalancerFeignClient) { 
  15.       client = ((LoadBalancerFeignClient) client).getDelegate();     } 
  16.     if (client instanceof FeignBlockingLoadBalancerClient) {       client = ((FeignBlockingLoadBalancerClient) client).getDelegate(); 
  17.     }     builder.client(client); 
  18.   }   Targeter targeter = get(context, Targeter.class); 
  19.   return (T) targeter.target(this, builder, context, new HardCodedTarget<>(this.type, this.name, url)); } 

feign(content)方法在调用链中最终执行如下方法从IOC容器中获取Feign.Builder

  1. public  T getInstance(String name, Class type) {   AnnotationConfigApplicationContext context = getContext(name); 
  2.   if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,type).length > 0) {     return context.getBean(type); 
  3.   }   return null; 

接下来进入

SeataHystrixFeignBuilder.builder(beanFactory);方法

  1. static Feign.Builder builder(BeanFactory beanFactory) {   return HystrixFeign.builder().retryer(Retryer.NEVER_RETRY).client(new SeataFeignClient(beanFactory)); 

SeataFeignClient类

  1. // Feign的执行就通过Client接口调用。 public class SeataFeignClient implements Client { 
  2.    private final Client delegate; 
  3.    private final BeanFactory beanFactory; 
  4.    private static final int MAP_SIZE = 16; 
  5.    SeataFeignClient(BeanFactory beanFactory) { 
  6.     this.beanFactory = beanFactory;     this.delegate = new Client.Default(null, null); 
  7.   }  
  8.   SeataFeignClient(BeanFactory beanFactory, Client delegate) {     this.delegate = delegate; 
  9.     this.beanFactory = beanFactory;   } 
  10.    @Override 
  11.   public Response execute(Request request, Request.Options options) throws IOException {     Request modifiedRequest = getModifyRequest(request); 
  12.     return this.delegate.execute(modifiedRequest, options);   } 
  13.    // 该方法给请求headers中添加TX_XID请求header信息。  
  14.   private Request getModifyRequest(Request request) {     String xid = RootContext.getXID(); 
  15.     if (StringUtils.isEmpty(xid)) {       return request; 
  16.     }  
  17.     Map> headers = new HashMap<>(MAP_SIZE);     headers.putAll(request.headers()); 
  18.      List seataXid = new ArrayList<>(); 
  19.     seataXid.add(xid);     headers.put(RootContext.KEY_XID, seataXid); 
  20.     return Request.create(request.method(), request.url(), headers, request.body(), request.charset());   } 
  21.  } 
5 参与者如何加入全局事务

在被调用端(通过Feign调用服务)接口服务上没有加入任何注解或是特殊的代码那它又是如何加入到整个全局事务中的呢?

在2.2中介绍了seata自动配置会为我们自动的创建数据源代理。就是通过这个代理数据源来完成的DataSourceProxy。

事务方法在执行时都会先拿到Connection对象,这里系统默认的DataSource已经被代理成DataSourceProxy。

5.1 参与者获取XID

SeataHandlerInterceptorConfiguration注册一个拦截器SeataHandlerInterceptor;SeataHandlerInterceptor拦截器对我们的所有请求进行拦截

  1. public class SeataHandlerInterceptorConfiguration implements WebMvcConfigurer {  
  2.   @Override   public void addInterceptors(InterceptorRegistry registry) { 
  3.     registry.addInterceptor(new SeataHandlerInterceptor()).addPathPatterns("/**");   } 
  4.  } 

拦截器从Header中获取TX_XID

  1. public class SeataHandlerInterceptor implements HandlerInterceptor {   @Override 
  2.   public boolean preHandle(HttpServletRequest request, HttpServletResponse response,Object handler) {     String xid = RootContext.getXID(); 
  3.     // 从Header中获取TX_XID信息。如果存在就绑定到RootContext上下文中。       String rpcXid = request.getHeader(RootContext.KEY_XID); 
  4.     if (StringUtils.isBlank(xid) && rpcXid != null) {       RootContext.bind(rpcXid); 
  5.     }     return true; 
  6.   }   @Override 
  7.   public void afterCompletion(HttpServletRequest request, HttpServletResponse response,Object handler, Exception e) {     if (StringUtils.isNotBlank(RootContext.getXID())) { 
  8.       String rpcXid = request.getHeader(RootContext.KEY_XID);       if (StringUtils.isEmpty(rpcXid)) { 
  9.         return;       } 
  10.       // 执行完后解除绑定         String unbindXid = RootContext.unbind(); 
  11.       if (!rpcXid.equalsIgnoreCase(unbindXid)) {         if (unbindXid != null) { 
  12.           RootContext.bind(unbindXid);         } 
  13.       }     } 
  14.   } } 

参与者通过拦截器的方式将xid拿到并且绑定到上下文中。

5.2 获取代理连接对象

一个数据库操作执行

DataSourceProxy.getConnection方法获取ConnectionProxy对象。

  1. @Override public ConnectionProxy getConnection() throws SQLException { 
  2.   Connection targetConnection = targetDataSource.getConnection();   return new ConnectionProxy(this, targetConnection); 

用ConnectionProxy代理默认数据源的的Connection对象。

在ConnectionProxy对象中有个非常重要的属性

  1. public class ConnectionProxy extends AbstractConnectionProxy {  
  2.   private ConnectionContext context = new ConnectionContext();      
  3. }     

在一个数据库操作做最后事务提交的时候会通过ConnectionContext对象来判断是否是全局事务xid是否为空。

5.3 绑定XID到ConnectionContext

由于事务在提交的时候需要从ConnectionContext中获取判断是否全局事务(xid是否为空);xid是在Statement执行时进行绑定的。

执行相关SQL语句是通过StatementProxy, PreparedStatementProxy,这两个对象都是通过ConnectionProxy获取。

  1. public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {  
  2.   @Override   public Map> getParameters() { 
  3.     return parameters;   } 
  4.    public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException { 
  5.     super(connectionProxy, targetStatement, targetSQL);   } 
  6.    @Override 
  7.   public boolean execute() throws SQLException {     return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); 
  8.   }  
  9.   @Override   public ResultSet executeQuery() throws SQLException { 
  10.     return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());   } 
  11.    @Override 
  12.   public int executeUpdate() throws SQLException {     return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate()); 
  13.   } } 

查看executeUpdate的执行

  1. public static  T execute(List sqlRecognizers,                                                      StatementProxy statementProxy, 
  2.                                                      StatementCallback statementCallback,                                                      Object... args) throws SQLException { 
  3.     if (!RootContext.requireGlobalLock() && !StringUtils.equals(BranchType.AT.name(), RootContext.getBranchType())) {         // Just work as original statement 
  4.         return statementCallback.execute(statementProxy.getTargetStatement(), args);     } 
  5.      String dbType = statementProxy.getConnectionProxy().getDbType(); 
  6.     if (CollectionUtils.isEmpty(sqlRecognizers)) {         sqlRecognizers = SQLVisitorFactory.get( 
  7.             statementProxy.getTargetSQL(),             dbType); 
  8.     }     Executor executor; 
  9.     if (CollectionUtils.isEmpty(sqlRecognizers)) {         executor = new PlainExecutor<>(statementProxy, statementCallback); 
  10.     } else {         if (sqlRecognizers.size() == 1) { 
  11.             SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);             switch (sqlRecognizer.getSQLType()) { 
  12.                 case INSERT:                     executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, 
  13.                                                           new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},                                                           new Object[]{statementProxy, statementCallback, sqlRecognizer}); 
  14.                     break;                 case UPDATE: 
  15.                     executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);                     break; 
  16.                 case DELETE:                     executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); 
  17.                     break;                 case SELECT_FOR_UPDATE: 
  18.                     executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);                     break; 
  19.                 default:                     executor = new PlainExecutor<>(statementProxy, statementCallback); 
  20.                     break;             } 
  21.         } else {             executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); 
  22.         }     } 
  23.     T rs;     try { 
  24.         rs = executor.execute(args);     } catch (Throwable ex) { 
  25.         if (!(ex instanceof SQLException)) {             // Turn other exception into SQLException 
  26.             ex = new SQLException(ex);         } 
  27.         throw (SQLException) ex;     } 
  28.     return rs; } 

这里的操作UpdateExecutor,DeleteExecutor,InsertExecutor(通过SPI获取实现MySQL或Oracle)他们都继承自baseTransactionalExecutor。

  1. rs = executor.execute(args); // 上面的execute执行baseTransactionalExecutor.execute方法。 
  2. public class baseTransactionalExecutor ... {     @Override 
  3.     public T execute(Object... args) throws Throwable {         if (RootContext.inGlobalTransaction()) { 
  4.             String xid = RootContext.getXID();             statementProxy.getConnectionProxy().bind(xid); 
  5.         }  
  6.         statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());         return doExecute(args); 
  7.     } } 
  1. statementProxy.getConnectionProxy().bind(xid) ; // 该行代码将xid绑定到ConnectionProxy对象中的ConnectionContext上。 
  2. public void bind(String xid) {   context.bind(xid); 

到这xid已经绑定到了ConnectionProxy中的ConnectionContext中。

 

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/796807.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号