企业网站 实名认证,赚钱软件哪个赚钱多又快,企业宣传网站,网页设计师培训在哪里Seata 源码篇之AT模式启动流程 - 02 自动配置两个关键点 初始化初始化TM初始化RM初始化TC 全局事务执行流程TM 发起全局事务GlobalTransactional 注解处理全局事务的开启 TM 和 RM 执行分支事务IntroductionDelegatingIntroductionInterceptorDelegatePerTargetObjectIntroduct… Seata 源码篇之AT模式启动流程 - 02 自动配置两个关键点 初始化初始化TM初始化RM初始化TC 全局事务执行流程TM 发起全局事务GlobalTransactional 注解处理全局事务的开启 TM 和 RM 执行分支事务IntroductionDelegatingIntroductionInterceptorDelegatePerTargetObjectIntroductionInterceptorIntroduction 切面 数据源自动代理创建器 小结 上一篇 文章我们介绍了Seata AT模式解决分布式事务的核心思想本文我们来看看具体通过源码来看看AT模式运行的具体流程。
本文基于Seata源码版本如下所示: 拉取日期: 2023-09-14 下面是本文的流程:
Seata与Spring结合使用的自动配置类TMRMTC等组件初始化流程全局事务执行流程
本文核心是梳理一遍AT模式启动的整体流程所以不会过多侧重细节的分析更多在全局的掌控和理解上。 自动配置
Seata的AT模式目标是做到业务无侵入只需要一个简单的注解即可完成声明式分布式事务管理为了做到这一点就需要与Spring AOP模块结合使用。
AOP 通常会和 IOC 结合使用 其中 AOP 工作流程通常分为三步这里结合Spring提供的IOC进行叙述:
选择切入哪些对象 : 借助Spring在每个Bean创建流程中的回调埋点依次获取到每个创建完毕的Bean然后利用JointPoint进行切入过滤这里回调埋点指的就是BeanPostProcessorSpring为AOP拦截判断专门提供了一个现成的Bean后置处理器我们称之为自动代理创建器。选择在哪些时机点处织入逻辑 这里时机点指的是方法调用前后属性读取前后构造方法调用前后Spring中只支持方法调研前后织入拦截逻辑织入拦截逻辑 : 可以是通过更改字节码的方式或者是像Spring这样为每个Bean创建一个代理对象代理对象会拦截目标对象所有方法调用并在每个方法调用前搜集到所有能应用在当前方法上的拦截器然后将所有拦截器组成一个调用链等到前置拦截调用链执行完毕后再执行目标方法 AspectJ在Spring中对应AdvisorPointCut还是PointCutJoinPoint没有单独定义一个类因为Spring只支持方法调用前后拦截Advice还是Advice但是最后会通过适配器统一转化MethodInterceptor。 在Seata的项目工程中提供了一个seata-spring-boot-starter自动配置包该自动配置包负责向Spring IOC中注入一些核心的Bean完成上述流程 : 此处我们先重点关注SeataAutoConfiguration类该自动配置类往容器中注入了一个十分重要的Bean: GlobalTransactionScanner见名知意该类负责完成GlobalTransactional等注解的扫描和Bean代理等工作 : BeanDependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})ConditionalOnMissingBean(GlobalTransactionScanner.class)public static GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,ConfigurableListableBeanFactory beanFactory,Autowired(required false) ListScannerChecker scannerCheckers) {... // 1.获取IOC引用 GlobalTransactionScanner.setBeanFactory(beanFactory);// 2.从各个来源添加bean过滤器: SPI,IOC,属性配置 GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));GlobalTransactionScanner.addScannerCheckers(scannerCheckers);GlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());GlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());...// 3.完成Seata目标Bean扫描器的创建return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);}下面我们会看到GlobalTransactionScanner其实就是Seata在Spring已有基础设施上实现的一个自动代理创建器罢了主要是重写是否对Bean进行代理的判断逻辑只对Seata感兴趣的目标Bean进行代理。
下面我们先来看一下GlobalTransactionScanner的继承关系: GlobalTransactionScanner在Spring已有的AbstractAutoProxyCreator抽象自动代理创建器基础上重写了其判断Bean是否代理的核心方法: Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// 1. 过滤掉Seata不感兴趣的Beanif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {// 2. bean如果已经代理过了,则跳过代理if (PROXYED_SET.contains(beanName)) {return bean;}interceptor null;// 3. Seata提供的Advice,通过Bean所开启的模式不同,会返回AT模式对应的增强器或者TCC模式对应的增强器ProxyInvocationHandler proxyInvocationHandler DefaultInterfaceParser.get().parserInterfaceToProxy(bean);// 4. 返回空,说明当前bean无需代理if (proxyInvocationHandler null) {return bean;}// 5. Advice最终都需要被适配为MethodInterceptor,要么第三方提供的就是MethodInterceptor类型的拦截器,否则需要额外向Spring中注册一个适配器,用于将自定义的Advice转化为MethodInterceptorinterceptor new AdapterSpringSeataInterceptor(proxyInvocationHandler);LOGGER.info(Bean [{}] with name [{}] would use interceptor [{}], bean.getClass().getName(), beanName, interceptor.toString());// 6. 如果当前Bean还没有被代理过,那么直接调用父类现成逻辑进行代理if (!AopUtils.isAopProxy(bean)) {bean super.wrapIfNecessary(bean, beanName, cacheKey);} else {// 7. 如果当前Bean已经被代理过了,那就只需要在当前代理对象已有的拦截器链中再加入Seata提供的interceptor即可// 7.1 AdvisedSupport是Spring中所有代理对象都会继承的接口,同时代理对象内部持有一个AdvisedSupport对象实例,该对象持有当前被代理对象的所有上下文信息AdvisedSupport advised SpringProxyUtils.getAdvisedSupport(bean);// 7.2 构建Seata需要额外应用到当前代理对象上的增强器链,Seata重写了getAdvicesAndAdvisorsForBean方法,该方法重写后只会返回Seata提供的interceptorAdvisor[] advisor buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;// 7.3 将Seata提供的增强器插入到当前Bean已有增强器链指定位置for (Advisor avr : advisor) {pos findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}Spring AOP模块比较好理解如果有对这块不太清楚的小伙伴可以参考我之前写的AOP源码剖析系列文章: Spring源码剖析专栏 专栏前半部分系列文章是笔者探索Spring源码初期所写所以可能存在部分理解错误大家可以理性看待后期有空可能会对该专栏前半部门文章进行重写。 两个关键点
Seata需要对哪些Bean进行代理这个过滤由doCheckers和DefaultInterfaceParser共同负责完成 private boolean doCheckers(Object bean, String beanName) {// 1. 已经代理过的Bean跳过代理 / 排除集合中包含的bean不进行代理 / 工厂Bean不进行代理if (PROXYED_SET.contains(beanName) || EXCLUDE_BEAN_NAME_SET.contains(beanName)|| FactoryBean.class.isAssignableFrom(bean.getClass())) {return false;}// 2. 借助Bean扫描器过滤掉那些Seata不感兴趣的Beanif (!SCANNER_CHECKER_SET.isEmpty()) {for (ScannerChecker checker : SCANNER_CHECKER_SET) {try {if (!checker.check(bean, beanName, beanFactory)) {// failed check, do not scan this beanreturn false;}} catch (Exception e) {LOGGER.error(Do check failed: beanName{}, checker{},beanName, checker.getClass().getSimpleName(), e);}}}return true;}doCheckers属于Seata的前置过滤负责过滤掉一些内部BeanDefaultInterfaceParser作为后置过滤 负责过滤掉那些没有标注GlobalTransactional或者TwoPhaseBusinessAction注解的Bean。 关于ScannerChecker的实现大家感兴趣可以自己看看它有哪些实现反正代码都很短也很容易理解。 为什么Seata需要区别对待已经代理过的Bean和没有被代理过的Bean呢
这里可以简单看看父类中的wrapIfNecessary方法实现: protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// 一些Spring内部Bean无需代理 / 当前代理创建器会缓存其已经代理过的对象,避免重复代理. . .// 由子类实现,返回能应用到当前Bean上所有的增强器// 常用实现: 从IOC得到所有类型为Advisor的Bean,依次判断是否能应用到当前Bean上Object[] specificInterceptors getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);// 如果返回空,说明当前Bean无需代理,因为没有Advisor能应用到当前Bean上// 如果不为空,则对当前Bean进行代理if (specificInterceptors ! DO_NOT_PROXY) {...// 为当前Bean创建代理对象Object proxy createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));...// 返回代理对象return proxy;}...// 无需代理,返回原样Beanreturn bean;}AbstractAutoProxyCreator类的wrapIfNecessary方法问题在于会产生重复代理问题 Seata通过对已经代理过的bean单独处理避免了重复代理问题的产生。同时Seata实现了getAdvicesAndAdvisorsForBean方法目前只返回AdapterSpringSeataInterceptor。 Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)throws BeansException {return new Object[]{interceptor};}buildAdvisors方法中会为当前AdapterSpringSeataInterceptor构建一个默认的DefaultPointcutAdvisor其持有的PointCut在类过滤器和方法过滤器中都返回True。 初始化
GlobalTransactionScanner 类还继承了 InitializingBean 接口实现了该接口的Bean会在Bean初始化阶段回调其实现的afterPropertiesSet方法。GlobalTransactionScanner 在该方法中完成相关组件的初始化工作 : Overridepublic void afterPropertiesSet() {...// 每个Client都需要去初始化 RM 和 TMif (initialized.compareAndSet(false, true)) {initClient();}}private void initClient() {...//init TMTMClient.init(applicationId, txServiceGroup, accessKey, secretKey);...//init RMRMClient.init(applicationId, txServiceGroup);...registerSpringShutdownHook();}初始化TM
TM - 事务管理器定义全局事务的范围开始全局事务、提交或回滚全局事务TM 的初始化方法被包含在 seata-tm 和 seata-core 包中初始化逻辑主要是构建由Netty实现的RPC客户端 并调用其init方法 。
下面来看一下TMClient的init方法: public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {// 1. 创建TM对应的Netty客户端TmNettyRemotingClient tmNettyRemotingClient TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);// 2. 初始化Netty客户端相关资源tmNettyRemotingClient.init();}继续往下追踪AbstractNettyRemotingClient的init方法看看初始化哪些Netty客户端资源: Overridepublic void init() {// 注册消息类型处理器,当从Netty接收到不同类型msg后,会分发给不同的消息处理器处理registerProcessor();if (initialized.compareAndSet(false, true)) {// 负责启动Netty客户端和相关线程池,用于定时发送心跳消息和处理消息super.init();// 尝试与TC建立连接if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}RPC模块属于Seata设计的比较优雅的一个模块此处细节暂时不用深究后续会专门出一篇文章好好唠唠其设计哲学。 初始化RM
RM - 资源管理器管理分支事务处理的资源与TC通信以注册分支事务和报告分支事务的状态并驱动分支事务提交或回滚RM 的初始化方法同样在 seata-rm 和 seata-core 包中其核心初始化方式与TM是共用的都是通过AbstractNettyRemotingClient的init方法。
下面来看看RMClient的init方法 public static void init(String applicationId, String transactionServiceGroup) {// 1. 创建RM对应的Netty客户端RmNettyRemotingClient rmNettyRemotingClient RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());// 2. 初始化Netty客户端相关资源rmNettyRemotingClient.init();}RMClient初始化方法和TM初始化方法都是调用父类AbstractNettyRemotingClient的init方法所以此处就不再过多赘述。 初始化TC
TC - 事务协调者维护全局和分支事务的状态驱动全局事务提交或回滚TC是以中间件的形式存在的它不同于TM和RM是以SDK的形式寄生在业务服务中的它是一个独立的Java服务在seata-server包下。
TC 是一个SpringBoot项目由SpringBootApplication标记的main方法启动
SpringBootApplication(scanBasePackages {io.seata})
public class ServerApplication {public static void main(String[] args) throws IOException {// run the spring-boot applicationSpringApplication.run(ServerApplication.class, args);}
}TC服务端对外会提供一个Web界面用于查询运行情况所以需要启动一个SpringBoot Web项目工程而启动Netty则是用于与TM和RM进行通信的。 TC 启动基于Netty的RPC服务端它是通过实现CommandLineRunner的run方法实现初始化 : Overridepublic void run(String... args) {...Server.start(args);... }Server的start方法负责完成TC核心的Netty客户端启动流程 : public static void start(String[] args) {...// 1. 创建Netty服务端NettyRemotingServer nettyRemotingServer new NettyRemotingServer(workingThreads);XID.setPort(nettyRemotingServer.getListenPort());// 2. UUIDGenerator: 用于生成全局唯一的全局事务ID和分支事务ID,采用雪花算法实现 UUIDGenerator.init(parameterParser.getServerNode());// 3. SessionHodler负责事务日志状态的持久化存储SessionHolder.init();// 4. 锁管理器工厂初始化LockerManagerFactory.init();// 5. 事务协调器的核心,通过RpcServer与远程TM,RM通信来实现分支事务的提交,回滚等。DefaultCoordinator coordinator DefaultCoordinator.getInstance(nettyRemotingServer);coordinator.init();nettyRemotingServer.setHandler(coordinator);...// 6. 启动Netty服务端nettyRemotingServer.init();}Netty服务端的启动逻辑也是注册一系列消息处理器然后启动Netty Server这里就不过多赘述了。 全局事务执行流程
TM 发起全局事务
TM 发起全局事务的流程分为三个步骤 :
拦截标注了GlobalTransactional注解的Bean的方法执行TM 向 TC 发送全局事务开启申请然后获得 TC 返回的全局事务 XID全局事务开启绑定 XID
上面说过Seata采用AOP实现对Bean方法执行的拦截拦截逻辑体现在AdapterSpringSeataInterceptor的invoke方法中: Overridepublic Object invoke(Nonnull MethodInvocation invocation) throws Throwable {// 简单封装了一层,便于获取当前代理对象的某些信息AdapterInvocationWrapper adapterInvocationWrapper new AdapterInvocationWrapper(invocation);// 核心拦截逻辑还是汇聚在proxyInvocationHandler中Object result proxyInvocationHandler.invoke(adapterInvocationWrapper);return result;}GlobalTransactionScanner的wrapIfNecessary方法中会通过DefaultInterfaceParser方法解析当前Bean判断其开启了 TCC 还是 AT ( 或XA ) 模式 然后返回对应不同的proxyInvocationHandler如下图所示: 本文我们重点关注AT模式因此我们来看看GlobalTransactionalInterceptorHandler类的invoke方法实现: Overrideprotected Object doInvoke(InvocationWrapper invocation) throws Throwable {// 1. 获取目标对象类型Class? targetClass invocation.getTarget().getClass();// 2. 获取当前拦截的目标方法调用Method specificMethod ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);if (specificMethod ! null !specificMethod.getDeclaringClass().equals(Object.class)) {// 3. 获取当前方法上标注的目标注解final GlobalTransactional globalTransactionalAnnotation getAnnotation(specificMethod, targetClass, GlobalTransactional.class);final GlobalLock globalLockAnnotation getAnnotation(specificMethod, targetClass, GlobalLock.class);boolean localDisable disable || (ATOMIC_DEGRADE_CHECK.get() degradeNum degradeCheckAllowTimes);if (!localDisable) {// 4. 优先处理GlobalTransactional注解情况if (globalTransactionalAnnotation ! null || this.aspectTransactional ! null) {AspectTransactional transactional;if (globalTransactionalAnnotation ! null) {// 4.1 填充全局事务上下文信息(超时时间,传播级别,重试次数,回滚异常)transactional new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),globalTransactionalAnnotation.rollbackForClassName(),globalTransactionalAnnotation.noRollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.propagation(),globalTransactionalAnnotation.lockRetryInterval(),globalTransactionalAnnotation.lockRetryTimes(),globalTransactionalAnnotation.lockStrategyMode());} else {transactional this.aspectTransactional;}// 4.2 处理标注了GlobalTransactional注解的全局事务return handleGlobalTransaction(invocation, transactional);} else if (globalLockAnnotation ! null) {// 5. 处理GlobalLock注解return handleGlobalLock(invocation, globalLockAnnotation);}}}// 6. 执行本地事务return invocation.proceed();}GlobalTransactional 注解处理
如果当前拦截的方法上标注了GlobalTransactional注解则会由handleGlobalTransaction方法进行处理: Object handleGlobalTransaction(final InvocationWrapper methodInvocation,final AspectTransactional aspectTransactional) throws Throwable {boolean succeed true;try {// transactionalTemplate 使用了典型的模版方法加回调的设计模式// TransactionalExecutor 负责提供当前事务相关处理接口return transactionalTemplate.execute(new TransactionalExecutor() {// 执行当前分支事务的本地事务Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}...// 获取当前事务的上下文信息 -- 这里事务上下文信息由aspectTransactional 转换而来Overridepublic TransactionInfo getTransactionInfo() {// reset the value of timeoutint timeout aspectTransactional.getTimeoutMills();if (timeout 0 || timeout DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout defaultGlobalTransactionTimeout;}TransactionInfo transactionInfo new TransactionInfo();transactionInfo.setTimeOut(timeout);transactionInfo.setName(name());transactionInfo.setPropagation(aspectTransactional.getPropagation());transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());transactionInfo.setLockStrategyMode(aspectTransactional.getLockStrategyMode());SetRollbackRule rollbackRules new LinkedHashSet();for (Class? rbRule : aspectTransactional.getRollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : aspectTransactional.getRollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class? rbRule : aspectTransactional.getNoRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {....}...}Seata这里使用了典型的模版方法加回调接口的设计模式包括TransactionInfo以及TransactionalTemplate的命名方式其实都是借鉴了Spring事务模块的实现。
下面来看看TransactionalTemplate所设定的模版流程具体实现: public Object execute(TransactionalExecutor business) throws Throwable {// 1. 获取当前全局事务的上下文信息TransactionInfo txInfo business.getTransactionInfo();...// 1.1 判断当前是否已经存在一个全局事务GlobalTransaction tx GlobalTransactionContext.getCurrent();// 1.2 根据不同的全局事务传播行为进行处理Propagation propagation txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder null;try {switch (propagation) {// 如果当前存在全局事务,则将当前全局事务挂起,然后直接无事务执行,执行完毕后,再恢复先前挂起的全局事务case NOT_SUPPORTED:if (existingTransaction(tx)) {suspendedResourcesHolder tx.suspend(false);}return business.execute();// 如果当前存在全局事务,则将当前全局事务挂起,然后自己新创建一个全局事务执行,执行完毕后,再恢复先前挂起的全局事务case REQUIRES_NEW:if (existingTransaction(tx)) {suspendedResourcesHolder tx.suspend(false);}tx GlobalTransactionContext.createNew();break;// 如果当前存在全局事务,则加入当前全局事务,否则无事务执行case SUPPORTS:if (notExistingTransaction(tx)) {return business.execute();}break;// 如果当前存在全局事务,则加入当前全局事务,否则新创建一个全局事务case REQUIRED:tx GlobalTransactionContext.getCurrentOrCreate();break;// 如果当前存在全局事务,则抛出异常,否则无事务执行case NEVER:if (existingTransaction(tx)) {throw new TransactionException(String.format(Existing transaction found for transaction marked with propagation never, xid %s, tx.getXid()));} else {return business.execute();}// 如果当前不存在全局事务,则抛出异常,否则加入当前全局事务case MANDATORY:if (notExistingTransaction(tx)) {throw new TransactionException(No existing transaction found for transaction marked with propagation mandatory);}break;default:throw new TransactionException(Not Supported Propagation: propagation);}// 1.3 将当前全局锁配置设置到本地线程缓存中,然后返回先前的配置GlobalLockConfig previousConfig replaceGlobalLockConfig(txInfo);if (tx.getGlobalTransactionRole() GlobalTransactionRole.Participant) {LOGGER.info(join into a existing global transaction,xid{}, tx.getXid());}try {// 2. 如果当前线程是全局事务的发起者,即TM,则给TC发送一个开启全局事务的请求,否则只是简单回调相关钩子方法beginTransaction(txInfo, tx);Object rs;try {// 3. 执行当前分支事务对应的本地事务rs business.execute();} catch (Throwable ex) {// 4. 分支事务执行发生异常,判断对应异常是否需要回滚,如果需要则回滚当前全局事务completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 5. 当前分支事务执行正常,发送提交分支事务的请求commitTransaction(tx, txInfo);return rs;} finally {// 6. 资源清理和恢复,同时触发钩子回调resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// 7. 如果存在被挂起的全局事务,则进行恢复if (suspendedResourcesHolder ! null) {tx.resume(suspendedResourcesHolder);}}}Seata将整个全局事务的执行流程划分为了两个阶段:
根据传播行为决定是新建加入还是挂起当前全局事务如果当前分支事务是全局事务首位发起者那么当前分支事务作为TM其他后面加入的分支事务作为RM然后执行自己的本地事务最后根据执行结果上报状态。 全局事务的开启
下面我们先来看看全局事务是如何开启的 : private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {// 前后钩子方法调用triggerBeforeBegin();// 向TC发送开启全局事务的请求tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();...}DefaultGlobalTransaction 负责完成全局事务的开启: Overridepublic void begin(int timeout, String name) throws TransactionException {this.createTime System.currentTimeMillis();// 如果当前分支事务不是首个发起全局事务的,即是RM,则啥也不干,直接返回if (role ! GlobalTransactionRole.Launcher) {...return;}// 如果当前分支事务是首个发起全局事务的,即是TMassertXIDNull();String currentXid RootContext.getXID();if (currentXid ! null) {throw new IllegalStateException(Global transaction already exists, cant begin a new global transaction, currentXid currentXid);}// 向 TC 发送全局事务开启请求xid transactionManager.begin(null, null, name, timeout);status GlobalStatus.Begin;// 绑定XID到本地线程空间RootContext.bind(xid);...}DefaultTransactionManager 负责帮助TM开启本次全局事务 Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {// 向 TC 发起全局事务开启请求GlobalBeginRequest request new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 同步等待TC返回结果GlobalBeginResponse response (GlobalBeginResponse) syncCall(request);if (response.getResultCode() ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}// 从结果中取出本次全局事务IDreturn response.getXid();}private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);...}DefaultTransactionManager 最终是通过调用TmNettyRemotingClient的sendSyncRequest方法向TC发出全局事务开启请求的然后TC做出响应返回一个新的全局唯一的全局事务ID。 TM 和 RM 执行分支事务
分支事务执行流程总共可以分为以下九步:
数据源代理对象注入 ( Seata 自动配置类中完成 )解析SQL得到SQL的类型 ( UPDTAE )表名条件 ( where name “大忽悠” ) 等相关的信息根据解析得到的条件信息生成查询语句定位数据获得 before-image执行对应的业务代码中的SQL根据before-image的结果通过主键定位数据获取after-image把前后镜像数据以及业务SQL相关的信息组成一条回滚日志记录插入到UNDO_LOG表中提交前向TC注册当前分支事务业务数据相关的SQL更新和生成UNDO LOG的SQL更新语句作为一个本地事务原子提交将本地事务提交的结果上报给TC
分支事务执行的拦截逻辑就不是体现在 AdapterSpringSeataInterceptor 中了该拦截器模版方法中只是简单调用业务方法我们这里需要重新回过头来看一下数据源代理的注入过程。 数据源代理注入的过程逻辑存在于SeataDataSourceAutoConfiguration类中我们一起来看一下: Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)public static SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());}Seata这里还是老套路向IOC中注入一个数据源自动代理创建器只针对类型为DataSource的Bean进行代理但是不同的一点在于SeataAutoDataSourceProxyCreator提供的切面类型为DefaultIntroductionAdvisor下面我们先来简单了解一下IntroductionAdvisor的前世今生。 Introduction
Spring中的增强器体系分为两类:
一类以 Advice 为首的 per-class 类型该类型Advice的实例可以在目标对象类的所有实例间共享该类型拦截器通常只提供方法拦截功能不会为目标对象类保存任何状态或者添加新的特性。另一类以 Introduction 为首的 per-instance 类型该类型Advice不会在目标类所有对象实例之间共享而是会为不同的实例对象保存它们各自的状态以及相关逻辑。
Introduction 类型的 Adivce 在 AspectJ 中被称为 Inter - Type Declaration 在 JBoss AOP 中称 Mix-in ; Introduction 不是根据横切逻辑在Joinpoint处的执行时机来区分的而是根据它可以完成的功能而区别于其他Advice类型。
Introduction 不是根据横切逻辑在Joinpoint处的执行时机来区分的而是根据它可以完成的功能而区别于其他Advice类型。Introduction 可以为原有的对象添加新的特性或者行为 , 这一点在Spring中的含义就是为代理对象额外添加需要实现的接口这样我们就可以在获得代理对象后将代理对象转化为其他顶层接口类型进行调用了。 IntroductionInterceptor继承了MethodInterceptor以及DynamicIntroductionAdvice。通过DynamicIntroductionAdvice我们可以界定当前的IntroductionInterceptor为哪些接口类提供相应的拦截功能:
public interface DynamicIntroductionAdvice extends Advice {// 当代理对象方法调用被拦截时,通过调用增强器该方法判断方法所属接口是否是增强器提供的// 如果是,则将方法调用请求转发给增强器内部持有的实例对象boolean implementsInterface(Class? intf);
}通过MethodInterceptorIntroductionInterceptor就可以处理新添加的接口上的方法调用了。毕竟原来的目标对象不会处理自己认为没有的东西啊。另外通常情况下对于IntroductionInterceptor来说如果是新增加的接口上的方法调用不必去调用MethodInterceptor的proceed方法。毕竟当前位置已经是“航程”的终点了当前被拦截的方法实际上就是整个调用链中要最终执行的唯一方法。
因为Introduction较之于其他Advice有些特殊所以我们有必要从总体上看一下Spring中对Introduction的支持结构 : Introduction型的Advice有两条分支:
以DynamicIntroductionAdvice为 首的动态分支以IntroductionInfo为首的静态可配置分支
从上面DynamicIntroductionAdvice 的定义中可以看出使用DynamicIntroductionAdvice我们可以到运行时再去判定当前Introduction 可应用到的目标接口类型而不用预先就设定。而IntroductionInfo类型则完全相反其定义如下
public interface IntroductionInfo {Class?[] getInterfaces();
}IntroductionInfo 接口的实现类必须返回预定的目标接口类型这样在对IntroductionInfo型的Introduction进行织入的时候实际上就不需要指定目标接口类型了因为它自身就带有这些必要的信息。
要对目标对象进行拦截并添加Introduction逻辑我们可以直接扩展IntroductionInterceptor然后在子类的invoke方法中实现所有的拦截逻辑。不过除非特殊状况下需要去直接扩展IntroductionInterceptor大多数时候直接使用Spring提供的两个现成的实现类就可以了。 DelegatingIntroductionInterceptor
从名字也可以看的出来DelegatingIntroductionInterceptor不会自己实现将要添加到目标对象上的新的逻辑行为而是委派delegate给其他实现类。
下面可以简单看看拦截器核心的invoke方法实现: Overridepublic Object invoke(MethodInvocation mi) throws Throwable {// 1. 当前被调用的目标方法是否是Introduction增强器提供接口中包含的if (isMethodOnIntroducedInterface(mi)) {// 1.1 调用接口实现对象的指定方法,获取返回值Object retVal AopUtils.invokeJoinpointUsingReflection(this.delegate, mi.getMethod(), mi.getArguments());// 1.2 感觉类似一个trick,如果实现返回的是delegate对象,那么这里会替换为返回目标对象if (retVal this.delegate mi instanceof ProxyMethodInvocation) {Object proxy ((ProxyMethodInvocation) mi).getProxy();// 1.3 再确保当前方法返回值兼容目标对象类型时,该trick才会生效if (mi.getMethod().getReturnType().isInstance(proxy)) {retVal proxy;}}return retVal;}// 2. 当前被调用的方法不是Introduction增强器提供接口中包含的,走正常流程return doProceed(mi);}懂了原理后下面给出一个具体使用案例说明:
public class TestMain {public static void main(String[] args) {// 1. 准备被代理的目标对象People peo new People() {Overridepublic void eat() {System.out.println(吃饭);}Overridepublic void drink() {System.out.println(喝水);}};// 2. 准备代理工厂ProxyFactory pf new ProxyFactory();// 3. 准备introduction增强器,增强器持有需要额外添加的接口Developer和Developer接口的实现类DelegatingIntroductionInterceptor diinew DelegatingIntroductionInterceptor((Developer) () - System.out.println(编码));// 4. 添加增强器和代理对象需要继承的接口pf.addAdvice(dii);pf.addInterface(People.class);// 5. 设置被代理对象pf.setTarget(peo);// 6. 代理对象继承了People接口,Developer接口和Spring AOP内部接口peo (People)pf.getProxy();peo.drink();peo.eat();// 7. 强制转换为Developer接口,实际方法调用会被introduction增强器拦截,调用请求转发给了增强器内部持有的Developer接口实现类Developer developer(Developer) peo;developer.code();}public interface People {void eat();void drink();}public interface Developer{void code();}
}这里需要注意一点:
spring 是否采用jdk代理大部分时候都取决于我们要代理的目标对象是否存在实现的接口如果不存在则走cglib代理否则走jdk动态代理但是这里introduction增强器会将其额外添加的接口设置到interfaces集合中所以这里即使目标对象没有实现接口还是会走jdk动态代理这就导致了我们得到的代理对象无法转换为目标对象类型 感觉Spring AOP模块这里的判断逻辑应该存在漏洞算是一个小bug。 虽然DelegatingIntroductionInterceptor是Introduction型Advice的一个实现但你可能料想不到的是它 其实是个“伪军”因为它的实现根本就没有兑现Introduction作为per-instance型Advice的承诺。
实际上 DelegatingIntroductionInterceptor会使用它所持有的同一个“delegate”接口实例供同一目标类的所有实例共享使用。你想啊就持有一个接口实现类的实例对象它往哪里去放对应各个目标对象实例的状态啊所以如果要真的想严格达到Introduction型Advice所宣称的那样的效果我们不能使用DelegatingIntroductionInterceptor而是要使用它的兄弟DelegatePerTargetObjectIntroductionInterceptor。 DelegatePerTargetObjectIntroductionInterceptor
与DelegatingIntroductionInterceptor不同DelegatePerTargetObjectIntroductionInterceptor会在内部持有一个目标对象与相应Introduction逻辑实现类之间的映射关系。当每个目标对象上的新定义的接口方法被调用的时候DelegatePerTargetObjectIntroductionInterceptor会拦截这些调用然后以目标对象实例作为键到它持有的那个映射关系中取得对应当前目标对象实例的Introduction实现类实例。剩下的当然就是让当前目标对象实例吃自己家锅里的饭了 Overridepublic Object invoke(MethodInvocation mi) throws Throwable {// 1. 当前被调用的目标方法是否是Introduction增强器提供接口中包含的if (isMethodOnIntroducedInterface(mi)) {// 1.1 获取增强器内部持有的接口实现对象 Object delegate getIntroductionDelegateFor(mi.getThis());// 1.2 调用实现对象对应的方法,并获取其返回值Object retVal AopUtils.invokeJoinpointUsingReflection(delegate, mi.getMethod(), mi.getArguments());// 1.3 感觉类似一个trick,如果实现返回的是delegate对象,那么这里会替换为返回目标对象if (retVal delegate mi instanceof ProxyMethodInvocation) {retVal ((ProxyMethodInvocation) mi).getProxy();}return retVal;}// 2. 当前被调用的方法不是Introduction增强器提供接口中包含的,走正常流程return doProceed(mi);}如果根据当前目标对象实例没有找到对应的Introduction实现类实例DelegatePerTargetobjectIntroductionInterceptor将会为其创建一个新的然后添加到映射关系中 private Object getIntroductionDelegateFor(Nullable Object targetObject) {synchronized (this.delegateMap) {if (this.delegateMap.containsKey(targetObject)) {return this.delegateMap.get(targetObject);}else {Object delegate createNewDelegate();this.delegateMap.put(targetObject, delegate);return delegate;}}}使用DelegatePerTargetObjectIntroductionInterceptor与使用DelegatingIntroductionInterceptor没有太大的差别唯一的区别可能就在于构造方式上。现在我们不是自己构造delegate接口实例而只需要告知DelegatePerTargetObjectIntroductionInterceptor相应的delegate接 口类型和对应实现类的类型。剩下的工作留给DelegatePerTargetObjectIntroductionInterceptor就可以了如以下代码所示
public class TestMain {public static void main(String[] args) {// 1. 准备被代理的目标对象People peo new People() {Overridepublic void eat() {System.out.println(吃饭);}Overridepublic void drink() {System.out.println(喝水);}};// 2. 准备代理工厂ProxyFactory pf new ProxyFactory();// 3. 准备introduction增强器DelegatePerTargetObjectIntroductionInterceptor diinew DelegatePerTargetObjectIntroductionInterceptor(DeveloperImpl.class,Developer.class);// 4. 添加增强器和代理对象需要继承的接口pf.addAdvice(dii);pf.addInterface(People.class);// 5. 设置被代理对象pf.setTarget(peo);// 6. 代理对象继承了People接口,Developer接口和Spring AOP内部接口peo (People)pf.getProxy();peo.drink();peo.eat();// 7. 强制转换为Developer接口,实际方法调用会被introduction增强器拦截,调用请求转发给了增强器内部持有的Developer接口实现类Developer developer(Developer) peo;developer.code();}public interface People {void eat();void drink();}public interface Developer{void code();}public static class DeveloperImpl implements Developer{Overridepublic void code() {System.out.println(编码);}}
}注意点和上面说的一样 Introduction 切面
在Spring中AOP规范中提到的Aspect切面对应这里的Advisor切面通常由PointCut和Advice组成PointCut负责完成目标Bean的过滤Advice负责承载拦截逻辑的实现Spring中的Advisor也分为如下两个流派: IntroductionAdvisor与Pointcutadvisor最本质上的区别就是IntroductionAdvisor只能应 用于类级别的拦截只能使用Introduction型的Advice而不能像PointcutAdvisor那样可以使用任 何类型的Pointcut以及差不多任何类型的Advice。也就是说IntroductionAdvisor纯粹就是为 Introduction而生的。
IntroductionAdvisor的类层次比较简单只有一个默认实现DefaultIntroductionAdvisorSeata 中使用到的IntroductionAdvisor就是该类型。 数据源自动代理创建器
在了解完DefaultIntroductionAdvisor后再来看SeataAutoDataSourceProxyCreator的实现就比较容易了SeataAutoDataSourceProxyCreator首先重写了getAdvicesAndAdvisorsForBean返回类型为DefaultIntroductionAdvisor的增强器:
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {... private final Object[] advisors;public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {...this.advisors buildAdvisors(dataSourceProxyMode);}private Object[] buildAdvisors(String dataSourceProxyMode) {Advice advice new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode);return new Object[]{new DefaultIntroductionAdvisor(advice)};}Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class? beanClass, String beanName, TargetSource customTargetSource) {return advisors;}...} DefaultIntroductionAdvisor只支持类级别过滤并且其ClassFilter默认返回true这意味着该切面会切中自动代理创建器过滤下来的所有Bean并将自身提供的advice放入所有被切中目标对象的代理对象的拦截器链中。 从上面源码可知数据源自动代理创建器会为其感兴趣的所有Bean进行代理并往代理对象的拦截器链中添加一个SeataAutoDataSourceProxyAdvice拦截器。
下面我们进入数据源自动代理创建器的WrapIfNecessary方法看看判断是否代理的逻辑具体是如何实现的: Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// 1. 只针对类型为DataSource的Bean进行代理if (!(bean instanceof DataSource)) {return bean;}// 2. 不对类型为SeataDataSourceProxy的数据源再次进行代理if (!(bean instanceof SeataDataSourceProxy)) {// 3. 调用抽象自动代理创建器的方法对当前数据源进行代理Object enhancer super.wrapIfNecessary(bean, beanName, cacheKey);// 4. 如果当前Bean无需代理,直接返回原本的Beanif (bean enhancer) {return bean;}// 5. 当前数据源对应的代理对象已经添加了SeataAutoDataSourceProxyAdvice// 但是真正负责对数据源进行代理的是SeataDataSourceProxy,这一点在SeataAutoDataSourceProxyAdvice的invoke方法中会体现出来DataSource origin (DataSource) bean;// 建立好原始数据源和其代理数据源的映射关系SeataDataSourceProxy proxy buildProxy(origin, dataSourceProxyMode);DataSourceProxyHolder.put(origin, proxy);return enhancer;}// 3. 如果手动向容器中注入了数据源代理,依旧取出其中原始数据源,进行动态代理,然后返回// 这里是为了将自动和手动设置代理数据源的逻辑统一起来,不然可能会产生重复代理的问题SeataDataSourceProxy proxy (SeataDataSourceProxy) bean;DataSource origin proxy.getTargetDataSource();Object originEnhancer super.wrapIfNecessary(origin, beanName, cacheKey);if (origin originEnhancer) {return origin;}DataSourceProxyHolder.put(origin, proxy);return bean}// 根据不同的代理模式创建不同的数据源代理对象SeataDataSourceProxy buildProxy(DataSource origin, String proxyMode) {if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) {return new DataSourceProxy(origin);}if (BranchType.XA.name().equalsIgnoreCase(proxyMode)) {return new DataSourceProxyXA(origin);}throw new IllegalArgumentException(Unknown dataSourceProxyMode: proxyMode);}当代理对象方法执行时SeataAutoDataSourceProxyAdvice会拦截目标方法执行并应用数据源代理: Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {// 1. 检查当前是否存在全局事务,或者是否需要获取全局锁if (!inExpectedContext()) {return invocation.proceed();}// 2. 获取当前调用的是数据源的哪个方法Method method invocation.getMethod();String name method.getName();Class?[] parameterTypes method.getParameterTypes();Method declared;try {declared DataSource.class.getDeclaredMethod(name, parameterTypes);} catch (NoSuchMethodException e) {return invocation.proceed();}// 3. 取出当前数据源对应的SeataDataSourceProxy,然后调用代理数据源对应的方法DataSource origin (DataSource) invocation.getThis();SeataDataSourceProxy proxy DataSourceProxyHolder.get(origin);Object[] args invocation.getArguments();return declared.invoke(proxy, args);}当我们的数据源被代理后代理数据源方法调用会走AOP拦截逻辑也就是被SeataAutoDataSourceProxyAdvice的invoke方法拦截拦截后会取出对应的SeataDataSourceProxy 因为SeataDataSourceProxy实现了DataSource接口所以这里调用目标对象什么方法就同样调用SeataDataSourceProxy对应的方法。
SeataDataSourceProxy内部持有目标DataSource其内部还会执行connectionstatement等对象的代理逻辑层层代理目标是拦截sql语句的执行。 小结
由于篇幅有限所以本文将分成上下两部分下部分主要针对数据源代理这块分支事务提交和回滚三部分进行讲述。
笔者实力有限对Seata目前了解也不是特别深入本文也是笔者初次研究Seata源码所记录的研究心得如果有大佬发现行文错误欢迎评论区指出。