搜索
您的当前位置:首页正文

RxJava 2.x实战

来源:二三娱乐

零 书籍信息

第一章 RxJava简介

1.1 你需要了解的函数响应式编程

1. 响应式编程

  1. 响应式编程的特点:
    1)异步编程:提供了合适的异步编程模型,能够挖掘多核CPU的能力,提高效率,降低延迟和阻塞等。
    2)数据流:基于数据流模型,响应式编程提供了一套统一的Stream风格的数据处理接口。与Java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且支持复用和同时接入多个订阅者。
    3)变化传播:以一个输入流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。
  2. 响应式编程在用户界面及基于实时系统的动画方面有广泛的应用;也在处理嵌套回调的异步事件、负责的列表过滤和变换的时候也有良好的表现。
  3. 和前端的响应式设计是不同的概念。前端的响应式设计指的是自适应。

2.函数式编程

  1. 函数式编程中,由于数据是不可变的,因此没有并发编程的问题,是线程安全的。它将计算机运算看做数学中的的函数计算,主要特点是将计算过程分解为多个可复用的函数,并且避免了状态及变量的概念。
  2. 函数式编程具有以下特点:
    1) 函数是“第一等公民” :所谓第一等公民是指函数与其他数据类型一样,处于平等地位,可以赋值给其他变量,也可以作为参数,传入另外一个函数,或者作为别的函数的返回值。
    2)闭包和高阶函数:闭包是起函数的作用并可以像对象一样操作的对象。
    3)递归:把递归作为控制流程的机制。
    4)惰性求值。
    5)没有“副作用”:指的是函数内部与外部互动,产生运算以为的其他结果。函数式编程强调没有“副作用”,意味着函数要保持独立,所有功能就是返回一个新值,没有其他行为,尤其是不能修改外部变量的值。

1.2 RxJava简介

1.RxJava的由来

RxJava是rx的Java实现。

2. Rx是什么

RX是reactive Extension的缩写,是由微软架构师所开发的一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。

3.ReactiveX的历史

微软的定义是,rx是一个函数库。
reactiveX.io给的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

4.RX模式

  1. 使用观察者模式
  • 创建:Rx可以方便的创建事件流和数据流。
  • 组合:Rx使用查询式的操作符组合和变换数据流。
  • 监听:Rx可以订阅任何可观察的数据流并执行操作。
  1. 简化代码
  • 函数式风格:对可观察的数据流使用无副作用的输入/输出函数,避免程序里错综复杂的状态。
  • 简化代码:Rx操作符通常可以将复杂的难题简化为很少的几行代码。
  • 异步错误处理:传统的try/catch没办法处理异步寄宿,Rx提供了合适的错误处理机制。
  • 轻松使用并发:Rx的Observables和Schedulers可以让开发者摆脱底层的线程同步和各种并发问题。
    总之,RxJava是Rx在JVM平台上的一个实现,通过使用观察者序列来构建异步、基于事件的程序。

1.3 为何选择RxJava

  • 可组合
    对于单层的异步操作来说,Java中的Future对象处理非常简单有效,但是一旦涉及嵌套,它们就开始变得非常繁琐和复杂。
  • 更灵活
    RxJava的Observable不仅支持单独的标量值(就想Future可以做的),还支持数据序列,甚至是无穷的数据流。
  • 无偏见
    Rx对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式(如线程池、事件循环、非阻塞I/O)来满足我们的需求。无论我们怎样实现它,无论底层实现时阻塞还是非阻塞的,客户端代码都将与Observable的全部交互当成是异步的。

1.4 RxJava能做什么

能做的很多,但是最合适的就是多线程处理,超方便。Android和服务端都可以。

1.6 小结

RxJava是结合了多种设计模式并优化而产生的结晶。
RxJava的思维导图。


思维导图

第2章 RxJava基础知识

2.1 Observable

