前置知识:观察者模式&java.util.Observable
RxJAVA2 = Observer + 异步处理
添加Maven依赖
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>2.2.10</version></dependency>
Consumer示例
import io.reactivex.Observable;import io.reactivex.functions.Consumer;import io.reactivex.schedulers.Schedulers;public class Main {public static void main(String[] args) {Consumer<String> consumer = s -> {Thread.sleep(100L);System.out.println(Thread.currentThread().getName() + " : " + s);};Observable<String> observable = Observable.create(emitter -> {emitter.onNext("foo");emitter.onNext("foo2");emitter.onNext("foo3");});//异步调用observable.observeOn(Schedulers.newThread()).subscribe(consumer);//同步調用observable.subscribe(consumer);}}
Observer示例
import io.reactivex.Observable;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;public class Main2 {public static void main(String[] args) {Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable disposable) {System.out.println("onSubscribe:" + disposable);}@Overridepublic void onNext(String s) {System.out.println("onNext:" + s);// if ("foo3".equals(s)) {// throw new IllegalArgumentException("foo3");// }}@Overridepublic void onError(Throwable throwable) {System.out.println("onError:" + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("onComplete");}};Observable<String> observable = Observable.create(emitter -> {try {emitter.onNext("foo");emitter.onNext("foo2");emitter.onNext("foo3");emitter.onNext("foo4");} catch (Exception e) {emitter.onError(e);}emitter.onComplete();});//同步調用observable.subscribe(observer);}}
链式调用
import io.reactivex.Observable;import io.reactivex.ObservableOnSubscribe;import io.reactivex.schedulers.Schedulers;public class Main3 {public static void main(String[] args) {Observable.create((ObservableOnSubscribe<String>) emitter -> {emitter.onNext("foo");emitter.onNext("foo2");emitter.onNext("foo3");}).observeOn(Schedulers.newThread()).subscribe(s -> System.out.println(s));for (;;){}}}
