背景
- Rxjava2.0相对Rxjava1.0有了很多的更新,因此需要花时间好好研究下。
- 文章的讲述顺序:观察者模式、操作符、线程调度。
- 有人用水管来表示观察者和被观察者,我觉得也挺形象的:
- 如上图所示,上游就是我们的被观察者Observable,下游就是观察者Observer
观察者模式
- Rxjava以观察者模式为骨架,在2.0中依然如此。不过此次更新,出现了两种观察者模式:
- Observable(被观察者)/Observer(观察者)
- Flowable(被观察者)/Subscriber(观察者)
- RxJava2.X中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)的。
- 在1.0中,关于背压最大的遗憾,就是集中在Observable这个类中,导致有的Observable支持背压,有的不支持。为了解决这种缺憾,新版本把支持背压和不支持背压的Observable区分开来。
Observable/Observer
- Observable的正常用法如下:
|
|
这里出现一个新类:ObservableEmitter,Emitter是发射器的意思,那这个类的作用就是发出事件。它有三种类型的事件:onNext(T value)、onComplete()、onError(Throwable error),不过发送事件有一些规则要遵守:
- 上游可以发送无限个onNext, 下游也可以接收无限个onNext.
- 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
- 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
- 上游可以不发送onComplete或onError.
- 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
- 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
还有一个新类是Disposable,从字面意思“被抛弃的”,我们应该能猜到,它用于中断观察者和被观察者之间的联系,即取消订阅。不过需要注意的是:调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件。Disposable的用处不止这些, 后面讲解到了线程的调度之后, 我们会发现它的重要性. 随着后续深入的讲解, 我们会在更多的地方发现它的身影。
还有一个变化时subscribe(),有多个重载的方法:
123456public final Disposable subscribe() {}public final Disposable subscribe(Consumer<? super T> onNext) {}public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}public final void subscribe(Observer<? super T> observer) {}最后一个带有Observer参数的我们已经使用过了,这里对其他几个方法进行说明:
- 不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
- 带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见。
因为Observable/Observer不支持背压,所以使用它们的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,仅供各位参考)
Flowable/Subscriber
- Flowable 的使用方法如下:
|
|
- Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。
- 从代码运行结果我们看出,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作;
- Flowable也可以通过creat()来创建:
|
|
- Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的。
- 不同于上面的方式,当用Flowable.create创建Flowable时,即使调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完之后,才开始调用onNext。
- 尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。
其他观察者模式
除了上面两种观察者,还有一类观察者:
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
其实这三者都差不多,Maybe/MaybeObserver可以说是前两者的复合体,因此以Maybe/MaybeObserver为例简单介绍一下这种观察者模式的用法
|
|
- 上面就是Maybe/MaybeObserver的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式。
下面是上面提到的所有观察者的接口:
1234567891011121314151617181920//Observable接口interface ObservableSource<T> {void subscribe(Observer<? super T> observer);}//Single接口interface SingleSource<T> {void subscribe(SingleObserver<? super T> observer);}//Completable接口interface CompletableSource {void subscribe(CompletableObserver observer);}//Maybe接口interface MaybeSource<T> {void subscribe(MaybeObserver<? super T> observer);}//Flowable接口public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);}其实我们可以看到,每一种观察者都继承自各自的接口,这也就把他们能完全的区分开,各自独立(特别是Observable和Flowable),保证了他们各自的拓展或者配套的操作符不会相互影响。这也是框架设计者的用意。
其他更新
从2.x开始,被观察者不再接收null作为数据源。
在这个图中,黄色水管表示子线程,深蓝色水管表示主线程。
操作符相关
- 操作符本质上变动不大,多是包名或者类的变动。
Action
- 改动如下:
|
|
Function
- 同上也是命名方式的改变:
|
|
变换操作符
map
在背景中,我们引进了一张图片用水管来比喻观察者和被观察者,这里我们也可以引进一张图片
图中map函数作用是将圆形事件转成矩形事件,从而导致下游的事件都变成了矩形事件。代码例子:
|
|
flatmap
- flatmap是一个非常强大的操作符,它将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。
我们还是再用一张图片来描述这个流程:
如图所示,上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面。如果需要保证顺序则需要使用 concatMap (它的用法和flatMap一致)。
|
|
- 代码的运行结果是:
|
|
- 从运行结果来看,也说明flatmap的运行结果是无序的。
总结
- 上面大致介绍了Rxjava2的一些变化,逻辑是混乱的,因为我也是边学习,边记录。后面熟悉之后再回过头来整理一遍。