一、是什么?

  1. “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。简单的异步处理,扩展的观察者模式,灵活的线程控制。

  2. 感谢掘金作者Season的Rxjava2.0系列文章。

  3. Github项目地址:https://github.com/ReactiveX/RxJava https://github.com/ReactiveX/RxAndroid

二、关键词

  1. Observable (可观察者,即被观察者)

  2. Observer (观察者)

  3. subscribe (订阅)

  4. Consumer:只接收onNext()的观察者

  5. Map:变换

  6. FlatMap

  7. Zip:合并

  8. Flowable(背压,生产者的速度大于消费者的速度)

  9. onCompleted(): 事件队列完结

  10. onNext():普通事件

  11. onError(): 事件队列异常

三、基本使用

1. 被观察者(Observable)

决定事件什么时候触发

  1. 使用Observable创建事件队列
  1. Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
  2. /**
  3. * ObservableEmitter:它可以通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)
  4. * 发出next事件、complete(完成)事件和error(错误)事件
  5. *
  6. * Observable可以发送无限个onNext, Observer也可以接收无限个onNext.
  7. *
  8. * 当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送,
  9. * 而Observer收到onComplete事件之后将不再继续接收事件.
  10. *
  11. * 当Observable发送了一个onError后, Observableon的Error之后的事件将继续发送,
  12. * 而Observer收到onError事件之后将不再继续接收事件.
  13. */
  14. @Override
  15. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  16. emitter.onNext(1);
  17. emitter.onNext(2);
  18. emitter.onNext(3);
  19. // 完成
  20. emitter.onComplete();
  21. }
  22. });

2. 观察者(Observer)

可以调用Disposable的dispose()方法使Observer不在接收发过来的事件
如果有多个Disposable,RxJava中内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的事件。

事件触发后做什么

  1. Observer接口
  1. Observer<Integer> observer = new Observer<Integer>() {
  2. @Override
  3. public void onSubscribe(Disposable d) {
  4. Log.e(TAG, "subscribe");
  5. }
  6. @Override
  7. public void onNext(Integer value) {
  8. Log.e(TAG, "" + value);
  9. }
  10. @Override
  11. public void onError(Throwable e) {
  12. Log.e(TAG, "error", e);
  13. }
  14. @Override
  15. public void onComplete() {
  16. Log.e(TAG, "complete");
  17. Log.e(TAG, "rx01: ====== end ======");
  18. }
  19. };
  1. Consumer接口,只接收onNext(),不接收其它的
  1. // Consumer(消费者)表示只关心onNext事件
  2. Consumer<Integer> consumer = new Consumer<Integer>() {
  3. @Override
  4. public void accept(Integer integer) throws Exception {
  5. Log.e(TAG, "Observer thread is :" + Thread.currentThread().getName());
  6. Log.e(TAG, "onNext: " + integer);
  7. }
  8. };

3. 订阅者(Subscribe)

  1. 创建了 ObservableObserver 之后,再用 subscribe() 方法将它们联结起来
  1. observable.subscribe(observer);
  1. 当然也可以使用RxJava的链式编程
  1. Observable.create(new ObservableOnSubscribe<Integer>() {
  2. @Override
  3. public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
  4. for (int i = 0; i < 3; i++) {
  5. Log.e(TAG, "发送: " + i);
  6. e.onNext(i);
  7. }
  8. e.onComplete();
  9. Log.e(TAG, "发送: " + 99);
  10. e.onNext(99);
  11. }
  12. }).subscribe(new Observer<Integer>() {
  13. // 调用dispose()会导致Observer不在接收事件
  14. private Disposable disposable;
  15. @Override
  16. public void onSubscribe(@NonNull Disposable d) {
  17. Log.e(TAG, "绑定: ");
  18. disposable = d;
  19. }
  20. @Override
  21. public void onNext(@NonNull Integer integer) {
  22. Log.e(TAG, "接收: " + integer);
  23. if (integer == 1) {
  24. disposable.dispose();
  25. }
  26. }
  27. @Override
  28. public void onError(@NonNull Throwable e) {
  29. Log.e(TAG, "onError: ", e);
  30. }
  31. @Override
  32. public void onComplete() {
  33. Log.e(TAG, "onComplete: ");
  34. Log.e(TAG, "rx02: ====== end ======");
  35. }
  36. });
  37. }

四、线程调度(Scheduler)

  • Schedulers.immediate():在当前线程运行,默认模式

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

  • Schedulers.io(): I/O 操作,数据库,网络等

  • Schedulers.computation(): 计算

  • AndroidSchedulers.mainThread():Android主线程

  1. // 后台线程取数据,主线程显示,适用于大多数
  2. observable
  3. // 指定 subscribe(被观察者)发生在 IO 线程,事件产生线程
  4. .subscribeOn(Schedulers.io())
  5. // 指定 Subscriber(观察者)的回调发生在主线程,事件消费线程
  6. .observeOn(AndroidSchedulers.mainThread())
  7. .subscribe(mSubscriber);

