这篇文章主要讲下scheduler和thread的关系。
主要是读RxJava Threading Examples的笔记。

no thread

普通的observable的话是不会产生额外的线程,是在当前线程执行任务的。

代码示例

  1. public class NoThreadObservable {
  2. public static void main(String[] args) {
  3. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
  4. Observable.create(new ObservableOnSubscribe<Integer>() {
  5. @Override
  6. public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
  7. intList.forEach(i -> {
  8. System.out.println(Thread.currentThread().getName() + " Generated " + i);
  9. emitter.onNext(i);
  10. });
  11. }
  12. }).map(e -> {
  13. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
  14. return e + 10;
  15. }).map(e -> {
  16. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
  17. return e - 10;
  18. }).subscribe(new Observer<Integer>() {
  19. @Override
  20. public void onSubscribe(@NonNull Disposable d) {
  21. }
  22. @Override
  23. public void onNext(@NonNull Integer s) {
  24. System.out.println(Thread.currentThread().getName() + " Received " + s);
  25. }
  26. @Override
  27. public void onError(@NonNull Throwable e) {
  28. }
  29. @Override
  30. public void onComplete() {
  31. }
  32. });
  33. }
  34. }

代码的输出都是在main线程上。

输出结果

  1. main Generated 1
  2. main Shifted Up 11
  3. main Shifted Down 1
  4. main Received 1
  5. main Generated 2
  6. main Shifted Up 12
  7. main Shifted Down 2
  8. main Received 2
  9. main Generated 3
  10. main Shifted Up 13
  11. main Shifted Down 3
  12. main Received 3
  13. main Generated 4
  14. main Shifted Up 14
  15. main Shifted Down 4
  16. main Received 4
  17. main Generated 5
  18. main Shifted Up 15
  19. main Shifted Down 5
  20. main Received 5

弹珠图

NoThreadObservable.png

scheduler api

Scheduler purpose
Schedulers.computation( )) meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
Schedulers.from(executor)) uses the specified Executor as a Scheduler
Schedulers.immediate( )) schedules work to begin immediately in the current thread
Schedulers.io( )) meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
Schedulers.newThread( )) creates a new thread for each unit of work
Schedulers.trampoline( )) queues work to begin on the current thread after any already-queued work

Observable.subscribeOn

SubscribeOn运算符指定Observable将在哪个线程上开始操作,无论该运算符在运算符链中的哪个点被调用

代码示例

  1. public static void normalCase() throws InterruptedException {
  2. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
  3. Observable.create(new ObservableOnSubscribe<Integer>() {
  4. @Override
  5. public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
  6. intList.forEach(i -> {
  7. System.out.println(Thread.currentThread().getName() + " Generated " + i);
  8. emitter.onNext(i);
  9. });
  10. }
  11. }).subscribeOn(Schedulers.io())
  12. .map(e -> {
  13. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
  14. return e + 10;
  15. })
  16. .map(e -> {
  17. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
  18. return e - 10;
  19. })
  20. .subscribe(new Observer<Integer>() {
  21. @Override
  22. public void onSubscribe(@NonNull Disposable d) {
  23. }
  24. @Override
  25. public void onNext(@NonNull Integer s) {
  26. System.out.println(Thread.currentThread().getName() + " Received " + s);
  27. }
  28. @Override
  29. public void onError(@NonNull Throwable e) {
  30. }
  31. @Override
  32. public void onComplete() {
  33. }
  34. });
  35. Thread.sleep(1000);
  36. }