RxJava的使用通常需要三步。
1)创建Observable
Observable的字面意思就是被观察者,使用RxJava时需要创建一个被观察者,它会决定什么时候触发怎样的事件。可以看做是上游发送命令。
2)创建Observer
Observer即观察者,它可以在不同的线程中执行任务。这种模式极大的简化了并发操作,因为它创建了饿一个处于待命状态的观察者哨兵,可以在未来某个时刻响应Observable的通知,而不需要阻塞等待Observable发射数据。
3)使用subscribe()进行订阅
创建了Observable和Observer之后,需要使用subscribe()方法把它们连接起来。
可以简化为:

创建Observable对象,创建Observer对象,然后用subscribe()方法进行关联。

可以看一段代码示例:

Obserbale.just("hell world").subscribe(new Consume<String>(){
  @Override
  public void accept(@NonNull String s) throws Exception{
    System.out.println(s);
  }
}

subscribe()方法有多个重载的方法。
在RxJava中,被观察者(Observable)、观察者(Observer)、subscribe()方法三者缺一不可,只有使用了subscribe()方法,被观察者才会开始发送数据。


5中被观察者模式
5种被观察者模式描述

只有Flowable支持背压,所以在需要背压的情况下,则必须使用Flowable。

do操作符

do操作可以给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段时,这些回调就会被触发。在RxJava中包含了很多doXXX操作符。各个do操作符的用途如下:

操作符 用途
doOnSubscribe 一旦观察者订阅了Observable,它就会被调用
doOnLifecycle 可以在观察者订阅之后,设置是否取消订阅
doOnNext 它产生的Observable每发射一项数据就会调用它一次,它的Consumer接受发射的数据项。一般用于在subscribe之前对数据进行处理
doOnEach 它产生的Observable每发射一项数据就会调用它一次,不仅包括onNext,还包括onError和onCompleted
doAfterNext 在onNext之后执行,而doOnNext()是在onNext之前执行
doOnCompolete 当它产生Observable在正常终止调用onComplete时会被调用
doFinally 在当产生的Observable终止之后被调用,无论是否正常终止还是异常终止。doFinally优先于doAfterTerminate的调用
doAfterTerminate 注册一个Action,当Observable调用onComplete或onError时触发

2.2 Hot Observable和Cold Observable

1. Observable的分类

Observable有Hot和Cold之分。Hot Observable无论有没有订阅者订阅都会发送事件。当Hot Observable有多个订阅者时,Hot Observable与订阅者的关系时一对多的关系,可以与多个订阅者共享信息。
Cold Observalbe是只有观察者订阅了,才开始进行发射数据流代码,并且是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送的。也就是说对于多个Observer,他们各自的事件是独立的。

Hot Obseravle是一个电台,大家听到的是同一首歌;Cold Observable是一张音乐CD,人们可以独立购买并听取它。

2.Cold Observable

Observable的just、create、range、fromXXX等操作符都能生成Cold Observable。尽管Cold Observable很好,但是对于某些事件不确定何时发生不确定Observable发射的元素数量的情况,还需要Hot Observable。

3.Cold Observable如何转换成Hot Observable

1)使用publish,生成ConnectableObservable。

ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS, 
            }
        }).observeOn(Schedulers.newThread()).publish();

        //ConnectableObservable需要调用connect()方法才能真正执行
        observable.connect();
        Consumer<Long> subscribe1 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("subscriber1:" + aLong);
            }
        };

        Consumer<Long> subscribe2 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("subscriber1:" + aLong);
            }
        };

        observable.subscribe(subscribe1);
        observable.subscribe(subscribe2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

输出结果为:

subscriber1:0
subscriber2:0
subscriber1:1
subscriber2:1
subscriber1:2
subscriber2:2

ConnectableObservable是线程安全的,需要调用connect()方法才会执行。
2)使用Subject/Processor
Subject和Processor的作用相同。Processor是RxJava 2.x新增的类,继承自Flowable,支持背压控制,而Subject则不支持背压控制。
Subject既是Observable,又是Observer(Subscriber)。这一点可以从Subject的源码上看到,继承自Observable,实现Observer。
Subject作为观察者,可以订阅目标Cold Observable,使对方开始发送事件。同是它又作为Observable转发或者发送新的事件,让Cold Observable借助Subject转换为Hot Observable。
Subject并不是线程安全的,如果想要其线程安全,则需要调用toSerialized()方法。

