1、线程调度概述
- 在不指定线程的状况下, RxJava 遵循的是线程不变的原则,即:在哪一个线程调用 subscribe(),就在哪一个线程生产事件;在哪一个线程生产事件,就在哪一个线程消费事件。若是须要切换线程,就须要用到 Scheduler (调度器)
在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,若是只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式自己的目的就是『后台处理,前台回调』的异步机制,所以异步对于 RxJava 是相当重要的。而要实现异步,则须要用到 RxJava 的另外一个概念: Schedulerhtml
2、线程调度的使用
2-1、常用API
subscribeOn():指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫作事件产生的线程。
- observeOn():指定 Subscriber 所运行在的线程。或者叫作事件消费的线程。
代码
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: "+Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: "+Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: value = "+integer + " " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: "+Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: "+Thread.currentThread().getName());
}
});
运行结果
D/MainActivity: onSubscribe: main
D/MainActivity: subscribe: RxCachedThreadScheduler-1
D/MainActivity: onNext: value = 1 main
D/MainActivity: onNext: value = 2 main
D/MainActivity: onNext: value = 3 main
D/MainActivity: onComplete: main
2-2、内置的几个线程调度
RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景
- Schedulers.immediate():直接在当前线程运行,至关于不指定线程。这是默认的 Scheduler。
- Schedulers.newThread():老是启用新线程,并在新线程执行操做。
- Schedulers.io():I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。 不要把计算工做放在 io() 中,能够避免建立没必要要的线程。
- Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
- Schedulers.from(executor):使用指定的Executor做为调度器
- Schedulers.trampoline():当其它排队的任务完成后,在当前线程排队开始执行
- Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操做将在 Android 主线程运行。