国产精品嫩草99av在线_一区在线视频观看_欧美高清一区_欧美 日韩 国产 一区_99精品欧美一区二区三区_久久大香伊蕉在人线观看热2_一色屋精品视频在线观看网站_在线亚洲国产精品网站_亚洲区一区二区三区_你懂的视频一区二区

當前位置:首頁 > 科技  > 軟件

事務提交之后異步執行工具類封裝

來源: 責編: 時間:2023-09-18 21:40:27 381觀看
導讀一、背景許多時候,我們期望在事務提交之后異步執行某些邏輯,調用外部系統,發送MQ,推送ES等等;當事務回滾時,異步操作也不執行,這些異步操作需要等待事務完成后才執行;比如出入庫的事務執行完畢后,異步發送MQ給報表系統、ES等等

Gdk28資訊網——每日最新資訊28at.com

一、背景

許多時候,我們期望在事務提交之后異步執行某些邏輯,調用外部系統,發送MQ,推送ES等等;當事務回滾時,異步操作也不執行,這些異步操作需要等待事務完成后才執行;比如出入庫的事務執行完畢后,異步發送MQ給報表系統、ES等等。Gdk28資訊網——每日最新資訊28at.com

二、猜想

我們在項目中大多都是使用聲明式事務(@Transactional注解) ,spring會基于動態代理機制對我們的業務方法進行增強,控制connection,從而達到事務的目的。那么我們能否在此找尋一些蛛絲馬跡。我們來看下spring事務的相關核心類(裝配流程不詳細敘述)。Gdk28資訊網——每日最新資訊28at.com

TransactionInterceptor:Gdk28資訊網——每日最新資訊28at.com

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {  @Override  @Nullable  public Object invoke(MethodInvocation invocation) throws Throwable {     // Work out the target class: may be {@code null}.     // The TransactionAttributeSource should be passed the target class     // as well as the method, which may be from an interface.     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);     // Adapt to TransactionAspectSupport's invokeWithinTransaction...     return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);  }}

TransactionAspectSupport(重點關注事務提交之后做了哪些事情,有哪些擴展點)。Gdk28資訊網——每日最新資訊28at.com

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable {   // If the transaction attribute is null, the method is non-transactional.   TransactionAttributeSource tas = getTransactionAttributeSource();   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);   final TransactionManager tm = determineTransactionManager(txAttr);   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {            throw new TransactionUsageException(                  "Unsupported annotated transaction on suspending function detected: " + method +                  ". Use TransactionalOperator.transactional extensions instead.");         }         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());         if (adapter == null) {            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +                  method.getReturnType());         }         return new ReactiveTransactionSupport(adapter);      });      return txSupport.invokeWithinTransaction(            method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);   }   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {      // 創建事務,此處也會創建connection      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);      Object retVal;      try {         // 執行目標方法         retVal = invocation.proceedWithInvocation();      }      catch (Throwable ex) {         // 目標方法異常時處理         completeTransactionAfterThrowing(txInfo, ex);         throw ex;      }      finally {		 // 重置TransactionInfo ThreadLocal         cleanupTransactionInfo(txInfo);      }      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {         // Set rollback-only in case of Vavr failure matching our rollback rules...         TransactionStatus status = txInfo.getTransactionStatus();         if (status != null && txAttr != null) {            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);         }      }	  // 業務方法成功執行,提交事務(重點關注此處),最終會調用AbstractPlatformTransactionManager#commit方法      commitTransactionAfterReturning(txInfo);      return retVal;   }   else {      final ThrowableHolder throwableHolder = new ThrowableHolder();      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.      try {         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);            try {               Object retVal = invocation.proceedWithInvocation();               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {                  // Set rollback-only in case of Vavr failure matching our rollback rules...                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);               }               return retVal;            }            catch (Throwable ex) {               if (txAttr.rollbackOn(ex)) {                  // A RuntimeException: will lead to a rollback.                  if (ex instanceof RuntimeException) {                     throw (RuntimeException) ex;                  }                  else {                     throw new ThrowableHolderException(ex);                  }               }               else {                  // A normal return value: will lead to a commit.                  throwableHolder.throwable = ex;                  return null;               }            }            finally {               cleanupTransactionInfo(txInfo);            }         });         // Check result state: It might indicate a Throwable to rethrow.         if (throwableHolder.throwable != null) {            throw throwableHolder.throwable;         }         return result;      }      catch (ThrowableHolderException ex) {         throw ex.getCause();      }      catch (TransactionSystemException ex2) {         if (throwableHolder.throwable != null) {            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);            ex2.initApplicationException(throwableHolder.throwable);         }         throw ex2;      }      catch (Throwable ex2) {         if (throwableHolder.throwable != null) {            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);         }         throw ex2;      }   }}}

AbstractPlatformTransactionManager:Gdk28資訊網——每日最新資訊28at.com

public final void commit(TransactionStatus status) throws TransactionException {   if (status.isCompleted()) {      throw new IllegalTransactionStateException(            "Transaction is already completed - do not call commit or rollback more than once per transaction");   }   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;   if (defStatus.isLocalRollbackOnly()) {      if (defStatus.isDebug()) {         logger.debug("Transactional code has requested rollback");      }      processRollback(defStatus, false);      return;   }   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {      if (defStatus.isDebug()) {         logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");      }      processRollback(defStatus, true);      return;   }   // 事務提交處理   processCommit(defStatus);}private void processCommit(DefaultTransactionStatus status) throws TransactionException {   try {      boolean beforeCompletionInvoked = false;      try {         boolean unexpectedRollback = false;         prepareForCommit(status);         triggerBeforeCommit(status);         triggerBeforeCompletion(status);         beforeCompletionInvoked = true;         if (status.hasSavepoint()) {            if (status.isDebug()) {               logger.debug("Releasing transaction savepoint");            }            unexpectedRollback = status.isGlobalRollbackOnly();            status.releaseHeldSavepoint();         }         else if (status.isNewTransaction()) {            if (status.isDebug()) {               logger.debug("Initiating transaction commit");            }            unexpectedRollback = status.isGlobalRollbackOnly();            doCommit(status);         }         else if (isFailEarlyOnGlobalRollbackOnly()) {            unexpectedRollback = status.isGlobalRollbackOnly();         }         // Throw UnexpectedRollbackException if we have a global rollback-only         // marker but still didn't get a corresponding exception from commit.         if (unexpectedRollback) {            throw new UnexpectedRollbackException(                  "Transaction silently rolled back because it has been marked as rollback-only");         }      }      catch (UnexpectedRollbackException ex) {         // can only be caused by doCommit         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);         throw ex;      }      catch (TransactionException ex) {         // can only be caused by doCommit         if (isRollbackOnCommitFailure()) {            doRollbackOnCommitException(status, ex);         }         else {            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);         }         throw ex;      }      catch (RuntimeException | Error ex) {         if (!beforeCompletionInvoked) {            triggerBeforeCompletion(status);         }         doRollbackOnCommitException(status, ex);         throw ex;      }      // Trigger afterCommit callbacks, with an exception thrown there      // propagated to callers but the transaction still considered as committed.      try {		 // 在事務提交后觸發(追蹤到這里就離真相不遠了)         triggerAfterCommit(status);      }      finally {         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);      }   }   finally {      cleanupAfterCompletion(status);   }}

TransactionSynchronizationUtils:Gdk28資訊網——每日最新資訊28at.com

public abstract class TransactionSynchronizationUtils {  public static void triggerAfterCommit() {     // TransactionSynchronizationManager: 事務同步器管理     invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());  }  public static void invokeAfterCommit(@Nullable List<TransactionSynchronization> synchronizations) {     if (synchronizations != null) {        for (TransactionSynchronization synchronization : synchronizations) {		   // 調用TransactionSynchronization#afterCommit方法,默認實現為空,留給子類擴展		   // 那么我們想在事務提交之后做一些異步操作,實現此方法即可           synchronization.afterCommit();        }     }  }}

TransactionSynchronization:Gdk28資訊網——每日最新資訊28at.com

public interface TransactionSynchronization extends Flushable {   default void afterCommit() {}}

過程中我們發現TransactionSynchronizationManager、TransactionSynchronization、TransactionSynchronizationAdapter 等相關類涉及aop的整個流程,篇幅有限,在此不詳細展開,當然我們的一些擴展也是離不開這些基礎類的。Gdk28資訊網——每日最新資訊28at.com

三、實現

事務提交之后異步執行,我們需自定義synchronization.afterCommit,結合線程池一起使用,定義線程池TaskExecutor。Gdk28資訊網——每日最新資訊28at.com

@Beanpublic TaskExecutor taskExecutor() {    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();    taskExecutor.setCorePoolSize(******);    taskExecutor.setMaxPoolSize(******);    taskExecutor.setKeepAliveSeconds(******);    taskExecutor.setQueueCapacity(******);    taskExecutor.setThreadNamePrefix(******);    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());    taskExecutor.initialize();    return taskExecutor;}

定義AfterCommitExecutor接口。Gdk28資訊網——每日最新資訊28at.com

public interface AfterCommitExecutor extends Executor { }

定義AfterCommitExecutorImpl實現類,注意需繼承TransactionSynchronizationAdapter類。Gdk28資訊網——每日最新資訊28at.com

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.core.NamedThreadLocal;import org.springframework.core.task.TaskExecutor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.transaction.support.TransactionSynchronizationAdapter;import org.springframework.transaction.support.TransactionSynchronizationManager;import java.util.List;import java.util.ArrayList;@Componentpublic class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);    // 保存要運行的任務線程    private static final ThreadLocal<List<Runnable>> RUNNABLE_THREAD_LOCAL = new NamedThreadLocal<>("AfterCommitRunnable");    // 設置線程池    @Autowired    private TaskExecutor taskExecutor;    /**     * 異步執行     *     * @param runnable 異步線程     */    @Override    public void execute(Runnable runnable) {        LOGGER.info("Submitting new runnable {} to run after commit", runnable);        // 如果事務已經提交,馬上進行異步處理        if (!TransactionSynchronizationManager.isSynchronizationActive()) {            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);            runnable.run();            return;        }        // 同一個事務的合并到一起處理(注意:沒有初始化則初始化,并注冊)        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();        if (null == threadRunnableList) {            threadRunnableList = new ArrayList<>();            RUNNABLE_THREAD_LOCAL.set(threadRunnableList);            TransactionSynchronizationManager.registerSynchronization(this);        }        threadRunnableList.add(runnable);    }    /**     * 監聽到事務提交之后執行方法     */    @Override    public void afterCommit() {        List<Runnable> threadRunnableList = RUNNABLE_THREAD_LOCAL.get();        LOGGER.info("Transaction successfully committed, executing {} threadRunnable", threadRunnableList.size());        for (Runnable runnable : threadRunnableList) {            try {                taskExecutor.execute(runnable);            } catch (RuntimeException e) {                LOGGER.error("Failed to execute runnable " + runnable, e);            }        }    }    /**     * 事務提交/回滾執行     *     * @param status (STATUS_COMMITTED-0、STATUS_ROLLED_BACK-1、STATUS_UNKNOWN-2)     */    @Override    public void afterCompletion(int status) {        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");        RUNNABLE_THREAD_LOCAL.remove();    }}

使用。Gdk28資訊網——每日最新資訊28at.com

工具類封裝好了,使用上那就很簡便了:注入AfterCommitExecutor,調用AfterCommitExecutor.execute(runnable)方法即可

四、總結

spring如此龐大,找準切入點,許多問題都是可以找到解決思路、或者方案;Gdk28資訊網——每日最新資訊28at.com

你對spring了解多少......Gdk28資訊網——每日最新資訊28at.com

本文鏈接:http://m.rrqrq.com/showinfo-26-10420-0.html事務提交之后異步執行工具類封裝

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 為什么說MyBatis默認的DefaultSqlSession是線程不安全?

下一篇: AIoTel下視頻編碼(一)--移動看家視頻水印溯源技術

標簽:
  • 熱門焦點
Top 国产精品嫩草99av在线_一区在线视频观看_欧美高清一区_欧美 日韩 国产 一区_99精品欧美一区二区三区_久久大香伊蕉在人线观看热2_一色屋精品视频在线观看网站_在线亚洲国产精品网站_亚洲区一区二区三区_你懂的视频一区二区
欧美在线短视频| 国产中文一区二区| 26uuu精品一区二区三区四区在线| 粉嫩蜜臀av国产精品网站| 欧美α欧美αv大片| 久久国产福利| 国产精品一区二区在线观看不卡 | 亚洲国产另类精品专区| 欧美一区午夜视频在线观看| 免费中文字幕日韩欧美| 亚洲精品一品区二品区三品区| 精品亚洲成a人在线观看| 亚洲图片欧美色图| 欧美精品v国产精品v日韩精品 | 粉嫩久久99精品久久久久久夜| 亚洲免费伊人电影| 国产午夜精品久久久久久免费视| 在线精品视频一区二区三四| 97精品超碰一区二区三区| 免播放器亚洲| 麻豆一区二区三| 三级欧美韩日大片在线看| 亚洲自拍与偷拍| 欧美va亚洲va| av不卡在线看| 黄色日韩在线| 亚洲香蕉视频| 国产主播一区二区| 岛国av在线一区| 国内欧美视频一区二区| 午夜精品久久久久久| 国产精品一二三区在线| 毛片一区二区三区| 日韩电影免费在线看| 中文字幕欧美激情| 久久一夜天堂av一区二区三区| 色av成人天堂桃色av| 美女任你摸久久| eeuss国产一区二区三区| 欧美成人国产| 亚洲美女色禁图| 欧美三级视频在线| 久久久天堂av| 亚洲欧洲另类国产综合| 欧美一级欧美三级| 欧美日韩欧美一区二区| 亚洲一区二区四区| 日韩欧美在线123| 91精品婷婷国产综合久久性色| 免费在线观看成人av| 欧美高清性hdvideosex| 国产精品久久久久久久久动漫 | 午夜精品婷婷| 在线亚洲伦理| 欧美大片拔萝卜| 日本亚洲欧美天堂免费| 国产成人综合亚洲91猫咪| 99久久精品国产一区| 国产精品视区| 国产欧美一区二区视频| 欧美一区二区免费观在线| 日韩欧美亚洲国产另类| 欧美激情综合在线| 午夜欧美精品| 亚洲裸体俱乐部裸体舞表演av| 香蕉成人久久| 亚洲第一区色| 久久精品成人| 在线国产电影不卡| 久久精品夜色噜噜亚洲aⅴ| 亚洲视频在线一区观看| 男人的天堂亚洲一区| 国产成人午夜精品5599| 亚洲一区欧美二区| 欧美sm极限捆绑bd| 亚洲在线视频网站| 日韩**一区毛片| 国产精品一二| 精品国产欧美一区二区| 国内外成人在线视频| 国产精品日本欧美一区二区三区| 欧美性色综合网| 久久精品国产一区二区| 欧美精品一区在线| 欧美一二三四区在线| 看片网站欧美日韩| 国产一级一区二区| 中文无字幕一区二区三区| 国产在线日韩欧美| 91久久精品一区二区三| 国产色综合一区| 丝袜美腿一区二区三区| 影音先锋久久久| 8x8x8国产精品| 中文字幕电影一区| 日韩av电影天堂| 韩国av一区| 国产亚洲欧洲一区高清在线观看| 欧美a一区二区| 日韩午夜免费视频| 一区二区三区精品| 日韩午夜在线电影| 久久久久九九视频| 久久er99热精品一区二区| 在线观看成人av电影| 粉嫩av一区二区三区| 8v天堂国产在线一区二区| 美女在线视频一区| 国产女主播一区二区三区| 一区二区三区中文字幕精品精品| 亚洲欧美在线视频观看| 欧美系列亚洲系列| 日韩美女天天操| 亚洲福利一区二区| 精品久久国产字幕高潮| 国产区日韩欧美| 欧美专区18| 欧美视频一区在线观看| 99久久精品免费看| 国产乱码字幕精品高清av| 亚洲精品国产一区二区三区四区在线 | 亚洲国产激情av| 亚洲另类自拍| 欧美视频福利| 欧美三级免费| 欧美在线三级电影| 一区二区三区日韩精品视频| 日本欧美韩国一区三区| 99久久国产综合精品色伊| 欧美色视频在线| 亚洲欧美视频在线观看视频| 国产黑丝在线一区二区三区| 在线观看不卡| 久久一二三四| 亚洲精品视频一区二区| 亚洲精品在线免费| 在线电影国产精品| 久久久久久久久久久黄色| 欧美一区在线视频| 欧美日韩高清不卡| 欧美精品tushy高清| 欧美专区一区二区三区| 欧美一区二区三区在线观看视频| 欧美日韩国产色站一区二区三区| 一区二区三区视频在线播放| 日韩视频一区二区三区在线播放免费观看| 久久av老司机精品网站导航| 日韩电影在线观看网站| 久久99精品国产91久久来源| 成人免费小视频| 亚洲国产精品综合小说图片区| 欧美激情一区三区| 成人久久18免费网站麻豆| 亚洲视频资源在线| 欧美日韩免费精品| 欧美电影精品一区二区| 又紧又大又爽精品一区二区| aⅴ色国产欧美| 欧美tk丨vk视频| 1024成人网色www| 国产农村妇女精品一二区 | 亚洲性色视频| 欧美videos中文字幕| 欧美1级日本1级| 一区二区在线观看免费 | 国产美女一区二区| 黄色亚洲在线| 久久久国际精品| 99riav久久精品riav| 色婷婷精品久久二区二区蜜臀av| 精品国产自在久精品国产| 美女脱光内衣内裤视频久久影院| 国产成人精品亚洲日本在线桃色 | 色悠悠久久综合| 精品日韩一区二区| 亚洲成人先锋电影| 亚洲精品国产成人久久av盗摄 | 欧美日韩在线播| 国产日产精品一区二区三区四区的观看方式 | 精品精品国产高清一毛片一天堂| 亚洲色图丝袜美腿| 国产精品一级在线| 91久久精品www人人做人人爽| 亚洲欧美日韩在线观看a三区| 亚洲美女在线一区| 美女黄色成人网| 亚洲美女免费视频| 国产女主播一区二区三区| 久久99久久99精品免视看婷婷 | **性色生活片久久毛片| 欧美精品一区二区视频| 欧美精品一区二区久久久| 亚洲日本精品国产第一区| 欧美a级一区二区| 欧美性大战久久久久久久蜜臀| 成人免费看视频| 国产精品欧美久久久久一区二区| 久久福利毛片| 成人亚洲一区二区一| 综合色天天鬼久久鬼色|