4. Hot Observable如何转换成Cold Observable

1)ConnectableObservable的refCount操作符

RefCount
RefCount操作符把一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者/观察者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,知道最后一个观察者完成,才断开与下层可连接Observable的连接。
package chapter2;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

/**
 * @author shaopeng.wei
 * @since 2019-01-14 14:45
 * Purpose
 */
public class ConnectObservable2ColdObservable {
    public static void main(String[] args) throws InterruptedException {
        Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("subscriber1:" + aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("subscriber2:" + aLong);
            }
        };


        Observable<Long> longObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                Observable.interval(5, TimeUnit.MILLISECONDS, Schedulers.io()).take(Integer.MAX_VALUE).subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());
        System.out.println("默认cold Observable,subscriber1先发送10ms,预期subscriber2不受干扰从0开始");
        Disposable subscribe1 = longObservable.subscribe(subscriber1);
        Thread.sleep(10L);
        Disposable subscribe2 = longObservable.subscribe(subscriber2);


        Thread.sleep(20L);
        subscribe1.dispose();
        subscribe2.dispose();

        System.out.println("和预期一直,并没有因为subscriber1先执行而导致subscriber2跟着subscriber1开始");
        System.out.println("————————————————————————————————————————————————————————————————————");
        System.out.println("Cold Observable转化为Hot 不会重新开始,所有subscriber2会跟着subscriber1的数,不会从0开始。");
        ConnectableObservable<Long> publish = longObservable.publish();
        publish.connect();

        Disposable disposable1 = publish.subscribe(subscriber1);
        Thread.sleep(10L);
        Disposable disposable2 = publish.subscribe(subscriber2);

        Thread.sleep(20L);
        disposable1.dispose();
        disposable2.dispose();
        System.out.println("发现和预期一致");
        System.out.println("————————————————————————————————————————————————————————————————————");
        System.out.println("预期和cold Observable一样");
        Observable<Long> observable = publish.refCount();
        disposable1 = observable.subscribe(subscriber1);
        Thread.sleep(20L);
        disposable2 = observable.subscribe(subscriber2);

        Thread.sleep(20L);
        disposable1.dispose();
        disposable2.dispose();
        System.out.println("发现预期并不一样。因为如果不是所有的订阅者/观察者取消了订阅,而只是部分取消,则部分的订阅者/观察者重新开始订阅时,不会从头开始数据流");

    }
}

当不是所有的订阅者/观察者取消订阅,而只是部分取消时,部分订阅者/观察者重新订阅时,数据流并不会从头开始
2)Observable的share操作符
share操作符封装了publish的.refCount()调用。

2.3 Flowable

Flowable是RxJava 2.x引入的新的实现,支持非阻塞式的背压。与FLowable相比,使用Observable较好的场景如下:

  • 一般处理不超过1000条数据,并且几乎不会出现内存溢出;
  • GUI鼠标事件,基本不会背压
  • 处理同步流
    使用Flowable较好的场景如下:
  • 处理以某种方式产生超过10KB的元素;
  • 文件读取与分析;
  • 读取数据库记录,也是一个阻塞和基于拉取模式;
  • 网络I/O流
  • 创建一个响应式非阻塞接口

2.5 Single、Completable和Maybe

1.SIngle

从SingleEmitter源码可以看出,Single的只有onSuccess和onError事件。其中onSuccess()用于发射数据(在Observable/Flowable中使用onNext()来发射数据),而且只能发射一个数据,后面即使再发射数据也不会做任何处理。
Single的SingleObservable中只有onSuccess和onError,并没有onComplete,这也是Single与其他4中观察者之间最大的区别。
Single可以通过toXXX方法转化成Observable、Flowable、Completable以及Maybe。

