乐清官方网站,网站设计基本步骤,google的网站优化工具,哪个网站是tv域名【Seata源码学习 】篇六 全局事务提交与回滚
全局事务提交
TM在RPC远程调用RM后,如果没有出现异常#xff0c;将向TC发送提交全局事务请求io.seata.tm.api.TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {// 1. …【Seata源码学习 】篇六 全局事务提交与回滚
全局事务提交
TM在RPC远程调用RM后,如果没有出现异常将向TC发送提交全局事务请求io.seata.tm.api.TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo//获取GlobalTransation注解的属性封装的TransactionInfoTransactionInfo txInfo business.getTransactionInfo();if (txInfo null) {throw new ShouldNeverHappenException(transactionInfo does not exist);}// 1.1 Get current transaction, if not null, the tx role is GlobalTransactionRole.Participant.// GlobalTransactionContext 全局事务上下文对象 用于创建一个新事务或者获取当前事务// GlobalTransactionContext.getCurrent - RootContext.getXID - ContextCore.get// ContextCore 是一个接口 seata有两个实现 FastThreadLocalContextCore ThreadLocalContextCore 都是基于ThreadLocal存储XIDGlobalTransaction tx GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 获取当前事务的传播行为Propagation propagation txInfo.getPropagation();// 存储被挂起的事务XIDSuspendedResourcesHolder suspendedResourcesHolder null;try {//处理事务的传播行为switch (propagation) {//如果当前事务的传播行为是 NOT_SUPPORTED 则以非事务的方式执行调用methodInvocation.proceed()// 如果当前拦截器不为拦截链的最后一个则将获取下一个拦截器执行invoke方法如果是最后一个则直接执行目标方法case NOT_SUPPORTED:// If transaction is existing, suspend it.//如果当前存在全局事务则挂起当前事务if (existingTransaction(tx)) {suspendedResourcesHolder tx.suspend();}// Execute without transaction and return.// 责任链模式 继续执行拦截器链return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.// 如果当前存在事务 则挂起当前事务 并创建一个新的事务if (existingTransaction(tx)) {suspendedResourcesHolder tx.suspend();tx GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.// 如果不存在事务 则跳过当前事务拦截器 执行拦截器链并返回if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.// 有事务抛出异常if (existingTransaction(tx)) {throw new TransactionException(String.format(Existing transaction found for transaction marked with propagation never, xid %s, tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.// 要求必须有事务没事务抛出异常if (notExistingTransaction(tx)) {throw new TransactionException(No existing transaction found for transaction marked with propagation mandatory);}// Continue and execute with current transaction.break;default:throw new TransactionException(Not Supported Propagation: propagation);}// 1.3 If null, create new transaction with role GlobalTransactionRole.Launcher.// 如果当前的事务上下文中不存在事务 那么此次事务发起为 TM 角色为 Launcherif (tx null) {tx GlobalTransactionContext.createNew();}// set current tx config to holder// 记录当前的全局锁配置存放到 ThreadLocalGlobalLockConfig previousConfig replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is GlobalTransactionRole.Launcher, send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.// 执行全局事务开启的前后置钩子方法// 如果当前事务的角色是 Participant 也就是 RM 判断当前事务上下文RootContext是否存在XID如果不存在抛出异常// 如果当前事务的角色是 launcher 也就是 TM 判断当前事务上下文RootContext是否存在XID如果存在抛出异常// 如果不存在则通过TmNettyRemotingClient 向TC发送一个 GlobalReportRequest 同步消息并获取TC返回的XID绑定到RootContextbeginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行执行拦截器链路rs business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 如果抛出异常判断异常是否在指定的范围中默认为Throwable类及其子类// 执行异常回滚的前后钩子方法// 如果当前事务的角色是 launcher 也就是 TM 通过TmNettyRemotingClient 向TC发送一个 GlobalRollbackRequest 同步消息// 并记录TC返回的当前事务状态StatuscompleteTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 如果方法执行过程中没有出现异常// 执行事务提交的前后置方法// 如果当前事务的角色是 launcher 也就是 TM 通过TmNettyRemotingClient 向TC发送一个 GlobalCommitRequest 同步消息 提交全局事务// 并记录TC返回的当前事务状态StatuscommitTransaction(tx);return rs;} finally {//5. clear// 恢复以前的全局锁配置resumeGlobalLockConfig(previousConfig);// 执行整个事务完成的前后置方法triggerAfterCompletion();// 移除当前绑定的事务钩子对象cleanUp();}} finally {// If the transaction is suspended, resume it.// 当前事务执行完毕后恢复挂起的事务// 获取suspendedResourcesHolder关联的xid由RootContext重新绑定if (suspendedResourcesHolder ! null) {tx.resume(suspendedResourcesHolder);}}}事务提交前后钩子方法执行
在全局事务提交前后seata给我们预留了两个钩子方法可以根据实际生产中的业务需要进行扩展
io.seata.tm.api.TransactionalTemplate#commitTransaction private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//全局事务提交前钩子方法triggerBeforeCommit();//全局事务提交tx.commit();//全局事务提交后钩子方法triggerAfterCommit();} catch (TransactionException txe) {// 4.1 Failed to committhrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}}TM提交全局事务
io.seata.tm.api.DefaultGlobalTransaction#commit
public void commit() throws TransactionException {// 全局事务提交必须是 TM发起的if (role GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug(Ignore Commit(): just involved in global transaction [{}], xid);}return;}//XID不能为空assertXIDNotNull();// 全局事务提交失败默认重试5次int retry COMMIT_RETRY_COUNT 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry 0) {try {retry--;//由 TransactionManager 对事务进行提交操作 status transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error(Failed to report global commit [{}],Retry Countdown: {}, reason: {}, this.getXid(), retry, ex.getMessage());if (retry 0) {throw new TransactionException(Failed to report global commit, ex);}}}} finally {if (xid.equals(RootContext.getXID())) {//挂起事务 -》 将当前的XID解绑将XID封装到SuspendedResourcesHolder中suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info([{}] commit status: {}, xid, status);}}全局事务提交失败默认会重试5次seata对事务的操作都是交由TransactionManager接口处理TC、TM、RM分别有不同的实现类具体如下图所示 io.seata.tm.DefaultTransactionManager#commit public GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit new GlobalCommitRequest();//绑定全局事务XIDglobalCommit.setXid(xid);//向TC发送GlobalCommitRequest消息GlobalCommitResponse response (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();}TC处理全局事务提交
TC接收到消息后由ServerHandler调用processMessage处理将根据消息的类型从pair中匹配合适的RemotingProcessor处理器 GlobalCommitResponse 消息将匹配ServerOnRequestProcessor处理器ServerOnRequestProcessor进而又将消息由TransactionMessageHandler进行处理。
TransactionMessageHandler 也是一个接口TC将使用的是DefaultCoordinator实现类最终将消息向上转型为AbstractTransactionRequestToTC调用不同消息子类的handle进行处理。
io.seata.core.protocol.transaction.GlobalCommitRequest#handle public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalCommitRequest, io.seata.core.rpc.RpcContext)
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {GlobalCommitResponse response new GlobalCommitResponse();response.setGlobalStatus(GlobalStatus.Committing);exceptionHandleTemplate(new AbstractCallbackGlobalCommitRequest, GlobalCommitResponse() {Overridepublic void execute(GlobalCommitRequest request, GlobalCommitResponse response)throws TransactionException {try {//真正开始执行全局事务提交doGlobalCommit(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore,String.format(global commit request failed. xid%s, msg%s, request.getXid(), e.getMessage()),e);}}Overridepublic void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);checkTransactionStatus(request, response);}Overridepublic void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {super.onException(request, response, rex);checkTransactionStatus(request, response);}}, request, response);return response;}io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());response.setGlobalStatus(core.commit(request.getXid()));}io.seata.server.coordinator.DefaultCore#commit Overridepublic GlobalStatus commit(String xid) throws TransactionException {// 根据XID 从存储介质中找到对应的GlobalSessionGlobalSession globalSession SessionHolder.findGlobalSession(xid);// 如果没找到 则全局事务已结束if (globalSession null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit SessionHolder.lockAndExecute(globalSession, () - {if (globalSession.getStatus() GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.// AT模式释放全局锁globalSession.closeAndClean();// 如果是AT模式 可以异步提交if (globalSession.canBeCommittedAsync()) {// 将当前全局事务状态改为 异步提交中 并且将当前全局事务会话加入到 ASYNC_COMMITTING_SESSION_MANAGER sessionHolder中globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {globalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) {boolean success doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success globalSession.hasBranch() globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}DefaultCore 首先根据XID从存储介质中找到GlobalSessionAT模式下释放全局锁标记当前全局事务状态为GlobalStatus.AsyncCommitting
seata 服务端在启动时会初始化 DefaultCoordinatorDefaultCoordinator会启动周期线程每隔一秒钟执行handleAsyncCommitting处理异步全局事务提交 asyncCommitting.scheduleAtFixedRate(() - SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);protected void handleAsyncCommitting() {// 用于从存储介质中查找满足条件的sessionSessionCondition sessionCondition new SessionCondition(GlobalStatus.AsyncCommitting);// 查找所有状态为 GlobalStatus.AsyncCommitting 的全局事务CollectionGlobalSession asyncCommittingSessions SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession - {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 调用 DefaultCore.doGlobalCommit 处理core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error(Failed to async committing [{}] {} {}, asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);}});}io.seata.server.coordinator.DefaultCore#doGlobalCommit
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);//当前事务未AT模式if (globalSession.isSaga()) {success getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// 遍历所有的分支事务Boolean result SessionHelper.forEach(globalSession.getSortedBranches(), branchSession - {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus branchSession.getStatus();if (currentStatus BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {// 策略模式 不同的分支事务类型交由不同的 Core 处理BranchStatus branchStatus getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info(Commit branch XAER_NOTA retry timeout, xid {} branchId {}, globalSession.getXid(), branchSession.getBranchId());branchStatus BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {//分支事务提交成功 将当前分支事务从全局事务中移除case PhaseTwo_Committed:SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://not at branchSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error(Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed., globalSession.getXid(), branchSession.getBranchId());return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error(Committing branch transaction[{}], status:{} and will retry later,branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error(Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later., globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, Committing branch transaction exception: {},new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result ! null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() !globalSession.canBeCommittedAsync()) {LOGGER.info(Committing global transaction is NOT done, xid {}., globalSession.getXid());return false;}if (!retrying) {//contains not AT branchglobalSession.setStatus(GlobalStatus.Committed);}}// if it succeeds and there is no branch, retryingtrue is the asynchronous state when retrying. EndCommitted is// executed to improve concurrency performance, and the global transaction ends..if (success globalSession.getBranchSessions().isEmpty()) {SessionHelper.endCommitted(globalSession, retrying);LOGGER.info(Committing global transaction is successfully done, xid {}., globalSession.getXid());}return success;}全局事务在提交时会遍历所有与之关联的分支事务ID向RM发送BranchCommitRequest 消息,RM分支事务提交成功后将分支事务从全局事务中删除。当所有分支事务全部执行成功后将GlobalSession信息从数据库中删除 public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchCommitRequest request new BranchCommitRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchCommitSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchCommitRequest,String.format(Send branch commit failed, xid %s branchId %s, branchSession.getXid(),branchSession.getBranchId()), e);}}RM提交分支事务
TC发送BranchCommitRequest消息后RM接收到并将消息交由RmBranchCommitProcessor处理
io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor // 分支事务提交RmBranchCommitProcessor rmBranchCommitProcessor new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);io.seata.core.rpc.processor.client.RmBranchCommitProcessor#process public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC的地址String remoteAddress NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info(rm client handle branch commit process: msg);}//处理分支事务提交handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}
io.seata.core.rpc.processor.client.RmBranchCommitProcessor#handleBranchCommit private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {BranchCommitResponse resultMessage;resultMessage (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug(branch commit result: resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error(branch commit error: {}, throwable.getMessage(), throwable);}}io.seata.rm.AbstractRMHandler#onRequest public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToRM)) {throw new IllegalArgumentException();}AbstractTransactionRequestToRM transactionRequest (AbstractTransactionRequestToRM)request;transactionRequest.setRMInboundMessageHandler(this);return transactionRequest.handle(context);}io.seata.core.protocol.transaction.BranchCommitRequest#handle public AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this);}io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchCommitRequest) public BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response new BranchCommitResponse();//真正开始处理分支事务提交exceptionHandleTemplate(new AbstractCallbackBranchCommitRequest, BranchCommitResponse() {Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;}io.seata.rm.AbstractRMHandler#doBranchCommit
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {String xid request.getXid();long branchId request.getBranchId();String resourceId request.getResourceId();String applicationData request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info(Branch committing: xid branchId resourceId applicationData);}// AT模式下 将从存储介质中删除Undo.log日志BranchStatus status getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info(Branch commit result: status);}}全局事务提交时TC会释放全局锁如果为AT模式将标记全局事务状态为异步提交中然后遍历所有与当前全局事务绑定的分支事务向RM发送BranchCommitRequest消息RM随后删除undo.log日志并向TC返回状态。待所有的分支事务全部删除undo.log日志成功后TC将标记全局事务状态为Committed 并 从存储介质中删除全局事务信息
全局事务回滚
TM提交全局事务回滚请求
TM在RPC远程调用RM后,如果出现异常获取自身业务方法抛出异常将执行completeTransactionAfterThrowing方法
io.seata.tm.api.TransactionalTemplate#completeTransactionAfterThrowing private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {//roll backif (txInfo ! null txInfo.rollbackOn(originalException)) {try {//全局事务回滚rollbackTransaction(tx, originalException);} catch (TransactionException txe) {// Failed to rollbackthrow new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, originalException);}} else {// not roll back on this exception, so commitcommitTransaction(tx);}}接着向TC发送GlobalRollbackRequest消息
io.seata.tm.DefaultTransactionManager#rollback public GlobalStatus rollback(String xid) throws TransactionException {GlobalRollbackRequest globalRollback new GlobalRollbackRequest();globalRollback.setXid(xid);GlobalRollbackResponse response (GlobalRollbackResponse) syncCall(globalRollback);return response.getGlobalStatus();}TC处理全局事务回滚请求
io.seata.server.AbstractTCInboundHandler#handle(io.seata.core.protocol.transaction.GlobalRollbackRequest, io.seata.core.rpc.RpcContext) public GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {GlobalRollbackResponse response new GlobalRollbackResponse();response.setGlobalStatus(GlobalStatus.Rollbacking);exceptionHandleTemplate(new AbstractCallbackGlobalRollbackRequest, GlobalRollbackResponse() {Overridepublic void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)throws TransactionException {try {doGlobalRollback(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format(global rollback request failed. xid%s, msg%s, request.getXid(), e.getMessage()), e);}}Overridepublic void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}Overridepublic void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {super.onException(request, response, rex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}}, request, response);return response;}io.seata.server.coordinator.DefaultCore#rollback public GlobalStatus rollback(String xid) throws TransactionException {GlobalSession globalSession SessionHolder.findGlobalSession(xid);if (globalSession null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack SessionHolder.lockAndExecute(globalSession, () - {globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.if (globalSession.getStatus() GlobalStatus.Begin) {// 标记全局事务状态为 RollbackingglobalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}boolean rollbackSuccess doGlobalRollback(globalSession, false);return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();}io.seata.server.coordinator.DefaultCore#doGlobalRollback public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {//遍历分支事务Boolean result SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession - {BranchStatus currentBranchStatus branchSession.getStatus();if (currentBranchStatus BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//分支事务回滚操作BranchStatus branchStatus branchRollback(globalSession, branchSession);if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info(Rollback branch XAER_NOTA retry timeout, xid {} branchId {}, globalSession.getXid(), branchSession.getBranchId());branchStatus BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {//回滚成功则将分支事务从存储介质中删除case PhaseTwo_Rollbacked:SessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info(Rollback branch transaction successfully, xid {} branchId {}, globalSession.getXid(), branchSession.getBranchId());return CONTINUE;//回滚失败则标记全局锁回滚失败并释放全局锁case PhaseTwo_RollbackFailed_Unretryable:SessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.info(Rollback branch transaction fail and stop retry, xid {} branchId {}, globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info(Rollback branch transaction fail and will retry, xid {} branchId {}, globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,Rollback branch transaction exception, xid {} branchId {} exception {},new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result ! null) {return result;}}// In db mode, lock and branch data residual problems may occur.// Therefore, execution needs to be delayed here and cannot be executed synchronously.if (success) {// 全局事务回滚成功将全局锁释并标记全局锁状态为 Rollbacked 随后删除全局事务信息SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info(Rollback global transaction successfully, xid {}., globalSession.getXid());}return success;}io.seata.server.coordinator.AbstractCore#branchRollback public BranchStatus branchRollback(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchRollbackRequest request new BranchRollbackRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchRollbackSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchRollbackRequest,String.format(Send branch rollback failed, xid %s branchId %s,branchSession.getXid(), branchSession.getBranchId()), e);}}与全局事务提交类似全局事务回滚时也是遍历所有的分支事务随后进行分支事务回滚操作分支事务回滚成功后就将分支事务的信息从存储介质中删除待所有的分支事务全部回滚后就将全局锁释放并删除全局锁信息。
接下来我们看下分支事务回滚操作
RM处理分支事务回滚
TC向RM发送BranchRollbackRequest消息后RM将消息交由 RmBranchRollbackProcessor 进行处理 RmBranchRollbackProcessor rmBranchRollbackProcessor new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);io.seata.core.rpc.processor.client.RmBranchRollbackProcessor#handleBranchRollback private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {BranchRollbackResponse resultMessage;resultMessage (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug(branch rollback result: resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error(send response error: {}, throwable.getMessage(), throwable);}}io.seata.rm.AbstractRMHandler#handle(io.seata.core.protocol.transaction.BranchRollbackRequest) public BranchRollbackResponse handle(BranchRollbackRequest request) {BranchRollbackResponse response new BranchRollbackResponse();exceptionHandleTemplate(new AbstractCallbackBranchRollbackRequest, BranchRollbackResponse() {Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;}io.seata.rm.AbstractRMHandler#doBranchRollback protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {String xid request.getXid();long branchId request.getBranchId();String resourceId request.getResourceId();String applicationData request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info(Branch Rollbacking: xid branchId resourceId);}//AT模式 使用的是 DataSourceManagerBranchStatus status getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info(Branch Rollbacked result: status);}}io.seata.rm.datasource.DataSourceManager#branchRollback public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy get(resourceId);if (dataSourceProxy null) {throw new ShouldNeverHappenException(String.format(resource: %s not found,resourceId));}try {//通过undo.log日志进行回滚UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException te) {StackTraceLogger.info(LOGGER, te,branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}],new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}return BranchStatus.PhaseTwo_Rollbacked;}io.seata.rm.datasource.undo.AbstractUndoLogManager#undo
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {Connection conn null;ResultSet rs null;PreparedStatement selectPST null;boolean originalAutoCommit true;for (; ; ) {try {conn dataSourceProxy.getPlainConnection();// The entire undo process should run in a local transaction.if (originalAutoCommit conn.getAutoCommit()) {conn.setAutoCommit(false);}// Find UNDO LOG//根据全局事务xid 和分支事务id查找回滚日志selectPST conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);rs selectPST.executeQuery();//存在回滚日志boolean exists false;while (rs.next()) {exists true;// It is possible that the server repeatedly sends a rollback request to roll back// the same branch transaction to multiple processes,// ensuring that only the undo_log in the normal state is processed.int state rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info(xid {} branch {}, ignore {} undo_log, xid, branchId, state);}return;}String contextString rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);//解析回滚日志MapString, String context parseContext(contextString);byte[] rollbackInfo getRollbackInfo(rs);String serializer context null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);UndoLogParser parser serializer null ? UndoLogParserFactory.getInstance(): UndoLogParserFactory.getInstance(serializer);//反序列化成BranchUndoLog对象BranchUndoLog branchUndoLog parser.decode(rollbackInfo);try {// put serializer name to localsetCurrentSerializer(parser.getName());ListSQLUndoLog sqlUndoLogs branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() 1) {Collections.reverse(sqlUndoLogs);}for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {TableMeta tableMeta TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);//执行回滚日志undoExecutor.executeOn(conn);}} finally {// remove serializer nameremoveCurrentSerializer();}}// If undo_log exists, it means that the branch transaction has completed the first phase,// we can directly roll back and clean the undo_log// Otherwise, it indicates that there is an exception in the branch transaction,// causing undo_log not to be written to the database.// For example, the business processing timeout, the global transaction is the initiator rolls back.// To ensure data consistency, we can insert an undo_log with GlobalFinished state// to prevent the local transaction of the first phase of other programs from being correctly submitted.// See https://github.com/seata/seata/issues/489if (exists) {//回滚日志执行成功后删除deleteUndoLog(xid, branchId, conn);//提交本地事务conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info(xid {} branch {}, undo_log deleted with {}, xid, branchId,State.GlobalFinished.name());}} else {insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info(xid {} branch {}, undo_log added with {}, xid, branchId,State.GlobalFinished.name());}}return;} catch (SQLIntegrityConstraintViolationException e) {// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_logif (LOGGER.isInfoEnabled()) {LOGGER.info(xid {} branch {}, undo_log inserted, retry rollback, xid, branchId);}} catch (Throwable e) {if (conn ! null) {try {conn.rollback();} catch (SQLException rollbackEx) {LOGGER.warn(Failed to close JDBC resource while undo ... , rollbackEx);}}throw new BranchTransactionException(BranchRollbackFailed_Retriable, String.format(Branch session rollback failed and try again later xid %s branchId %s %s, xid,branchId, e.getMessage()), e);} finally {try {if (rs ! null) {rs.close();}if (selectPST ! null) {selectPST.close();}if (conn ! null) {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();}} catch (SQLException closeEx) {LOGGER.warn(Failed to close JDBC resource while undo ... , closeEx);}}}}通过xid和分支事务ID从业务库中的undo.log表中获取回滚日志经过解析和反序列化后执行
io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn
public void executeOn(Connection conn) throws SQLException {// 前后镜像数据 与当前数据三者比较// 如果前镜像数据 后镜像数据 无需回滚 数据没有变化// 如果后镜像数据 ! 当前数据 不能回滚因为产生了脏数据if (IS_UNDO_DATA_VALIDATION_ENABLE !dataValidationAndGoOn(conn)) {return;}PreparedStatement undoPST null;try {// 构建回滚SQL模版String undoSQL buildUndoSQL();undoPST conn.prepareStatement(undoSQL);//获取数据修改前的镜像数据TableRecords undoRows getUndoRows();//遍历所有修改前的行数据for (Row undoRow : undoRows.getRows()) {ArrayListField undoValues new ArrayList();ListField pkValueList getOrderedPkList(undoRows, undoRow, getDbType(conn));for (Field field : undoRow.getFields()) {if (field.getKeyType() ! KeyType.PRIMARY_KEY) {undoValues.add(field);}}//拼装成真正的回滚SQLundoPrepare(undoPST, undoValues, pkValueList);//执行回滚SQLundoPST.executeUpdate();}} catch (Exception ex) {if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}finally {//important for oracleIOUtil.close(undoPST);}}在执行回滚SQL前会将前后镜像数据与当前数据进行比较 // 如果前镜像数据 后镜像数据 无需回滚 数据没有变化// 如果后镜像数据 ! 当前数据 不能回滚因为产生了脏数据如果后镜像数据等于当前数据 且 不等于前镜像数据那么将创建回滚SQL并执行完成分支事务回滚操作
总结图