一、是什么?
“a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。简单的异步处理,扩展的观察者模式,灵活的线程控制。
感谢掘金作者Season的Rxjava2.0系列文章。
Github项目地址:
https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid
二、关键词
Observable (可观察者,即被观察者)
Observer (观察者)
subscribe (订阅)
Consumer:只接收onNext()的观察者
Map:变换
FlatMap
Zip:合并
Flowable(背压,生产者的速度大于消费者的速度)
onCompleted(): 事件队列完结
onNext():普通事件
onError(): 事件队列异常
三、基本使用
1. 被观察者(Observable)
决定事件什么时候触发
- 使用Observable创建事件队列
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {/*** ObservableEmitter:它可以通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)* 发出next事件、complete(完成)事件和error(错误)事件** Observable可以发送无限个onNext, Observer也可以接收无限个onNext.** 当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送,* 而Observer收到onComplete事件之后将不再继续接收事件.** 当Observable发送了一个onError后, Observableon的Error之后的事件将继续发送,* 而Observer收到onError事件之后将不再继续接收事件.*/@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);// 完成emitter.onComplete();}});
2. 观察者(Observer)
可以调用Disposable的dispose()方法使Observer不在接收发过来的事件
如果有多个Disposable,RxJava中内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的事件。
事件触发后做什么
Observer接口
Observer<Integer> observer = new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "subscribe");}@Overridepublic void onNext(Integer value) {Log.e(TAG, "" + value);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "error", e);}@Overridepublic void onComplete() {Log.e(TAG, "complete");Log.e(TAG, "rx01: ====== end ======");}};
- Consumer接口,只接收onNext(),不接收其它的
// Consumer(消费者)表示只关心onNext事件Consumer<Integer> consumer = new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());Log.e(TAG, "onNext: " + integer);}};
3. 订阅者(Subscribe)
- 创建了
Observable和Observer之后,再用subscribe()方法将它们联结起来
observable.subscribe(observer);
- 当然也可以使用RxJava的链式编程
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 0; i < 3; i++) {Log.e(TAG, "发送: " + i);e.onNext(i);}e.onComplete();Log.e(TAG, "发送: " + 99);e.onNext(99);}}).subscribe(new Observer<Integer>() {// 调用dispose()会导致Observer不在接收事件private Disposable disposable;@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.e(TAG, "绑定: ");disposable = d;}@Overridepublic void onNext(@NonNull Integer integer) {Log.e(TAG, "接收: " + integer);if (integer == 1) {disposable.dispose();}}@Overridepublic void onError(@NonNull Throwable e) {Log.e(TAG, "onError: ", e);}@Overridepublic void onComplete() {Log.e(TAG, "onComplete: ");Log.e(TAG, "rx02: ====== end ======");}});}
四、线程调度(Scheduler)
Schedulers.immediate():在当前线程运行,默认模式Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。Schedulers.io(): I/O 操作,数据库,网络等Schedulers.computation(): 计算AndroidSchedulers.mainThread():Android主线程
// 后台线程取数据,主线程显示,适用于大多数observable// 指定 subscribe(被观察者)发生在 IO 线程,事件产生线程.subscribeOn(Schedulers.io())// 指定 Subscriber(观察者)的回调发生在主线程,事件消费线程.observeOn(AndroidSchedulers.mainThread()).subscribe(mSubscriber);
五、变换
1.Map
对原始Observable发出的每一项数据进行相应的操作后在发出。
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}).map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Exception {return "This is result " + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e(TAG, s);}});
2. flatMap
flatMap()接收一个Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
flatMap是没有顺序的,如果要顺序发送请使用concatMap
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);}}).flatMap(new Function<Integer, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(Integer integer) throws Exception {final List<String> list = new ArrayList<>();for (int i = 0; i < 3; i++) {list.add("I am value " + integer);}// 10毫秒的延时return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e(TAG, s);}});
六、Zip合并
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {@Overridepublic String apply(Integer integer, String s) throws Exception {return integer + s;}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "onSubscribe");}@Overridepublic void onNext(String value) {Log.e(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError");}@Overridepublic void onComplete() {Log.e(TAG, "onComplete");}});
七、Flowable(背压,生产者的速度大于消费者的速度)
如果Observable与Observer不在同一个线程,当被观察者的生产速度大于被观察者的消费速度时会抛出
MissingBackpressureException异常。在RxJava2中新增了Flowable专门用于专门应对背压(Backpressure)问题。
/*** Subscription.request()方法表示Subscriber要处理几个事件* emitter.requested()方法表示减少几个事件* 在Flowable中使用Subscription.cancel()关闭事件处理*/Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 128; i++) {Log.e(TAG, "emit " + i);emitter.onNext(i);}}// BackpressureStrategy.ERROR 默认128kb 超出会抛出MissingBackpressureException异常// BackpressureStrategy.BUFFER 无大小限制// BackpressureStrategy.DROP 存不下的事件直接丢弃// BackpressureStrategy.LATEST 只保留最新的事件,与DROP相反}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())// .onBackpressureDrop()//添加背压策略.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.e(TAG, "onSubscribe");// 全局的mSubscription = s;// 处理多少个发送过来的事件mSubscription.request(Long.MAX_VALUE);// 取消接收事件// mSubscription.cancel();}@Overridepublic void onNext(Integer integer) {Log.e(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.e(TAG, "onComplete");}});
八、其它API
Scan:累加器
Filter:过滤器
take()、takeLast():只发送前N个元素、只发送后N个元素
Skip()、SkipLast():不发送前N个元素、不发送后N个元素
distinct:仅处理一次
ElementAt():只发送第N个元素
Sample():定期发射Observable最近发射的数据项
Merge():合并多个Observables的发射物,多输入,单输出
startWith():在开头插入一条
