现实中有这样的用法,创建一批在线程池中运行的 CompletableFuture 实例,然后等待它们全部执行完再继续后面的操作。比如说 AWS 的 Lambda, 单单提交任务到线程池,不等待所有任务全部完成便退出主线程的话,AWS 便认为 Lambda 执行完毕,无视线程池中正在执行的任务而强行结束该 Lambda 实例。
以往我们通常的作法如下
1 2 3 4 5 6 7 8 9 10 |
ExecutorService threadPool = Executors.newFixedThreadPool(10); List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10000) .mapToObj(n -> CompletableFuture.runAsync(() -> { System.out.println("done " + n); }, threadPool)).collect(toList()); futures.forEach(CompletableFuture::join); System.out.println("all done"); |
如果所有的任务均无异常,上面的代码能得到预想的结果,只要上面打印出 all done
的话真的就是表明所有的任务都完成了。但是在循环 join futures
中的每一个 CompletableFuture 时,只要碰到任意一个任务有异常时,便立即抛出给外部线程,不在乎是否还有其他任务正在执行。此时,如果外部未予捕获,当然
System.out.println("all done");
这行代码也不会得到执行。这时候假如线程池未关闭的话(比如前面代码中的线程池中是非 Daemon 线程),那么其他的任务仍然会默默的执行,从此难以获知何时任务全部完成。
为易于理解,再作一下个 CompletableFuture::join 的测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
AtomicInteger count = new AtomicInteger(); ExecutorService threadPool = Executors.newFixedThreadPool(10); List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10000) .mapToObj(n -> CompletableFuture.runAsync(() -> { count.incrementAndGet(); throw new RuntimeException("ex: " + n); //每个任务都抛出异常 }, threadPool)).collect(toList()); try { futures.forEach(CompletableFuture::join); // 发生任何异常便立即抛出 } catch (Exception ex) { System.out.println("done count: " + count.get()); } // 回到主线程立即检查完成多少任务 System.out.println("all done? " + count.get()); System.out.println(count.get()); //其余任务仍在执行,等待所有任务真正执行完成检查完成多少任务 threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); System.out.println("all done. " + count.get()); |
执行后的输出基本类似(捕获到异常时多是还有任务正在执行,线程池还在的话别的任务还是不受影响的)
done count: 1706
all done? 1729
1734
all done. 10000
那么有没有更好的办法去检查提交的所有任务是否全部完成了(含抛出异常的任务)?
- 每次关掉线程池肯定不可取了,要线程池又有何用
- 检查线程池中完成的任务数?这是一个累加数
- 检查线程池中的任务队是否为空,普通线程池或许管用,对于 ForkJoinPool 又不同了,再则可能还在添加其他的任务
解铃还是系铃人,CompletableFuture 惹下的事情还得 CompletableFuture 的方法来解决,那就是可由
CompletableFuture<Void> allof = CompletableFuture.allof(CompletableFuture<?>... cfs);
得到的大总管来处理,headerFuture 也是一个 CompletableFuture, 仍然是调用它的 join()
方法,不再逐个调用了。对比以下两段代码,分别标以代码 1 与代码 2
代码 1
1 2 |
List<CompletableFuture<Void>> futures = ...; CompletableFuture.allof(futures.toArray(new CompletableFuture[]{}).join(); |
代码 2
1 2 |
List<CompletableFuture<Void>> futures = ...; futures.forEach(CompletableFuture::join); |
代码 1 中的任务在执行当中如果有异常的话会 Hold 住,然而总是会在所有任务完成后才抛出执行当中的异常,而代码 2 在 forEach 循环时,碰到任意一个任务有异常便立即抛出。
因此我们就可以用下面的方式来等待所有提交的任务真正完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
AtomicInteger count = new AtomicInteger(); ExecutorService threadPool = Executors.newFixedThreadPool(10); List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10000) .mapToObj(n -> CompletableFuture.runAsync(() -> { count.incrementAndGet(); throw new RuntimeException("ex: " + n); }, threadPool)).collect(toList()); CompletableFuture<Void> headerFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})); try { headerFuture.join(); } catch (Exception ex) { System.out.println("done count: " + count.get()); } System.out.println("all done. " + count.get()); |
执行后的输出必定如下
done count: 10000
all done. 10000
捕获到异常时,必定是所有的任务都已完成。除此之外调用
headerFuture.get();
也能得到类似效果,唯一不同的时,headerFuture.get() 方法还会抛出 InterruptedException 和 ExecutionException。
如果不关心异常的话,也能用 headerFuture.isDone() 来检查所有任务是否完成
1 2 3 |
while(! headerFuture.isDone()){ } System.out.println("all done. " + count.get()); |
isDone() 只检查状态,它与 get() 和 join() 还有的不同是,调用 isDone() 时任务有异常不会传播到外部线程,而 get() 和 join() 是能捕获到任务中执行的异常的(异常由任务线程传播到外部线程)。如果在检查到 isDone() 后,想查明是否发生过异常,还能进一步用下面的方法
1 2 3 4 5 6 |
while (!headerFuture.isDone()) { } // 任何任务发生异常则为 true, 否则 false System.out.println("exception found: " + headerFuture.isCompletedExceptionally()); |
本文链接 https://yanbin.blog/wait-all-completablefuture-done/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。