输出结果

  1. RxCachedThreadScheduler-1 Generated 1
  2. RxCachedThreadScheduler-1 Shifted Up 11
  3. RxCachedThreadScheduler-1 Shifted Down 1
  4. RxCachedThreadScheduler-1 Received 1
  5. RxCachedThreadScheduler-1 Generated 2
  6. RxCachedThreadScheduler-1 Shifted Up 12
  7. RxCachedThreadScheduler-1 Shifted Down 2
  8. RxCachedThreadScheduler-1 Received 2
  9. RxCachedThreadScheduler-1 Generated 3
  10. RxCachedThreadScheduler-1 Shifted Up 13
  11. RxCachedThreadScheduler-1 Shifted Down 3
  12. RxCachedThreadScheduler-1 Received 3
  13. RxCachedThreadScheduler-1 Generated 4
  14. RxCachedThreadScheduler-1 Shifted Up 14
  15. RxCachedThreadScheduler-1 Shifted Down 4
  16. RxCachedThreadScheduler-1 Received 4
  17. RxCachedThreadScheduler-1 Generated 5
  18. RxCachedThreadScheduler-1 Shifted Up 15
  19. RxCachedThreadScheduler-1 Shifted Down 5
  20. RxCachedThreadScheduler-1 Received 5

可以看到使用的是RxCachedThreadScheduler-1线程。

