1、线程调度概述

  • 在不指定线程的状况下, RxJava 遵循的是线程不变的原则,即:在哪一个线程调用 subscribe(),就在哪一个线程生产事件;在哪一个线程生产事件,就在哪一个线程消费事件。若是须要切换线程,就须要用到 Scheduler (调度器)
  • 在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,若是只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式自己的目的就是『后台处理,前台回调』的异步机制,所以异步对于 RxJava 是相当重要的。而要实现异步,则须要用到 RxJava 的另外一个概念: Schedulerhtml

    2、线程调度的使用

    2-1、常用API

  • subscribeOn():指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫作事件产生的线程。

  • observeOn():指定 Subscriber 所运行在的线程。或者叫作事件消费的线程。
  • 代码

    1. Observable.create(new ObservableOnSubscribe<Integer>() {
    2. @Override
    3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    4. Log.d(TAG, "subscribe: "+Thread.currentThread().getName());
    5. emitter.onNext(1);
    6. emitter.onNext(2);
    7. emitter.onNext(3);
    8. emitter.onComplete();
    9. }
    10. }).subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    11. .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    12. .subscribe(new Observer<Integer>() {
    13. @Override
    14. public void onSubscribe(@NonNull Disposable d) {
    15. Log.d(TAG, "onSubscribe: "+Thread.currentThread().getName());
    16. }
    17. @Override
    18. public void onNext(@NonNull Integer integer) {
    19. Log.d(TAG, "onNext: value = "+integer + " " + Thread.currentThread().getName());
    20. }
    21. @Override
    22. public void onError(@NonNull Throwable e) {
    23. Log.d(TAG, "onError: "+Thread.currentThread().getName());
    24. }
    25. @Override
    26. public void onComplete() {
    27. Log.d(TAG, "onComplete: "+Thread.currentThread().getName());
    28. }
    29. });
  • 运行结果

    1. D/MainActivity: onSubscribe: main
    2. D/MainActivity: subscribe: RxCachedThreadScheduler-1
    3. D/MainActivity: onNext: value = 1 main
    4. D/MainActivity: onNext: value = 2 main
    5. D/MainActivity: onNext: value = 3 main
    6. D/MainActivity: onComplete: main

    2-2、内置的几个线程调度

  • RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景

  • Schedulers.immediate():直接在当前线程运行,至关于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread():老是启用新线程,并在新线程执行操做。
  • Schedulers.io():I/O 操做(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差很少,区别在于 io() 的内部实现是是用一个无数量上限的线程池,能够重用空闲的线程,所以多数状况下 io() 比 newThread() 更有效率。 不要把计算工做放在 io() 中,能够避免建立没必要要的线程。
  • Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操做限制性能的操做,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待时间会浪费 CPU。
  • Schedulers.from(executor):使用指定的Executor做为调度器
  • Schedulers.trampoline():当其它排队的任务完成后,在当前线程排队开始执行
  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操做将在 Android 主线程运行。