RxJava基础知识

 

Posted by     DH on March 8, 2019

概述

RxJava是目前团队使用的技术之一,因此进行了初步的了解。

GitHub地址:https://github.com/ReactiveX/RxJava

RxJava的核心设计思想是“观察者模式”。简言之就是:观察者模式=出版者(被观察者)+ 观察者。当被观察者发生改变时,观察者就能立刻发生对应的动作。这个实现并不需要观察者每隔一段时间(例如1S)去轮训被观察者是否改变到了自己需要的状态,而是采用订阅(Subscribe)的方式,每当被观察者改变到观察者想要的状态,就立马通知观察者,这个时候观察者再做出对应的动作(调用对应的方法或者执行某段代码)。

在Android或者iOS的按钮点击事件就是典型的观察者模式,当按钮被点击时,OnClickListener会接收到通知,然后做出对应的动作。在这个栗子中,按钮(mBtn)是被观察者,OnClickListener是观察者,订阅是通过setOnClickListener完成的。在按钮被点击时,Android framework立即通知订阅了按钮点击事件的OnClickListener,OnClickListener则调用对应的方法,输出“ 点击了按钮”。

mBtn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                ELog.d("点击了按钮");
            }
        });

RxJava采用的就是这个观察者模式。因此它拥有Observable(被观察者)、Observer(观察者)、subscribe(订阅)、Event(事件)。Observer通过subscribe订阅Observable,当Observable发出Event来让Observer知道,Observer再进行相应动作。

Observer/Subscriber观察者

Observer在前面已经提到过了,这里Subscriber也是观察者,他和Observer的区别在于:

1️⃣Subscriber增加了onStart()方法,该方法在事件还未发送之前调用,可用于数据重置;

2️⃣Subscriber增加了unsubscribe()方法,该方法用于取消订阅,调用该方法之后,Subscriber不再接受事件。

日常开发中,这两个方法基本没有缺别。

Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String string) {
                Log.d("onNext",string);
            }

            @Override
            public void onCompleted() {
                Log.d("onCompleted", "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("onError", "Error!");
            }
        } ;

Subscriber和Observer的创建方式相似,不再赘述。这里对应有3个方法,分别是onNext(),onCompleted()和onError(),来对应被观察者发送的不同事件。

Observable(被观察者)

被观察者前面已经提到,RxJava创建被观察者方式:

rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("test");
                subscriber.onCompleted();
            }
        });

通过create创建被观察者是最基本的方式,还有如下方式:

(1)just(T):依次发送参数

rx.Observable observable = rx.Observable.just("I","am","heiheihei");

调用顺序为:

onNext(“I”)——>
onNext(“am”)——>
onNext(“heiheihei”)——>
onCompleted();

(2)from(T[]) 和 from(Iterrable<?extends T>):将数组或者Iterrable遍历,传入每个被遍历的对下你给,依次发出:

String [] paras = {"I","am","heiheihei"};
rx.Observable observable = rx.Observable.from(paras);

Subscribe(订阅)

有了被观察者和观察者,接下来就可以用观察者去订阅被观察者。

observable.subscribe(subscriber);

注意被观察者是在前面,和我们中文逻辑”观察者.subscribe(被观察者)”不一样。 以上这种方式是观察者和被观察者都完整的情况下。还有一种不完整的观察者:Action。其中Action0可以对应于Subscriber的complete函数,因为不带参数,Action1可对应于onNext和onError,带有参数。

Action0 completeAction = new Action0() {
     @Override
      public void call() {
        
      }
 };

Action1 <Integer> onNextAction1 = new Action1<Integer>() {
      @Override
      public void call(Integer o) {

         }
 };

Action1 <Throwable>errorAction1 = new Action1<Throwable>() {
      @Override
      public void call(Throwable throwable) {

          }
} ;
        
observable.subscribe(onNextAction1);
observable.subscribe(onNextAction1,errorAction1);
observable.subscribe(onNextAction1,errorAction1,completeAction);

线程转变

从前面的1-4我们了解了RxJava的基本概念和含义,在之前的代码中和我们并么有涉及到线程的问题,也就是线程是默认状态,在默认状态下,在哪个线程订阅事件,就在哪个线程产生事件,也在这个线程消费事件。但是这样就不能完成例如子线程请求数据,主线程刷新UI的操作。要完成这个功能,需要使用Schedule。

(1)Schedulers.immediate():默认情况,也就是在当前线程运行;

(2)Schedulers.newThread(): 启用新线程,并在新线程执行操作;

(3)Schedulers.io(): 有I/O 操作时使用的 Scheduler,包括读写文件、读写数据库、网络信息交互等。

(4)Schedulers.computation(): 计算所使用的 Scheduler,如图像计算;

(5) AndroidSchedulers.mainThread():主线程运行。

用法:

(1)subscribeOn():指定被观察者发出事件的线程,也就是事件产生线程;

(2)ObserveOn():指定观察者做出动作的线程,也就是事件消费线程。

这样说还是太抽象,我们举一个很常见的例子,从网络拉取数据,然后刷新UI:

rx.Observable observable =  rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                // 从网络获取用户名字
                String userName = 。。。。。。;
                subscriber.onNext(userName);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io()) // 指定事件发生在io线程,也就是拉取用户信息这一过程
                .observeOn(AndroidSchedulers.mainThread()) // 指定事件消费发生在主线程,也就是刷新UI这一过程
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        refreshUI(s);
                    }
                });

上面这个例子展示了在日常工作中常用的线程切换情况。

变换

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列!这句是网上抄的,感觉描述的比较清楚,还是看栗子吧。

1️⃣map(): 我所理解的map就是把传入的对象转换成另一个对象,然后再接着进行事件发送等过程。

rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("userIcon");
                subscriber.onCompleted();
            }
        }).map(new Func1<String, Image>() {
            @Override
            public Image call(String s) {
                return getImage(s);
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Image>() {
                    @Override
                    public void onCompleted() {
                        
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Image image) {
                        refreshUI(image);
                    }
                });

2️⃣flatMap():使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatmap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

图示:

直接上个例子:

Observable.just(person1,person2,person3)
  .flatMap(new Func1<Person, Observable<Pets>>() {
      @Override public Observable<Pets> call(Person person) {
        return Observable.from(person.getPets());
      }
    }).subscribe(new Action1<Pet>() {
      @Override public void call(Pet pat) {
        // 操作
      }
    });