Java 8 CompletableFuture 浅入
Java 1.5 有了 Future, 可谓是跨了一大步,继而 Java 1.8 新加入一个 Future 的实现 CompletableFuture, 从此线程与线程之间可以愉快的对话了。最初两个线程间的协调我采用过 Object 的
如果是简单的等待所有线程完成可使用 Java 1.5 的 CountDownLatch, 这里有一篇介绍 CountDownLatch 协调线程, 就是实现的 waitAll(threads) 功能。而 Java 8 的
顾名思义,CompletableFuture 代表着一个 Future 完成后该干点什么,具体大致有:
有时候可以把 Future 想像成与线程是一一对应的。
试想一下,如过不用
当然,上面的代码改用
请看用
CompletableFuture 的异常处理
如果在设置 CompletableFuture.complete(value) 之前出现了异常,那么
上面代码执行后可以在控制台看到异常输出
办法一是调用
这时候在
程序在捕获到异常到终止,异常类型是 ExecutionException, 而不是原始的 RuntimeExeption。
现实中我们不会这么去构造并管理
因此,只要调用方法
前面的代码都是显式的用
同样的输出结果。但如果把上面的
如果要深入了解
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
wait() 和 notify() , Thread 的 join() 方法,那可算是很低级的 API 了,是否很多 Java 程序都不知道它们的存在,或根本没用过它们。如果是简单的等待所有线程完成可使用 Java 1.5 的 CountDownLatch, 这里有一篇介绍 CountDownLatch 协调线程, 就是实现的 waitAll(threads) 功能。而 Java 8 的
CompletableFuture 的功能就多去,可简单使用它实现异步方法。虽说 CompletableFuture 实现了 Future 接口,但它多数方法源自于 CompletionStage, 所以还里氏代换,用 Future 来引用 CompletableFuture 实例就很牵强了; 这也是为什么 PlayFramework 自 2.5 开始直接暴露的类型是 CompletionStage 而非其他两个。顾名思义,CompletableFuture 代表着一个 Future 完成后该干点什么,具体大致有:
- Future 完成后执行动作,或求取下一个 Future 的值。then...
- 多个 Future 的协调; 同时完成该怎么,其中一个完成该如何。allOf, anyOf
有时候可以把 Future 想像成与线程是一一对应的。
CompletableFuture 有太多太多的方法,并伴有 async 与 非 async 两个版本。本文之标题所谓 浅入, 确不敢说是深入浅出,而且要达到对 CompletableFuture 的基本了解亦非本文的目的。看完之后只能知道何以谓之 Completable, 不触及线程间的交互。试想一下,如过不用
Future 或 CompletableFuture, 想要实现等待某个线程完成之后才做后续的事,可以预设一段时间 Thread.sleep(xxx) 停下来等待,这很不可靠,时间短了线程没完,长了浪费时间; 或者采用来自于 巩固 Java Future 的使用 最后一段代码的方式 1AtomicReference<String> reference = new AtomicReference<>();
2
3new Thread(() -> {
4 //do something that is time-consuming
5 reference.set("I'm done"); //任务完成完设置 reference 的值
6}).start();
7
8while(reference.get() == null) { //耐心的等待,直到 reference.get() 有值为止
9}
10
11System.out.println("Finally, " + reference.get());当然,上面的代码改用
Future 来写会简单些,但仍然是调用 get() 来阻塞当前线程来等待,还每次要捕获 InterruptedException, ExecutionException 或 TimeoutException 异常。要是换作 CompletableFuture 来表述的话就更为直观,并且通过回调函数来处理后续操作,让代码行文更为流畅。请看用
CompletableFuture 稍加润色的代码 1package cc.unmi;
2
3import java.util.concurrent.CompletableFuture;
4
5public class Main {
6
7 public static void main(String[] args) {
8 CompletableFuture<Double> futurePrice = getPriceAsync();
9
10 //do anything you want, 当前线程不被阻塞
11 System.out.println(111);
12
13 //线程任务完成的话,执行回调函数,不阻塞后续操作
14 futurePrice.whenComplete((aDouble, throwable) -> {
15 System.out.println(aDouble);
16 //do something else
17 });
18
19 System.out.println(222);
20 }
21
22 static CompletableFuture<Double> getPriceAsync() {
23 CompletableFuture<Double> futurePrice = new CompletableFuture<>();
24 new Thread(() -> {
25 try {
26 Thread.sleep(5000);
27 } catch (InterruptedException e) {
28 e.printStackTrace();
29 }
30 futurePrice.complete(23.55);
31 }).start();
32 return futurePrice;
33 }
34}getPriceAsync() 就是一个异步方法,调用后马上返回得到一个 futurePrice, 用 Thread.sleep(5000) 模拟成一个耗时操作,线程执行完才设置 futurePrice 为完成状态并赋予结果。CompletableFuture 的 whenComplete() 也是异步的,所以我们能看到输出结果如下111如果我们实际使用
222
23.55
CompletableFuture 时不调用 Future 接口的 get() 等方法,上面的引用类型可以改成 CompletationStage, 以免受 get() 等方法的干扰。CompletableFuture 的异常处理
如果在设置 CompletableFuture.complete(value) 之前出现了异常,那么
get() 或其他回调函数像 whenComplete() 都会无限期的等待下去。例如下面的代码: 1public static void main(string[] args) throws ExecutionException, InterruptedException {
2 CompletableFuture<Double> futurePrice = new CompletableFuture<>();
3 new Thread(() -> {
4 if(true) {
5 throw new RuntimeExeption("");
6 }
7 futurePrice.complete(23.5);
8 }).start();
9
10 System.out.println(futurePrice.get());
11}上面代码执行后可以在控制台看到异常输出
java.lang.RuntimExeption(), 但是异常并不会在线程间传播,所以 futurePrice.get() 一直在等待。
办法一是调用 get(timeout) 时给定一个超时时间,如果指定时间内还没有获得结果则得到 TimeoutException。另一种办法是要在线程中通过 completeExceptionally(ex) 来传播异常 1public static void main(String[] args) throws ExecutionException, InterruptedException {
2 CompletableFuture<Double> futurePrice = new CompletableFuture<>();
3 new Thread(() -> {
4 try {
5 if (true) {
6 throw new RuntimeException("Something wrong");
7 }
8 futurePrice.complete(23.5);
9 } catch (Exception ex) {
10 futurePrice.completeExceptionally(ex); //捕获的异常还会由 ExecutionException 包裹一下
11 }
12 }).start();
13
14 System.out.println(futurePrice.get());
15}这时候在
futurePrice.get() 马上就能收到 `java.util.concurrent.ExecutionException: java.lang.Exception: Something wrong` 异常
程序在捕获到异常到终止,异常类型是 ExecutionException, 而不是原始的 RuntimeExeption。现实中我们不会这么去构造并管理
CompletableFuture, 但上面对我们理解它还是有帮助的。我们多用 supplyAsync(...) 静态方法来获得 CompletableFuture 实例,因为它同时给我们处理了 completeExceptionally(ex) 的细节,所以上面的代码效果等同于下面的写法,执行后的效果也是一致的。 1public static void main(String[] args) throws ExecutionException, InterruptedException {
2 CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> {
3 if (true) {
4 throw new RuntimeException("Something wrong");
5 }
6 return 23.5;
7 }, runnable -> new Thread(runnable).start());
8
9 System.out.println(futurePrice.get());
10} 因此,只要调用方法
CompletableFuture.supplyAsync(job), 这个 job 便会在在分配到了线程后立即执行,无须等待后面的 thenAccept() 等操作。前面的代码都是显式的用
new Thread(...).start() 来启动线程,如今我会尽量避免用这种方式来启动线程,而是用 Java 1.5 的 ExecutorService, 所以再加以改造: 1package cc.unmi;
2
3import java.io.IOException;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.CompletionStage;
6
7public class Main {
8
9 public static void main(String[] args) throws IOException {
10 CompletionStage<Double> futurePrice = CompletableFuture.supplyAsync(() -> {
11 try {
12 Thread.sleep(5000);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 return 23.55;
17 });
18 System.out.println(111);
19 futurePrice.thenAccept(System.out::println);
20 System.out.println(222);
21
22 System.in.read();
23 }
24}同样的输出结果。但如果把上面的
System.in.read() 移除掉,将看不到 23.55 的输出程序就直接退出了,为什么了呢?因为 CompletableFuture.supplyAsync() 方法默认把任务提交到 ForkJoinPool 线程池中执行,而它的线程设置了 daemon 属性为 true, 所以它阻止不了主线程的退出,才用 System.in.read() 维持主线程的执行。如果换成别的线程池类型就可不需要代码 System.in.read(), 再变 1package cc.unmi;
2
3import java.io.IOException;
4import java.util.concurrent.*;
5
6public class Main {
7
8 public static void main(String[] args) throws IOException {
9 ExecutorService executor = Executors.newCachedThreadPool();
10 CompletionStage<Double> futurePrice = CompletableFuture.supplyAsync(() -> {
11 try {
12 Thread.sleep(5000);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 return 23.55;
17 }, executor);
18
19 System.out.println(111);
20 futurePrice.thenAccept(System.out::println);
21 System.out.println(222);
22
23 executor.shutdown();
24 }
25}executor.shutdown() 并不是立即关掉线程池,而是采取更温柔, 安全的方式,等线程池中没有正在执行的任务时才关闭,从而结束主程序。如果要深入了解
CompletableFuture 的用法更应该关注它的几个静态方法,以及 CompletionStage 接口中定义的所有法。
永久链接 https://yanbin.blog/java-8-completablefuture-brief-touch/, 来自 隔叶黄莺 Yanbin's Blog[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。