网站怎么做数据转移,网站定制哪家安全,wordpress头部信息,wordpress主题js2019独角兽企业重金招聘Python工程师标准 一个countDown在多线程调度下使用不当的分享 1. 诡异的数据抖动 在一个需求开发过程中#xff0c;由于有多角色需要获取每个角色下的菜单#xff1b;结果出现了单角色下拉去菜单没问题#xff0c;多角色情况下只有一个… 2019独角兽企业重金招聘Python工程师标准 一个countDown在多线程调度下使用不当的分享 1. 诡异的数据抖动 在一个需求开发过程中由于有多角色需要获取每个角色下的菜单结果出现了单角色下拉去菜单没问题多角色情况下只有一个角色的菜单正常返回的问题。这个问题很忧伤没有菜单如何进功能页面 2. 怀疑是缓存 因为多角色下菜单采用了Redis缓存故而怀疑是其中一个角色下的菜单是缓存失效但是关闭掉缓存依然不起作用排除缓存影响。 3. debug发现问题 通过增加日志输出在关闭掉缓存的实时模式下依然存在菜单时而有时而没有的情况。证明应该是代码有问题。 在一个多线程调度调度服务类中发现一个问题即在debug到如下代码会出现后续代码未执行完全接口结果即被返回的情况。 //经过查询相关API不会出现时序问题可放心使用final CountDownLatch latch new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFutureT listenableFuture threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallbackT() {Overridepublic void onFailure(Throwable throwable) {//过早调用countDown BUGlatch.countDown();LogHelper.EXCEPTION.error(执行任务异常,throwable);if(futureCallback!null){futureCallback.onFailure(throwable);}}Overridepublic void onSuccess(T t) {//过早调用countDown BUGlatch.countDown();if(futureCallback!null){futureCallback.onSuccess(t);}}});}4. countDown调用时机不对 在调用远程接口返回后立即执行 lacth.countDown(); 会立刻造成主线程阻塞释放立即响应结果丢失部分数据。如下图所示在第二个任务获取数据处理完成后 就立即调用latch.countDown(), 致使后续的回调还未执行。主线程在收到 latch的释放阻塞后返回了不完整的数据结果。 改造后将countDown放在回调执行完成之后并放置在 try {} finally { }代码块之中保证一定得到执行防止抛异常后主线程阻塞。 如下所示 final CountDownLatch latch new CountDownLatch(callableList.size());for(Callable callable :callableList){ListenableFutureT listenableFuture threadPoolTaskExecutor.submitListenable(callable);listenableFuture.addCallback(new ListenableFutureCallbackT() {Overridepublic void onFailure(Throwable throwable) {try{LogHelper.DEFAULT.info(多线程回调,latchCountlatch.getCount());LogHelper.EXCEPTION.error(执行任务异常,throwable);if(futureCallback!null){futureCallback.onFailure(throwable);}}finally {latch.countDown();}}Overridepublic void onSuccess(T t) {try{LogHelper.DEFAULT.info(多线程回调成功latchCountlatch.getCount());if(futureCallback!null){futureCallback.onSuccess(t);}}finally {latch.countDown();}}});}5. 本次使用的多线程调度说明 使用Future阻塞模式不会出现以上问题使用future.get()的阻塞式获取不需要CountDownLatch工具类配合使用。而且本次其实也可以使用Future来实现同样的功能。但是Future没有提供 onFailure, onSucess 这样的回调接口考虑到易用性采用了ListenableFuture 本次采用的 spring的 ListenableFuture 方式回调来实现回调式聚合。在对CountDownLatch使用不当的情况下出现了该问题。解决该问题后功能运行正常。 本次提供的服务类 ThreadPoolExecutorService, 其主要的方法如下
/*** 线程池服务类** author David* since 2018/5/15*/
public interface ThreadPoolExecutorService {/*** 执行多线程任务处理并通过 futureCallback对结果进行回调处理* param callableList 异步线程可执行的任务 List* param futureCallback 异步回调* param timeout 超时时间毫秒* param T*/T void execute(ListCallableT callableList, ListenableFutureCallbackT futureCallback, long timeout);/*** 执行多线程任务处理并将结果聚合成一个List 进行返回** param callableList 异步线程可执行的任务 List* param failureCallback 失败的回调方法* param skipNull 是否忽略Null 结果如果忽略 null 不会添加到List 之中* param timeout 超时时间毫秒* param T*/T ListT executeAndMerge(ListCallableT callableList, FailureCallback failureCallback, boolean skipNull, long timeout);/*** 执行多线程任务处理并将结果List,聚合成一个List 进行返回** param callableList 异步线程可执行的任务 List* param failureCallback 失败的回调方法* param timeout 超时时间毫秒* param T*/T ListT executeAndMergeList(ListCallableListT callableList, FailureCallback failureCallback, long timeout); 6. Future 和 ListenableFuture的区别 spring 或者 guava 的 ListenableFutrue其使用方式和机理应该是类似的这里的说明是通用的。本次工具类使用的是spring自带的ListenableFutrue。 6.1 区别说明 ListenableFuture顾名思义就是可以监听的Future它是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算就必须使用另一个线程不断的查询计算状态。这样做代码复杂而且效率低下。使用ListenableFuture帮我们检测Future是否完成了如果完成就自动调用回调函数这样可以让主线程不必阻塞减少并发程序的复杂度。 6.4 spring ListenableFuture使用示例 一个简单的示例如果要获得 ListenableFuture则需要一个对Java线程池进行修饰过的线程池执行器。如下所示 bean idtaskExecutor classorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutorproperty namecorePoolSize value${threadpool.corePoolSize} /property namekeepAliveSeconds value${threadpool.keepAliveSeconds} /property namemaxPoolSize value${threadpool.maxPoolSize} /property namequeueCapacity value${threadpool.queueCapacity} /property namerejectedExecutionHandlerbean classjava.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy //property/beanguava的请参考以下方式进行修饰这里不进行详细说明 //Java线程池的类型是可选的【根据场景自行构建】 ListeningExecutorService executorService MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());得到修饰过的线程池执行器后即可提交可Callable任务得到 ListenableFuture 进行处理。 以下为计算1~5的3次方的结果并输出不需要聚合结果每个线程计算完毕后立刻输出结果 for(int i1; i5; i){final int num i;ListenableFutureInteger listenableFuture threadPoolTaskExecutor.submitListenable(new CallableInteger() {Overridepublic Integer call(){return (int)Math.pow(num,3);}});listenableFuture.addCallback(new ListenableFutureCallbackInteger() {Overridepublic void onFailure(Throwable throwable) {LogHelper.EXCEPTION.error(处理失败, throwable);}Overridepublic void onSuccess(Integer result) {System.out.println(num 的3次方为 result);}});} 6.2 是否可以添加多个callback 答案是肯定的因为ListenableFuture 的addCallback是添加到ListenableFutureCallbackRegistry 一个注册中心。而注册中心底层是支持多个回调的。在6.3会具体介绍回调方法注册中心的处理逻辑。 public void addCallback(ListenableFutureCallback? super T callback) {this.callbacks.addCallback(callback);} 6.3 addCallback是否需要考虑时序 guava 和 spring的 ListenableFuture 均做了时序兼容在listenableFuture执行的任意时刻调用 addCallback 均可准确的执行回调。这里就不得不说在调用addCallback的时候其实将回到方法注册到ListenableFutureCallbackRegistry回调注册中心。 6.3.1 ListenableFutureCallbackRegistry的特性有 a. 记录了Callable的3种调度状态 NEW(新建还未执行)SUCCESS(返回成功)FAILURE(抛异常失败) b. 有两个处理队列分别是: successCallbacks存储执行成功的回调方法 failureCallbacks: 存储执行失败的回调方法抛异常。 c. 存储响应结果 将CallableT 返回的结果也放在回调中心里面 d. mutex 对象保证线程安全 有一个成员变量private final Object mutex; 用来保证在添加回调任务或者设置结果集的时候注册中心是线程安全的。 其在添加回调任务的时候处理流程为 synchronized(this.mutex) {...}如上图所示在添加回调任务的时候会通过synchronized先获取 mutex 排它锁保证处理的线程安全。继而判断任务的状态 如果为NEW标识任务还未执行完毕这时候需要将任务先放入队列待任务执行完毕再根据状态调用回调方法 如果为SUCCESS标识任务已经执行成功不需要再放入队列而是在当前线程中直接调用 onSuccess方法 如果为FAILURE, 标识任务已经执行失败不需要再放入队列而是在当前线程中直接调用 onFailure方法 此外还有一个流程即在Callable任务调度完成返回结果后如果未抛异常对successCallbacks队列中的方法进行逐个回调如果抛出异常对failureCallbacks队列中的方法进行逐个回调。因流程较简单这里只是简单说明。 6.4 说明 本文只分析到 ListenableFuture 的使用方式 CountDownLatch的调用时机和ListenableFuture 的特性。 因时间和篇幅有限具体spring 或 guava在submitListenable 任务之后内部处理逻辑并没有阐述。感兴趣的同学可以具体到源码查看其内部实行逻辑。 转载于:https://my.oschina.net/davidzhang/blog/1839206