Spring 5 响应式编程研究

Spring 5.0 发布之时(2017-09-28) WebFlux 是它的一大亮点,即响应式 Web 编程。因为同一时代的 RxJava 2 和 Akka Actor 具备一定的流行度,Spring 5 也来赶这一趟时髦。于是多线程编程大致两种模式

  1. CompletableFuture, runAsync, supplyAsync, whenComplete...
  2. Obervable, observeOn, subscribe, subscribeOn...

以及 PlayFramework 的 Action 方法无论返回 Result 还是 CompletableStage<Result>, 内部都是异步的模式。

Akka Actor 比 CompletableFuture, RxJava,以及本文将要讨论的 Reactor 更高级的是 Akka System 可以分布式部署,Actor 分布在不同的进程,主机上。 

那时候业界已行成了一个 Reactive Stream 规范 org.reactivestreams(Publisher, Subscriber, Subscription, Processor), JDK 9 也奈不住寂寞,无法对 Reactive Stream 置若罔闻,在 2017-09-21 发布时加入了 java.util.concurrent.flow 包(Publisher, Subscriber, Processor, Subscription) 作为自己的 Reactive Stream 规范。

然而随着云计算的普及,基于消息系统解耦合的任务分解让代码变得更清晰,编码中甚至不用考虑多线程的行为,部署方式能解决任务执行的效率。

Spring 5 在 Reactive Streams 实现方面选择了 Project Reactor, 随之而来的是它的 Mono, Flux 这样的概念,这或许是众人对 Spring Reactive / WebFlux 望而生畏之处。本来 JDK 中的 Future, CompletableFuture 是很容易理解事物,代表着未来预期,成功或失败,也不用太关心谁来达成预期,这与 JavaScript 和 PlayFramework 的 Promise 大体相当。

而 WebFlux 是什么鬼,具体讲包括 Project Reactor 的 Mono(单个) 和 Flux(多个),可也没有 WebMono。首先,Mono 和 Flux 对我来说是两个较生僻的词, 前缀 mono- 表示单个, 比 monotone 单单调,其实与之对应的前缀是 poly-, 表示多,比如多边形 polygon。然而 Reactor 在表示多的时候取了 Flux 这个词,中文意思有流量,流出,变迁,大约要表述的动态中的多个元素。

还是 JDK 的 CompletableFuture 简单,一个就是 CompletableFuture<String>, 多个就用 CompletableFuture<List<String>, 而采用 Reactor 的 Mono/Flux 的话就有

  1. Mono<String> 表示一个 String
  2. Flux<String> 多个 String
  3. Flux<String>.collectList() 转换成了 Mono<List<String>>
  4. Mono<String>.repeat() 可转换为 Flux<String>

Mono 和 Flux 都实现了 org.reactivestreams.Publisher 接口, 虽然 Mono, Flux 在某种意义上也代表着未来预期,但我们应该以消息的概念来理解它们的含义,订阅了消息,之后将会有新消息发布过来,通过调用它的 subscribeOn, publishOn 等方法委托给别的线程去获得结果,单从字面上 Mono, Flux 不如 Future, CompletableFuture, Promise 形像。

说那么多了,不如来感受一下 Spring 的 WebFlux 编程方式,借助于 SpringBoot(2.7.11) 可快速创建一个 Spring Web 项目,只需用到依赖

它会使用 Netty 替代 Tomcat 作为应用服务器,如果仍要使用 Tomcat, 可用 spring-boot-starter-web 和  spring-webflux 这两个依赖组合。

编写一个涵盖 Flux<String>, Mono<String>, Mono<List<String>>, Flux<String>->Mono<List<String>>, 以及 CompletableFuture<String> 的 Controller 类

由于用到了 @Async 注解,需要在配置中加上 @EnableAsync 才能让 @Async 注解的函数在另一线程中执行

现在来依次测试

➜  curl -i http://localhost:8080/mono
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 11
hello world
➜  curl -i http://localhost:8080/flux
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: text/plain;charset=UTF-8
JavaPython
➜  curl -i http://localhost:8080/flux-to-mono-list
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 17
["Java","Python"]
➜  curl -i http://localhost:8080/async1
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 23
Hello CompletableFuture
➜  curl -i http://localhost:8080/async2
HTTP/1.1 200 OK
Content-Type: text/plain;charset=UTF-8
Content-Length: 23
Hello CompletableFuture

访问 /async1/async2 时在客户端要等 5 秒种才能得到结果,/async1 由线程 Thread[task-1,5,main] 执行,其他的 API 都是在 HTTP 线程中执行的,例如线程名 Thread[reactor-http-nio-2,5,main]

写作本文时对 Spring Web 有了一个新发现(这就是写下来的好处),即 Controller 方法可直接返回 CompletableFuture<T>,  原来 Spring Web 能够自动对该 CompletableFuture 值进行兑现再返回给客户端,而不是一个 CompletableFuture 的 JSON 表示格式。

Spring Web 处理 Controller 方法返回的 CompletableFuture<T> 相关的类有 HandlerMethodReturnValueHandler, DeferredResultProcessingInterceptor, DeferredResultMethodReturnValueHandler, TimeoutDeferredResultProcessingInterceptor

