试手 RxJava 2.x 及对线程的初步理解
在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。
说了那么多,RxJava 是什么?直接网上抄一句定义:RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。它的并发不一定需要了解它的实现细节,RxJava 也支持像 Flink, Akka Streams 那样的窗口概念。RxJava 2.x 符合了标准的响应式流(Reactive-Streams) 规范,主要应用了观察者模式,编程中围绕着的概念有 Observable, Observer, Subscriber, Subject(既是 Observable 又是 Observer), Flowable, Publisher, Processor(既是 Subscriber 又是 Publisher) 等,其中标暗红的对象是来自于 Reactive-Streams 定义的概念。响应的编程还需了解类似于 Iterator 的 onNext, onError, onComplete 这三个基本事件。还有响应式编程通常会用的 Producer/Consumer, Source/Sink, Publisher/Subscriber,传递的数据元素通常称作 Subject。
下面用几段基本代码来体验 RxJava 2 来做什么
效果和
稍好一些,像是 forEach() 操作。留心到 RxJava 中的 Observable 非常类似于 Java 8 的 Stream, 后续使用时头脑中可以对比它们各自的 map, flatMap, filter, groupBy 等操作。
以上用 Java 8 Lambda 语法书写的 subscribe 中各个 onNext, onError, onComplete 函数参数,
如果没有
首先来看一下以上代码各部分是由什么线程来执行的
log() 方法输出当前线程名与消息,实现代码为
执行后输出为
第 5 行加了
并且消费消息时加了一个延时,最后加上等待 1 秒钟,以确保主线程退出前,消息还被全部消费完成。看下现在输出为
再试一下
直接看输出来理解
observerOn() 和 subscribeOn() 双管齐下
记住前面用
subscribeOn() 只有第一次调用有效,因为 Observable 就一个,observeOn() 可以调用多次,每次调用都会影响到后续的 map, filter, subscribe 等操作。
RxJava 代码
CompletableFuture 代码
以上两段代码并非用以说明哪种写法的好坏,只是纯粹的提供两种代码风格
本文的内容比较杂,其余更多的话题有 RxJava 的 Back Pressure, buffer, 和 window 的支持等。RxJava 的 buffer 就像 Flink 的 window, 而 window 更像是 Flink 的 keyBy(x).window。 永久链接 https://yanbin.blog/rxjava-get-started/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个 rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。
说了那么多,RxJava 是什么?直接网上抄一句定义:RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。它的并发不一定需要了解它的实现细节,RxJava 也支持像 Flink, Akka Streams 那样的窗口概念。RxJava 2.x 符合了标准的响应式流(Reactive-Streams) 规范,主要应用了观察者模式,编程中围绕着的概念有 Observable, Observer, Subscriber, Subject(既是 Observable 又是 Observer), Flowable, Publisher, Processor(既是 Subscriber 又是 Publisher) 等,其中标暗红的对象是来自于 Reactive-Streams 定义的概念。响应的编程还需了解类似于 Iterator 的 onNext, onError, onComplete 这三个基本事件。还有响应式编程通常会用的 Producer/Consumer, Source/Sink, Publisher/Subscriber,传递的数据元素通常称作 Subject。
下面用几段基本代码来体验 RxJava 2 来做什么
Hello World
任何语言的 Hello World 代码都毫无意义的,RxJava 2 的也不例外,像下面这行1Observable.just("Hello World").subscribe(System.out::println);效果和
System.out.println("Hello World") 一样,说明不了什么问题。再进一步观察一个集合1Observable.fromIterable(Arrays.asList("one", "two", "three"))
2 .subscribe(s -> System.out.println("Hello " + s));稍好一些,像是 forEach() 操作。留心到 RxJava 中的 Observable 非常类似于 Java 8 的 Stream, 后续使用时头脑中可以对比它们各自的 map, flatMap, filter, groupBy 等操作。
onNext, onError, onComplete
来一个更多一点功能的代码,用 onNext, onError, onComplete 来控制数据 1Observable.<String>create(emitter -> {
2 emitter.onNext("Monday");
3 emitter.onNext("Tuesday");
4 // emitter.onError(new RuntimeException("Something wrong")); #1
5 emitter.onNext("Wednesday");
6 emitter.onComplete();
7}).subscribe(
8 day -> System.out.println("Day: " + day), //onNext
9 t -> System.err.println(t.getMessage()), //onError
10 () -> System.out.println("Done") //onComplete
11);
12
13System.out.println("Program exit");以上用 Java 8 Lambda 语法书写的 subscribe 中各个 onNext, onError, onComplete 函数参数,
如果没有
#1 行代码为被注释状态时的输出为Day: Monday如果
Day: Tuesday
Day: Wednesday
Done
Program exit
#1 代码启用的话,输出为Day: Monday上面整个代码也是一个同步执行的过程,因为到目前为止还未引用新的线程,所以全部的输出操作都是在主线程上完成的,这就是为什么数据遍历完之后才输出
Day: Tuesday
Something wrong
Program exit
Program exit。基本线程模型 - observeOn() 和 subscribeOn()
如果只是单线程操作,那要 RxJava 的优势在哪儿,RxJava 支持并发,而且还把并发操作透明化,也就是我们不需要太了解它的线程模型的实现。但作为一个希望通过阅读源代码来更深入理解一个框架的人来说,透明化的东西也要对它的线程模型看个究竟。首先来看一下以上代码各部分是由什么线程来执行的
1Observable.<String>create(emitter -> {
2 log("Producer");
3 emitter.onNext("Monday");
4 emitter.onComplete();
5}).subscribe(
6 day -> {
7 log("Consumer-" + day);
8 }
9);
10
11log("Program exit");log() 方法输出当前线程名与消息,实现代码为
1private static void log(String msg) {
2 System.out.println(Thread.currentThread() + ": " + msg);
3}执行后输出为
Thread[main,5,main]: Producer加上
Thread[main,5,main]: Consumer-Monday
Thread[main,5,main]: Program exit
observeOn(Scheduler), 指定一个观察者在哪个调度器上观察这个Observable 1Observable.<String>create(emitter -> {
2 log("Producer");
3 emitter.onNext("Monday");
4 emitter.onComplete();
5}).observeOn(Schedulers.newThread()) //只有消息消费用这个线程(池), 生产者仍然是主线程
6 .subscribe(
7 day -> {
8 Thread.sleep(200);
9 log("Consumer-" + day);
10 }
11 );
12
13log("Program exit");
14Thread.sleep(1000);第 5 行加了
.observeOn(Schedulers.newThread()) 让观察者在一个新的线程上去。Schedulers 中出来的各种 Scheduler 这里暂不展开,只要知道会在新的线程上去执行某件事情。并且消费消息时加了一个延时,最后加上等待 1 秒钟,以确保主线程退出前,消息还被全部消费完成。看下现在输出为
Thread[main,5,main]: Producer我们看到消息仍然是由主线程放进去的,消息放完后立即退到主线程上去了,消费消息在
Thread[main,5,main]: Program exit
Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday
observeOn() 指定的线程上。再试一下
subscribeOn(Scheduler), 指定Observable自身在哪个调度器上执行 1Observable.<String>create(emitter -> {
2 log("Producer");
3 emitter.onNext("Monday");
4 emitter.onComplete();
5}).subscribeOn(Schedulers.newThread()) //消息生产与消费都会有这个线程(池)
6 .subscribe(
7 day -> {
8 Thread.sleep(200);
9 log("Consumer-" + day);
10 }
11 );
12
13log("Program exit");
14Thread.sleep(1000);直接看输出来理解
Thread[main,5,main]: Program exit我们看到只用
Thread[RxNewThreadScheduler-1,5,main]: Producer
Thread[RxNewThreadScheduler-1,5,main]: Consumer-Monday
subscribeOn(Scheduler) 时,消息的产生与消费都在它指定的线程池上。observerOn() 和 subscribeOn() 双管齐下
1Observable.<String>create(emitter -> {
2 log("Producer");
3 emitter.onNext("Monday");
4 emitter.onComplete();
5}).subscribeOn(Schedulers.newThread()) //消息生产者用这个线程(池)
6 .observeOn(Schedulers.io()) //消息消费者用这个线程(池)
7 .subscribe(
8 day -> {
9 Thread.sleep(200);
10 log("Consumer-" + day);
11 }
12 );
13
14log("Program exit");
15Thread.sleep(1000);记住前面用
Schedulers.newThread() 的线程名称类似 RxNewThreadScheduler-1,5,main, 现在来看新的输出Thread[main,5,main]: Program exit新的线程名
Thread[RxNewThreadScheduler-1,5,main]: Producer
Thread[RxCachedThreadScheduler-1,5,main]: Consumer-Monday
RxCachedThreadScheduler-1,5,main 就是 Schedulers.io() 所对应的,所以由上面的输出表明- subscribeOn(Scheduler) 指定消息生产者用的线程(池)
- observeOn(Scheduler) 指定消息消费者用的线程(池)
subscribeOn() 只有第一次调用有效,因为 Observable 就一个,observeOn() 可以调用多次,每次调用都会影响到后续的 map, filter, subscribe 等操作。
不同的Schduler 类型
Schedulers 中的工厂方法可以创建出不同类型的线程池,简单说明如下- single(): 单线程线程池,相当于 Executors.newSingleThreadExecutor(), 但多次调用 Schedulers.single() 总是同一个线程, observable.observeOn(Schedules.single()).map(#1).observeOn(Schedules.single()).map(#2), #1 和 #2 用相同的线程
- newThread(): 创建一个新的线程, 与 single() 的区别是每次不同的线程。observable.observeOn(Schedules.newThread()).map(#1).observeOn(Schedules.newThread()).map(#2), #1 和#2 会用两个不同的线程
- computation(): 以 CPU 内核数为固定大小的线程池,适于 CPU 密集型计算
- io(): 相当于 Executors.newCachedThreadPool() 创建的线程池,适于大量 I/O 等待的操作
- trampoline(): 在当前线程上运行,好像是默认行为,很多时候好像是可以省略的
- from(Executor): 指定自定的线程池,如 Schedulers.from(Executor.newFixedThreadPool(10)), 有个好处是容易共享线程池和不用总是看到 RxXxx 那样的线程名
RxJava vs CompletableFuture
虽然正式版的 RxJava 1(Nov 2014) 是在 JDK 7(July 2011) 和 JDK 8(March 2014) 之后发布的,但是它的实现并不依赖于 JDK 7 的 ForkJoinPool 和 JDK 8 的 CompletableFuture。RxJava 与 CompletableFuture 有许多相似的功能,可以大概对比一下 RxJava 与 CompletableFuture 两种写法,不要太纠缠于细节,以下代码不是完全对等RxJava 代码
1Observable.just(request1, request2)
2 .observeOn(Schedulers.io())
3 .map(Request::get)
4 .subscribe(System.out::println);CompletableFuture 代码
1ExecutorService threadPool = Executors.newFixedThreadPool(10);
2Stream.of(request1, request2)
3 .map(request -> CompletableFuture.supplyAsync(request::get, threadPool))
4 .forEach(completableFuture ->
5 log(completableFuture.join())
6);以上两段代码并非用以说明哪种写法的好坏,只是纯粹的提供两种代码风格
一个更实用的生产消费的代码
前方的代码是加入消息消费前所有消息都准备好了,这对处理一个已有列表有用,更贴近现实的话是先设置好消费者后,消息的产生是连续不断的。1PublishProcessor<String> restProcessor = PublishProcessor.create();
2Observable.fromPublisher(restProcessor).subscribe(System.out::println);
3
4restProcessor.offer("Monday");
5restProcessor.offer("Tuesday");
6restProcessor.offer("Wednesday");
7restProcessor.onComplete();本文的内容比较杂,其余更多的话题有 RxJava 的 Back Pressure, buffer, 和 window 的支持等。RxJava 的 buffer 就像 Flink 的 window, 而 window 更像是 Flink 的 keyBy(x).window。 永久链接 https://yanbin.blog/rxjava-get-started/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。