弹珠图
弹珠图可以很好的描述rxjava的运作流程,建议先看下这篇文章Understanding Marble Diagrams for Reactive Streams
简单scheduler
输出结果: RxNewThreadScheduler-1 hello world!
public class SchedulersWork {private static final CountDownLatch latch = new CountDownLatch(1);public static void main(String[] args) throws InterruptedException {Scheduler.Worker worker = Schedulers.newThread().createWorker();worker.schedule(SchedulersWork::sayHello);latch.await();worker.dispose();}public static void sayHello() {System.out.println(Thread.currentThread().getName() + " hello world!");latch.countDown();}}
递归scheduler
输出结果: RxNewThreadScheduler-1 hello world! …
循环输出直到dispose worker
public class RecursiveScheduler {public static void main(String[] args) throws InterruptedException {Scheduler.Worker worker = Schedulers.newThread().createWorker();worker.schedule(new Runnable() {@Overridepublic void run() {sayHello();//递归直到disposeworker.schedule(this);}});Thread.sleep(1000);worker.dispose();}public static void sayHello() {System.out.println(Thread.currentThread().getName() + " hello world!");}}
⚠️: 递归调用需要限制递归次数或者主动设置dispose状态,否则会出现死循环。
检查或者设置dispose状态
package com.zihao.schedulers;import io.reactivex.rxjava3.core.Scheduler;import io.reactivex.rxjava3.schedulers.Schedulers;import java.util.Scanner;/*** 相比较普通的recursive 调度器,使用dispose状态检查* 如果不订阅了 就停止并释放资源** @author tangzihao* @Date 2021/1/3 11:20 上午*/public class CheckDisposeRecursiveScheduler {public static void main(String[] args) throws InterruptedException {Scheduler.Worker worker = Schedulers.newThread().createWorker();worker.schedule(new Runnable() {@Overridepublic void run() {while (!worker.isDisposed()) {sayHello();}System.out.println("worker被终止了。。。");}});//主线程通过终端控制worker结束Scanner scan = new Scanner(System.in);System.out.println("终止worker工作: ");if (scan.hasNext()) {String str = scan.next();if (str.equals("stop")) {worker.dispose();}}scan.close();}public static void sayHello() {//do nothing 故意不输出方便终端输入}}
输出结果
终止worker工作:stop //在控制台输入 main线程worker被终止了。。。 //worker线程
延迟或周期性调度器
delayed scheduler
schedule有三个参数,调用的方法,延迟的时间,时间单位
public class DelayedAndPeriodicScheduler {public static void main(String[] args) throws InterruptedException {Scheduler.Worker worker = Schedulers.newThread().createWorker();worker.schedule(DelayedAndPeriodicScheduler::sayHello,1, TimeUnit.SECONDS);Thread.sleep(2000);}public static void sayHello(){System.out.println("hello,world!");}}
periodic scheduler
schedulePeriodically有四个参数,调用的方法,延迟的时间,周期性时间,时间单位
public class DelayedAndPeriodicScheduler {public static void main(String[] args) throws InterruptedException {periodicScheduler();}public static void periodicScheduler() throws InterruptedException {Scheduler.Worker worker = Schedulers.newThread().createWorker();worker.schedulePeriodically(DelayedAndPeriodicScheduler::sayHello, 500, 250, TimeUnit.MILLISECONDS);Thread.sleep(3000);}public static void sayHello() {System.out.println("hello,world!");}}
