最初看到 Java 9 的这个新特性没太在意,及至重新关注到 Spring 5/Springboot 2 的响应式编程的时候才真正重视起 Reactive Streams(响应式流或反应式流)。应用响应式流的编程也就叫做响应式编程(Reactive Programming),无论是翻译成反应式编程都有些令人摸不准头脑。与此对应的在 Web 设计方面有一个叫做响应式 Web 设计(Responsive web design),两个词都译作响应式,却有些差别,大概是 Reactive 被译为反应的原因之一。
通过这里对 Reactive Streams 的学习,主要目的是为了进一步掌握 Spring 5/Springboot 2 的响应式 MVC 作铺垫的,不至于猛然间见 Flux, Mono 而不知所措。
函数式响应式编程概念最早来自于九十年代末,这也激发了微软的 Erik Meijer 设计开发了 .NET 的 Rx(Reactive eXtension) 库,以及到后来 Netflix 的 RxJava 也与他有关系。Reactive Stream 更像是一种编程模式,致力于解决一个生产者产生一系列消息,一个或多个去消费它们的问题。两者的名词我们会用: producer-consumer(生产者-消费者), source/sink(水源/水槽, Akka Stream 用了这个概念), publisher-subscriber(发布者-订阅者)。
既然 Reactive Stream 和 Java 8 引入的 Stream 都叫做流,它们之间有什么关系呢?有一点关系,Java 8 的 Stream 主要关注在流的过滤,映射,合并,而 Reactive Stream 更进一层,侧重的是流的产生与消费,即流在生产与消费者之间的协调。
一流的公司制定规范,Reactive Stream 标准在 2013 年也开始有了一个雏形:异步的流处理,并支持非阻塞式的 backpressure(背压? 很拗口的翻译,就是生产者与消费者之者应有流量控制)。流量控制即消费者的速率慢于生产者的速率时,生产者需要把速率降下来,比如说流处理时能在推/拉模式之间自动切换。至于背后的异步,非阻塞的实现仍然得仰仗于多线程了。
在之后出现了Netflix 的 RxJava, 它的四个主要角色是: Observable
, Observer
, Subscriber
和 Subject
。到 2015 年,正式的 Reactive Stream 出台,发布在 http://www.reactive-streams.org/。从这里我们可以认识到规范有多粗暴,就是定义了四个接口,以及一句话说生产/消费者之间是异步的,并实现 backpresure,没有任何的实现参考。
在 Reactive Stream 规范正式出来后,RxJava 也向它靠拢,实现了 org.reactivestreams
中的以上四个接口,RxJava 2 更是重写了。见 Reactive Streams 1.0.0 is here, 看到该规范的拥趸还不少, 括号中为支持 Reactive Streams 的起始版本号。
- Akka Streams(1.0-RC2)
- MongoDB(1.0.0)
- Ratpack(0.9.16), 可用来创建非阻塞式 HTTP 应用
- Reactive Rabbit(1.0.0), RabiitMQ/AMQP 的驱动
- Reactor(Spring 5 的响应式 MVC 就是用的它)
- RxJava(1.0.0), Netflix 出品
- Slick(3.0.0), Scala 的函数式关系映射组件,用于操作数据库
- Vert.x 3.0(milestone-5a), Eclipse 出品,也能用于构建非阻塞式 HTTP 应用
绕了一圈,该让 Java 9 与 Reactive Streams 发生关系了。Java 9 想必看到 Reactive Streams 是个好东西,于是把它纳入到 JDK 中来,但方式是无法容忍 JDK 中再出现 org.reactivestreams
这样的包定义,采用的做法是完全拷贝那四个接口定义,全收在了 java.util.concurrent.Flow
类中,作为 Flow
的内部静态接口存在。
JDK 9 本身也没有用力去实现以上四个接口,有两个比较简陋的 SubmissionPublisher
和 ConsumerSubscriber
。再就是还处于孵化器阶段的 jdk.incubator.http
包中的一些 Publisher
, Subscriber
实现,这也是 Reactive Streams 最应大力发挥网络协议领域。
因为有 JDK 9 的不遵循包名的引入 Reactive Streams 规范,所以 reactivestreams.org 又发出一个库 org.reactivestreams:reactive-streams-flow-adapters:1.0.2, 用于在 org.reactivestreams 和 java.util.consurrent.Flow
间的 Publisher
, Subscriber
, Processor
, Subscription
之间的类型转换。
Spring 5 第一个 Release 版本在 2017-09-28 发布的,而 Java 9 是在 2017-07-27 正式发布的,就是说在 Spring 5 发布时已经有了 Java 的 Reactive Streams。不过 Spring 5 的第一个里程碑版还是在 2016-07-28, 所以那时选择了 Reactor, 所以要使用 Spring 5 的响应式编程就必须了解 Flux 和 Mono,或许下一个 Spring 版本也要适配 Java 9 的 Reactive Streams Flow API,即双向转换或替换 API。
对于 Spring 5 可能发生的与 Java 9 Reactive Streams 的适配或许有些像 PlayFramework 兼容 Java 8 之前用的是它自己的 API F.Option
和 F.Promise
, 后来从 Play 2.4 升级到 2.5 后完全采用了 Java 8 的 Optional
和 CompletionStage
APIs 作为替代。
Java 9 的 SubmissionPublisher 应用实例
前面提到过 Java 9 有两个简陋的 Publisher 和 Subscriber 实现,来看看 SubmissionPublisher
和 ConsumerSubscriber
的应用举例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package cc.unmi; import java.util.concurrent.CompletableFuture; import java.util.concurrent.SubmissionPublisher; import java.util.stream.IntStream; public class TestFlow { public static void main(String[] args) { CompletableFuture<Void> subTask; try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) { subTask = publisher.consume(System.out::print); IntStream.rangeClosed(1, 3).forEach(publisher::submit); } subTask.join(); } } |
运行,输出为
123
没什么意外,看起来和直接用 Stream API 差不多,效果与下面仅一行代码是一样的
1 |
IntStream.rangeClosed(1, 3).forEach(System.out::print); |
面实际上内部运作起来就完全是另一回事了,此间就有 Java Flow APIs 在运转。下面逐步来理解一下:
从 SubmissionPublisher 构造函数起
SubmissionPublisher
实现了 Flow.Publisher
接口,它有三个构造函数
SubmissionPublisher() //默认线程池为 ForkJoinPool.commonPool(), 缓冲区大小为 256
SubmissionPublisher(Executor executor, int maxBufferCapacity)
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler)
publisher.consume(consumer) 发生了什么
看 SubmissionPublisher
的 consumer(...)
方法
1 2 3 4 5 6 7 |
public CompletableFuture<Void> consume(Consumer<? super T> consumer) { if (consumer == null) throw new NullPointerException(); CompletableFuture<Void> status = new CompletableFuture<>(); subscribe(new ConsumerSubscriber<T>(status, consumer)); return status; } |
上面代码创建了一个 ConsumerSubscriber
实例,它实现了 Flow.Subscriber
接口,subscribe(...)
方法创建了 Subscription
实例
1 |
BufferedSubscription<T> subscription = new BufferedSubscription<T>(subscriber, executor, onNextHandler, maxBufferCapacity); |
并提交任务给线程池,该任务执行到了 ConsumerSubscriber 的 onSubscribe(Flow.Subscription subscription) 方法,看到
1 |
subscription.request(Long.MAX_VALUE); |
一下请求所有的元素。
publisher.submit(T item) 生产消息
SubmissionPublisher.submit(item)
发布消息后,ConsumerSubscriber
会收到 onNext
, onComplete
事件,或出错时的 onError
,对应方法
1 2 3 |
onNext(T item) void onComplete() void onError(Throwable ex) |
从上面大概能看到一个 Reactive Streams 应用有 Publisher
, Subscriber
, Subscription
多个角色在参与协作。而一个 Reactive Streams 组件要做的事情就是就是尽可能的把它们做的更完美,高效率且接口更友好。
SubmissionPublisher 可有多个订阅者
当给 SubmissionPublisher
指定多个 Subscriber
的时候,消息只需发布一次,这与 IntStream.rangeClosed(1, 3).forEach(System.out::println);
就不一样了
1 2 3 4 5 6 7 8 9 10 11 12 |
public static void main(String[] args) { CompletableFuture<Void> subTask1; CompletableFuture<Void> subTask2; try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) { subTask1 = publisher.consume(System.out::print); subTask2 = publisher.consume(System.out::print); IntStream.rangeClosed(1, 3).forEach(publisher::submit); } subTask1.join(); subTask2.join(); } |
执行后的输出顺序是不确定的,可能是下面任意情况
112233
123123
Reactive Streams 和 Actor
在进行异步消息处理时,Reactive Streams 和 Actor 是两种不同的编程模式选择。Reactive Streams 规范相比 Actor 更简单,只是说收发消息异步,有流量控制。而 Actor 编程模式涉及到 Actor 容错管理,消息路由,集群,并支持远程消息等。
还有共同之处是: 它们定义的 API 都很简单,编码时都基本不需要关注线程本身,而实际消息的传递都是背后的线程池。所以线程的配置可延迟到部署阶段来进行优化处理。
下一步,继续对 Spring 5 的响应式 MVC 应用进行实战体验
本文链接 https://yanbin.blog/java-9-talk-reactive-stream/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
讲的非常清楚,感谢分享
初学者不清楚历史,上来就被出现的一大堆类/关系/库搞晕了
[…] https://yanbin.blog/java-9-talk-reactive-stream/#more-8877 […]
[…] https://yanbin.blog/java-9-talk-reactive-stream/#more-8877 […]