这篇文章主要讲下scheduler和thread的关系。
主要是读RxJava Threading Examples的笔记。
no thread
普通的observable的话是不会产生额外的线程,是在当前线程执行任务的。
代码示例
public class NoThreadObservable {public static void main(String[] args) {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);});}}).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});}}
输出结果
main Generated 1main Shifted Up 11main Shifted Down 1main Received 1main Generated 2main Shifted Up 12main Shifted Down 2main Received 2main Generated 3main Shifted Up 13main Shifted Down 3main Received 3main Generated 4main Shifted Up 14main Shifted Down 4main Received 4main Generated 5main Shifted Up 15main Shifted Down 5main Received 5
弹珠图
scheduler api
| Scheduler | purpose |
|---|---|
Schedulers.computation( )) |
meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors |
Schedulers.from(executor)) |
uses the specified Executor as a Scheduler |
Schedulers.immediate( )) |
schedules work to begin immediately in the current thread |
Schedulers.io( )) |
meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching |
Schedulers.newThread( )) |
creates a new thread for each unit of work |
Schedulers.trampoline( )) |
queues work to begin on the current thread after any already-queued work |
Observable.subscribeOn
SubscribeOn运算符指定Observable将在哪个线程上开始操作,无论该运算符在运算符链中的哪个点被调用
代码示例
public static void normalCase() throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);});}}).subscribeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(1000);}
输出结果
RxCachedThreadScheduler-1 Generated 1RxCachedThreadScheduler-1 Shifted Up 11RxCachedThreadScheduler-1 Shifted Down 1RxCachedThreadScheduler-1 Received 1RxCachedThreadScheduler-1 Generated 2RxCachedThreadScheduler-1 Shifted Up 12RxCachedThreadScheduler-1 Shifted Down 2RxCachedThreadScheduler-1 Received 2RxCachedThreadScheduler-1 Generated 3RxCachedThreadScheduler-1 Shifted Up 13RxCachedThreadScheduler-1 Shifted Down 3RxCachedThreadScheduler-1 Received 3RxCachedThreadScheduler-1 Generated 4RxCachedThreadScheduler-1 Shifted Up 14RxCachedThreadScheduler-1 Shifted Down 4RxCachedThreadScheduler-1 Received 4RxCachedThreadScheduler-1 Generated 5RxCachedThreadScheduler-1 Shifted Up 15RxCachedThreadScheduler-1 Shifted Down 5RxCachedThreadScheduler-1 Received 5
可以看到使用的是RxCachedThreadScheduler-1线程。
注意点
- 因为subscribeOn之后使用了RxCachedThreadScheduler-1线程(rxjava大量使用了immutable objects和decorate pattern模式),如果主线程不等待的话,会在subscribe之间程序结束,导致没有输出。一般来说如果我们使用web server等有至少一个非daemon thread的服务的话,是不用担心的。
subscribeOn之后是新的observable,不会对之前的observable造成影响,如果不使用的话会导致仍旧在main线程运行
public static void main(String[] args) throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable<Integer> observable = Observable.fromStream(intList.stream());Observable<Integer> scheduledObservable = observable.subscribeOn(Schedulers.io());observable.subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer integer) {System.out.println(Thread.currentThread().getName() + " " + integer);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});}
运行结果是:
main 1main 2main 3main 4main 5
弹珠图
Observable.observeOn
ObserveOn影响Observable将在该运算符出现的位置使用的线程。
代码示例
```java package com.zihao.schedulers.threadexample;
import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Arrays; import java.util.List;
/**
- observeOn *
- @author tangzihao
@Date 2021/1/4 11:35 上午 */ public class ObserveOnObservable { public static void main(String[] args) throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);})).observeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(1000);
} }
<a name="ts7Yg"></a>## 输出结果```shellmain Generated 1main Generated 2main Generated 3main Generated 4main Generated 5RxCachedThreadScheduler-1 Shifted Up 11RxCachedThreadScheduler-1 Shifted Down 1RxCachedThreadScheduler-1 Received 1RxCachedThreadScheduler-1 Shifted Up 12RxCachedThreadScheduler-1 Shifted Down 2RxCachedThreadScheduler-1 Received 2RxCachedThreadScheduler-1 Shifted Up 13RxCachedThreadScheduler-1 Shifted Down 3RxCachedThreadScheduler-1 Received 3RxCachedThreadScheduler-1 Shifted Up 14RxCachedThreadScheduler-1 Shifted Down 4RxCachedThreadScheduler-1 Received 4RxCachedThreadScheduler-1 Shifted Up 15RxCachedThreadScheduler-1 Shifted Down 5RxCachedThreadScheduler-1 Received 5
注意点
- main线程和RxCachedThreadScheduler-1是在一定程度上并行的,如果增加数据量的话可以看到main和RxCachedThreadScheduler-1线程并行执行。
RxCachedThreadScheduler-1只是从scheduler取了一个线程,对于这个线程来说,执行还是串行的(This is key: observeOn() is not an instruction saying “process these values using this thread pool”, but only “process these values using one thread from this Scheduler”.)
弹珠图
Two Observable.observeOn calls in one chain
ObserveOn影响Observable将在该运算符出现的位置使用的线程,所以可以使用多次observeOn的操作符。
代码示例
public class TwoObserveOnObservable {public static void main(String[] args) throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);})).observeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).observeOn(Schedulers.computation()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(1000);}}
输出结果
main Generated 1main Generated 2main Generated 3main Generated 4main Generated 5RxCachedThreadScheduler-1 Shifted Up 11RxCachedThreadScheduler-1 Shifted Up 12RxCachedThreadScheduler-1 Shifted Up 13RxComputationThreadPool-1 Shifted Down 1RxCachedThreadScheduler-1 Shifted Up 14RxComputationThreadPool-1 Received 1RxCachedThreadScheduler-1 Shifted Up 15RxComputationThreadPool-1 Shifted Down 2RxComputationThreadPool-1 Received 2RxComputationThreadPool-1 Shifted Down 3RxComputationThreadPool-1 Received 3RxComputationThreadPool-1 Shifted Down 4RxComputationThreadPool-1 Received 4RxComputationThreadPool-1 Shifted Down 5RxComputationThreadPool-1 Received 5
注意点
这个例子和文章中的有些出入,文章中的输出如下
[main] Generated 1[main] Generated 2[main] Generated 3[main] Generated 4[main] Generated 5[RxCachedThreadScheduler-1] Shifted Up 11[RxComputationThreadPool-1] Shifted Down 1[RxComputationThreadPool-1] Received 1[RxCachedThreadScheduler-1] Shifted Up 12[RxComputationThreadPool-1] Shifted Down 2[RxComputationThreadPool-1] Received 2[RxCachedThreadScheduler-1] Shifted Up 13...
弹珠图
Two Observable.subscribeOn calls in one chain
代码示例
public static void subscribeOnAndSubscribeOn() throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);});}}).subscribeOn(Schedulers.computation()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).subscribeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(3000);}
输出结果
只用到了第一个subscribeOn(Schedulers.computation())的RxComputationThreadPool中的线程。
RxComputationThreadPool-1 Generated 1RxComputationThreadPool-1 Shifted Up 11RxComputationThreadPool-1 Shifted Down 1RxComputationThreadPool-1 Received 1RxComputationThreadPool-1 Generated 2RxComputationThreadPool-1 Shifted Up 12RxComputationThreadPool-1 Shifted Down 2RxComputationThreadPool-1 Received 2RxComputationThreadPool-1 Generated 3RxComputationThreadPool-1 Shifted Up 13RxComputationThreadPool-1 Shifted Down 3RxComputationThreadPool-1 Received 3RxComputationThreadPool-1 Generated 4RxComputationThreadPool-1 Shifted Up 14RxComputationThreadPool-1 Shifted Down 4RxComputationThreadPool-1 Received 4RxComputationThreadPool-1 Generated 5RxComputationThreadPool-1 Shifted Up 15RxComputationThreadPool-1 Shifted Down 5RxComputationThreadPool-1 Received 5
注意点
在one chain中,onObserve可以调用多次,onSubscribe使用第一个onSubscribe的scheduler
Observable.subscribeOn and Observable.observeOn together
先使用subscribeOn,然后使用observeOn
代码示例
public static void subscribeOnAndObserveOn() throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);});}}).subscribeOn(Schedulers.computation()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).observeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(3000);}
输出结果
RxComputationThreadPool-1 Generated 1RxComputationThreadPool-1 Shifted Up 11RxComputationThreadPool-1 Generated 2RxComputationThreadPool-1 Shifted Up 12RxCachedThreadScheduler-1 Shifted Down 1RxComputationThreadPool-1 Generated 3RxComputationThreadPool-1 Shifted Up 13RxCachedThreadScheduler-1 Received 1RxComputationThreadPool-1 Generated 4RxComputationThreadPool-1 Shifted Up 14RxCachedThreadScheduler-1 Shifted Down 2RxComputationThreadPool-1 Generated 5RxCachedThreadScheduler-1 Received 2RxComputationThreadPool-1 Shifted Up 15RxCachedThreadScheduler-1 Shifted Down 3RxCachedThreadScheduler-1 Received 3RxCachedThreadScheduler-1 Shifted Down 4RxCachedThreadScheduler-1 Received 4RxCachedThreadScheduler-1 Shifted Down 5RxCachedThreadScheduler-1 Received 5
弹珠图
先使用observeOn,然后使用subscribeOn
代码示例
public static void observeOnAndSubscribeOn() throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);});}}).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).observeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribeOn(Schedulers.computation()).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(3000);}
输出结果
RxComputationThreadPool-1 Shifted Up 12RxComputationThreadPool-1 Generated 3RxComputationThreadPool-1 Shifted Up 13RxCachedThreadScheduler-1 Shifted Down 1RxComputationThreadPool-1 Generated 4RxComputationThreadPool-1 Shifted Up 14RxCachedThreadScheduler-1 Received 1RxComputationThreadPool-1 Generated 5RxCachedThreadScheduler-1 Shifted Down 2RxComputationThreadPool-1 Shifted Up 15RxCachedThreadScheduler-1 Received 2RxCachedThreadScheduler-1 Shifted Down 3RxCachedThreadScheduler-1 Received 3RxCachedThreadScheduler-1 Shifted Down 4RxCachedThreadScheduler-1 Received 4RxCachedThreadScheduler-1 Shifted Down 5RxCachedThreadScheduler-1 Received 5
弹珠图
注意点
使用了Schedulers.io()和Schedulers.computation()创建两个不同的线程池中的线程
- 这种结合导致两个线程并行处理不同值的运算。使用subscribeOn调度的线程在生成和处理更多值之前不会阻塞等待下游调度程序中的工作。
- subscribeOn和observeOn的操作符顺序没有明确的要求
如果使用两个subscribeOn,将使用第一个subscribeOn的scheduler
隐式的线程切换比如使用delay操作符
public class ImplicitThreadLikeDelay {public static void main(String[] args) throws InterruptedException {List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);})).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).delay(10, TimeUnit.MILLISECONDS).map(e -> {System.out.println(Thread.currentThread().getName() + " Delayed " + e);return e;}).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(1000);}}
输出结果
main Generated 1main Shifted Up 11main Generated 2main Shifted Up 12main Generated 3main Shifted Up 13main Generated 4main Shifted Up 14main Generated 5main Shifted Up 15RxComputationThreadPool-1 Delayed 11RxComputationThreadPool-1 Shifted Down 1RxComputationThreadPool-1 Received 1RxComputationThreadPool-1 Delayed 12RxComputationThreadPool-1 Shifted Down 2RxComputationThreadPool-1 Received 2RxComputationThreadPool-1 Delayed 13RxComputationThreadPool-1 Shifted Down 3RxComputationThreadPool-1 Received 3RxComputationThreadPool-1 Delayed 14RxComputationThreadPool-1 Shifted Down 4RxComputationThreadPool-1 Received 4RxComputationThreadPool-1 Delayed 15RxComputationThreadPool-1 Shifted Down 5RxComputationThreadPool-1 Received 5
注意点
rxjava的一些operator(比如这个例子中的delay)可能会在不同的线程上调度subscriber,而不是调用subscribe方法的线程。combine observable
single thread
代码示例
public static void usingOneThread() throws InterruptedException {List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);List<Integer> intStream2 = Arrays.asList(2, 4, 6, 8);Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);})).mergeWith(Observable.create(emitter -> intStream2.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);}))).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));return e + 10;}).subscribeOn(Schedulers.io()).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(1000);}
输出结果
RxCachedThreadScheduler-1 Generated 1RxCachedThreadScheduler-1 Shifted Up 11RxCachedThreadScheduler-1 Shifted Down 1RxCachedThreadScheduler-1 Received 1RxCachedThreadScheduler-1 Generated 3RxCachedThreadScheduler-1 Shifted Up 13RxCachedThreadScheduler-1 Shifted Down 3RxCachedThreadScheduler-1 Received 3RxCachedThreadScheduler-1 Generated 5RxCachedThreadScheduler-1 Shifted Up 15RxCachedThreadScheduler-1 Shifted Down 5RxCachedThreadScheduler-1 Received 5RxCachedThreadScheduler-1 Generated 7RxCachedThreadScheduler-1 Shifted Up 17RxCachedThreadScheduler-1 Shifted Down 7RxCachedThreadScheduler-1 Received 7RxCachedThreadScheduler-1 Generated 2RxCachedThreadScheduler-1 Shifted Up 12RxCachedThreadScheduler-1 Shifted Down 2RxCachedThreadScheduler-1 Received 2RxCachedThreadScheduler-1 Generated 4RxCachedThreadScheduler-1 Shifted Up 14RxCachedThreadScheduler-1 Shifted Down 4RxCachedThreadScheduler-1 Received 4RxCachedThreadScheduler-1 Generated 6RxCachedThreadScheduler-1 Shifted Up 16RxCachedThreadScheduler-1 Shifted Down 6RxCachedThreadScheduler-1 Received 6RxCachedThreadScheduler-1 Generated 8RxCachedThreadScheduler-1 Shifted Up 18RxCachedThreadScheduler-1 Shifted Down 8RxCachedThreadScheduler-1 Received 8
multi thread
代码示例
public static void usingMultiThread() throws InterruptedException {List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);Observable<Integer> generatorSubscribed = Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {System.out.println(Thread.currentThread().getName() + " Generated " + i);emitter.onNext(i);})).subscribeOn(Schedulers.io());Observable<Integer> shiftUp1 = generatorSubscribed.map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up #1 " + (e + 10));return e + 10;});Observable<Integer> shiftUp2 = generatorSubscribed.map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Up #2 " + (e + 10));return e + 10;});shiftUp1.mergeWith(shiftUp2).map(e -> {System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));return e - 10;}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer s) {System.out.println(Thread.currentThread().getName() + " Received " + s);}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});Thread.sleep(1000);}
输出结果
RxCachedThreadScheduler-2 Generated 1RxCachedThreadScheduler-1 Generated 1RxCachedThreadScheduler-2 Shifted Up #2 11RxCachedThreadScheduler-1 Shifted Up #1 11RxCachedThreadScheduler-2 Shifted Down 1RxCachedThreadScheduler-2 Received 1RxCachedThreadScheduler-2 Generated 3RxCachedThreadScheduler-2 Shifted Up #2 13RxCachedThreadScheduler-2 Shifted Down 3RxCachedThreadScheduler-2 Received 3RxCachedThreadScheduler-2 Generated 5RxCachedThreadScheduler-2 Shifted Up #2 15RxCachedThreadScheduler-2 Shifted Down 5RxCachedThreadScheduler-2 Received 5RxCachedThreadScheduler-2 Generated 7RxCachedThreadScheduler-2 Shifted Up #2 17RxCachedThreadScheduler-2 Shifted Down 7RxCachedThreadScheduler-2 Received 7RxCachedThreadScheduler-1 Shifted Down 1RxCachedThreadScheduler-1 Received 1RxCachedThreadScheduler-1 Generated 3RxCachedThreadScheduler-1 Shifted Up #1 13RxCachedThreadScheduler-1 Shifted Down 3RxCachedThreadScheduler-1 Received 3RxCachedThreadScheduler-1 Generated 5RxCachedThreadScheduler-1 Shifted Up #1 15RxCachedThreadScheduler-1 Shifted Down 5RxCachedThreadScheduler-1 Received 5RxCachedThreadScheduler-1 Generated 7RxCachedThreadScheduler-1 Shifted Up #1 17RxCachedThreadScheduler-1 Shifted Down 7RxCachedThreadScheduler-1 Received 7
