1、RxJava概述
- RxJava 是一个基于事件流、实现异步操作的库
- Rxjava原理基于一种扩展的观察者模式,有4个角色:
- 被观察者(Observable):产生事件
- 观察者(Observer):接收事件,并给出响应动作
- 订阅(Subscribe):连接被观察者 & 观察者
- 事件(Event):被观察者 & 观察者沟通的载体
- RxAndroid:https://github.com/ReactiveX/RxAndroid
- RxJava:https://github.com/ReactiveX/RxJava
添加依赖
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'
2、RxJava基本使用
2-1、订阅
```json
// 1、创建被观察者Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
/**
* 被观察者Observable的subscribe中会使用ObservableEmitter发送事件,观察者响应对应的事件
*/
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
}
});
// 2、创建观察者Observer
Observer<Integer> observer = new Observer<Integer>() {
/**
* 观察者接收事件前,默认最先调用复写 onSubscribe()
*/
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: "+d.isDisposed());
}
/**
* 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
*/
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
/**
* 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
*/
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
/**
* 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
*/
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
// 3、当 Observable 被订阅后,观察者的Observer的OnSubscribe方法会自动被调用,被观察者Observable的subscribe方法会被调用
observable.subscribe(observer);
- 上述代码的效果基本如下
```json
D/MainActivity: onSubscribe: false
D/MainActivity: subscribe:
2-2、发送事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
// ObservableEmitter 事件发射器,向观察者发送事件
Log.d(TAG, "subscribe: 1 start");
emitter.onNext(1);
Log.d(TAG, "subscribe: 1 end");
Log.d(TAG, "subscribe: 2 start");
emitter.onNext(2);
Log.d(TAG, "subscribe: 2 end");
Log.d(TAG, "subscribe: 3 start");
emitter.onNext(3);
Log.d(TAG, "subscribe: 3 end");
Log.d(TAG, "subscribe: onComplete start");
emitter.onComplete();
Log.d(TAG, "subscribe: onComplete end");
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: "+d.isDisposed());
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
observable.subscribe(observer);
代码执行结果
D/MainActivity: onSubscribe: false
D/MainActivity: subscribe:
D/MainActivity: subscribe: 1 start
D/MainActivity: 对Next事件作出响应1
D/MainActivity: subscribe: 1 end
D/MainActivity: subscribe: 2 start
D/MainActivity: 对Next事件作出响应2
D/MainActivity: subscribe: 2 end
D/MainActivity: subscribe: 3 start
D/MainActivity: 对Next事件作出响应3
D/MainActivity: subscribe: 3 end
D/MainActivity: subscribe: onComplete start
D/MainActivity: 对Complete事件作出响应
D/MainActivity: subscribe: onComplete end
2-3、链式调用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3、Observable的创建操作符
3-1、create
-
3-2、just
Observable.just(1, 2, 3, 4):最多10个参数
相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)、onComplete()
// 1. 创建时传入整型1、2、3、4
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)、onComplete()
Observable.just(1, 2, 3, 4)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-3、fromArray
Observable.fromArray(items)
相当于执行了onNext(0)、onNext(1)、onNext(2)、onNext(3)、onComplete()
Integer[] items = {0, 1, 2, 3};
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-4、fromIterable
相当于执行了onNext(1)、onNext(2)、onNext(3)、onComplete()
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "集合遍历");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "集合中的数据元素 = " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "遍历结束");
}
});
3-5、range
连续发送 1个事件序列,可指定范围
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 注:若设置为负数,则会抛出异常
Observable.range(3, 10)
// 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-6、rangeLong
类似range,不过事件类型是Long
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 注:若设置为负数,则会抛出异常
Observable.rangeLong(3, 10)
// 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-7、interval
每隔指定时间 就发送事件
- interval默认在computation调度器上执行
也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
// 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
Observable.interval(3,1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-8、intervalRange
每隔指定时间就发送 事件,可指定发送的数据的数量
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 间隔时间数字;
// 参数5 = 时间单位
// 该例子发送的事件序列特点:
// 1. 从3开始,一共发送10个事件;
// 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从3开始递增,3 4 5 6 7 8 9 10 11 12)
Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-9、timer
延迟指定时间后,发送1个数值0(Long类型)
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
timer操作符默认运行在一个新线程上,也可自定义线程调度器(第3个参数)timer(long,TimeUnit,Scheduler)
3-10、empty、error、never
empty():仅发送Complete事件,直接通知完成
// 仅发送Complete事件,直接通知完成
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Object value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
error:仅发送Error事件,直接通知异常
// 仅发送onError事件,直接通知异常
Observable.error(new RuntimeException()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Object value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应"+e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
never:不发送任何事件
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Object value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应"+e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
3-11、defer
直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable)+发送事件
// 第1次赋值
final Bundle bundle = new Bundle();
bundle.putInt("key",1);
// 2. 通过defer 定义被观察者对象 注:此时被观察者对象还没创建
Observable<Integer> observable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> get() throws Throwable {
return Observable.just(bundle.getInt("key"));
}
});
// 2. 第2次赋值
bundle.putInt("key",2);
// 3. 观察者开始订阅,注:此时,才会调用defer()创建被观察者对象(Observable)
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到的整数是" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
运行结果(订阅时才创建,此时的值是2,不是1)
D/MainActivity: 开始采用subscribe连接
D/MainActivity: 接收到的整数是2
D/MainActivity: 对Complete事件作出响应
4、subscribe订阅 Observer的几种创建方式
4-1、subscribe流程
subscribe有两类重载方法
- 注意subscribe(Observer o)方法没有返回值,因为Observer的方法onSubscribe(Disposable d)里面会返回一个
- 注意subscribe(Consumer c)方法有一个Disposable返回值,subscribe传入Consumer对象有多个重载方法,最终会转换成Observer的一个实现类LambdaObserver
```java
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe() {}
// 表示观察者只对被观察者发送的Next事件作出响应 public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者对被观察者发送的任何事件都作出响应 public final void subscribe(Observer<? super T> observer) {}
- Consumer转换成LambdaObserver对象,这是一个Observer的实现类
```java
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
LambdaObserver
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
}
4-2、Consumer
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new RuntimeException("error"));
emitter.onComplete();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.d(TAG, "accept: onNext " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
Log.d(TAG, "accept: onError "+throwable.toString());
}
}, new Action() {
@Override
public void run() throws Throwable {
Log.d(TAG, "run: onComplete");
}
});
4-3、Disposable作用
从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。
可采用 Disposable.dispose() 切断观察者与被观察者之间的连接,即观察者无法继续接收被观察者的事件,但被观察者还是可以继续发送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件" + value + "作出响应");
if (value == 2){
mDisposable.dispose();
Log.d(TAG, "onNext: 切断连接" );
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
运行结果
D/MainActivity: 开始采用subscribe连接
D/MainActivity: 对Next事件1作出响应
D/MainActivity: 对Next事件2作出响应
D/MainActivity: onNext: 切断连接
4-4、onComplete和onError
从上游和下游的关系来讲,onComplete和onError是上游来通知下游不再接收消息的,dispose是下游主动告诉上游自己不再接收消息的。
例子
// onComplete和onError是上游来通知下游不再接收消息的
// 发送onError事件后之后onComplete就不会响应了
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new RuntimeException("error"));
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
运行结果
D/MainActivity: 开始采用subscribe连接
D/MainActivity: 对Next事件1作出响应
D/MainActivity: 对Next事件2作出响应
D/MainActivity: 对Next事件3作出响应
D/MainActivity: 对Error事件作出响应
5、各类异常
5-1、代码崩溃
发送事件的时候报错崩溃后,会直接回调onError方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 此处会崩溃,会导致直接回调onError
User user = null;
user.getAge();
// 这个不会再执行
emitter.onNext(1);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: "+d.isDisposed());
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: value = "+integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: "+e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: "+Thread.currentThread().getName());
}
});
5-2、onNext(null)
onNext 不能传空
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 这个不会再执行
emitter.onNext(null);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: "+d.isDisposed());
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: value = "+integer);
}
@Override
public void onError(@NonNull Throwable e) {
// onNext传null,报错
// onError: java.lang.NullPointerException: onNext called with a null value. Null values are generally not allowed in 3.x operators and sources.
Log.d(TAG, "onError: "+e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: "+Thread.currentThread().getName());
}
});