Completable在创建后,不会发射任何数据。只有onComplete和onError事件,同事Completable并没有map、flatMap等操作符,它的操作符比Observable/Flowable要少很多。
Completable经常结合andThen操作符使用。

3.Maybe

Maybe可以看做Single和Completable的结合体。

2.5 Subject和Processor

2.5.1 Subject是一种特殊的存在

Subject及时Observable也是Observer。官网称可以看做一个桥梁或者代理。

1.Subject的分类

Subject包含4中类型,分别是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。
1) AsyncSubject
Observer会接受AsyncSubject的onComple()之前的最后一个数据。即在前面有多个onNext,只有在onComplete前的一个生效。

注意:可以看做必须调用subject.onComplete才会开始发送数据,否则观察者将不接受任何数据。

2). BehaviorSubject
Observer会先接受到BehaviorSubject被订阅之前的最后一个数据,再接受订阅之后发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。
3)ReplaySubject
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论他们是合适订阅的。
ReplaySubject除了可以通过createWithSize来限制缓存数据的数量外,还可以通过createWithTime()来限制缓存时间。
4)PublishSubject
Observer只接受PublishSubject被订阅之后发送的数据。
通过前面的介绍,总结4个Subject的特性为:

Subject 发射行为
AsnycSubject 不论订阅在什么时候发生,只发射最后一个数据
BehaviorSubject 发送订阅前的一个数据和订阅后的全部数据
ReplaySubject 不论订阅在什么时候,都全部发射
PublishSubject 发送订阅之后的全部数据

第3章 创建操作符

RxJava常用操作符
  1. jush():将一个或多个对象转换成发射这个或这些对象的一个Observable。
  2. from():将一个Iterable、一个Future或者一个数组转换成一个Observable。
  3. create():使用一个函数从头创建一个Observable。
  4. defer():只有当订阅者订阅才创建Observable,为每个订阅者创建一个新的Observable。
  5. range():创建一个发射指定范围的整数序列的Observable。
  6. interval():创建一个按照给定的时间间隔发射整数序列的Observable。
  7. timer():创建一个在给定的延时之后发射耽搁数据的Observable。
  8. empty():创建一个什么都不做直接通知完成的Observable。
  9. error():创建一个什么都不做直接通知错误的Observable。
  10. never():创建一个不发射任何数据的Observable。

3.1 create、just和from

1. create

用函数从头创建一个Observable。RxJava建议我们在传递给create方法的函数时,先检查一下观察者的isDisposed状态,以便在没有观察者的时候,让我们的Observable停止发射数据,防止运行昂贵的运算。

2. just

just类似于from,但是from会将数组或Iterable数据取出然后逐个发射,而just只是简单的原样发射,将数组或iterable当做单个数据。
它可以接受一致十个参数,返回一个按参数列表顺序发射这些数据的Observable。

传入null会报npe

3.from

from有一个两个参数的方法,可以设置超时时间和超时单位。

Observable.fromFuture(future,3,TimeUnit.SECONDS)

3.2 repeat

创建一个发射特定数据重复多次的Observable。repeat会重复地发射数据,某些实现允许我们重复发射某个数据序列,还有一些允许我们限制重复的次数。

1. repeatWhen

repeatWhen不是缓存或重放原始Observable的数据序列,而是有条件重新订阅和发射原来的Observable。

2.repeatUntil

表示直到某个条件就不再重复发射数据,即当条件为true时,表示中止重复发射上游的Observable。

3.3 defer、interval和timer

1. defer

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个全新的Observable,如图:


defer操作符

2.interval

创建一个按照固定时间间隔发射整数序列的Observable。

3.timer

创建一个Observable,它在一个特定的延迟后发射一个特殊的值。timer操作符创建一个在特定的时间段之后返回一个特殊值的Observable。

第4章 RxJava的线程操作

4.1 调度器(Scheduler)种类

1. RxJava线程介绍

RxJava是为了异步编程实现的一个框架,异步是其重要的特性。

2.Scheduler