五、变换

1.Map

对原始Observable发出的每一项数据进行相应的操作后在发出。

  1. Observable.create(new ObservableOnSubscribe<Integer>() {
  2. @Override
  3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  4. emitter.onNext(1);
  5. emitter.onNext(2);
  6. emitter.onNext(3);
  7. }
  8. }).map(new Function<Integer, String>() {
  9. @Override
  10. public String apply(Integer integer) throws Exception {
  11. return "This is result " + integer;
  12. }
  13. }).subscribe(new Consumer<String>() {
  14. @Override
  15. public void accept(String s) throws Exception {
  16. Log.e(TAG, s);
  17. }
  18. });

2. flatMap

flatMap()接收一个Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

flatMap是没有顺序的,如果要顺序发送请使用concatMap

  1. Observable.create(new ObservableOnSubscribe<Integer>() {
  2. @Override
  3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  4. emitter.onNext(1);
  5. emitter.onNext(2);
  6. emitter.onNext(3);
  7. }
  8. }).flatMap(new Function<Integer, ObservableSource<String>>() {
  9. @Override
  10. public ObservableSource<String> apply(Integer integer) throws Exception {
  11. final List<String> list = new ArrayList<>();
  12. for (int i = 0; i < 3; i++) {
  13. list.add("I am value " + integer);
  14. }
  15. // 10毫秒的延时
  16. return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
  17. }
  18. }).subscribeOn(Schedulers.io())
  19. .observeOn(AndroidSchedulers.mainThread())
  20. .subscribe(new Consumer<String>() {
  21. @Override
  22. public void accept(String s) throws Exception {
  23. Log.e(TAG, s);
  24. }
  25. });

六、Zip合并

Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

  1. Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
  2. @Override
  3. public String apply(Integer integer, String s) throws Exception {
  4. return integer + s;
  5. }
  6. }).subscribe(new Observer<String>() {
  7. @Override
  8. public void onSubscribe(Disposable d) {
  9. Log.e(TAG, "onSubscribe");
  10. }
  11. @Override
  12. public void onNext(String value) {
  13. Log.e(TAG, "onNext: " + value);
  14. }
  15. @Override
  16. public void onError(Throwable e) {
  17. Log.e(TAG, "onError");
  18. }
  19. @Override
  20. public void onComplete() {
  21. Log.e(TAG, "onComplete");
  22. }
  23. });

七、Flowable(背压,生产者的速度大于消费者的速度)

如果Observable与Observer不在同一个线程,当被观察者的生产速度大于被观察者的消费速度时会抛出MissingBackpressureException异常。在RxJava2中新增了Flowable专门用于专门应对背压(Backpressure)问题。

  1. /**
  2. * Subscription.request()方法表示Subscriber要处理几个事件
  3. * emitter.requested()方法表示减少几个事件
  4. * 在Flowable中使用Subscription.cancel()关闭事件处理
  5. */
  6. Flowable.create(new FlowableOnSubscribe<Integer>() {
  7. @Override
  8. public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
  9. for (int i = 0; i < 128; i++) {
  10. Log.e(TAG, "emit " + i);
  11. emitter.onNext(i);
  12. }
  13. }
  14. // BackpressureStrategy.ERROR 默认128kb 超出会抛出MissingBackpressureException异常
  15. // BackpressureStrategy.BUFFER 无大小限制
  16. // BackpressureStrategy.DROP 存不下的事件直接丢弃
  17. // BackpressureStrategy.LATEST 只保留最新的事件,与DROP相反
  18. }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
  19. // .onBackpressureDrop()//添加背压策略
  20. .observeOn(AndroidSchedulers.mainThread())
  21. .subscribe(new Subscriber<Integer>() {
  22. @Override
  23. public void onSubscribe(Subscription s) {
  24. Log.e(TAG, "onSubscribe");
  25. // 全局的
  26. mSubscription = s;
  27. // 处理多少个发送过来的事件
  28. mSubscription.request(Long.MAX_VALUE);
  29. // 取消接收事件
  30. // mSubscription.cancel();
  31. }
  32. @Override
  33. public void onNext(Integer integer) {
  34. Log.e(TAG, "onNext: " + integer);
  35. }
  36. @Override
  37. public void onError(Throwable t) {
  38. Log.w(TAG, "onError: ", t);
  39. }
  40. @Override
  41. public void onComplete() {
  42. Log.e(TAG, "onComplete");
  43. }
  44. });

八、其它API

  1. Scan:累加器

  2. Filter:过滤器

  3. take()、takeLast():只发送前N个元素、只发送后N个元素

  4. Skip()、SkipLast():不发送前N个元素、不发送后N个元素

  5. distinct:仅处理一次

  6. ElementAt():只发送第N个元素

  7. Sample():定期发射Observable最近发射的数据项

  8. Merge():合并多个Observables的发射物,多输入,单输出

  9. startWith():在开头插入一条

九、附录

  1. 本例源码:https://github.com/sdwfqin/AndroidSamples