一、是什么?
“a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。简单的异步处理,扩展的观察者模式,灵活的线程控制。
感谢掘金作者Season的Rxjava2.0系列文章。
Github项目地址:
https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid
二、关键词
Observable (可观察者,即被观察者)
Observer (观察者)
subscribe (订阅)
Consumer:只接收onNext()的观察者
Map:变换
FlatMap
Zip:合并
Flowable(背压,生产者的速度大于消费者的速度)
onCompleted(): 事件队列完结
onNext():普通事件
onError(): 事件队列异常
三、基本使用
1. 被观察者(Observable)
决定事件什么时候触发
- 使用Observable创建事件队列
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
/**
* ObservableEmitter:它可以通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)
* 发出next事件、complete(完成)事件和error(错误)事件
*
* Observable可以发送无限个onNext, Observer也可以接收无限个onNext.
*
* 当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送,
* 而Observer收到onComplete事件之后将不再继续接收事件.
*
* 当Observable发送了一个onError后, Observableon的Error之后的事件将继续发送,
* 而Observer收到onError事件之后将不再继续接收事件.
*/
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 完成
emitter.onComplete();
}
});
2. 观察者(Observer)
可以调用Disposable的dispose()方法使Observer不在接收发过来的事件
如果有多个Disposable,RxJava中内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的事件。
事件触发后做什么
Observer
接口
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "error", e);
}
@Override
public void onComplete() {
Log.e(TAG, "complete");
Log.e(TAG, "rx01: ====== end ======");
}
};
- Consumer接口,只接收onNext(),不接收其它的
// Consumer(消费者)表示只关心onNext事件
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.e(TAG, "onNext: " + integer);
}
};
3. 订阅者(Subscribe)
- 创建了
Observable
和Observer
之后,再用subscribe()
方法将它们联结起来
observable.subscribe(observer);
- 当然也可以使用RxJava的链式编程
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 3; i++) {
Log.e(TAG, "发送: " + i);
e.onNext(i);
}
e.onComplete();
Log.e(TAG, "发送: " + 99);
e.onNext(99);
}
}).subscribe(new Observer<Integer>() {
// 调用dispose()会导致Observer不在接收事件
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "绑定: ");
disposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "接收: " + integer);
if (integer == 1) {
disposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
Log.e(TAG, "rx02: ====== end ======");
}
});
}
四、线程调度(Scheduler)
Schedulers.immediate()
:在当前线程运行,默认模式Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。Schedulers.io()
: I/O 操作,数据库,网络等Schedulers.computation()
: 计算AndroidSchedulers.mainThread()
:Android主线程
// 后台线程取数据,主线程显示,适用于大多数
observable
// 指定 subscribe(被观察者)发生在 IO 线程,事件产生线程
.subscribeOn(Schedulers.io())
// 指定 Subscriber(观察者)的回调发生在主线程,事件消费线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);
五、变换
1.Map
对原始Observable发出的每一项数据进行相应的操作后在发出。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});
2. flatMap
flatMap()接收一个Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
flatMap是没有顺序的,如果要顺序发送请使用concatMap
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
// 10毫秒的延时
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, s);
}
});
六、Zip合并
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.e(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
七、Flowable(背压,生产者的速度大于消费者的速度)
如果Observable与Observer不在同一个线程,当被观察者的生产速度大于被观察者的消费速度时会抛出
MissingBackpressureException
异常。在RxJava2中新增了Flowable专门用于专门应对背压(Backpressure)问题。
/**
* Subscription.request()方法表示Subscriber要处理几个事件
* emitter.requested()方法表示减少几个事件
* 在Flowable中使用Subscription.cancel()关闭事件处理
*/
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.e(TAG, "emit " + i);
emitter.onNext(i);
}
}
// BackpressureStrategy.ERROR 默认128kb 超出会抛出MissingBackpressureException异常
// BackpressureStrategy.BUFFER 无大小限制
// BackpressureStrategy.DROP 存不下的事件直接丢弃
// BackpressureStrategy.LATEST 只保留最新的事件,与DROP相反
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
// .onBackpressureDrop()//添加背压策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe");
// 全局的
mSubscription = s;
// 处理多少个发送过来的事件
mSubscription.request(Long.MAX_VALUE);
// 取消接收事件
// mSubscription.cancel();
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
八、其它API
Scan:累加器
Filter:过滤器
take()、takeLast():只发送前N个元素、只发送后N个元素
Skip()、SkipLast():不发送前N个元素、不发送后N个元素
distinct:仅处理一次
ElementAt():只发送第N个元素
Sample():定期发射Observable最近发射的数据项
Merge():合并多个Observables的发射物,多输入,单输出
startWith():在开头插入一条