Spring 5.0 发布之时(2017-09-28) WebFlux 是它的一大亮点,即响应式 Web 编程。因为同一时代的 RxJava 2 和 Akka Actor 具备一定的流行度,Spring 5 也来赶这一趟时髦。于是多线程编程大致两种模式
- CompletableFuture, runAsync, supplyAsync, whenComplete...
- Obervable, observeOn, subscribe, subscribeOn...
以及 PlayFramework 的 Action 方法无论返回 Result 还是 CompletableStage<Result>, 内部都是异步的模式。
Akka Actor 比 CompletableFuture, RxJava,以及本文将要讨论的 Reactor 更高级的是 Akka System 可以分布式部署,Actor 分布在不同的进程,主机上。
那时候业界已行成了一个 Reactive Stream 规范 org.reactivestreams(Publisher, Subscriber, Subscription, Processor), JDK 9 也奈不住寂寞,无法对 Reactive Stream 置若罔闻,在 2017-09-21 发布时加入了 java.util.concurrent.flow 包(Publisher, Subscriber, Processor, Subscription) 作为自己的 Reactive Stream 规范。
然而随着云计算的普及,基于消息系统解耦合的任务分解让代码变得更清晰,编码中甚至不用考虑多线程的行为,部署方式能解决任务执行的效率。
Spring 5 在 Reactive Streams 实现方面选择了 Project Reactor, 随之而来的是它的 Mono, Flux 这样的概念,这或许是众人对 Spring Reactive / WebFlux 望而生畏之处。本来 JDK 中的 Future, CompletableFuture 是很容易理解事物,代表着未来预期,成功或失败,也不用太关心谁来达成预期,这与 JavaScript 和 PlayFramework 的 Promise 大体相当。
而 WebFlux 是什么鬼,具体讲包括 Project Reactor 的 Mono(单个) 和 Flux(多个),可也没有 WebMono。首先,Mono 和 Flux 对我来说是两个较生僻的词, 前缀 mono- 表示单个, 比 monotone 单单调,其实与之对应的前缀是 poly-
, 表示多,比如多边形 polygon。然而 Reactor 在表示多的时候取了 Flux
这个词,中文意思有流量,流出,变迁,大约要表述的动态中的多个元素。
还是 JDK 的 CompletableFuture 简单,一个就是 CompletableFuture<String>, 多个就用 CompletableFuture<List<String>, 而采用 Reactor 的 Mono/Flux 的话就有
- Mono<String> 表示一个 String
- Flux<String> 多个 String
- Flux<String>.collectList() 转换成了 Mono<List<String>>
- Mono<String>.repeat() 可转换为 Flux<String>
Mono 和 Flux 都实现了 org.reactivestreams.Publisher 接口, 虽然 Mono, Flux 在某种意义上也代表着未来预期,但我们应该以消息的概念来理解它们的含义,订阅了消息,之后将会有新消息发布过来,通过调用它的 subscribeOn, publishOn 等方法委托给别的线程去获得结果,单从字面上 Mono, Flux 不如 Future, CompletableFuture, Promise 形像。
说那么多了,不如来感受一下 Spring 的 WebFlux 编程方式,借助于 SpringBoot(2.7.11) 可快速创建一个 Spring Web 项目,只需用到依赖
1 2 3 4 |
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> |
它会使用 Netty 替代 Tomcat 作为应用服务器,如果仍要使用 Tomcat, 可用 spring-boot-starter-web 和 spring-webflux 这两个依赖组合。
编写一个涵盖 Flux<String>, Mono<String>, Mono<List<String>>, Flux<String>->Mono<List<String>>, 以及 CompletableFuture<String> 的 Controller 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
@RestController public class ReactiveController { @GetMapping("/mono") public Mono<String> index() { System.out.println(Thread.currentThread()); return Mono.fromSupplier(() -> "hello world"); } @GetMapping("/flux") public Flux<String> list() { System.out.println(Thread.currentThread()); return Flux.fromArray(new String[]{"Java", "Python"}); } @GetMapping("/mono-list") public Mono<List<String>> monoList() { System.out.println(Thread.currentThread()); return Mono.fromSupplier(() -> Arrays.asList("Java", "Python")); } @GetMapping("/flux-to-mono-list") public Mono<List<String>> fluxToMonoList() { System.out.println(Thread.currentThread()); return Flux.fromArray(new String[]{"Java", "Python"}).collectList(); } @Async @GetMapping("/async1") public CompletableFuture<String> async1() { System.out.println(Thread.currentThread()); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello CompletableFuture"; }); } @GetMapping("/async2") public CompletableFuture<String> async2() { System.out.println(Thread.currentThread()); return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "Hello CompletableFuture"; }); } } |
由于用到了 @Async 注解,需要在配置中加上 @EnableAsync 才能让 @Async 注解的函数在另一线程中执行
现在来依次测试
➜ curl -i http://localhost:8080/mono
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 11
hello world
➜ curl -i http://localhost:8080/flux
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/plain;charset=UTF-8
JavaPython
➜ curl -i http://localhost:8080/flux-to-mono-list
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 17
["Java","Python"]
➜ curl -i http://localhost:8080/async1
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 23
Hello CompletableFuture
➜ curl -i http://localhost:8080/async2
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 23
Hello CompletableFuture
访问 /async1
或 /async2
时在客户端要等 5 秒种才能得到结果,/async1
由线程 Thread[task-1,5,main]
执行,其他的 API 都是在 HTTP 线程中执行的,例如线程名 Thread[reactor-http-nio-2,5,main]
写作本文时对 Spring Web 有了一个新发现(这就是写下来的好处),即 Controller 方法可直接返回 CompletableFuture<T>, 原来 Spring Web 能够自动对该 CompletableFuture 值进行兑现再返回给客户端,而不是一个 CompletableFuture 的 JSON 表示格式。
Spring Web 处理 Controller 方法返回的 CompletableFuture<T> 相关的类有 HandlerMethodReturnValueHandler, DeferredResultProcessingInterceptor, DeferredResultMethodReturnValueHandler, TimeoutDeferredResultProcessingInterceptor
Controller 方法返回的 CompletableFuture, 最终由 Spring Web 框架调用 CompletableFuture.handle() 方法进行兑现。同时在 Service 方法中也只要直接返回 CompletableFuture, 线程池自己安排,或者用 @Async 标注在 Service 方法上
1 2 3 4 5 6 7 |
// Service 方法 @Async("customThreadPool") public CompletableFuture<List<User>> retrieveUsers() { List<User> users = ....... return CompletableFuture.completableFuture(users); } |
话题重新回到 Spring WebFlux, 在 Controller 方法中代之以 CompletableFuture 的是返回 Mono 或 Flux。这里存在一个选择上的综合症,多个值时到底是 Flux<Object> 还是 Mono<List<Object>> 呢?Reactor 区分 Flux 和 Mono 是因为它们都有自己专属的 API,理论上可用 Mono<List<Object>> 一撸到底,List 本身就用 JDK 的 Stream API,但无法应用 Flux 的窗口(Windows) 和背压(Backpressure, 流量控制) 等特性。也不像 CompletableFuture 是 JDK 自带的 API, 而 Mono, Flux 是外来物,让 Service 及后端的方法返回 Mono 或 Flux 心理上会产生排异性。Mono/Flux 和 CompletableFuture 变成了两种编程方式的选择,它们之间还可以相互转换,Project Reactor 还提供了 Mono/Flux 与 JDK 9 flow API 的适配。
说到底,使用 WebFlux 的便利就是能用上 Reactor 的编程模式,它的强大的 API, subscribe, publish, onSubscribe, onNext, onError, Source, Sink, Retry, Flux 对集合,流,窗口,流量的控制(Backpressure)。单纯的异步编程使用 CompletableFuture 安排线程池来执行任务,对 CompletableFuture 的串联,组合,事件处理就足够了,再配合上 Java8 的 Stream API 处理集合。
使用 WebFlux 同时还需要第三方的组件支持 Reactor, 如 Spring Web 的 WebClient。
来一个简单的 Mono 实例,Controller 方法 index()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@GetMapping("/mono") public Mono<Map<String, Integer>> index() { return Mono.fromSupplier(() -> { System.out.println("supplier: " + Thread.currentThread()); return "helloworld"; }).subscribeOn(Schedulers.newSingle("supplier")) .flatMap(v -> { System.out.println("flatMap: " + Thread.currentThread()); return WebClient.builder().build().get().uri("http://yanbin.blog?a="+v).retrieve().toEntity(String.class); }).map(v -> { System.out.println("map: " + Thread.currentThread()); Map<String, Integer> result = new HashMap<>(); result.put("statusCode", v.getStatusCodeValue()); return result; }).publishOn(Schedulers.newSingle("publisher")) .doOnNext(s -> { System.out.println("onNext: " + Thread.currentThread()); }); } |
启动服务后请求 http://localhost:8080
➜ curl -i http://localhost:8080/mono
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 18
{"statusCode":301}
在控制台的输出为
supplier: Thread[supplier-1,5,main]
flatMap: Thread[supplier-1,5,main]
map: Thread[reactor-http-nio-4,5,main]
onNext: Thread[publisher-2,5,main]
应用服务器是 Netty, 特别关注一下上面每一步骤所使用的线程,supplier 和 flatMap 用了 subscribeOn 的 Schedulers.newSingle("supplier") 线程池, doOnNext 使用了 publishOn 的 Schedulers.newSingle("publisher") 线程池。
如果去除上面的 subscribeOn 和 publishOn 的代码,那么所有操作都在 Netty 的 HTTP 线程中执行,打印出的是
supplier: Thread[reactor-http-nio-2,5,main]
flatMap: Thread[reactor-http-nio-2,5,main]
map: Thread[reactor-http-nio-2,5,main]
onNext: Thread[reactor-http-nio-2,5,main]
再来一个 Flux 的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@GetMapping("/flux") public Flux<Integer> list() { return Flux.range(1, 10) .parallel(2) .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(5))) .flatMap(v -> { System.out.println("flatMap: " + Thread.currentThread()); return WebClient.builder().build().get().uri("http://yanbin.blog?id=" + v).retrieve().toEntity(String.class); }) .map(ResponseEntity::getStatusCodeValue) .sequential() .take(8); } |
访问 http://localhost:8080/flux
➜ curl -i http://localhost:8080/flux
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json
[301,301,301,301,301,301,301,301]
控制台的输出
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
运行机制说明:
- parallel(2) 控制了同时只能执行两个任务,虽然配置的线程池大小是 5,实际只用了两个线程 thread-3,5 和 thread-4,5
- 最后 take(8) 只取前 8 个结果,但实际执行了 10 个元素的 flatMap, 并非 lazy map
Project Reactor 和 RxJava2 的编程模型基本是一样的,与 CompletableFuture 相比,任务实际由哪个线程池来执行较难以理解, subscribeOn 既管前又管后。
总结
本文说那么多,其实对多线程编程有两种编程风格的选择,CompletableFuture 还是 Reactive
- CompletableFuture 在调用 runAsync, supplyAsync 中自行选择线程池,onComplete, exceptionally 回调处理结果。多个 CompletableFuture 可串联,合并; 它的集合结果用 JDK 的 Stream API 来处理
- Reactive 编程风格目前主要有两种选择,RxJava 和 Project Reactor。它应以处理消息的方式来理解如何编程,用类似 subscribeOn 和 publishOn 来指派线程池。Reactor 的 Flux 也提供了许多像 JDK Stream API 一样的集合处理 API
- Reactor 提供了在 Mono/Flux 与 CompletableFuture 之间转换的 API
- Java 9 开始引入了 java.util.concurrent.flow 包,其中只定义了 reactive stream 的相关接口。Reactor 提供了对 Java flow API 的适配
- Akka System 允许在分布式环境部署 Akka Actor, 并有 Actor 出错重试机制。Reactor 也能重试
- Reactor 相对于 RxJava 来讲功能上更完备
- 在当前云计算流行的时代,Reactive 编程逐渐变得不那么重要
本人几年前曾细记述过几篇有关响应式编程的日志
其他链接:
本文链接 https://yanbin.blog/spring-5-mono-flux-reactor-programming/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
学到了