等待所有的 CompletableFuture 完成

现实中有这样的用法,创建一批在线程池中运行的  CompletableFuture 实例,然后等待它们全部执行完再继续后面的操作。比如说 AWS 的 Lambda, 单单提交任务到线程池,不等待所有任务全部完成便退出主线程的话,AWS 便认为 Lambda 执行完毕,无视线程池中正在执行的任务而强行结束该 Lambda 实例。

以往我们通常的作法如下

如果所有的任务均无异常,上面的代码能得到预想的结果,只要上面打印出 all done 的话真的就是表明所有的任务都完成了。但是在循环 join futures 中的每一个 CompletableFuture 时,只要碰到任意一个任务有异常时,便立即抛出给外部线程,不在乎是否还有其他任务正在执行。此时,如果外部未予捕获,当然

System.out.println("all done");

这行代码也不会得到执行。这时候假如线程池未关闭的话(比如前面代码中的线程池中是非 Daemon 线程),那么其他的任务仍然会默默的执行,从此难以获知何时任务全部完成。

为易于理解,再作一下个 CompletableFuture::join 的测试

执行后的输出基本类似(捕获到异常时多是还有任务正在执行,线程池还在的话别的任务还是不受影响的)

done count: 1706
all done? 1729
1734
all done. 10000

那么有没有更好的办法去检查提交的所有任务是否全部完成了(含抛出异常的任务)?

  1. 每次关掉线程池肯定不可取了,要线程池又有何用
  2. 检查线程池中完成的任务数?这是一个累加数
  3. 检查线程池中的任务队是否为空,普通线程池或许管用,对于 ForkJoinPool 又不同了,再则可能还在添加其他的任务

解铃还是系铃人,CompletableFuture 惹下的事情还得 CompletableFuture 的方法来解决,那就是可由

CompletableFuture<Void> allof = CompletableFuture.allof(CompletableFuture<?>... cfs);

得到的大总管来处理,headerFuture 也是一个 CompletableFuture, 仍然是调用它的 join() 方法,不再逐个调用了。对比以下两段代码,分别标以代码 1 与代码 2

代码 1

代码 2

代码 1 中的任务在执行当中如果有异常的话会 Hold 住,然而总是会在所有任务完成后才抛出执行当中的异常,而代码 2 在 forEach 循环时,碰到任意一个任务有异常便立即抛出。

因此我们就可以用下面的方式来等待所有提交的任务真正完成

执行后的输出必定如下

done count: 10000
all done. 10000

捕获到异常时,必定是所有的任务都已完成。除此之外调用

headerFuture.get();

也能得到类似效果,唯一不同的时,headerFuture.get() 方法还会抛出 InterruptedException 和 ExecutionException。

如果不关心异常的话,也能用 headerFuture.isDone() 来检查所有任务是否完成

isDone()  只检查状态,它与 get()  和 join() 还有的不同是,调用 isDone()  时任务有异常不会传播到外部线程,而 get() 和 join() 是能捕获到任务中执行的异常的(异常由任务线程传播到外部线程)。如果在检查到 isDone() 后,想查明是否发生过异常,还能进一步用下面的方法

本文链接 https://yanbin.blog/wait-all-completablefuture-done/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments