试手 RxJava 2.x 及对线程的初步理解

在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。

Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西

而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个  rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。

说了那么多,RxJava 是什么?直接网上抄一句定义:RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。它的并发不一定需要了解它的实现细节,RxJava  也支持像 Flink, Akka Streams 那样的窗口概念。RxJava 2.x 符合了标准的响应式流(Reactive-Streams) 规范,主要应用了观察者模式,编程中围绕着的概念有 Observable, Observer, Subscriber, Subject(既是 Observable 又是 Observer), Flowable, Publisher, Processor(既是 Subscriber 又是 Publisher) 等,其中标暗红的对象是来自于 Reactive-Streams 定义的概念。响应的编程还需了解类似于 Iterator 的 onNext, onError, onComplete 这三个基本事件。还有响应式编程通常会用的 Producer/Consumer, Source/Sink, Publisher/Subscriber,传递的数据元素通常称作 Subject。 

下面用几段基本代码来体验 RxJava 2 来做什么

Hello World

任何语言的 Hello World 代码都毫无意义的,RxJava 2 的也不例外,像下面这行

效果和 System.out.println("Hello World") 一样,说明不了什么问题。再进一步观察一个集合

稍好一些,像是 forEach() 操作。留心到 RxJava 中的 Observable 非常类似于 Java 8 的 Stream, 后续使用时头脑中可以对比它们各自的 map, flatMap, filter, groupBy 等操作。 

onNext, onError, onComplete

来一个更多一点功能的代码,用 onNext, onError, onComplete 来控制数据

以上用 Java 8 Lambda 语法书写的 subscribe 中各个 onNext, onError, onComplete 函数参数,

如果没有 #1 行代码为被注释状态时的输出为

Day: Monday
Day: Tuesday
Day: Wednesday
Done
Program exit

如果 #1 代码启用的话,输出为

Day: Monday
Day: Tuesday
Something wrong
Program exit

上面整个代码也是一个同步执行的过程,因为到目前为止还未引用新的线程,所以全部的输出操作都是在主线程上完成的,这就是为什么数据遍历完之后才输出 Program exit

基本线程模型 - observeOn() 和 subscribeOn()

如果只是单线程操作,那要 RxJava 的优势在哪儿,RxJava 支持并发,而且还把并发操作透明化,也就是我们不需要太了解它的线程模型的实现。但作为一个希望通过阅读源代码来更深入理解一个框架的人来说,透明化的东西也要对它的线程模型看个究竟。

首先来看一下以上代码各部分是由什么线程来执行的

log() 方法输出当前线程名与消息,实现代码为

执行后输出为

Thread[main,5,main]: Producer
Thread[main,5,main]: Consumer-Monday
Thread[main,5,main]: Program exit

加上 observeOn(Scheduler), 指定一个观察者在哪个调度器上观察这个Observable

第 5 行加了 .observeOn(Schedulers.newThread()) 让观察者在一个新的线程上去。Schedulers 中出来的各种 Scheduler 这里暂不展开,只要知道会在新的线程上去执行某件事情。

并且消费消息时加了一个延时,最后加上等待 1 秒钟,以确保主线程退出前,消息还被全部消费完成。看下现在输出为

Thread[main,5,main]: Producer
Thread[main,5,main]: Program exit
Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday

我们看到消息仍然是由主线程放进去的,消息放完后立即退到主线程上去了,消费消息在 observeOn() 指定的线程上。

再试一下 subscribeOn(Scheduler), 指定Observable自身在哪个调度器上执行

直接看输出来理解

Thread[main,5,main]: Program exit
Thread[RxNewThreadScheduler-1,5,main]: Producer
Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday

我们看到只用 subscribeOn(Scheduler) 时,消息的产生与消费都在它指定的线程池上。

observerOn() 和 subscribeOn() 双管齐下

记住前面用 Schedulers.newThread() 的线程名称类似 RxNewThreadScheduler-1,5,main, 现在来看新的输出

Thread[main,5,main]: Program exit
Thread[RxNewThreadScheduler-1,5,main]: Producer
Thread[RxCachedThreadScheduler-1,5,main]: Consumer-Monday

新的线程名 RxCachedThreadScheduler-1,5,main 就是 Schedulers.io() 所对应的,所以由上面的输出表明

  1. subscribeOn(Scheduler) 指定消息生产者用的线程(池)
  2. observeOn(Scheduler) 指定消息消费者用的线程(池)

subscribeOn() 只有第一次调用有效,因为 Observable 就一个,observeOn() 可以调用多次,每次调用都会影响到后续的 map, filter, subscribe 等操作。

不同的Schduler 类型

Schedulers 中的工厂方法可以创建出不同类型的线程池,简单说明如下

  1. single():  单线程线程池,相当于 Executors.newSingleThreadExecutor(), 但多次调用  Schedulers.single() 总是同一个线程, observable.observeOn(Schedules.single()).map(#1).observeOn(Schedules.single()).map(#2), #1 和 #2 用相同的线程
  2. newThread(): 创建一个新的线程, 与 single()  的区别是每次不同的线程。observable.observeOn(Schedules.newThread()).map(#1).observeOn(Schedules.newThread()).map(#2), #1 和#2 会用两个不同的线程
  3. computation(): 以 CPU 内核数为固定大小的线程池,适于 CPU 密集型计算
  4. io():  相当于 Executors.newCachedThreadPool() 创建的线程池,适于大量 I/O 等待的操作
  5. trampoline(): 在当前线程上运行,好像是默认行为,很多时候好像是可以省略的
  6. from(Executor): 指定自定的线程池,如 Schedulers.from(Executor.newFixedThreadPool(10)), 有个好处是容易共享线程池和不用总是看到 RxXxx 那样的线程名

RxJava vs CompletableFuture

虽然正式版的 RxJava 1(Nov 2014) 是在 JDK 7(July 2011) 和 JDK 8(March 2014) 之后发布的,但是它的实现并不依赖于 JDK 7 的 ForkJoinPool 和 JDK 8 的 CompletableFuture。RxJava 与 CompletableFuture 有许多相似的功能,可以大概对比一下 RxJava 与  CompletableFuture 两种写法,不要太纠缠于细节,以下代码不是完全对等

RxJava 代码

CompletableFuture 代码

以上两段代码并非用以说明哪种写法的好坏,只是纯粹的提供两种代码风格

一个更实用的生产消费的代码

前方的代码是加入消息消费前所有消息都准备好了,这对处理一个已有列表有用,更贴近现实的话是先设置好消费者后,消息的产生是连续不断的。

本文的内容比较杂,其余更多的话题有 RxJava 的 Back Pressure, buffer, 和 window 的支持等。RxJava 的 buffer 就像 Flink 的 window, 而 window 更像是 Flink 的 keyBy(x).window。

本文链接 https://yanbin.blog/rxjava-get-started/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments