小学生家长网站建设需求,网站建设介绍ppt模板,wordpress注册弹窗代码,专业嵌入式软件开发日常工作中有用到线程池吗#xff1f;什么是线程池#xff1f;为什么要使用线程池#xff1f;
作为 JUC 包下的门面担当#xff0c;线程池是名副其实的 JUC 一哥#xff0c;不了解线程池#xff0c;那说明你对 JUC 包其他工具也了解的不咋样吧#xff0c;对 JUC 没深入…日常工作中有用到线程池吗什么是线程池为什么要使用线程池
作为 JUC 包下的门面担当线程池是名副其实的 JUC 一哥不了解线程池那说明你对 JUC 包其他工具也了解的不咋样吧对 JUC 没深入研究过那就是没掌握到 Java 的精髓给面试官这样一个印象那结果可想而知了。
可以这样说计算机发展到现在摩尔定律在现有工艺水平下已经遇到难易突破的物理瓶颈通过多核 CPU 并行计算来提升服务器的性能已经成为主流随之出现了多线程技术。
线程作为操作系统宝贵的资源对它的使用需要进行控制管理线程池就是采用池化思想类似连接池、常量池、对象池等管理线程的工具。
JUC 提供了 ThreadPoolExecutor 体系类来帮助我们更方便的管理线程、并行执行任务。
下图是 Java 线程池继承体系 顶级接口Executor提供了一种方式解耦任务的提交和执行只定义了一个 execute(Runnable command) 方法用来提交任务至于具体任务怎么执行则交给他的实现者去自定义实现。
ExecutorService 接口继承 Executor且扩展了生命周期管理的方法、返回 Futrue 的方法、批量提交任务的方法。
AbstractExecutorService 抽象类继承 ExecutorService 接口对 ExecutorService 相关方法提供了默认实现用 RunnableFuture 的实现类 FutureTask 包装 Runnable 任务交给 execute() 方法执行然后可以从该 FutureTask 阻塞获取执行结果并且对批量任务的提交做了编排。
ThreadPoolExecutor 继承 AbstractExecutorService采用池化思想管理一定数量的线程来调度执行提交的任务且定义了一套线程池的生命周期状态用一个 ctl 变量来同时保存当前池状态高3位和当前池线程数低29位。看过源码的小伙伴会发现ThreadPoolExecutor 类里的方法大量有同时需要获取或更新池状态和池当前线程数的场景放一个原子变量里可以很好的保证数据的一致性以及代码的简洁性说到 ctl 了可以顺便讲下几个状态之间的流转过程。 // 用此变量保存当前池状态高3位和当前线程数低29位private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS Integer.SIZE - 3;private static final int CAPACITY (1 COUNT_BITS) - 1;// runState is stored in the high-order bits// 可以接受新任务提交也会处理任务队列中的任务// 结果111跟29个0111 00000000000000000000000000000private static final int RUNNING -1 COUNT_BITS;// 不接受新任务提交但会处理任务队列中的任务// 结果000 00000000000000000000000000000private static final int SHUTDOWN 0 COUNT_BITS;// 不接受新任务不执行队列中的任务且会中断正在执行的任务// 结果001 00000000000000000000000000000private static final int STOP 1 COUNT_BITS;// 任务队列为空workerCount 0线程池的状态在转换为TIDYING状态时会执行钩子方法terminated()// 结果010 00000000000000000000000000000private static final int TIDYING 2 COUNT_BITS;// 调用terminated()钩子方法后进入TERMINATED状态// 结果010 00000000000000000000000000000private static final int TERMINATED 3 COUNT_BITS;// Packing and unpacking ctl// 低29位变为0得到了线程池的状态private static int runStateOf(int c) { return c ~CAPACITY; }// 高3位变为为0得到了线程池中的线程数private static int workerCountOf(int c) { return c CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }使用线程池可以带来以下好处 1、降低资源消耗。降低频繁创建、销毁线程带来的额外开销复用已创建线程 2、降低使用复杂度。将任务的提交和执行进行解耦我们只需要创建一个线程池然后往里面提交任务就行3、具体执行流程由线程池自己管理降低使用复杂度 4、提高线程可管理性。能安全有效的管理线程资源避免不加限制无限申请造成资源耗尽风险 5、提高响应速度。任务到达后直接复用已创建好的线程执行。 线程池的使用场景简单来说可以有 1、快速响应用户请求响应速度优先。比如一个用户请求需要通过 RPC 调用好几个服务去获取数据然后聚合返回此场景就可以用线程池并行调用响应时间取决于响应最慢的那个 RPC 接口的耗时又或者一个注册请求注册完之后要发送短信、邮件通知为了快速返回给用户可以将该通知操作丢到线程池里异步去执行然后直接返回客户端成功提高用户体验 2、单位时间处理更多请求吞吐量优先。比如接受 MQ 消息然后去调用第三方接口查询数据此场景并不追求快速响应主要利用有限的资源在单位时间内尽可能多的处理任务可以利用队列进行任务的缓冲。 基于以上使用场景可以套到自己项目中说下为了提升系统性能自己对负责的系统模块使用线程池做了哪些优化优化前后对比 Qps 提升多少、Rt 降低多少、服务器数量减少多少等等。
ThreadPoolExecutor 都有哪些核心参数
尽量的熟悉线程池执行流程。
不能仅仅掌握以下内容
包含核心线程数corePoolSize、最大线程数maximumPoolSize空闲线程超时时间keepAliveTime、时间单位unit、阻塞队列workQueue、拒绝策略handler、线程工厂ThreadFactory这7个参数。
还需要深入理解
会再主动描述下线程池的执行流程也就是 execute() 方法执行流程。
execute()方法执行逻辑如下
public void execute(Runnable command) {if (command null)throw new NullPointerException();int c ctl.get();if (workerCountOf(c) corePoolSize) {if (addWorker(command, true))return;c ctl.get();}if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();if (! isRunning(recheck) remove(command))reject(command);else if (workerCountOf(recheck) 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}可以总结出如下主要执行流程当然看上述代码会有一些异常分支判断可以自己梳理加到下述执行主流程里 1、判断线程池的状态如果不是RUNNING状态直接执行拒绝策略 2、如果当前线程数 核心线程池则新建一个线程来处理提交的任务 3、如果当前线程数 核心线程数且任务队列没满则将任务放入阻塞队列等待执行 4、如果 核心线程池 当前线程池数 最大线程数且任务队列已满则创建新的线程执行提交的任务 5、如果当前线程数 最大线程数且队列已满则执行拒绝策略拒绝该任务。 使用场景也要了解
执行流程主要用在 CPU 密集型场景下。
像 Tomcat、Dubbo 这类框架他们内部的线程池主要用来处理网络 IO 任务的所以他们都对 JUC 线程池的执行流程进行了调整来支持 IO 密集型场景使用。
他们提供了阻塞队列 TaskQueue该队列继承 LinkedBlockingQueue重写了 offer() 方法来实现执行流程的调整。
Overridepublic boolean offer(Runnable o) {// we cant do any checksif (parentnull) return super.offer(o);// we are maxed out on threads, simply queue the objectif (parent.getPoolSize() parent.getMaximumPoolSize()) return super.offer(o);// we have idle threads, just add it to the queueif (parent.getSubmittedCount()(parent.getPoolSize())) return super.offer(o);// if we have less threads than maximum force creation of a new threadif (parent.getPoolSize()parent.getMaximumPoolSize()) return false;// if we reached here, we need to add it to the queuereturn super.offer(o);}可以看到他在入队之前做了几个判断这里的 parent 就是所属的线程池对象。 1、如果 parent 为 null直接调用父类 offer 方法入队 2、如果当前线程数等于最大线程数则直接调用父类 offer()方法入队 3、如果当前未执行的任务数量小于等于当前线程数仔细思考下是不是说明有空闲的线程呢那么直接调用父类 offer() 入队后就马上有线程去执行它 4、如果当前线程数小于最大线程数量则直接返回 false然后回到 JUC 线程池的执行流程回想下是不是就去添加新线程去执行任务了呢 5、其他情况都直接入队。 可以看出当当前线程数大于核心线程数时JUC 原生线程池首先是把任务放到队列里等待执行而不是先创建线程执行。
如果 Tomcat 接收的请求数量大于核心线程数请求就会被放到队列中等待核心线程处理如果并发量很大就会在队列里堆积大量任务这样会降低请求的总体响应速度。
所以 Tomcat并没有使用 JUC 原生线程池利用 TaskQueue 的 offer() 方法巧妙的修改了 JUC 线程池的执行流程改写后 Tomcat 线程池执行流程如下 1、判断如果当前线程数小于核心线程池则新建一个线程来处理提交的任务 2、如果当前当前线程池数大于核心线程池小于最大线程数则创建新的线程执行提交的任务 3、如果当前线程数等于最大线程数则将任务放入任务队列等待执行 4、如果队列已满则执行拒绝策略。 而且 Tomcat 会做核心线程预热在创建好线程池后接着会去创建核心线程并启动服务启动后就可以直接接受客户端请求进行处理了避免了冷启动问题。
然后再说下线程池的 Worker 线程模型继承 AQS 实现了锁机制。线程启动后执行 runWorker() 方法runWorker() 方法中调用 getTask() 方法从阻塞队列中获取任务获取到任务后先执行 beforeExecute() 钩子函数再执行任务然后再执行 afterExecute() 钩子函数。若超时获取不到任务会调用 processWorkerExit() 方法执行 Worker 线程的清理工作。
runworker()、getTask()、addWorker()
刚说到了 Worker 继承 AQS 实现了锁机制那ThreadPoolExecutor 都用到了哪些锁为什么要用锁
1mainLock 锁
ThreadPoolExecutor 内部维护了 ReentrantLock 类型锁 mainLock在访问 workers 成员变量以及进行相关数据统计记账比如访问 largestPoolSize、completedTaskCount时需要获取该重入锁。
为什么要有 mainLock private final ReentrantLock mainLock new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSetWorker workers new HashSetWorker();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;可以看到 workers 变量用的 HashSet 是线程不安全的是不能用于多线程环境的。largestPoolSize、completedTaskCount 也是没用 volatile 修饰所以需要在锁的保护下进行访问。
为什么不直接用个线程安全容器呢
其实 Doug 老爷子在 mainLock 变量的注释上解释了意思就是说事实证明相比于线程安全容器此处更适合用 lock主要原因之一就是串行化 interruptIdleWorkers() 方法避免了不必要的中断风暴。
怎么理解这个中断风暴呢
其实简单理解就是如果不加锁interruptIdleWorkers() 方法在多线程访问下就会发生这种情况。一个线程调用interruptIdleWorkers() 方法对 Worker 进行中断此时该 Worker 出于中断中状态此时又来一个线程去中断正在中断中的 Worker 线程这就是所谓的中断风暴。
那 largestPoolSize、completedTaskCount 变量加个 volatile 关键字修饰是不是就可以不用 mainLock 了
其他一些内部变量能用 volatile 的都加了 volatile 修饰了这两个没加主要就是为了保证这两个参数的准确性在获取这两个值时能保证获取到的一定是修改方法执行完成后的值。如果不加锁可能在修改方法还没执行完成时此时来获取该值获取到的就是修改前的值然后修改方法一提交就会造成获取到的数据不准确了。
2Worker 线程锁
刚也说了 Worker 线程继承 AQS实现了 Runnable 接口内部持有一个 Thread 变量一个 firstTask及 completedTasks 三个成员变量。
基于 AQS 的 acquire()、tryAcquire() 实现了 lock()、tryLock() 方法类上也有注释该锁主要是用来维护运行中线程的中断状态。在 runWorker() 方法中以及刚说的 interruptIdleWorkers() 方法中用到了。
这个维护运行中线程的中断状态怎么理解呢 protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }在runWorker() 方法中获取到任务开始执行前需要先调用 w.lock() 方法lock() 方法会调用 tryAcquire() 方法tryAcquire() 实现了一把非重入锁通过 CAS 实现加锁。 protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}final void runWorker(ThreadPoolExecutor.Worker w) {Thread wt Thread.currentThread();Runnable task w.firstTask;w.firstTask null;w.unlock(); // allow interruptsboolean completedAbruptly true;try {while (task ! null || (task getTask()) ! null) {w.lock(); // 需要先调用 w.lock() 方法// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown null;try {task.run();} catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task null;w.completedTasks;w.unlock();}}completedAbruptly false;} finally {processWorkerExit(w, completedAbruptly);}
}interruptIdleWorkers() 方法会中断那些等待获取任务的线程会调用 w.tryLock() 方法来加锁如果一个线程已经在执行任务中那么 tryLock() 就获取锁失败就保证了不能中断运行中的线程了。
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {for (ThreadPoolExecutor.Worker w : workers) {Thread t w.thread;if (!t.isInterrupted() w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}在项目中是怎样使用线程池的Executors 了解吗
现在大多数公司都在遵循阿里巴巴 Java 开发规范该规范里明确说明不允许使用 Executors 创建线程池而是通过 ThreadPoolExecutor 显示指定参数去创建。
Executors 创建的线程池有发生 OOM 的风险。
Executors.newFixedThreadPool 和 Executors.SingleThreadPool 创建的线程池内部使用的是无界Integer.MAX_VALUE的 LinkedBlockingQueue 队列可能会堆积大量请求导致 OOM。
Executors.newCachedThreadPool 和 Executors.scheduledThreadPool 创建的线程池最大线程数是用的Integer.MAX_VALUE可能会创建大量线程导致 OOM。
自己在日常工作中也有封装类似的工具类但是都是内存安全的参数需要自己指定适当的值也有基于 LinkedBlockingQueue 实现了内存安全阻塞队列 MemorySafeLinkedBlockingQueue当系统内存达到设置的最大剩余阈值时就不在往队列里添加任务了避免发生 OOM。
package cn.com.codingce.juc.thread;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程池工具类*/
public class ThreadPoolUtils {/*** 线程池*/private final ThreadPoolExecutor executor;/*** 线程工厂*/private CustomThreadFactory threadFactory;/*** 异步执行结果*/private final ListCompletableFutureVoid completableFutures;/*** 拒绝策略*/private final CustomAbortPolicy abortPolicy;/*** 失败数量*/private final AtomicInteger failedCount;public ThreadPoolUtils(int corePoolSize, int maximumPoolSize, int queueSize, String poolName) {this.failedCount new AtomicInteger(0);this.abortPolicy new CustomAbortPolicy();this.completableFutures new ArrayList();this.threadFactory new CustomThreadFactory(poolName);this.executor new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize), this.threadFactory, abortPolicy);}/*** 执行任务*/public void execute(Runnable runnable) {CompletableFutureVoid future CompletableFuture.runAsync(runnable, executor);// 设置好异常情况future.exceptionally(e - {failedCount.incrementAndGet();System.out.println(Task Failed... e);e.printStackTrace();return null;});// 任务结果列表completableFutures.add(future);}/*** 执行自定义runnable接口可省略只是加了个获取taskName*/public void execute(SimpleTask runnable) {CompletableFutureVoid future CompletableFuture.runAsync(runnable, executor);// 设置好异常情况future.exceptionally(e - {failedCount.incrementAndGet();System.out.println(Task [ runnable.taskName ] Failed... e);e.printStackTrace();return null;});// 任务结果列表completableFutures.add(future);}/*** 停止线程池*/public void shutdown() {executor.shutdown();System.out.println(************************停止线程池************************);System.out.println(** 活动线程数 executor.getActiveCount() \t\t\t\t\t\t\t\t\t\t**);System.out.println(** 等待任务数 executor.getQueue().size() \t\t\t\t\t\t\t\t\t\t**);System.out.println(** 完成任务数 executor.getCompletedTaskCount() \t\t\t\t\t\t\t\t\t\t**);System.out.println(** 全部任务数 executor.getTaskCount() \t\t\t\t\t\t\t\t\t\t**);System.out.println(** 拒绝任务数 abortPolicy.getRejectCount() \t\t\t\t\t\t\t\t\t\t**);System.out.println(** 成功任务数 (executor.getCompletedTaskCount() - failedCount.get()) \t\t\t\t\t\t\t\t\t\t**);System.out.println(** 异常任务数 failedCount.get() \t\t\t\t\t\t\t\t\t\t**);System.out.println(**********************************************************);}/*** 获取任务执行情况* 之所以遍历taskCount数的CompletableFuture是因为如果有拒绝的任务相应的CompletableFuture也会放进列表而这种CompletableFuture调用get方法是会永远阻塞的。*/public boolean getExecuteResult() {// 任务数不包含拒绝的任务long taskCount executor.getTaskCount();for (int i 0; i taskCount; i) {CompletableFutureVoid future completableFutures.get(i);try {// 获取结果这个是同步的目的是获取真实的任务完成情况future.get();} catch (InterruptedException | ExecutionException e) {System.out.println(java.util.concurrent.CompletableFuture.get() Failed ... e);return false;}// 出现异常falseif (future.isCompletedExceptionally()) {return false;}}return true;}/*** 线程工厂*/private static class CustomThreadFactory implements ThreadFactory {private final String poolName;private final AtomicInteger count;private CustomThreadFactory(String poolName) {this.poolName poolName;this.count new AtomicInteger(0);}Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);// 线程名利于排查thread.setName(poolName -[线程 count.incrementAndGet() ]);return thread;}}/*** 自定义拒绝策略*/private static class CustomAbortPolicy implements RejectedExecutionHandler {/*** 拒绝的任务数*/private final AtomicInteger rejectCount;private CustomAbortPolicy() {this.rejectCount new AtomicInteger(0);}private AtomicInteger getRejectCount() {return rejectCount;}/*** 这个方法如果不抛异常则执行此任务的线程会一直阻塞*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {System.out.println(Task r.toString() rejected from e.toString() 累计 rejectCount.incrementAndGet());}}/*** 只是加了个taskName可自行实现更加复杂的逻辑*/public abstract static class SimpleTask implements Runnable {/*** 任务名称*/private String taskName;public void setTaskName(String taskName) {this.taskName taskName;}}}测试:
package cn.com.codingce.juc.thread;import java.util.Random;/*** 线程池工具类*/
public class ThreadPoolUtilsMain {public static void main(String[] args) throws InterruptedException {test();}public static void test() throws InterruptedException {ThreadPoolUtils pool new ThreadPoolUtils(5, 5, 10, A业务线程池);// 14个正常任务for (int i 0; i 14; i) {pool.execute(new ThreadPoolUtils.SimpleTask() {Overridepublic void run() {try {// 模拟任务耗时Thread.sleep(600);} catch (InterruptedException e) {e.printStackTrace();}// 随机名称String taskName randomName();super.setTaskName(taskName);System.out.println(Thread.currentThread().getName() -执行【 taskName 】);}});}// 1个异常任务pool.execute(new ThreadPoolUtils.SimpleTask() {Overridepublic void run() {// 随机名称String taskName randomName();super.setTaskName(taskName);throw new RuntimeException(执行【 taskName 】 异常);}});// 5个多余用来拒绝的任务for (int i 0; i 5; i) {pool.execute(new ThreadPoolUtils.SimpleTask() {Overridepublic void run() {// 随机名称String taskName randomName();super.setTaskName(taskName);throw new RuntimeException(多余任务);}});}System.out.println(任务完成情况 pool.getExecuteResult());pool.shutdown();Thread.sleep(20000);}private static String randomName() {return 任务 (char) (new Random().nextInt(60) 65);}}输出
Task java.util.concurrent.CompletableFuture$AsyncRun378bf509 rejected from java.util.concurrent.ThreadPoolExecutor5fd0d5ae[Running, pool size 5, active threads 5, queued tasks 10, completed tasks 0] 累计1
Task java.util.concurrent.CompletableFuture$AsyncRun2d98a335 rejected from java.util.concurrent.ThreadPoolExecutor5fd0d5ae[Running, pool size 5, active threads 5, queued tasks 10, completed tasks 0] 累计2
Task java.util.concurrent.CompletableFuture$AsyncRun16b98e56 rejected from java.util.concurrent.ThreadPoolExecutor5fd0d5ae[Running, pool size 5, active threads 5, queued tasks 10, completed tasks 0] 累计3
Task java.util.concurrent.CompletableFuture$AsyncRun7ef20235 rejected from java.util.concurrent.ThreadPoolExecutor5fd0d5ae[Running, pool size 5, active threads 5, queued tasks 10, completed tasks 0] 累计4
Task java.util.concurrent.CompletableFuture$AsyncRun27d6c5e0 rejected from java.util.concurrent.ThreadPoolExecutor5fd0d5ae[Running, pool size 5, active threads 5, queued tasks 10, completed tasks 0] 累计5
A业务线程池-[线程1]-执行【任务{】
A业务线程池-[线程4]-执行【任务H】
A业务线程池-[线程2]-执行【任务r】
A业务线程池-[线程5]-执行【任务N】
A业务线程池-[线程3]-执行【任务O】
A业务线程池-[线程1]-执行【任务[】
A业务线程池-[线程5]-执行【任务n】
A业务线程池-[线程4]-执行【任务M】
A业务线程池-[线程3]-执行【任务V】
A业务线程池-[线程2]-执行【任务n】
Task [任务J] Failed...java.util.concurrent.CompletionException: java.lang.RuntimeException: 执行【任务J】异常
java.util.concurrent.CompletionException: java.lang.RuntimeException: 执行【任务J】异常at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 执行【任务J】异常at cn.com.codingce.juc.thread.ThreadPoolUtilsMain$2.run(ThreadPoolUtilsMain.java:43)at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)... 3 more
A业务线程池-[线程1]-执行【任务Q】
A业务线程池-[线程3]-执行【任务C】
A业务线程池-[线程4]-执行【任务q】
A业务线程池-[线程5]-执行【任务t】
java.util.concurrent.CompletableFuture.get() Failed ...java.util.concurrent.ExecutionException: java.lang.RuntimeException: 执行【任务J】异常
任务完成情况false
************************停止线程池************************
** 活动线程数0 **
** 等待任务数0 **
** 完成任务数15 **
** 全部任务数15 **
** 拒绝任务数5 **
** 成功任务数14 **
** 异常任务数1 **
**********************************************************我们一般都是在 Spring 环境中使用线程池的直接使用 JUC 原生 ThreadPoolExecutor 有个问题Spring 容器关闭的时候可能任务队列里的任务还没处理完有丢失任务的风险。
我们知道 Spring 中的 Bean 是有生命周期的如果 Bean 实现了 Spring 相应的生命周期接口InitializingBean、DisposableBean接口在 Bean 初始化、容器关闭的时候会调用相应的方法来做相应处理。
所以最好不要直接使用 ThreadPoolExecutor 在 Spring 环境中可以使用 Spring 提供的 ThreadPoolTaskExecutor。
也会按业务类型进行线程池隔离各任务执行互不影响避免共享一个线程池任务执行参差不齐相互影响高耗时任务会占满线程池资源导致低耗时任务没机会执行同时如果任务之间存在父子关系可能会导致死锁的发生进而引发 OOM。
使用线程池的常规操作是通过 Bean 定义多个业务隔离的线程池实例。我们是参考美团线程池实践那篇文章做了一个动态可监控线程池的轮子而且利用了 Spring 的一些特性将线程池实例都配置在配置中心里服务启动的时候会从配置中心拉取配置然后生成 BeanDefination 注册到 Spring 容器中在 Spring 容器刷新时会生成线程池实例注册到 Spring 容器中。这样我们业务代码就不用显式用 Bean 声明线程池了可以直接通过依赖注入的方式使用线程池而且也可以动态调整线程池的参数了。
EnableAsync
Configuration
public class ThreadPoolConfig {Beanpublic ThreadPoolTaskExecutor excelExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100000);executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());executor.setThreadNamePrefix(excel-);return executor;}
}应用
Resource
ThreadPoolTaskExecutor excelExecutor;// ...
ListCompletableFutureListSimplePeOnlineLabelDto list new ArrayList();ListListString reportList ListUtil.split(reportIds, 200);
reportList.forEach(ls - {CompletableFutureListSimplePeOnlineLabelDto completableFuture CompletableFuture.supplyAsync(() - peOnlineDataService.getPeOnlineReptLabelData(ls),excelExecutor);list.add(completableFuture);
});for (CompletableFutureListSimplePeOnlineLabelDto listCompletableFuture : list) {log.info(读取数据);try {ListSimplePeOnlineLabelDto simplePeOnlineLabelDtoList listCompletableFuture.get();peOnlineData.addAll(simplePeOnlineLabelDtoList);}catch (Exception e){log.error(读取数据失败);}
}
// ...通过 ThreadPoolExecutor 来创建线程池核心参数设置多少合适呢
可能很多人都看到过《Java 并发编程实践》这本书里介绍的一个线程数计算公式 Ncpu CPU 核数 Ucpu 目标 CPU 利用率0 Ucpu 1 W / C 等待时间 / 计算时间 要程序跑到 CPU 的目标利用率需要的线程数为 Nthreads Ncpu * Ucpu * (1 W / C) 这公式太偏理论化了很难实际落地下来首先很难获取准确的等待时间和计算时间。再着一个服务中会运行着很多线程比如 Tomcat 有自己的线程池、Dubbo 有自己的线程池、GC 也有自己的后台线程我们引入的各种框架、中间件都有可能有自己的工作线程这些线程都会占用 CPU 资源所以通过此公式计算出来的误差一定很大。
所以说怎么确定线程池大小呢
其实没有固定答案需要通过压测不断的动态调整线程池参数观察 CPU 利用率、系统负载、GC、内存、RT、吞吐量等各种综合指标数据来找到一个相对比较合理的值。
所以不要再问设置多少线程合适了这个问题没有标准答案需要结合业务场景设置一系列数据指标排除可能的干扰因素注意链路依赖比如连接池限制、三方接口限流然后通过不断动态调整线程数测试找到一个相对合适的值。
线程池是如何监控的
因为线程池的运行相对而言是个黑盒它的运行我们感知不到该问题主要考察怎么感知线程池的运行情况。 对线程池 ThreadPoolExecutor 做了一些增强做了一个线程池管理框架。主要功能有监控告警、动态调参。主要利用了 ThreadPoolExecutor 类提供的一些 set、get方法以及一些钩子函数。 动态调参是基于配置中心实现的核心参数配置在配置中心可以随时调整、实时生效利用了线程池提供的 set 方法。
监控主要就是利用线程池提供的一些 get 方法来获取一些指标数据然后采集数据上报到监控系统进行大盘展示。也提供了 Endpoint 实时查看线程池指标数据。
同时定义了5中告警规则。 线程池活跃度告警。活跃度 activeCount / maximumPoolSize当活跃度达到配置的阈值时会进行事前告警 队列容量告警。容量使用率 queueSize / queueCapacity当队列容量达到配置的阈值时会进行事前告警 拒绝策略告警。当触发拒绝策略时会进行告警 任务执行超时告警。重写 ThreadPoolExecutor 的 afterExecute() 和 beforeExecute()根据当前时间和开始时间的差值算出任务执行时长超过配置的阈值会触发告警 任务排队超时告警。重写 ThreadPoolExecutor 的 beforeExecute()记录提交任务时时间根据当前时间和提交时间的差值算出任务排队时长超过配置的阈值会触发告警。 通过监控 告警可以让我们及时感知到我们业务线程池的执行负载情况第一时间做出调整防止事故的发生。
execute() 提交任务和 submit() 提交任务有啥不同
看到这个问题是不是大多数人都觉得这个我行。execute() 无返回值submit() 有返回值会返回一个 FutureTask然后可以调用 get() 方法阻塞获取返回值。
这个问题主要知道 FutureTask 的实现原理FutureTask 继承体系如下 调用 submit() 方法提交的任务Runnable or Callable会被包装成 FutureTask() 对象。FutureTask 类提供了 7 种任务状态和五个成员变量。 /** Possible state transitions:* NEW - COMPLETING - NORMAL* NEW - COMPLETING - EXCEPTIONAL* NEW - CANCELLED* NEW - INTERRUPTING - INTERRUPTED*/// 构造函数中 state 置为 NEW初始态private static final int NEW 0;// 瞬时态表示完成中private static final int COMPLETING 1;// 正常执行结束后的状态private static final int NORMAL 2;// 异常执行结束后的状态private static final int EXCEPTIONAL 3;// 调用 cancel 方法成功执行后的状态private static final int CANCELLED 4;// 瞬时态中断中private static final int INTERRUPTING 5;// 正常执行中断后的状态private static final int INTERRUPTED 6;// 任务状态以上 7 种private volatile int state;/** 通过 submit() 提交的任务执行完后置为 null*/private CallableV callable;/** 任务执行结果或者调用 get() 要抛出的异常*/private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程会在 run() 方法中通过 cas 赋值*/private volatile Thread runner;/** 调用get()后由等待线程组成的无锁并发栈通过 cas 实现无锁*/private volatile WaitNode waiters;创建 FutureTask 对象时 state 置为 NEWcallable 赋值为我们传入的任务。
run() 方法中会去执行 callable 任务。执行之前先判断任务处于 NEW 状态并且通过 cas 设置 runner 为当前线程成功。然后去调用 call() 执行任务执行成功后会调用 set() 方法将结果赋值给 outcome任务执行抛出异常后会将异常信息调用 setException() 赋值给 outcome。至于为什么要先将状态变为 COMPLETING再变为 NORMAL主要是为了保证在 NORMAL 态时已经完成了 outcome 赋值。finishCompletion() 会去唤醒通过 LockSupport.unpark()那些因调用 get() 而阻塞的线程waiters。 protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}}调用 get() 方法会阻塞获取结果或异常如果 state COMPLETING说明任务已经执行完成NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED则直接通过 report() 方法返回结果或抛出异常。如果state COMPLETING说明任务还在执行中或还没开始执行则调用 awaitDone() 方法进行阻塞等待。 public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit null)throw new NullPointerException();int s state;if (s COMPLETING (s awaitDone(true, unit.toNanos(timeout))) COMPLETING)throw new TimeoutException();return report(s);}awaitDone() 方法则通过 state 状态判断来决定直接返回还是将当前线程添加到 waiters 里然后调用LockSupport.park() 方法挂起当前线程。
还有个重要的 cancel() 方法因为 FutureTask 源码类注释的第一句就说了 FutureTask 是一个可取消的异步计算。代码也非常简单如果 state 不是 NEW 或者通过 CAS 赋值为 INTERRUPTING / CANCELLED 失败则直接返回。反之如果 mayInterruptIfRunning ture表示可能中断在运行中线程则中断线程state 变为 INTERRUPTED最后去唤醒等待的线程。 public boolean cancel(boolean mayInterruptIfRunning) {if (!(state NEW UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t runner;if (t ! null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}以上简单介绍了下 FutureTask 的执行流程篇幅有限源码解读的不是很仔细后面可以考虑单独出一篇文章好好分析下 FutureTask 的源码。
什么是阻塞队列阻塞队列有哪些
阻塞队列 BlockingQueue 继承 Queue是我们熟悉的基本数据结构队列的一种特殊类型。
当从阻塞队列中获取数据时如果队列为空则等待直到队列有元素存入。当向阻塞队列中存入元素时如果队列已满则等待直到队列中有元素被移除。提供 offer()、put()、take()、poll() 等常用方法。
JDK 提供的阻塞队列的实现有以下前 7 种
1ArrayBlockingQueue由数组实现的有界阻塞队列该队列按照 FIFO 对元素进行排序。维护两个整形变量标识队列头尾在数组中的位置在生产者放入和消费者获取数据共用一个锁对象意味着两者无法真正的并行运行性能较低
2LinkedBlockingQueue由链表组成的有界阻塞队列如果不指定大小默认使用 Integer.MAX_VALUE 作为队列大小该队列按照 FIFO 对元素进行排序对生产者和消费者分别维护了独立的锁来控制数据同步意味着该队列有着更高的并发性能
3SynchronousQueue不存储元素的阻塞队列无容量可以设置公平或非公平模式插入操作必须等待获取操作移除元素反之亦然;
4PriorityBlockingQueue支持优先级排序的无界阻塞队列默认情况下根据自然序排序也可以指定 Comparator;
5DelayQueue支持延时获取元素的无界阻塞队列创建元素时可以指定多久之后才能从队列中获取元素常用于缓存系统或定时任务调度系统
6LinkedTransferQueue一个由链表结构组成的无界阻塞队列与LinkedBlockingQueue相比多了transfer和tryTranfer方法该方法在有消费者等待接收元素时会立即将元素传递给消费者
7LinkedBlockingDeque一个由链表结构组成的双端阻塞队列可以从队列的两端插入和删除元素;
8VariableLinkedBlockingQueue说完以上 JDK 提供这几个阻塞队列后还可以说下 LinkedBlockingQueue 是我们使用最广泛的阻塞队列但是 LinkedBlockingQueue 一旦定义好后是不能修改容量 capacity 的。自己在使用线程池的过程中有动态去调整容量的需求所以参考 RabbitMq 里的 VariableLinkedBlockingQueue实现了一个可以调整容量的增强版 LinkedBlockingQueue实现容量的动态调整;
9MemorySafeLinkedBlockingQueue而且 LinkedBlockingQueue 默认是使用 Integer.MAX_VALUE 作为容量的也就是个无界队列可能会有发生 OOM 的风险所以自己实现了一个内存安全的 MemorySafeLinkedBlockingQueue可以配置最大剩余内存当内存达到该值的时候再往队列放任务就会失败很好的保证了不会发生令人头疼的 OOM 问题
10TaskQueue上面讲 Tomcat 线程池时说过该阻塞队列作为 LinkedBlockingQueue 的子类覆写了 offer()、poll()、take() 等方法来调整线程池的执行流程。
重点说下 8、9 这两个自定义阻塞队列来突出你对阻塞队列丰富的使用经验这两队列源码可以自行搜索 MemorySafeLinkedBlockingQueue VariableLinkedBlockingQueue 队列实现
线程池拒绝策略有哪些适用场景是怎么样的
当阻塞队列已满并且达到最大线程数时再提交任务会走拒绝策略流程JDK 提供了拒绝策略顶层接口 RejectedExecutionHandler所有拒绝策略都需要继承该接口JDK 内置了四种拒绝策略。
1AbortPolicy线程池默认的拒绝策略触发时会抛出 RejectedExecutionException 异常。如果是一些比较重要的业务可以使用该拒绝策略在系统不能进一步支持更大并发量的情况下通过抛出异常及时发现问题并进行处理
2CallerRunsPolicy在线程池没关闭的情况下由调用者线程去处理任务反之直接丢弃。此拒绝策略追求任务都能被执行不丢失比较适合并发量不大并且不允许丢失任务的场景场景性能较低
3DiscardPolicy丢弃任务不抛出异常一般无感知。建议一些无关紧要的任务可以使用此策略
4DiscardOldestPolicy丢弃队列中最老的任务然后重新提交被拒绝的任务。需要根据业务场景进行选择是否要用。
3、4 这两种拒绝策略都在会在无感知的情况下丢弃任务需要根据业务场景决定是否要使用。
也可以根据自己需要自定义拒绝策略比如 Dubbo 定义了拒绝策略 AbortPolicyWithReport在抛出异常前会先进行线程堆栈信息的打印。
在使用线程池的过程中遇到过哪些坑或者需要注意的地方
1OOM 问题 刚开始使用线程都是通过 Executors 创建的前面说了这种方式创建的线程池会有发生 OOM 的风险可以举例说明
2任务执行异常丢失问题 可以通过下述4种方式解决 在任务代码中增加 try、catch 异常处理 如果使用的 Future 方式则可通过 Future 对象的 get 方法接收抛出的异常 为工作线程设置 setUncaughtExceptionHandler在 uncaughtException 方法中处理异常 可以重写 afterExecute(Runnable r, Throwable t) 方法拿到异常 t。 3共享线程池问题 整个服务共享一个全局线程池导致任务相互影响耗时长的任务占满资源短耗时任务得不到执行。同时父子线程间会导致死锁的发生进而导致 OOM。
4跟 ThreadLocal 配合使用导致脏数据问题 我们知道 Tomcat 利用线程池来处理收到的请求会复用线程如果我们代码中用到了 ThreadLocal在请求处理完后没有去 remove那每个请求就有可能获取到之前请求遗留的脏值。
5ThreadLocal 在线程池场景下会失效可以考虑用阿里开源的 TTL 来解决。
6需要自定义线程工厂指定线程名称不然发生问题都不知道咋定位。