1、RxJava概述

  • RxJava 是一个基于事件流、实现异步操作的库
  • Rxjava原理基于一种扩展的观察者模式,有4个角色:
    • 被观察者(Observable):产生事件
    • 观察者(Observer):接收事件,并给出响应动作
    • 订阅(Subscribe):连接被观察者 & 观察者
    • 事件(Event):被观察者 & 观察者沟通的载体
  • RxAndroid:https://github.com/ReactiveX/RxAndroid
  • RxJava:https://github.com/ReactiveX/RxJava
  • 添加依赖

    1. implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    2. implementation 'io.reactivex.rxjava2:rxjava:2.0.7'

    2、RxJava基本使用

    2-1、订阅

    ```json

    1. // 1、创建被观察者Observable
    2. Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    3. /**
    4. * 被观察者Observable的subscribe中会使用ObservableEmitter发送事件,观察者响应对应的事件
    5. */
    6. @Override
    7. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    8. Log.d(TAG, "subscribe: ");
    9. }
    10. });
  1. // 2、创建观察者Observer
  2. Observer<Integer> observer = new Observer<Integer>() {
  3. /**
  4. * 观察者接收事件前,默认最先调用复写 onSubscribe()
  5. */
  6. @Override
  7. public void onSubscribe(Disposable d) {
  8. Log.d(TAG, "onSubscribe: "+d.isDisposed());
  9. }
  10. /**
  11. * 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
  12. */
  13. @Override
  14. public void onNext(Integer value) {
  15. Log.d(TAG, "对Next事件作出响应" + value);
  16. }
  17. /**
  18. * 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
  19. */
  20. @Override
  21. public void onError(Throwable e) {
  22. Log.d(TAG, "对Error事件作出响应");
  23. }
  24. /**
  25. * 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
  26. */
  27. @Override
  28. public void onComplete() {
  29. Log.d(TAG, "对Complete事件作出响应");
  30. }
  31. };
  32. // 3、当 Observable 被订阅后,观察者的Observer的OnSubscribe方法会自动被调用,被观察者Observable的subscribe方法会被调用
  33. observable.subscribe(observer);
  1. - 上述代码的效果基本如下
  2. ```json
  3. D/MainActivity: onSubscribe: false
  4. D/MainActivity: subscribe:

2-2、发送事件

  1. Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
  2. @Override
  3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  4. Log.d(TAG, "subscribe: ");
  5. // ObservableEmitter 事件发射器,向观察者发送事件
  6. Log.d(TAG, "subscribe: 1 start");
  7. emitter.onNext(1);
  8. Log.d(TAG, "subscribe: 1 end");
  9. Log.d(TAG, "subscribe: 2 start");
  10. emitter.onNext(2);
  11. Log.d(TAG, "subscribe: 2 end");
  12. Log.d(TAG, "subscribe: 3 start");
  13. emitter.onNext(3);
  14. Log.d(TAG, "subscribe: 3 end");
  15. Log.d(TAG, "subscribe: onComplete start");
  16. emitter.onComplete();
  17. Log.d(TAG, "subscribe: onComplete end");
  18. }
  19. });
  20. Observer<Integer> observer = new Observer<Integer>() {
  21. @Override
  22. public void onSubscribe(Disposable d) {
  23. Log.d(TAG, "onSubscribe: "+d.isDisposed());
  24. }
  25. @Override
  26. public void onNext(Integer value) {
  27. Log.d(TAG, "对Next事件作出响应" + value);
  28. }
  29. @Override
  30. public void onError(Throwable e) {
  31. Log.d(TAG, "对Error事件作出响应");
  32. }
  33. @Override
  34. public void onComplete() {
  35. Log.d(TAG, "对Complete事件作出响应");
  36. }
  37. };
  38. observable.subscribe(observer);
  • 代码执行结果

    1. D/MainActivity: onSubscribe: false
    2. D/MainActivity: subscribe:
    3. D/MainActivity: subscribe: 1 start
    4. D/MainActivity: Next事件作出响应1
    5. D/MainActivity: subscribe: 1 end
    6. D/MainActivity: subscribe: 2 start
    7. D/MainActivity: Next事件作出响应2
    8. D/MainActivity: subscribe: 2 end
    9. D/MainActivity: subscribe: 3 start
    10. D/MainActivity: Next事件作出响应3
    11. D/MainActivity: subscribe: 3 end
    12. D/MainActivity: subscribe: onComplete start
    13. D/MainActivity: Complete事件作出响应
    14. D/MainActivity: subscribe: onComplete end

    2-3、链式调用

    1. Observable.create(new ObservableOnSubscribe<Integer>() {
    2. @Override
    3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    4. emitter.onNext(1);
    5. emitter.onNext(2);
    6. emitter.onNext(3);
    7. emitter.onComplete();
    8. }
    9. }).subscribe(new Observer<Integer>() {
    10. @Override
    11. public void onSubscribe(Disposable d) {
    12. Log.d(TAG, "开始采用subscribe连接");
    13. }
    14. // 默认最先调用复写的 onSubscribe()
    15. @Override
    16. public void onNext(Integer value) {
    17. Log.d(TAG, "对Next事件"+ value +"作出响应" );
    18. }
    19. @Override
    20. public void onError(Throwable e) {
    21. Log.d(TAG, "对Error事件作出响应");
    22. }
    23. @Override
    24. public void onComplete() {
    25. Log.d(TAG, "对Complete事件作出响应");
    26. }
    27. });

    3、Observable的创建操作符

    3-1、create

  • 参考上述2里面的代码例子

    3-2、just

  • Observable.just(1, 2, 3, 4):最多10个参数

  • 相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)、onComplete()

    1. // 1. 创建时传入整型1、2、3、4
    2. // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)、onComplete()
    3. Observable.just(1, 2, 3, 4)
    4. .subscribe(new Observer<Integer>() {
    5. @Override
    6. public void onSubscribe(Disposable d) {
    7. Log.d(TAG, "开始采用subscribe连接");
    8. }
    9. // 默认最先调用复写的 onSubscribe()
    10. @Override
    11. public void onNext(Integer value) {
    12. Log.d(TAG, "接收到了事件" + value);
    13. }
    14. @Override
    15. public void onError(Throwable e) {
    16. Log.d(TAG, "对Error事件作出响应");
    17. }
    18. @Override
    19. public void onComplete() {
    20. Log.d(TAG, "对Complete事件作出响应");
    21. }
    22. });

    3-3、fromArray

  • Observable.fromArray(items)

  • 相当于执行了onNext(0)、onNext(1)、onNext(2)、onNext(3)、onComplete()

    1. Integer[] items = {0, 1, 2, 3};
    2. Observable.fromArray(items)
    3. .subscribe(new Observer<Integer>() {
    4. @Override
    5. public void onSubscribe(Disposable d) {
    6. Log.d(TAG, "开始采用subscribe连接");
    7. }
    8. @Override
    9. public void onNext(Integer value) {
    10. Log.d(TAG, "接收到了事件" + value);
    11. }
    12. @Override
    13. public void onError(Throwable e) {
    14. Log.d(TAG, "对Error事件作出响应");
    15. }
    16. @Override
    17. public void onComplete() {
    18. Log.d(TAG, "对Complete事件作出响应");
    19. }
    20. });

    3-4、fromIterable

  • 相当于执行了onNext(1)、onNext(2)、onNext(3)、onComplete()

    1. List<Integer> list = new ArrayList<>();
    2. list.add(1);
    3. list.add(2);
    4. list.add(3);
    5. // 2. 通过fromIterable()将集合中的对象 / 数据发送出去
    6. Observable.fromIterable(list)
    7. .subscribe(new Observer<Integer>() {
    8. @Override
    9. public void onSubscribe(Disposable d) {
    10. Log.d(TAG, "集合遍历");
    11. }
    12. @Override
    13. public void onNext(Integer value) {
    14. Log.d(TAG, "集合中的数据元素 = " + value);
    15. }
    16. @Override
    17. public void onError(Throwable e) {
    18. Log.d(TAG, "对Error事件作出响应");
    19. }
    20. @Override
    21. public void onComplete() {
    22. Log.d(TAG, "遍历结束");
    23. }
    24. });

    3-5、range

  • 连续发送 1个事件序列,可指定范围

    1. // 参数说明:
    2. // 参数1 = 事件序列起始点;
    3. // 参数2 = 事件数量;
    4. // 注:若设置为负数,则会抛出异常
    5. Observable.range(3, 10)
    6. // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
    7. .subscribe(new Observer<Integer>() {
    8. @Override
    9. public void onSubscribe(Disposable d) {
    10. Log.d(TAG, "开始采用subscribe连接");
    11. }
    12. // 默认最先调用复写的 onSubscribe()
    13. @Override
    14. public void onNext(Integer value) {
    15. Log.d(TAG, "接收到了事件" + value);
    16. }
    17. @Override
    18. public void onError(Throwable e) {
    19. Log.d(TAG, "对Error事件作出响应");
    20. }
    21. @Override
    22. public void onComplete() {
    23. Log.d(TAG, "对Complete事件作出响应");
    24. }
    25. });

    3-6、rangeLong

  • 类似range,不过事件类型是Long

    1. // 参数说明:
    2. // 参数1 = 事件序列起始点;
    3. // 参数2 = 事件数量;
    4. // 注:若设置为负数,则会抛出异常
    5. Observable.rangeLong(3, 10)
    6. // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
    7. .subscribe(new Observer<Long>() {
    8. @Override
    9. public void onSubscribe(Disposable d) {
    10. Log.d(TAG, "开始采用subscribe连接");
    11. }
    12. // 默认最先调用复写的 onSubscribe()
    13. @Override
    14. public void onNext(Long value) {
    15. Log.d(TAG, "接收到了事件" + value);
    16. }
    17. @Override
    18. public void onError(Throwable e) {
    19. Log.d(TAG, "对Error事件作出响应");
    20. }
    21. @Override
    22. public void onComplete() {
    23. Log.d(TAG, "对Complete事件作出响应");
    24. }
    25. });

    3-7、interval

  • 每隔指定时间 就发送事件

  • interval默认在computation调度器上执行
  • 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)

    1. // 参数说明:
    2. // 参数1 = 第1次延迟时间;
    3. // 参数2 = 间隔时间数字;
    4. // 参数3 = 时间单位;
    5. // 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
    6. Observable.interval(3,1, TimeUnit.SECONDS)
    7. .subscribe(new Observer<Long>() {
    8. @Override
    9. public void onSubscribe(Disposable d) {
    10. Log.d(TAG, "开始采用subscribe连接");
    11. }
    12. // 默认最先调用复写的 onSubscribe()
    13. @Override
    14. public void onNext(Long value) {
    15. Log.d(TAG, "接收到了事件"+ value );
    16. }
    17. @Override
    18. public void onError(Throwable e) {
    19. Log.d(TAG, "对Error事件作出响应");
    20. }
    21. @Override
    22. public void onComplete() {
    23. Log.d(TAG, "对Complete事件作出响应");
    24. }
    25. });

    3-8、intervalRange

  • 每隔指定时间就发送 事件,可指定发送的数据的数量

    1. // 参数说明:
    2. // 参数1 = 事件序列起始点;
    3. // 参数2 = 事件数量;
    4. // 参数3 = 第1次事件延迟发送时间;
    5. // 参数4 = 间隔时间数字;
    6. // 参数5 = 时间单位
    7. // 该例子发送的事件序列特点:
    8. // 1. 从3开始,一共发送10个事件;
    9. // 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从3开始递增,3 4 5 6 7 8 9 10 11 12)
    10. Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
    11. .subscribe(new Observer<Long>() {
    12. @Override
    13. public void onSubscribe(Disposable d) {
    14. Log.d(TAG, "开始采用subscribe连接");
    15. }
    16. @Override
    17. public void onNext(Long value) {
    18. Log.d(TAG, "接收到了事件" + value);
    19. }
    20. @Override
    21. public void onError(Throwable e) {
    22. Log.d(TAG, "对Error事件作出响应");
    23. }
    24. @Override
    25. public void onComplete() {
    26. Log.d(TAG, "对Complete事件作出响应");
    27. }
    28. });

    3-9、timer

  • 延迟指定时间后,发送1个数值0(Long类型)

    1. Observable.timer(2, TimeUnit.SECONDS)
    2. .subscribe(new Observer<Long>() {
    3. @Override
    4. public void onSubscribe(Disposable d) {
    5. Log.d(TAG, "开始采用subscribe连接");
    6. }
    7. @Override
    8. public void onNext(Long value) {
    9. Log.d(TAG, "接收到了事件" + value);
    10. }
    11. @Override
    12. public void onError(Throwable e) {
    13. Log.d(TAG, "对Error事件作出响应");
    14. }
    15. @Override
    16. public void onComplete() {
    17. Log.d(TAG, "对Complete事件作出响应");
    18. }
    19. });
  • timer操作符默认运行在一个新线程上,也可自定义线程调度器(第3个参数)timer(long,TimeUnit,Scheduler)

    3-10、empty、error、never

  • empty():仅发送Complete事件,直接通知完成

    1. // 仅发送Complete事件,直接通知完成
    2. Observable.empty().subscribe(new Observer<Object>() {
    3. @Override
    4. public void onSubscribe(Disposable d) {
    5. Log.d(TAG, "开始采用subscribe连接");
    6. }
    7. @Override
    8. public void onNext(Object value) {
    9. Log.d(TAG, "接收到了事件" + value);
    10. }
    11. @Override
    12. public void onError(Throwable e) {
    13. Log.d(TAG, "对Error事件作出响应");
    14. }
    15. @Override
    16. public void onComplete() {
    17. Log.d(TAG, "对Complete事件作出响应");
    18. }
    19. });
  • error:仅发送Error事件,直接通知异常

    1. // 仅发送onError事件,直接通知异常
    2. Observable.error(new RuntimeException()).subscribe(new Observer<Object>() {
    3. @Override
    4. public void onSubscribe(Disposable d) {
    5. Log.d(TAG, "开始采用subscribe连接");
    6. }
    7. @Override
    8. public void onNext(Object value) {
    9. Log.d(TAG, "接收到了事件" + value);
    10. }
    11. @Override
    12. public void onError(Throwable e) {
    13. Log.d(TAG, "对Error事件作出响应"+e.toString());
    14. }
    15. @Override
    16. public void onComplete() {
    17. Log.d(TAG, "对Complete事件作出响应");
    18. }
    19. });
  • never:不发送任何事件

    1. Observable.never().subscribe(new Observer<Object>() {
    2. @Override
    3. public void onSubscribe(Disposable d) {
    4. Log.d(TAG, "开始采用subscribe连接");
    5. }
    6. @Override
    7. public void onNext(Object value) {
    8. Log.d(TAG, "接收到了事件" + value);
    9. }
    10. @Override
    11. public void onError(Throwable e) {
    12. Log.d(TAG, "对Error事件作出响应"+e.toString());
    13. }
    14. @Override
    15. public void onComplete() {
    16. Log.d(TAG, "对Complete事件作出响应");
    17. }
    18. });

    3-11、defer

  • 直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable)+发送事件

    1. // 第1次赋值
    2. final Bundle bundle = new Bundle();
    3. bundle.putInt("key",1);
    4. // 2. 通过defer 定义被观察者对象 注:此时被观察者对象还没创建
    5. Observable<Integer> observable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {
    6. @Override
    7. public ObservableSource<? extends Integer> get() throws Throwable {
    8. return Observable.just(bundle.getInt("key"));
    9. }
    10. });
    11. // 2. 第2次赋值
    12. bundle.putInt("key",2);
    13. // 3. 观察者开始订阅,注:此时,才会调用defer()创建被观察者对象(Observable)
    14. observable.subscribe(new Observer<Integer>() {
    15. @Override
    16. public void onSubscribe(Disposable d) {
    17. Log.d(TAG, "开始采用subscribe连接");
    18. }
    19. @Override
    20. public void onNext(Integer value) {
    21. Log.d(TAG, "接收到的整数是" + value);
    22. }
    23. @Override
    24. public void onError(Throwable e) {
    25. Log.d(TAG, "对Error事件作出响应");
    26. }
    27. @Override
    28. public void onComplete() {
    29. Log.d(TAG, "对Complete事件作出响应");
    30. }
    31. });
  • 运行结果(订阅时才创建,此时的值是2,不是1)

    1. D/MainActivity: 开始采用subscribe连接
    2. D/MainActivity: 接收到的整数是2
    3. D/MainActivity: Complete事件作出响应

    4、subscribe订阅 Observer的几种创建方式

    4-1、subscribe流程

  • subscribe有两类重载方法

    • 注意subscribe(Observer o)方法没有返回值,因为Observer的方法onSubscribe(Disposable d)里面会返回一个
    • 注意subscribe(Consumer c)方法有一个Disposable返回值,subscribe传入Consumer对象有多个重载方法,最终会转换成Observer的一个实现类LambdaObserver ```java // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
      public final Disposable subscribe() {}

// 表示观察者只对被观察者发送的Next事件作出响应 public final Disposable subscribe(Consumer<? super T> onNext) {}

// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}

// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}

// 表示观察者对被观察者发送的任何事件都作出响应 public final void subscribe(Observer<? super T> observer) {}

  1. - Consumer转换成LambdaObserver对象,这是一个Observer的实现类
  2. ```java
  3. public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
  4. Action onComplete, Consumer<? super Disposable> onSubscribe) {
  5. ObjectHelper.requireNonNull(onNext, "onNext is null");
  6. ObjectHelper.requireNonNull(onError, "onError is null");
  7. ObjectHelper.requireNonNull(onComplete, "onComplete is null");
  8. ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
  9. LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
  10. subscribe(ls);
  11. return ls;
  12. }
  • LambdaObserver

    1. public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    2. private static final long serialVersionUID = -7251123623727029452L;
    3. final Consumer<? super T> onNext;
    4. final Consumer<? super Throwable> onError;
    5. final Action onComplete;
    6. final Consumer<? super Disposable> onSubscribe;
    7. public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
    8. Action onComplete,
    9. Consumer<? super Disposable> onSubscribe) {
    10. super();
    11. this.onNext = onNext;
    12. this.onError = onError;
    13. this.onComplete = onComplete;
    14. this.onSubscribe = onSubscribe;
    15. }
    16. }

    4-2、Consumer

    1. Observable.create(new ObservableOnSubscribe<Integer>() {
    2. @Override
    3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    4. emitter.onNext(1);
    5. emitter.onNext(2);
    6. emitter.onNext(3);
    7. emitter.onError(new RuntimeException("error"));
    8. emitter.onComplete();
    9. }
    10. }).subscribe(new Consumer<Integer>() {
    11. @Override
    12. public void accept(Integer integer) throws Throwable {
    13. Log.d(TAG, "accept: onNext " + integer);
    14. }
    15. }, new Consumer<Throwable>() {
    16. @Override
    17. public void accept(Throwable throwable) throws Throwable {
    18. Log.d(TAG, "accept: onError "+throwable.toString());
    19. }
    20. }, new Action() {
    21. @Override
    22. public void run() throws Throwable {
    23. Log.d(TAG, "run: onComplete");
    24. }
    25. });

    4-3、Disposable作用

  • 从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。

  • 可采用 Disposable.dispose() 切断观察者与被观察者之间的连接,即观察者无法继续接收被观察者的事件,但被观察者还是可以继续发送事件

    1. Observable.create(new ObservableOnSubscribe<Integer>() {
    2. @Override
    3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    4. emitter.onNext(1);
    5. emitter.onNext(2);
    6. emitter.onNext(3);
    7. emitter.onComplete();
    8. }
    9. }).subscribe(new Observer<Integer>() {
    10. private Disposable mDisposable;
    11. @Override
    12. public void onSubscribe(Disposable d) {
    13. mDisposable = d;
    14. Log.d(TAG, "开始采用subscribe连接");
    15. }
    16. // 默认最先调用复写的 onSubscribe()
    17. @Override
    18. public void onNext(Integer value) {
    19. Log.d(TAG, "对Next事件" + value + "作出响应");
    20. if (value == 2){
    21. mDisposable.dispose();
    22. Log.d(TAG, "onNext: 切断连接" );
    23. }
    24. }
    25. @Override
    26. public void onError(Throwable e) {
    27. Log.d(TAG, "对Error事件作出响应");
    28. }
    29. @Override
    30. public void onComplete() {
    31. Log.d(TAG, "对Complete事件作出响应");
    32. }
    33. });
  • 运行结果

    1. D/MainActivity: 开始采用subscribe连接
    2. D/MainActivity: Next事件1作出响应
    3. D/MainActivity: Next事件2作出响应
    4. D/MainActivity: onNext: 切断连接

    4-4、onComplete和onError

  • 从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。

  • 例子

    1. // onCompleteonError是上游来通知下游不再接收消息的
    2. // 发送onError事件后之后onComplete就不会响应了
    3. Observable.create(new ObservableOnSubscribe<Integer>() {
    4. @Override
    5. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    6. emitter.onNext(1);
    7. emitter.onNext(2);
    8. emitter.onNext(3);
    9. emitter.onError(new RuntimeException("error"));
    10. emitter.onComplete();
    11. }
    12. }).subscribe(new Observer<Integer>() {
    13. @Override
    14. public void onSubscribe(Disposable d) {
    15. Log.d(TAG, "开始采用subscribe连接");
    16. }
    17. // 默认最先调用复写的 onSubscribe()
    18. @Override
    19. public void onNext(Integer value) {
    20. Log.d(TAG, "对Next事件"+ value +"作出响应" );
    21. }
    22. @Override
    23. public void onError(Throwable e) {
    24. Log.d(TAG, "对Error事件作出响应");
    25. }
    26. @Override
    27. public void onComplete() {
    28. Log.d(TAG, "对Complete事件作出响应");
    29. }
    30. });
  • 运行结果

    1. D/MainActivity: 开始采用subscribe连接
    2. D/MainActivity: Next事件1作出响应
    3. D/MainActivity: Next事件2作出响应
    4. D/MainActivity: Next事件3作出响应
    5. D/MainActivity: Error事件作出响应

    5、各类异常

    5-1、代码崩溃

  • 发送事件的时候报错崩溃后,会直接回调onError方法

    1. Observable.create(new ObservableOnSubscribe<Integer>() {
    2. @Override
    3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    4. // 此处会崩溃,会导致直接回调onError
    5. User user = null;
    6. user.getAge();
    7. // 这个不会再执行
    8. emitter.onNext(1);
    9. }
    10. }).subscribe(new Observer<Integer>() {
    11. @Override
    12. public void onSubscribe(@NonNull Disposable d) {
    13. Log.d(TAG, "onSubscribe: "+d.isDisposed());
    14. }
    15. @Override
    16. public void onNext(@NonNull Integer integer) {
    17. Log.d(TAG, "onNext: value = "+integer);
    18. }
    19. @Override
    20. public void onError(@NonNull Throwable e) {
    21. Log.d(TAG, "onError: "+e.toString());
    22. }
    23. @Override
    24. public void onComplete() {
    25. Log.d(TAG, "onComplete: "+Thread.currentThread().getName());
    26. }
    27. });

    5-2、onNext(null)

  • onNext 不能传空

    1. Observable.create(new ObservableOnSubscribe<Integer>() {
    2. @Override
    3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    4. // 这个不会再执行
    5. emitter.onNext(null);
    6. }
    7. }).subscribe(new Observer<Integer>() {
    8. @Override
    9. public void onSubscribe(@NonNull Disposable d) {
    10. Log.d(TAG, "onSubscribe: "+d.isDisposed());
    11. }
    12. @Override
    13. public void onNext(@NonNull Integer integer) {
    14. Log.d(TAG, "onNext: value = "+integer);
    15. }
    16. @Override
    17. public void onError(@NonNull Throwable e) {
    18. // onNextnull,报错
    19. // onError: java.lang.NullPointerException: onNext called with a null value. Null values are generally not allowed in 3.x operators and sources.
    20. Log.d(TAG, "onError: "+e.toString());
    21. }
    22. @Override
    23. public void onComplete() {
    24. Log.d(TAG, "onComplete: "+Thread.currentThread().getName());
    25. }
    26. });