您的当前位置:首页>新品 > 正文

【时快讯】【Seata解析】TC处理全局事务和分支事务原理详解

来源:CSDN 时间:2023-04-07 11:14:59

本文接文章《Seata解析-TC处理全局事务和分支事务原理详解之全局事务开启和分支事务注册》继续介绍TC对请求的处理。本文将介绍分支状态报告请求和全局事务报告请求。

文章目录


(资料图)

一、分支状态报告请求二、全局事务报告请求1、提交全局事务2、回滚全局事务3、其他事务状态4、总结

一、分支状态报告请求

分支状态报告请求的消息类型是MessageType.TYPE_BRANCH_STATUS_REPORT,请求对象为BranchReportRequest,该请求由RM发起。该请求的作用是报告某个分支事务的状态,TC收到后更改对应BranchSession对象的状态。 处理该请求通过模板方法调用了DefaultCoordinator.doBranchReport:

protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext)        throws TransactionException {//调用DefaultCore的branchReport方法        core.branchReport(request.getBranchType(), request.getXid(), request.getBranchId(), request.getStatus(),            request.getApplicationData());    }

在doBranchReport方法中又调用了DefaultCore的branchReport方法,在branchReport方法中又调用了其他方法,最终调用到ATCore的branchReport方法:

public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,                             String applicationData) throws TransactionException {//根据XID得到全局事务对象GlobalSession        GlobalSession globalSession = assertGlobalSessionNotNull(xid, true);        //根据branchId得到分支事务对象branchSession        BranchSession branchSession = globalSession.getBranch(branchId);        if (branchSession == null) {throw new BranchTransactionException(BranchTransactionNotExist,                    String.format("Could not found branch session xid = %s branchId = %s", xid, branchId));        }        //添加监听器        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());        //更改分支事务状态,本请求的处理核心在下面这个方法中        globalSession.changeBranchStatus(branchSession, status);        if (LOGGER.isInfoEnabled()) {LOGGER.info("Report branch status successfully, xid = {}, branchId = {}", globalSession.getXid(),                branchSession.getBranchId());        }    }

branchReport方法在最后调用了GlobalSession的changeBranchStatus方法,这个方法也非常简单,就是直接更改了分支事务的状态:

public void changeBranchStatus(BranchSession branchSession, BranchStatus status)        throws TransactionException {//设置分支事务对象的状态        branchSession.setStatus(status);        //通知监听器,监听器的处理后续文章介绍        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBranchStatusChange(this, branchSession, status);        }    }

分支状态报告请求的处理还是比较简单的,总起来说就是找到分支事务对象BranchSession,然后直接修改其状态。

二、全局事务报告请求

全局事务报告请求的消息类型是MessageType.TYPE_GLOBAL_REPORT,请求对象为GlobalReportRequest,该请求由TM发起。该请求的作用是报告某个全局事务的状态,TC收到后更改该全局事务的状态。 全局事务报告请求的处理通过模板方法调用了协调器对象DefaultCoordinator的doGlobalReport方法,之后在doGlobalReport方法里面进一步调用到DefaultCore的globalReport方法:

public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {//根据XID找到全局事务        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);        if (globalSession == null) {return globalStatus;        }        //添加监听器        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());        doGlobalReport(globalSession, xid, globalStatus);        return globalSession.getStatus();    }    @Override    public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {if (globalSession.isSaga()) {//getCore(BranchType.SAGA)方法返回的对象是SagaCore,现在使用的是AT模式,不知道为什么调用SagaCore对象            getCore(BranchType.SAGA).doGlobalReport(globalSession, xid, globalStatus);        }    }

globalReport和doGlobalReport两个方法也非常简单,就是找到全局事务对象,然后调用SagaCore的doGlobalReport方法:

public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {//全局事务提交        if (GlobalStatus.Committed.equals(globalStatus)) {removeAllBranches(globalSession);            SessionHelper.endCommitted(globalSession);            LOGGER.info("Global[{}] committed", globalSession.getXid());        }        //全局事务回滚,Finished未知,等分析TM的时候在介绍该状态        else if (GlobalStatus.Rollbacked.equals(globalStatus)                || GlobalStatus.Finished.equals(globalStatus)) {removeAllBranches(globalSession);            SessionHelper.endRollbacked(globalSession);            LOGGER.info("Global[{}] rollbacked", globalSession.getXid());        }        //其他状态        else {globalSession.changeStatus(globalStatus);            LOGGER.info("Global[{}] reporting is successfully done. status[{}]", globalSession.getXid(), globalSession.getStatus());            if (GlobalStatus.RollbackRetrying.equals(globalStatus)                    || GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)                    || GlobalStatus.UnKnown.equals(globalStatus)) {globalSession.queueToRetryRollback();                LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());            } else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {globalSession.queueToRetryCommit();                LOGGER.info("Global[{}] will retry commit", globalSession.getXid());            }        }    }

可以看到上面分了三种情况处理全局事务状态,下面我们也分三种情况介绍处理逻辑。

1、提交全局事务

提交全局事务调用了两个方法:removeAllBranches和SessionHelper.endCommitted。 removeAllBranches方法将GlobalSession对象中的分支事务集合清空,同时释放分支事务对数据库记录加的锁,释放锁其实就是将上一篇文章介绍的BucketLockMap对象中的数据库记录和加锁的事务id删除。SessionHelper.endCommitted将修改GlobalSession对象中的状态为已提交(GlobalStatus.Committed):

private void removeAllBranches(GlobalSession globalSession) throws TransactionException {//得到所有的分支事务        ArrayListbranchSessions = globalSession.getSortedBranches();        for (BranchSession branchSession : branchSessions) {globalSession.removeBranch(branchSession);        }    }    //下面是GlobalSession中的方法    public void removeBranch(BranchSession branchSession) throws TransactionException {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onRemoveBranch(this, branchSession);        }        //释放分支事务加的锁        branchSession.unlock();        //删除分支事务        remove(branchSession);    }    public boolean remove(BranchSession branchSession) {return branchSessions.remove(branchSession);    }

下面是SessionHelper.endCommitted方法:

public static void endCommitted(GlobalSession globalSession) throws TransactionException {globalSession.changeStatus(GlobalStatus.Committed);        globalSession.end();    }    //下面是GlobalSession的方法    public void end() throws TransactionException {// Clean locks first        clean();        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onEnd(this);        }    }    public void clean() throws TransactionException {//下面的代码也是释放分支事务的锁,因为removeAllBranches方法已经释放过,所以下面的方法是空执行        LockerManagerFactory.getLockManager().releaseGlobalSessionLock(this);    }

2、回滚全局事务

回滚全局事务的首先也是执行removeAllBranches方法,所以第一步也是释放分支事务加的锁。下面我们重点看一下SessionHelper.endRollbacked:

public static void endRollbacked(GlobalSession globalSession) throws TransactionException {GlobalStatus currentStatus = globalSession.getStatus();        //isTimeoutGlobalStatus检查当前状态是否是超时的状态        if (isTimeoutGlobalStatus(currentStatus)) {globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);        } else {globalSession.changeStatus(GlobalStatus.Rollbacked);        }        globalSession.end();    }    public static boolean isTimeoutGlobalStatus(GlobalStatus status) {return status == GlobalStatus.TimeoutRollbacked                || status == GlobalStatus.TimeoutRollbackFailed                || status == GlobalStatus.TimeoutRollbacking                || status == GlobalStatus.TimeoutRollbackRetrying;    }

endRollbacked方法首先设置事务的状态,然后调用了globalSession.end方法,globalSession.end方法与提交全局事务中调用的是同一个,都是用于释放分支事务锁的。 endRollbacked方法在修改全局事务状态前,会比较当前事务的状态,如果当前事务状态是超时,则设置事务状态为超时回滚成功,如果不是则设置事务状态为回滚成功。 至于全局事务的状态的流转,则在分析完RM和TM之后在介绍。

3、其他事务状态

如果请求中的事务状态不是前面两种,则根据请求直接修改GlobalSession的状态,之后是一个分支判断,下面重点看一下分支判断:

//事务回滚中、超时回滚重试中、未知状态if (GlobalStatus.RollbackRetrying.equals(globalStatus)                    || GlobalStatus.TimeoutRollbackRetrying.equals(globalStatus)                    || GlobalStatus.UnKnown.equals(globalStatus)) {globalSession.queueToRetryRollback();                LOGGER.info("Global[{}] will retry rollback", globalSession.getXid());            } //提交重试else if (GlobalStatus.CommitRetrying.equals(globalStatus)) {globalSession.queueToRetryCommit();                LOGGER.info("Global[{}] will retry commit", globalSession.getXid());            }

分支判断中分别调用了queueToRetryRollback和queueToRetryCommit。 queueToRetryRollback:

public void queueToRetryRollback() throws TransactionException {//添加监听器        this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());        //seata提供了回滚重试管理器,下面这行代码将本全局事务对象添加到回滚重试管理器中        //回滚重试管理器后面文章介绍        SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(this);        GlobalStatus currentStatus = this.getStatus();        //设置事务状态        if (SessionHelper.isTimeoutGlobalStatus(currentStatus)) {this.changeStatus(GlobalStatus.TimeoutRollbackRetrying);        } else {this.changeStatus(GlobalStatus.RollbackRetrying);        }    }

queueToRetryCommit:

public void queueToRetryCommit() throws TransactionException {//设置监听器        this.addSessionLifecycleListener(SessionHolder.getRetryCommittingSessionManager());        //将本全局事务对象添加到重试提交管理器中        //重试提交管理器后面文章介绍        SessionHolder.getRetryCommittingSessionManager().addGlobalSession(this);        //设置事务状态        this.changeStatus(GlobalStatus.CommitRetrying);    }

从queueToRetryRollback和queueToRetryCommit两个方法中看出,当事务需要重试提交或者重试回滚时,都将全局事务对象加入到对应管理器中,由管理器进行后续处理。关于管理器,后面的文章详细分析。

4、总结

总的来说,在处理全局事务报告请求时,分了如下几种情况: (1)事务提交,更改事务状态为已提交,释放分支事务加的数据库记录锁; (2)事务回滚,释放分支事务加的数据库记录锁,更改事务状态为回滚成功或者超时回滚成功; (3)事务回滚中、超时回滚重试中、未知状态,更改事务状态,将GlobalSession对象添加到回滚重试管理器; (4)提交重试,更改事务状态,将GlobalSession对象添加到重试提交管理器; (5)其他状态,更改事务状态。

标签:

最新新闻:

新闻放送
Top