Scheduler是RxJava对线程控制器的一个抽象,RxJava内置了多个Scheduler的实现,他们基本满足绝大多数使用场景。

Scheduler 作用
single 使用定长为1的线程池,重复利用这个线程
newThread 每次启用新线程,并在新线程中执行操作
computation 使用固定的线程池,大小为CPU核数,适用于CPU密集型计算
io 适合I/O操作。行为模式和newTread()差不多,区别在于io()内部是一个无数量上限的线程池,可以重用空闲的线程。
trampoline 直接在当前线程运行,如果当前线程中有其他任务正在之慈宁宫,则会先暂停其他任务
Scheduler.from

4.2 RxJava线程模型

2. 线程调度

默认情况下不做任何线程处理,Observable和Observer处于同一线程中。如果想要切换线程,则可以使用subscribeOn()和ObserverOn()
1)subscribeOn
subscribeOn()改变上游发射数据的线程池,只有第一次有效。
2)observeOn
observeOn用来指定下游操作运行在特定的线程调度器Scheduler上。
多次执行,每次执行都会切换。

4.3 Scheduler的测试

TestScheduler是专门用于测试的调度器,与其他调度器的区别是:TestScheduler只有被调用了时间才会继续,是一种特殊的、非线程安全的调度器,用于测试一些不引入真是并发性、允许手动推进虚拟时间的调度器。
TestScheduler用来测试一些需要精确时间的任务是非常合适的,减少了等待时间。

第5章 变换操作符和过滤操作符

RxJava的变换操作符主要包括以下几种。

  • map:对序列的每一项都用一个函数来变换Observable发射的数据序列
  • flapMap、concatMap、flatMapIterable:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
  • switchMap:将Observable发射的数据集合变换为Observables集合,然后值发射这些Observables

第6、7章

这两章都是一些操作符,没太多深入信息快速浏览。

第8章 RxJava的背压

8.1 背压

指在异步场景下,被观察者发送时间速度远快于观察者处理的速度,从而导致buffer溢出,这种现象叫做背压。

  1. 背压必须是在异步的场景下才会出现,即被观察者和观察者处于不同的线程中。
  2. RxJava是基于push模型的,生产者有数据就发送给消费者。
  3. 在RxJava 2.x中,只有新增的Flowable是支持背压的。

8.2 RxJava 2.x的背压策略

RxJava 2.x中,默认队列大小为128,并且要求所有操作符强制支持背压。一共有5种背压策略。

1. MISSING

此策略表示创建的Flowable没有指定背压策略,不会做缓存或丢弃处理,需要下游通过背压操作符指定背压策略

2. ERROR

表示如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。

3. BUFFER

此策略表示异步患处吃同Observable的一样,没有固定大小,可以无限制添加数据,不会抛出MissingBackpressureException异常,但是会导致OOM。

4. DROP

此策略表示,如果FLowable的异步缓冲池满了,则会丢掉将要放入缓存池中的数据。

5.LATEST

此策略表示,如果缓存池满了,会丢掉将要放入缓存池中的数据,但是不管缓存池状态如何,LASTEST策略会将最后一条数据强行放入缓存池中。

第9章 Disposable和Transformer的使用

9.1 Disposable

Observable.subscribe()方法会返回一个Disposable的对象,可以通过调用dispose方法来切断数据流。

CompositeDisposable

可以使用CompositeDisposable.add()方法把多个Disposable对象添加到容器中,然后通过CompositeDisposable.clear()即可切断所有的事件。

第10章 RxJava的并行编程

10.1 RxJava并行操作

并发是指一个处理器同事处理多个任务。并行是多个处理器或者多核处理器同时处理多个不同的任务。

总结

这本书是一本流水账式的操作手册,感觉写的不是很好。对于初学者没有写如何上手,对于资深者又没有太深入的东西。全程快速的扫描了一遍,主要是弥补部分遗漏的知识点,看完收获不是很大。总体来说不值得卖纸质版,当当的5块钱电子版的定价就不错。

Top