微信微网站制作,湖南郴州市房价多少一平米,wordpress 首页缓存,网广州建网站站制作1、异步任务的交互
异步任务交互指 将异步任务获取结果的速度相比较#xff0c;按一定的规则( 先到先用 )进行下一步处理。
1.1 applyToEither
applyToEither() 把两个异步任务做比较#xff0c;异步任务先到结果的#xff0c;就对先到的结果进行下一步的操作。
Complet…1、异步任务的交互
异步任务交互指 将异步任务获取结果的速度相比较按一定的规则( 先到先用 )进行下一步处理。
1.1 applyToEither
applyToEither() 把两个异步任务做比较异步任务先到结果的就对先到的结果进行下一步的操作。
CompletableFutureR applyToEither(CompletableFutureT other, FunctionT,R func)
演示案例使用最先完成的异步任务的结果
public class ApplyToEitherDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 开启异步任务1CompletableFutureInteger future1 CompletableFuture.supplyAsync(() - {int x new Random().nextInt(3);CommonUtils.sleepSecond(x);CommonUtils.printThreadLog(任务1耗时: x 秒);return x;});// 开启异步任务2CompletableFutureInteger future2 CompletableFuture.supplyAsync(() - {int y new Random().nextInt(3);CommonUtils.sleepSecond(y);CommonUtils.printThreadLog(任务2耗时: y 秒);return y;});
// 哪些异步任务的结果先到达就使用哪个异步任务的结果CompletableFutureInteger future future1.applyToEither(future2, (result - {CommonUtils.printThreadLog(最先到达的结果: result);return result;}));
// 主线程休眠4秒等待所有异步任务完成CommonUtils.sleepSecond(4);Integer ret future.get();CommonUtils.printThreadLog(ret ret);}
}
速记心法任务1、任务2就像两辆公交哪路公交先到就乘坐(使用)哪路公交。
以下是applyToEither 和其对应的异步回调版本
CompletableFutureR applyToEither(CompletableFutureT other, FunctionT,R func)
CompletableFutureR applyToEitherAsync(CompletableFutureT other, FunctionT,R func)
CompletableFutureR applyToEitherAsync(CompletableFutureT other, FunctionT,R func,Executor executor)
1.2 acceptEither
acceptEither() 把两个异步任务做比较异步任务先到结果的就对先到的结果进行下一步操作 ( 消费使用 )。
CompletableFutureVoid acceptEither(CompletableFutureT other, ConsumerT action)
CompletableFutureVoid acceptEitherAsync(CompletableFutureT other, ConsumerT action)
CompletableFutureVoid acceptEitherAsync(CompletableFutureT other, ConsumerT action,Executor executor)
演示案例使用最先完成的异步任务的结果
public class AcceptEitherDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 异步任务交互CommonUtils.printThreadLog(main start);// 开启异步任务1CompletableFutureInteger future1 CompletableFuture.supplyAsync(() - {int x new Random().nextInt(3);CommonUtils.sleepSecond(x);CommonUtils.printThreadLog(任务1耗时: x 秒);return x;});
// 开启异步任务2CompletableFutureInteger future2 CompletableFuture.supplyAsync(() - {int y new Random().nextInt(3);CommonUtils.sleepSecond(y);CommonUtils.printThreadLog(任务2耗时: y 秒);return y;});
// 哪些异步任务的结果先到达就使用哪个异步任务的结果future1.acceptEither(future2,result - {CommonUtils.printThreadLog(最先到达的结果: result);});
// 主线程休眠4秒等待所有异步任务完成CommonUtils.sleepSecond(4);CommonUtils.printThreadLog(main end);}
}
1.3 runAfterEither
如果不关心最先到达的结果只想在有一个异步任务先完成时得到完成的通知可以使用 runAfterEither() 以下是它的相关方法
CompletableFutureVoid runAfterEither(CompletableFutureT other, Runnable action)
CompletableFutureVoid runAfterEitherAsync(CompletableFutureT other, Runnable action)
CompletableFutureVoid runAfterEitherAsync(CompletableFutureT other, Runnable action, Executor executor)
提示 异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。 2、get() 和 join() 区别
get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。
那么该如何选用呢请看如下案例
public class GetOrJoinDemo {public static void main(String[] args) {CompletableFutureString future CompletableFuture.supplyAsync(() - {return hello;});
String ret null;// 抛出检查时异常必须处理try {ret future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(ret ret);
// 抛出运行时异常可以不处理ret future.join();System.out.println(ret ret);}
}
使用时我们发现get() 抛出检查时异常 需要程序必须处理而join() 方法抛出运行时异常程序可以不处理。所以join() 更适合用在流式编程中。
3、ParallelStream VS CompletableFuture
CompletableFuture 虽然提高了任务并行处理的能力如果它和 Stream API 结合使用能否进一步多个任务的并行处理能力呢
同时对于 Stream API 本身就提供了并行流ParallelStream它们有什么不同呢
我们将通过一个耗时的任务来体现它们的不同更重要地是我们能进一步加强 CompletableFuture 和 Stream API 的结合使用同时搞清楚CompletableFuture 在流式操作的优势
需求创建10个MyTask耗时的任务统计它们执行完的总耗时
定义一个MyTask类来模拟耗时的长任务
public class MyTask {private int duration;
public MyTask(int duration) {this.duration duration;}
// 模拟耗时的长任务public int doWork() {CommonUtils.printThreadLog(doWork);CommonUtils.sleepSecond(duration);return duration;}
}
同时我们创建10个任务每个持续1秒。
IntStream intStream IntStream.range(0, 10);
ListMyTask tasks intStream.mapToObj(item - {return new MyTask(1);
}).collect(Collectors.toList());
3.1 并行流的局限
我们先使用串行执行让所有的任务都在主线程 main 中执行。
public class SequenceDemo {public static void main(String[] args) {// 方案一在主线程中使用串行执行// step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作IntStream intStream IntStream.range(0, 10);ListMyTask tasks intStream.mapToObj(item - {return new MyTask(1);}).collect(Collectors.toList());// step 2: 执行tasks集合中的每个任务统计总耗时long start System.currentTimeMillis();ListInteger result tasks.stream().map(myTask - {return myTask.doWork();}).collect(Collectors.toList());long end System.currentTimeMillis();double costTime (end - start) / 1000.0;System.out.printf(processed %d tasks cost %.2f second,tasks.size(),costTime);}
}
它花费了10秒, 因为每个任务在主线程一个接一个的执行。
因为涉及 Stream API而且存在耗时的长任务所以我们可以使用 parallelStream()
public class ParallelDemo {public static void main(String[] args) {// 方案二使用并行流// step 1: 创建10个MyTask对象每个任务持续1s存入List集合IntStream intStream IntStream.range(0, 10);ListMyTask tasks intStream.mapToObj(item - {return new MyTask(1);}).collect(Collectors.toList());// step 2: 执行10个MyTask,统计总耗时long start System.currentTimeMillis();ListInteger results tasks.parallelStream().map(myTask - {return myTask.doWork();}).collect(Collectors.toList());long end System.currentTimeMillis();double costTime (end - start) / 1000.0;System.out.printf(processed %d tasks %.2f second,tasks.size(),costTime);}
}
它花费了2秒多因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程)需要注意是运行结果由自己电脑CPU的核数决定。
3.2 CompletableFuture 在流式操作的优势
让我们看看使用CompletableFuture是否执行的更有效率
public class CompletableFutureDemo {public static void main(String[] args) {// 需求创建10MyTask耗时的任务统计它们执行完的总耗时// 方案三使用CompletableFuture// step 1: 创建10个MyTask对象每个任务持续1s存入List集合IntStream intStream IntStream.range(0, 10);ListMyTask tasks intStream.mapToObj(item - {return new MyTask(1);}).collect(Collectors.toList());// step 2: 根据MyTask对象构建10个耗时的异步任务long start System.currentTimeMillis();ListCompletableFutureInteger futures tasks.stream().map(myTask - {return CompletableFuture.supplyAsync(() - {return myTask.doWork();});}).collect(Collectors.toList());// step 3: 当所有任务完成时获取每个异步任务的执行结果存入List集合中ListInteger results futures.stream().map(future - {return future.join();}).collect(Collectors.toList());long end System.currentTimeMillis();double costTime (end - start) / 1000.0;System.out.printf(processed %d tasks cost %.2f second,tasks.size(),costTime);}
}
运行发现两者使用的时间大致一样。能否进一步优化呢
CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程如下所示
public class CompletableFutureDemo2 {public static void main(String[] args) {// 需求创建10MyTask耗时的任务统计它们执行完的总耗时// 方案三使用CompletableFuture// step 1: 创建10个MyTask对象每个任务持续1s存入List集合IntStream intStream IntStream.range(0, 10);ListMyTask tasks intStream.mapToObj(item - {return new MyTask(1);}).collect(Collectors.toList());// 准备线程池final int N_CPU Runtime.getRuntime().availableProcessors();// 设置线程池的数量最少是10个最大是16个ExecutorService executor Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));// step 2: 根据MyTask对象构建10个耗时的异步任务long start System.currentTimeMillis();ListCompletableFutureInteger futures tasks.stream().map(myTask - {return CompletableFuture.supplyAsync(() - {return myTask.doWork();},executor);}).collect(Collectors.toList());// step 3: 当所有任务完成时获取每个异步任务的执行结果存入List集合中ListInteger results futures.stream().map(future - {return future.join();}).collect(Collectors.toList());long end System.currentTimeMillis();double costTime (end - start) / 1000.0;System.out.printf(processed %d tasks cost %.2f second,tasks.size(),costTime);// 关闭线程池executor.shutdown();}
}
测试代码时电脑配置是4核8线程而我们创建的线程池中线程数最少也是10个所以每个线程负责一个任务( 耗时1s )总体来说处理10个任务总共需要约1秒。
3.3 合理配置线程池中的线程数
正如我们看到的CompletableFuture 可以更好地控制线程池中线程的数量而 ParallelStream 不能。
问题1如何选用 CompletableFuture 和 ParallelStream
如果你的任务是IO密集型的你应该使用CompletableFuture
如果你的任务是CPU密集型的使用比处理器更多的线程是没有意义的所以选择ParallelStream 因为它不需要创建线程池更容易使用。
问题2IO密集型任务和CPU密集型任务的区别
CPU密集型也叫计算密集型此时系统运行时大部分的状况是CPU占用率近乎100%I/O在很短的时间就可以完成而CPU还有许多运算要处理CPU 使用率很高。比如说要计算123… 10万亿、天文计算、圆周率后几十位等 都是属于CPU密集型程序。
CPU密集型任务的特点大量计算CPU占用率一般都很高I/O时间很短
IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作但CPU的使用率不高。
简单的说就是需要大量的输入输出例如读写文件、传输文件、网络请求。
IO密集型任务的特点大量网络请求文件操作CPU运算少很多时候CPU在等待资源才能进一步操作。
问题3既然要控制线程池中线程的数量多少合适呢
如果是CPU密集型任务就需要尽量压榨CPU参考值可以设为 Ncpu1
如果是IO密集型任务参考值可以设置为 2 * Ncpu其中Ncpu 表示 核心数。
注意的是以上给的是参考值详细配置超出本次课程的范围选不赘述。