:::tips
- 使用 Rx.Observable.create() 创建响应式流。该流通过显式的通知每一个 Observer (或者说是 “Subscriber”) 数据事件onNext() 或者错误事件onError()。
与 Java StreamAPI 相比,Observable 是可重用的。 :::
RxJava
public static void rxjava(){Observable<Object> observable = Observable.create(subscriber -> {for (int i = 0; i < 10; i++) {// 传输响应式流中的元素subscriber.onNext("rx1 -- " + i);}// 通知订阅者,响应式流结束subscriber.onCompleted();});observable.subscribe(item -> System.out.println("下一个元素是:" + item),ex -> System.err.println("异常信息:" + ex.getMessage()),() -> System.out.println("响应式流结束"));}
RxJava2
public static void rxjava2(){Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {for (int i = 0; i < 10; i++) {observableEmitter.onNext("rxjava2 - " + i);}observableEmitter.onComplete();}});observable.subscribe(System.out::println,System.err::println,() -> System.out.println("流结束"));}
RxJava3
public static void rxjava3(){Observable.create((ObservableOnSubscribe<String>) emitter -> {for (int i = 0; i < 10; i++) {emitter.onNext("rxjava3 - " + i);}emitter.onComplete();}).subscribe(System.out::println,System.err::println,() -> System.out.println("流结束"));}