Controller 方法返回的 CompletableFuture, 最终由 Spring Web 框架调用 CompletableFuture.handle() 方法进行兑现。同时在 Service 方法中也只要直接返回 CompletableFuture, 线程池自己安排,或者用 @Async 标注在 Service 方法上

话题重新回到 Spring WebFlux, 在 Controller 方法中代之以 CompletableFuture 的是返回 Mono 或 Flux。这里存在一个选择上的综合症,多个值时到底是 Flux<Object> 还是 Mono<List<Object>> 呢?Reactor 区分 Flux 和  Mono 是因为它们都有自己专属的 API,理论上可用 Mono<List<Object>> 一撸到底,List 本身就用 JDK 的 Stream API,但无法应用 Flux 的窗口(Windows) 和背压(Backpressure, 流量控制) 等特性。也不像 CompletableFuture 是 JDK 自带的 API, 而 Mono, Flux 是外来物,让 Service 及后端的方法返回 Mono 或 Flux 心理上会产生排异性。Mono/Flux 和 CompletableFuture 变成了两种编程方式的选择,它们之间还可以相互转换,Project Reactor 还提供了 Mono/Flux 与 JDK 9 flow API 的适配。

说到底,使用 WebFlux 的便利就是能用上 Reactor 的编程模式,它的强大的 API, subscribe, publish, onSubscribe, onNext, onError, Source, Sink, Retry, Flux 对集合,流,窗口,流量的控制(Backpressure)。单纯的异步编程使用 CompletableFuture 安排线程池来执行任务,对 CompletableFuture 的串联,组合,事件处理就足够了,再配合上 Java8 的 Stream API 处理集合。

使用 WebFlux 同时还需要第三方的组件支持 Reactor, 如 Spring Web 的 WebClient。

来一个简单的  Mono 实例,Controller 方法 index()

启动服务后请求 http://localhost:8080

➜  curl -i http://localhost:8080/mono
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 18
{"statusCode":301}

在控制台的输出为

supplier: Thread[supplier-1,5,main]
flatMap: Thread[supplier-1,5,main]
map: Thread[reactor-http-nio-4,5,main]
onNext: Thread[publisher-2,5,main]

应用服务器是 Netty, 特别关注一下上面每一步骤所使用的线程,supplier 和 flatMap 用了 subscribeOn 的 Schedulers.newSingle("supplier") 线程池, doOnNext 使用了 publishOn 的 Schedulers.newSingle("publisher") 线程池。

如果去除上面的 subscribeOn 和  publishOn 的代码,那么所有操作都在 Netty 的 HTTP 线程中执行,打印出的是

supplier: Thread[reactor-http-nio-2,5,main]
flatMap: Thread[reactor-http-nio-2,5,main]
map: Thread[reactor-http-nio-2,5,main]
onNext: Thread[reactor-http-nio-2,5,main] 

再来一个 Flux 的例子

访问 http://localhost:8080/flux

➜ curl -i http://localhost:8080/flux
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: application/json
[301,301,301,301,301,301,301,301]

控制台的输出

flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-3,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-4,5,main]
flatMap: Thread[pool-4-thread-4,5,main]

运行机制说明:

  1. parallel(2) 控制了同时只能执行两个任务,虽然配置的线程池大小是 5,实际只用了两个线程 thread-3,5 和 thread-4,5
  2. 最后 take(8) 只取前 8 个结果,但实际执行了 10 个元素的 flatMap, 并非 lazy map

Project Reactor 和  RxJava2 的编程模型基本是一样的,与 CompletableFuture 相比,任务实际由哪个线程池来执行较难以理解, subscribeOn 既管前又管后。

总结

本文说那么多,其实对多线程编程有两种编程风格的选择,CompletableFuture 还是 Reactive

  1. CompletableFuture 在调用 runAsync, supplyAsync 中自行选择线程池,onComplete, exceptionally 回调处理结果。多个 CompletableFuture 可串联,合并; 它的集合结果用 JDK 的 Stream API 来处理
  2. Reactive 编程风格目前主要有两种选择,RxJava 和 Project Reactor。它应以处理消息的方式来理解如何编程,用类似 subscribeOn 和 publishOn 来指派线程池。Reactor 的 Flux 也提供了许多像 JDK Stream API 一样的集合处理 API
  3. Reactor 提供了在 Mono/Flux 与 CompletableFuture 之间转换的 API
  4. Java 9 开始引入了 java.util.concurrent.flow 包,其中只定义了 reactive stream 的相关接口。Reactor 提供了对 Java flow API 的适配
  5. Akka System 允许在分布式环境部署 Akka Actor, 并有 Actor 出错重试机制。Reactor 也能重试
  6. Reactor 相对于 RxJava 来讲功能上更完备
  7. 在当前云计算流行的时代,Reactive 编程逐渐变得不那么重要

本人几年前曾细记述过几篇有关响应式编程的日志

  1. 试手 RxJava 2.x 及对线程的初步理解
  2. Java 9 - 说说响应式流
  3. Akka Actor: 从最简单的例子开始

其他链接:

  1. Guide to Spring 5 WebFlux

 

本文链接 https://yanbin.blog/spring-5-mono-flux-reactor-programming/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

1 Comment
Inline Feedbacks
View all comments
geralt
geralt
8 months ago

学到了