本篇文章主要是阅读官网的scheduler文档。

弹珠图

弹珠图可以很好的描述rxjava的运作流程,建议先看下这篇文章Understanding Marble Diagrams for Reactive Streams

简单scheduler

输出结果: RxNewThreadScheduler-1 hello world!

  1. public class SchedulersWork {
  2. private static final CountDownLatch latch = new CountDownLatch(1);
  3. public static void main(String[] args) throws InterruptedException {
  4. Scheduler.Worker worker = Schedulers.newThread().createWorker();
  5. worker.schedule(SchedulersWork::sayHello);
  6. latch.await();
  7. worker.dispose();
  8. }
  9. public static void sayHello() {
  10. System.out.println(Thread.currentThread().getName() + " hello world!");
  11. latch.countDown();
  12. }
  13. }

递归scheduler

输出结果: RxNewThreadScheduler-1 hello world! …
循环输出直到dispose worker

  1. public class RecursiveScheduler {
  2. public static void main(String[] args) throws InterruptedException {
  3. Scheduler.Worker worker = Schedulers.newThread().createWorker();
  4. worker.schedule(new Runnable() {
  5. @Override
  6. public void run() {
  7. sayHello();
  8. //递归直到dispose
  9. worker.schedule(this);
  10. }
  11. });
  12. Thread.sleep(1000);
  13. worker.dispose();
  14. }
  15. public static void sayHello() {
  16. System.out.println(Thread.currentThread().getName() + " hello world!");
  17. }
  18. }

⚠️: 递归调用需要限制递归次数或者主动设置dispose状态,否则会出现死循环。

检查或者设置dispose状态

  1. package com.zihao.schedulers;
  2. import io.reactivex.rxjava3.core.Scheduler;
  3. import io.reactivex.rxjava3.schedulers.Schedulers;
  4. import java.util.Scanner;
  5. /**
  6. * 相比较普通的recursive 调度器,使用dispose状态检查
  7. * 如果不订阅了 就停止并释放资源
  8. *
  9. * @author tangzihao
  10. * @Date 2021/1/3 11:20 上午
  11. */
  12. public class CheckDisposeRecursiveScheduler {
  13. public static void main(String[] args) throws InterruptedException {
  14. Scheduler.Worker worker = Schedulers.newThread().createWorker();
  15. worker.schedule(new Runnable() {
  16. @Override
  17. public void run() {
  18. while (!worker.isDisposed()) {
  19. sayHello();
  20. }
  21. System.out.println("worker被终止了。。。");
  22. }
  23. });
  24. //主线程通过终端控制worker结束
  25. Scanner scan = new Scanner(System.in);
  26. System.out.println("终止worker工作: ");
  27. if (scan.hasNext()) {
  28. String str = scan.next();
  29. if (str.equals("stop")) {
  30. worker.dispose();
  31. }
  32. }
  33. scan.close();
  34. }
  35. public static void sayHello() {
  36. //do nothing 故意不输出方便终端输入
  37. }
  38. }

输出结果

  1. 终止worker工作:
  2. stop //在控制台输入 main线程
  3. worker被终止了。。。 //worker线程

延迟或周期性调度器

delayed scheduler

schedule有三个参数,调用的方法,延迟的时间,时间单位

  1. public class DelayedAndPeriodicScheduler {
  2. public static void main(String[] args) throws InterruptedException {
  3. Scheduler.Worker worker = Schedulers.newThread().createWorker();
  4. worker.schedule(DelayedAndPeriodicScheduler::sayHello,1, TimeUnit.SECONDS);
  5. Thread.sleep(2000);
  6. }
  7. public static void sayHello(){
  8. System.out.println("hello,world!");
  9. }
  10. }

periodic scheduler

schedulePeriodically有四个参数,调用的方法,延迟的时间,周期性时间,时间单位

  1. public class DelayedAndPeriodicScheduler {
  2. public static void main(String[] args) throws InterruptedException {
  3. periodicScheduler();
  4. }
  5. public static void periodicScheduler() throws InterruptedException {
  6. Scheduler.Worker worker = Schedulers.newThread().createWorker();
  7. worker.schedulePeriodically(DelayedAndPeriodicScheduler::sayHello, 500, 250, TimeUnit.MILLISECONDS);
  8. Thread.sleep(3000);
  9. }
  10. public static void sayHello() {
  11. System.out.println("hello,world!");
  12. }
  13. }