弹珠图
弹珠图可以很好的描述rxjava的运作流程,建议先看下这篇文章Understanding Marble Diagrams for Reactive Streams
简单scheduler
输出结果: RxNewThreadScheduler-1 hello world!
public class SchedulersWork {
private static final CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(SchedulersWork::sayHello);
latch.await();
worker.dispose();
}
public static void sayHello() {
System.out.println(Thread.currentThread().getName() + " hello world!");
latch.countDown();
}
}
递归scheduler
输出结果: RxNewThreadScheduler-1 hello world! …
循环输出直到dispose worker
public class RecursiveScheduler {
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
sayHello();
//递归直到dispose
worker.schedule(this);
}
});
Thread.sleep(1000);
worker.dispose();
}
public static void sayHello() {
System.out.println(Thread.currentThread().getName() + " hello world!");
}
}
⚠️: 递归调用需要限制递归次数或者主动设置dispose状态,否则会出现死循环。
检查或者设置dispose状态
package com.zihao.schedulers;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Scanner;
/**
* 相比较普通的recursive 调度器,使用dispose状态检查
* 如果不订阅了 就停止并释放资源
*
* @author tangzihao
* @Date 2021/1/3 11:20 上午
*/
public class CheckDisposeRecursiveScheduler {
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
while (!worker.isDisposed()) {
sayHello();
}
System.out.println("worker被终止了。。。");
}
});
//主线程通过终端控制worker结束
Scanner scan = new Scanner(System.in);
System.out.println("终止worker工作: ");
if (scan.hasNext()) {
String str = scan.next();
if (str.equals("stop")) {
worker.dispose();
}
}
scan.close();
}
public static void sayHello() {
//do nothing 故意不输出方便终端输入
}
}
输出结果
终止worker工作:
stop //在控制台输入 main线程
worker被终止了。。。 //worker线程
延迟或周期性调度器
delayed scheduler
schedule有三个参数,调用的方法,延迟的时间,时间单位
public class DelayedAndPeriodicScheduler {
public static void main(String[] args) throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(DelayedAndPeriodicScheduler::sayHello,1, TimeUnit.SECONDS);
Thread.sleep(2000);
}
public static void sayHello(){
System.out.println("hello,world!");
}
}
periodic scheduler
schedulePeriodically有四个参数,调用的方法,延迟的时间,周期性时间,时间单位
public class DelayedAndPeriodicScheduler {
public static void main(String[] args) throws InterruptedException {
periodicScheduler();
}
public static void periodicScheduler() throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedulePeriodically(DelayedAndPeriodicScheduler::sayHello, 500, 250, TimeUnit.MILLISECONDS);
Thread.sleep(3000);
}
public static void sayHello() {
System.out.println("hello,world!");
}
}