注意点

  1. 因为subscribeOn之后使用了RxCachedThreadScheduler-1线程(rxjava大量使用了immutable objects和decorate pattern模式),如果主线程不等待的话,会在subscribe之间程序结束,导致没有输出。一般来说如果我们使用web server等有至少一个非daemon thread的服务的话,是不用担心的。
  2. subscribeOn之后是新的observable,不会对之前的observable造成影响,如果不使用的话会导致仍旧在main线程运行

    1. public static void main(String[] args) throws InterruptedException {
    2. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    3. Observable<Integer> observable = Observable.fromStream(intList.stream());
    4. Observable<Integer> scheduledObservable = observable.subscribeOn(Schedulers.io());
    5. observable.subscribe(new Observer<Integer>() {
    6. @Override
    7. public void onSubscribe(@NonNull Disposable d) {
    8. }
    9. @Override
    10. public void onNext(@NonNull Integer integer) {
    11. System.out.println(Thread.currentThread().getName() + " " + integer);
    12. }
    13. @Override
    14. public void onError(@NonNull Throwable e) {
    15. }
    16. @Override
    17. public void onComplete() {
    18. }
    19. });
    20. }

    运行结果是:

    1. main 1
    2. main 2
    3. main 3
    4. main 4
    5. main 5

    弹珠图

    Observable.subscribeOn.png

    Observable.observeOn

    ObserveOn影响Observable将在该运算符出现的位置使用的线程。

    代码示例

    ```java package com.zihao.schedulers.threadexample;

import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers;

import java.util.Arrays; import java.util.List;

/**

  • observeOn *
  • @author tangzihao
  • @Date 2021/1/4 11:35 上午 */ public class ObserveOnObservable { public static void main(String[] args) throws InterruptedException {

    1. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    2. Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
    3. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    4. emitter.onNext(i);
    5. })).observeOn(Schedulers.io())
    6. .map(e -> {
    7. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    8. return e + 10;
    9. })
    10. .map(e -> {
    11. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    12. return e - 10;
    13. })
    14. .subscribe(new Observer<Integer>() {
    15. @Override
    16. public void onSubscribe(@NonNull Disposable d) {
    17. }
    18. @Override
    19. public void onNext(@NonNull Integer s) {
    20. System.out.println(Thread.currentThread().getName() + " Received " + s);
    21. }
    22. @Override
    23. public void onError(@NonNull Throwable e) {
    24. }
    25. @Override
    26. public void onComplete() {
    27. }
    28. });
    29. Thread.sleep(1000);

    } }

    1. <a name="ts7Yg"></a>
    2. ## 输出结果
    3. ```shell
    4. main Generated 1
    5. main Generated 2
    6. main Generated 3
    7. main Generated 4
    8. main Generated 5
    9. RxCachedThreadScheduler-1 Shifted Up 11
    10. RxCachedThreadScheduler-1 Shifted Down 1
    11. RxCachedThreadScheduler-1 Received 1
    12. RxCachedThreadScheduler-1 Shifted Up 12
    13. RxCachedThreadScheduler-1 Shifted Down 2
    14. RxCachedThreadScheduler-1 Received 2
    15. RxCachedThreadScheduler-1 Shifted Up 13
    16. RxCachedThreadScheduler-1 Shifted Down 3
    17. RxCachedThreadScheduler-1 Received 3
    18. RxCachedThreadScheduler-1 Shifted Up 14
    19. RxCachedThreadScheduler-1 Shifted Down 4
    20. RxCachedThreadScheduler-1 Received 4
    21. RxCachedThreadScheduler-1 Shifted Up 15
    22. RxCachedThreadScheduler-1 Shifted Down 5
    23. RxCachedThreadScheduler-1 Received 5

    注意点

  1. main线程和RxCachedThreadScheduler-1是在一定程度上并行的,如果增加数据量的话可以看到main和RxCachedThreadScheduler-1线程并行执行。
  2. RxCachedThreadScheduler-1只是从scheduler取了一个线程,对于这个线程来说,执行还是串行的(This is key: observeOn() is not an instruction saying “process these values using this thread pool”, but only “process these values using one thread from this Scheduler”.)

    弹珠图

    Observable.observeOn.png

    Two Observable.observeOn calls in one chain

    ObserveOn影响Observable将在该运算符出现的位置使用的线程,所以可以使用多次observeOn的操作符。

    代码示例

    1. public class TwoObserveOnObservable {
    2. public static void main(String[] args) throws InterruptedException {
    3. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    4. Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
    5. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    6. emitter.onNext(i);
    7. })).observeOn(Schedulers.io())
    8. .map(e -> {
    9. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    10. return e + 10;
    11. })
    12. .observeOn(Schedulers.computation())
    13. .map(e -> {
    14. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    15. return e - 10;
    16. })
    17. .subscribe(new Observer<Integer>() {
    18. @Override
    19. public void onSubscribe(@NonNull Disposable d) {
    20. }
    21. @Override
    22. public void onNext(@NonNull Integer s) {
    23. System.out.println(Thread.currentThread().getName() + " Received " + s);
    24. }
    25. @Override
    26. public void onError(@NonNull Throwable e) {
    27. }
    28. @Override
    29. public void onComplete() {
    30. }
    31. });
    32. Thread.sleep(1000);
    33. }
    34. }

    输出结果

    1. main Generated 1
    2. main Generated 2
    3. main Generated 3
    4. main Generated 4
    5. main Generated 5
    6. RxCachedThreadScheduler-1 Shifted Up 11
    7. RxCachedThreadScheduler-1 Shifted Up 12
    8. RxCachedThreadScheduler-1 Shifted Up 13
    9. RxComputationThreadPool-1 Shifted Down 1
    10. RxCachedThreadScheduler-1 Shifted Up 14
    11. RxComputationThreadPool-1 Received 1
    12. RxCachedThreadScheduler-1 Shifted Up 15
    13. RxComputationThreadPool-1 Shifted Down 2
    14. RxComputationThreadPool-1 Received 2
    15. RxComputationThreadPool-1 Shifted Down 3
    16. RxComputationThreadPool-1 Received 3
    17. RxComputationThreadPool-1 Shifted Down 4
    18. RxComputationThreadPool-1 Received 4
    19. RxComputationThreadPool-1 Shifted Down 5
    20. RxComputationThreadPool-1 Received 5

    注意点

  3. 这个例子和文章中的有些出入,文章中的输出如下

    1. [main] Generated 1
    2. [main] Generated 2
    3. [main] Generated 3
    4. [main] Generated 4
    5. [main] Generated 5
    6. [RxCachedThreadScheduler-1] Shifted Up 11
    7. [RxComputationThreadPool-1] Shifted Down 1
    8. [RxComputationThreadPool-1] Received 1
    9. [RxCachedThreadScheduler-1] Shifted Up 12
    10. [RxComputationThreadPool-1] Shifted Down 2
    11. [RxComputationThreadPool-1] Received 2
    12. [RxCachedThreadScheduler-1] Shifted Up 13
    13. ...

    弹珠图

    TwoObserveOnObservable (1).png

    Two Observable.subscribeOn calls in one chain

    代码示例

    1. public static void subscribeOnAndSubscribeOn() throws InterruptedException {
    2. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    3. Observable.create(new ObservableOnSubscribe<Integer>() {
    4. @Override
    5. public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
    6. intList.forEach(i -> {
    7. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    8. emitter.onNext(i);
    9. });
    10. }
    11. }).subscribeOn(Schedulers.computation())
    12. .map(e -> {
    13. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    14. return e + 10;
    15. })
    16. .subscribeOn(Schedulers.io())
    17. .map(e -> {
    18. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    19. return e - 10;
    20. })
    21. .subscribe(new Observer<Integer>() {
    22. @Override
    23. public void onSubscribe(@NonNull Disposable d) {
    24. }
    25. @Override
    26. public void onNext(@NonNull Integer s) {
    27. System.out.println(Thread.currentThread().getName() + " Received " + s);
    28. }
    29. @Override
    30. public void onError(@NonNull Throwable e) {
    31. }
    32. @Override
    33. public void onComplete() {
    34. }
    35. });
    36. Thread.sleep(3000);
    37. }

    输出结果

    只用到了第一个subscribeOn(Schedulers.computation())的RxComputationThreadPool中的线程。

    1. RxComputationThreadPool-1 Generated 1
    2. RxComputationThreadPool-1 Shifted Up 11
    3. RxComputationThreadPool-1 Shifted Down 1
    4. RxComputationThreadPool-1 Received 1
    5. RxComputationThreadPool-1 Generated 2
    6. RxComputationThreadPool-1 Shifted Up 12
    7. RxComputationThreadPool-1 Shifted Down 2
    8. RxComputationThreadPool-1 Received 2
    9. RxComputationThreadPool-1 Generated 3
    10. RxComputationThreadPool-1 Shifted Up 13
    11. RxComputationThreadPool-1 Shifted Down 3
    12. RxComputationThreadPool-1 Received 3
    13. RxComputationThreadPool-1 Generated 4
    14. RxComputationThreadPool-1 Shifted Up 14
    15. RxComputationThreadPool-1 Shifted Down 4
    16. RxComputationThreadPool-1 Received 4
    17. RxComputationThreadPool-1 Generated 5
    18. RxComputationThreadPool-1 Shifted Up 15
    19. RxComputationThreadPool-1 Shifted Down 5
    20. RxComputationThreadPool-1 Received 5

    注意点

    在one chain中,onObserve可以调用多次,onSubscribe使用第一个onSubscribe的scheduler

    Observable.subscribeOn and Observable.observeOn together

    先使用subscribeOn,然后使用observeOn

    代码示例

    1. public static void subscribeOnAndObserveOn() throws InterruptedException {
    2. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    3. Observable.create(new ObservableOnSubscribe<Integer>() {
    4. @Override
    5. public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
    6. intList.forEach(i -> {
    7. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    8. emitter.onNext(i);
    9. });
    10. }
    11. }).subscribeOn(Schedulers.computation())
    12. .map(e -> {
    13. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    14. return e + 10;
    15. })
    16. .observeOn(Schedulers.io())
    17. .map(e -> {
    18. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    19. return e - 10;
    20. })
    21. .subscribe(new Observer<Integer>() {
    22. @Override
    23. public void onSubscribe(@NonNull Disposable d) {
    24. }
    25. @Override
    26. public void onNext(@NonNull Integer s) {
    27. System.out.println(Thread.currentThread().getName() + " Received " + s);
    28. }
    29. @Override
    30. public void onError(@NonNull Throwable e) {
    31. }
    32. @Override
    33. public void onComplete() {
    34. }
    35. });
    36. Thread.sleep(3000);
    37. }

    输出结果

    1. RxComputationThreadPool-1 Generated 1
    2. RxComputationThreadPool-1 Shifted Up 11
    3. RxComputationThreadPool-1 Generated 2
    4. RxComputationThreadPool-1 Shifted Up 12
    5. RxCachedThreadScheduler-1 Shifted Down 1
    6. RxComputationThreadPool-1 Generated 3
    7. RxComputationThreadPool-1 Shifted Up 13
    8. RxCachedThreadScheduler-1 Received 1
    9. RxComputationThreadPool-1 Generated 4
    10. RxComputationThreadPool-1 Shifted Up 14
    11. RxCachedThreadScheduler-1 Shifted Down 2
    12. RxComputationThreadPool-1 Generated 5
    13. RxCachedThreadScheduler-1 Received 2
    14. RxComputationThreadPool-1 Shifted Up 15
    15. RxCachedThreadScheduler-1 Shifted Down 3
    16. RxCachedThreadScheduler-1 Received 3
    17. RxCachedThreadScheduler-1 Shifted Down 4
    18. RxCachedThreadScheduler-1 Received 4
    19. RxCachedThreadScheduler-1 Shifted Down 5
    20. RxCachedThreadScheduler-1 Received 5

    弹珠图

    SubscribeOnObserveOn.png

    先使用observeOn,然后使用subscribeOn

    代码示例

    1. public static void observeOnAndSubscribeOn() throws InterruptedException {
    2. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    3. Observable.create(new ObservableOnSubscribe<Integer>() {
    4. @Override
    5. public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
    6. intList.forEach(i -> {
    7. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    8. emitter.onNext(i);
    9. });
    10. }
    11. }).map(e -> {
    12. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    13. return e + 10;
    14. }).observeOn(Schedulers.io()).map(e -> {
    15. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    16. return e - 10;
    17. }).subscribeOn(Schedulers.computation()).subscribe(new Observer<Integer>() {
    18. @Override
    19. public void onSubscribe(@NonNull Disposable d) {
    20. }
    21. @Override
    22. public void onNext(@NonNull Integer s) {
    23. System.out.println(Thread.currentThread().getName() + " Received " + s);
    24. }
    25. @Override
    26. public void onError(@NonNull Throwable e) {
    27. }
    28. @Override
    29. public void onComplete() {
    30. }
    31. });
    32. Thread.sleep(3000);
    33. }

    输出结果

    1. RxComputationThreadPool-1 Shifted Up 12
    2. RxComputationThreadPool-1 Generated 3
    3. RxComputationThreadPool-1 Shifted Up 13
    4. RxCachedThreadScheduler-1 Shifted Down 1
    5. RxComputationThreadPool-1 Generated 4
    6. RxComputationThreadPool-1 Shifted Up 14
    7. RxCachedThreadScheduler-1 Received 1
    8. RxComputationThreadPool-1 Generated 5
    9. RxCachedThreadScheduler-1 Shifted Down 2
    10. RxComputationThreadPool-1 Shifted Up 15
    11. RxCachedThreadScheduler-1 Received 2
    12. RxCachedThreadScheduler-1 Shifted Down 3
    13. RxCachedThreadScheduler-1 Received 3
    14. RxCachedThreadScheduler-1 Shifted Down 4
    15. RxCachedThreadScheduler-1 Received 4
    16. RxCachedThreadScheduler-1 Shifted Down 5
    17. RxCachedThreadScheduler-1 Received 5

    弹珠图

    ObserveOnSubscribeOn.png

    注意点

  4. 使用了Schedulers.io()和Schedulers.computation()创建两个不同的线程池中的线程

  5. 这种结合导致两个线程并行处理不同值的运算。使用subscribeOn调度的线程在生成和处理更多值之前不会阻塞等待下游调度程序中的工作。
  6. subscribeOn和observeOn的操作符顺序没有明确的要求
  7. 如果使用两个subscribeOn,将使用第一个subscribeOn的scheduler

    隐式的线程切换比如使用delay操作符

    1. public class ImplicitThreadLikeDelay {
    2. public static void main(String[] args) throws InterruptedException {
    3. List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    4. Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
    5. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    6. emitter.onNext(i);
    7. }))
    8. .map(e -> {
    9. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    10. return e + 10;
    11. })
    12. .delay(10, TimeUnit.MILLISECONDS)
    13. .map(e -> {
    14. System.out.println(Thread.currentThread().getName() + " Delayed " + e);
    15. return e;
    16. })
    17. .map(e -> {
    18. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    19. return e - 10;
    20. })
    21. .subscribe(new Observer<Integer>() {
    22. @Override
    23. public void onSubscribe(@NonNull Disposable d) {
    24. }
    25. @Override
    26. public void onNext(@NonNull Integer s) {
    27. System.out.println(Thread.currentThread().getName() + " Received " + s);
    28. }
    29. @Override
    30. public void onError(@NonNull Throwable e) {
    31. }
    32. @Override
    33. public void onComplete() {
    34. }
    35. });
    36. Thread.sleep(1000);
    37. }
    38. }

    输出结果

    1. main Generated 1
    2. main Shifted Up 11
    3. main Generated 2
    4. main Shifted Up 12
    5. main Generated 3
    6. main Shifted Up 13
    7. main Generated 4
    8. main Shifted Up 14
    9. main Generated 5
    10. main Shifted Up 15
    11. RxComputationThreadPool-1 Delayed 11
    12. RxComputationThreadPool-1 Shifted Down 1
    13. RxComputationThreadPool-1 Received 1
    14. RxComputationThreadPool-1 Delayed 12
    15. RxComputationThreadPool-1 Shifted Down 2
    16. RxComputationThreadPool-1 Received 2
    17. RxComputationThreadPool-1 Delayed 13
    18. RxComputationThreadPool-1 Shifted Down 3
    19. RxComputationThreadPool-1 Received 3
    20. RxComputationThreadPool-1 Delayed 14
    21. RxComputationThreadPool-1 Shifted Down 4
    22. RxComputationThreadPool-1 Received 4
    23. RxComputationThreadPool-1 Delayed 15
    24. RxComputationThreadPool-1 Shifted Down 5
    25. RxComputationThreadPool-1 Received 5

    注意点
    rxjava的一些operator(比如这个例子中的delay)可能会在不同的线程上调度subscriber,而不是调用subscribe方法的线程。

    combine observable

    single thread

    代码示例

    1. public static void usingOneThread() throws InterruptedException {
    2. List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
    3. List<Integer> intStream2 = Arrays.asList(2, 4, 6, 8);
    4. Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
    5. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    6. emitter.onNext(i);
    7. })).mergeWith(Observable.create(emitter -> intStream2.forEach(i -> {
    8. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    9. emitter.onNext(i);
    10. })))
    11. .map(e -> {
    12. System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
    13. return e + 10;
    14. })
    15. .subscribeOn(Schedulers.io())
    16. .map(e -> {
    17. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    18. return e - 10;
    19. })
    20. .subscribe(new Observer<Integer>() {
    21. @Override
    22. public void onSubscribe(@NonNull Disposable d) {
    23. }
    24. @Override
    25. public void onNext(@NonNull Integer s) {
    26. System.out.println(Thread.currentThread().getName() + " Received " + s);
    27. }
    28. @Override
    29. public void onError(@NonNull Throwable e) {
    30. }
    31. @Override
    32. public void onComplete() {
    33. }
    34. });
    35. Thread.sleep(1000);
    36. }

    输出结果

    1. RxCachedThreadScheduler-1 Generated 1
    2. RxCachedThreadScheduler-1 Shifted Up 11
    3. RxCachedThreadScheduler-1 Shifted Down 1
    4. RxCachedThreadScheduler-1 Received 1
    5. RxCachedThreadScheduler-1 Generated 3
    6. RxCachedThreadScheduler-1 Shifted Up 13
    7. RxCachedThreadScheduler-1 Shifted Down 3
    8. RxCachedThreadScheduler-1 Received 3
    9. RxCachedThreadScheduler-1 Generated 5
    10. RxCachedThreadScheduler-1 Shifted Up 15
    11. RxCachedThreadScheduler-1 Shifted Down 5
    12. RxCachedThreadScheduler-1 Received 5
    13. RxCachedThreadScheduler-1 Generated 7
    14. RxCachedThreadScheduler-1 Shifted Up 17
    15. RxCachedThreadScheduler-1 Shifted Down 7
    16. RxCachedThreadScheduler-1 Received 7
    17. RxCachedThreadScheduler-1 Generated 2
    18. RxCachedThreadScheduler-1 Shifted Up 12
    19. RxCachedThreadScheduler-1 Shifted Down 2
    20. RxCachedThreadScheduler-1 Received 2
    21. RxCachedThreadScheduler-1 Generated 4
    22. RxCachedThreadScheduler-1 Shifted Up 14
    23. RxCachedThreadScheduler-1 Shifted Down 4
    24. RxCachedThreadScheduler-1 Received 4
    25. RxCachedThreadScheduler-1 Generated 6
    26. RxCachedThreadScheduler-1 Shifted Up 16
    27. RxCachedThreadScheduler-1 Shifted Down 6
    28. RxCachedThreadScheduler-1 Received 6
    29. RxCachedThreadScheduler-1 Generated 8
    30. RxCachedThreadScheduler-1 Shifted Up 18
    31. RxCachedThreadScheduler-1 Shifted Down 8
    32. RxCachedThreadScheduler-1 Received 8

    multi thread

    代码示例

    1. public static void usingMultiThread() throws InterruptedException {
    2. List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
    3. Observable<Integer> generatorSubscribed = Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
    4. System.out.println(Thread.currentThread().getName() + " Generated " + i);
    5. emitter.onNext(i);
    6. })).subscribeOn(Schedulers.io());
    7. Observable<Integer> shiftUp1 = generatorSubscribed.map(e -> {
    8. System.out.println(Thread.currentThread().getName() + " Shifted Up #1 " + (e + 10));
    9. return e + 10;
    10. });
    11. Observable<Integer> shiftUp2 = generatorSubscribed.map(e -> {
    12. System.out.println(Thread.currentThread().getName() + " Shifted Up #2 " + (e + 10));
    13. return e + 10;
    14. });
    15. shiftUp1.mergeWith(shiftUp2)
    16. .map(e -> {
    17. System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
    18. return e - 10;
    19. }).subscribe(new Observer<Integer>() {
    20. @Override
    21. public void onSubscribe(@NonNull Disposable d) {
    22. }
    23. @Override
    24. public void onNext(@NonNull Integer s) {
    25. System.out.println(Thread.currentThread().getName() + " Received " + s);
    26. }
    27. @Override
    28. public void onError(@NonNull Throwable e) {
    29. }
    30. @Override
    31. public void onComplete() {
    32. }
    33. });
    34. Thread.sleep(1000);
    35. }

    输出结果

    1. RxCachedThreadScheduler-2 Generated 1
    2. RxCachedThreadScheduler-1 Generated 1
    3. RxCachedThreadScheduler-2 Shifted Up #2 11
    4. RxCachedThreadScheduler-1 Shifted Up #1 11
    5. RxCachedThreadScheduler-2 Shifted Down 1
    6. RxCachedThreadScheduler-2 Received 1
    7. RxCachedThreadScheduler-2 Generated 3
    8. RxCachedThreadScheduler-2 Shifted Up #2 13
    9. RxCachedThreadScheduler-2 Shifted Down 3
    10. RxCachedThreadScheduler-2 Received 3
    11. RxCachedThreadScheduler-2 Generated 5
    12. RxCachedThreadScheduler-2 Shifted Up #2 15
    13. RxCachedThreadScheduler-2 Shifted Down 5
    14. RxCachedThreadScheduler-2 Received 5
    15. RxCachedThreadScheduler-2 Generated 7
    16. RxCachedThreadScheduler-2 Shifted Up #2 17
    17. RxCachedThreadScheduler-2 Shifted Down 7
    18. RxCachedThreadScheduler-2 Received 7
    19. RxCachedThreadScheduler-1 Shifted Down 1
    20. RxCachedThreadScheduler-1 Received 1
    21. RxCachedThreadScheduler-1 Generated 3
    22. RxCachedThreadScheduler-1 Shifted Up #1 13
    23. RxCachedThreadScheduler-1 Shifted Down 3
    24. RxCachedThreadScheduler-1 Received 3
    25. RxCachedThreadScheduler-1 Generated 5
    26. RxCachedThreadScheduler-1 Shifted Up #1 15
    27. RxCachedThreadScheduler-1 Shifted Down 5
    28. RxCachedThreadScheduler-1 Received 5
    29. RxCachedThreadScheduler-1 Generated 7
    30. RxCachedThreadScheduler-1 Shifted Up #1 17
    31. RxCachedThreadScheduler-1 Shifted Down 7
    32. RxCachedThreadScheduler-1 Received 7