概述
RxJava是目前团队使用的技术之一,因此进行了初步的了解。
GitHub地址:https://github.com/ReactiveX/RxJava
RxJava的核心设计思想是“观察者模式”。简言之就是:观察者模式=出版者(被观察者)+ 观察者。当被观察者发生改变时,观察者就能立刻发生对应的动作。这个实现并不需要观察者每隔一段时间(例如1S)去轮训被观察者是否改变到了自己需要的状态,而是采用订阅(Subscribe)的方式,每当被观察者改变到观察者想要的状态,就立马通知观察者,这个时候观察者再做出对应的动作(调用对应的方法或者执行某段代码)。
在Android或者iOS的按钮点击事件就是典型的观察者模式,当按钮被点击时,OnClickListener会接收到通知,然后做出对应的动作。在这个栗子中,按钮(mBtn)是被观察者,OnClickListener是观察者,订阅是通过setOnClickListener完成的。在按钮被点击时,Android framework立即通知订阅了按钮点击事件的OnClickListener,OnClickListener则调用对应的方法,输出“ 点击了按钮”。
mBtn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
ELog.d("点击了按钮");
}
});
RxJava采用的就是这个观察者模式。因此它拥有Observable(被观察者)、Observer(观察者)、subscribe(订阅)、Event(事件)。Observer通过subscribe订阅Observable,当Observable发出Event来让Observer知道,Observer再进行相应动作。
Observer/Subscriber观察者
Observer在前面已经提到过了,这里Subscriber也是观察者,他和Observer的区别在于:
1️⃣Subscriber增加了onStart()方法,该方法在事件还未发送之前调用,可用于数据重置;
2️⃣Subscriber增加了unsubscribe()方法,该方法用于取消订阅,调用该方法之后,Subscriber不再接受事件。
日常开发中,这两个方法基本没有缺别。
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String string) {
Log.d("onNext",string);
}
@Override
public void onCompleted() {
Log.d("onCompleted", "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("onError", "Error!");
}
} ;
Subscriber和Observer的创建方式相似,不再赘述。这里对应有3个方法,分别是onNext(),onCompleted()和onError(),来对应被观察者发送的不同事件。
Observable(被观察者)
被观察者前面已经提到,RxJava创建被观察者方式:
rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("test");
subscriber.onCompleted();
}
});
通过create创建被观察者是最基本的方式,还有如下方式:
(1)just(T):依次发送参数
rx.Observable observable = rx.Observable.just("I","am","heiheihei");
调用顺序为:
onNext(“I”)——> onNext(“am”)——> onNext(“heiheihei”)——> onCompleted();
(2)from(T[]) 和 from(Iterrable<?extends T>):将数组或者Iterrable遍历,传入每个被遍历的对下你给,依次发出:
String [] paras = {"I","am","heiheihei"};
rx.Observable observable = rx.Observable.from(paras);
Subscribe(订阅)
有了被观察者和观察者,接下来就可以用观察者去订阅被观察者。
observable.subscribe(subscriber);
注意被观察者是在前面,和我们中文逻辑”观察者.subscribe(被观察者)”不一样。 以上这种方式是观察者和被观察者都完整的情况下。还有一种不完整的观察者:Action。其中Action0可以对应于Subscriber的complete函数,因为不带参数,Action1可对应于onNext和onError,带有参数。
Action0 completeAction = new Action0() {
@Override
public void call() {
}
};
Action1 <Integer> onNextAction1 = new Action1<Integer>() {
@Override
public void call(Integer o) {
}
};
Action1 <Throwable>errorAction1 = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
} ;
observable.subscribe(onNextAction1);
observable.subscribe(onNextAction1,errorAction1);
observable.subscribe(onNextAction1,errorAction1,completeAction);
线程转变
从前面的1-4我们了解了RxJava的基本概念和含义,在之前的代码中和我们并么有涉及到线程的问题,也就是线程是默认状态,在默认状态下,在哪个线程订阅事件,就在哪个线程产生事件,也在这个线程消费事件。但是这样就不能完成例如子线程请求数据,主线程刷新UI的操作。要完成这个功能,需要使用Schedule。
(1)Schedulers.immediate():默认情况,也就是在当前线程运行;
(2)Schedulers.newThread(): 启用新线程,并在新线程执行操作;
(3)Schedulers.io(): 有I/O 操作时使用的 Scheduler,包括读写文件、读写数据库、网络信息交互等。
(4)Schedulers.computation(): 计算所使用的 Scheduler,如图像计算;
(5) AndroidSchedulers.mainThread():主线程运行。
用法:
(1)subscribeOn():指定被观察者发出事件的线程,也就是事件产生线程;
(2)ObserveOn():指定观察者做出动作的线程,也就是事件消费线程。
这样说还是太抽象,我们举一个很常见的例子,从网络拉取数据,然后刷新UI:
rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
// 从网络获取用户名字
String userName = 。。。。。。;
subscriber.onNext(userName);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()) // 指定事件发生在io线程,也就是拉取用户信息这一过程
.observeOn(AndroidSchedulers.mainThread()) // 指定事件消费发生在主线程,也就是刷新UI这一过程
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
refreshUI(s);
}
});
上面这个例子展示了在日常工作中常用的线程切换情况。
变换
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列!这句是网上抄的,感觉描述的比较清楚,还是看栗子吧。
1️⃣map(): 我所理解的map就是把传入的对象转换成另一个对象,然后再接着进行事件发送等过程。
rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("userIcon");
subscriber.onCompleted();
}
}).map(new Func1<String, Image>() {
@Override
public Image call(String s) {
return getImage(s);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Image>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Image image) {
refreshUI(image);
}
});
2️⃣flatMap():使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatmap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
图示:
直接上个例子:
Observable.just(person1,person2,person3)
.flatMap(new Func1<Person, Observable<Pets>>() {
@Override public Observable<Pets> call(Person person) {
return Observable.from(person.getPets());
}
}).subscribe(new Action1<Pet>() {
@Override public void call(Pet pat) {
// 操作
}
});