快速入门
public class HelloWorld {
public static void main(String[] args) {
Flowable.fromArray("Ben", "George").subscribe(s -> System.out.println("Hello " + s + "!"));
}
}
创建observable
使用observable操作符
可以使用just()或者from()方法将对象,数组,集合转换为Observable对象。
Observable<String> stringObservable = Observable.fromArray("a", "b", "c");
stringObservable.subscribe(s -> System.out.println(s + " "));
List<Integer> intList = Arrays.asList(1, 3, 5, 7);
Observable<Integer> integerObservable = Observable.fromStream(intList.stream());
integerObservable.subscribe(s -> System.out.println(s + " "));
Observable.just("one object").subscribe(System.out::println);
对于由Observable发射的每个元素,这些转换后的Observable都会同步调用它们的任何订阅者的onNext()方法,然后将调用订阅者的onCompleted()方法。
使用create方法
可以使用Create运算符从头创建Observable。向此运算符传递一个接受观察者作为其参数的函数。编写此函数,使其表现为可观察到的状态-通过适当地调用观察者的onNext,onError和onCompleted方法。Observable必须仅一次调用观察者的onCompleted方法或一次仅调用其onError方法。
同步
package com.zihao.observable.create;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.stream.IntStream;
/**
* 同步observable
* 不会产生额外的线程
*
* @author tangzihao
* @Date 2020/12/28 9:10 下午
*/
public class SyncCreateObservable {
public static void main(String[] args) {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
IntStream.rangeClosed(1, 50).forEach(i -> {
if (!emitter.isDisposed()) {
//如果抛出异常执行到30就不会执行了
/*if (i == 30) {
throw new RuntimeException("老子不开心");
}*/
emitter.onNext("value_" + i);
}
});
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("开始订阅啦");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("完成了");
}
});
}
}
上面代码有几个注意点:
- 这个是同步的代码,不会产生新的线程,主线程会等上面的onComplete后才会执行
- emitter.isDisposed()判断是否不感兴趣了,早期版本叫做unsubscribed
- 如果抛出异常,异常之后的都不会执行,包括onComplete方法也不会执行
- 如果没有抛出异常的话,onError方法不会执行
- onSubscribe,onError,onComplete只会执行一次。
异步
```java package com.zihao.observable.create;
import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.Disposable;
import java.util.stream.IntStream;
/**
- @author tangzihao
@Date 2020/12/28 9:46 下午 */ public class AsyncCreateObservable { public static void main(String[] args) {
Observable.create((ObservableOnSubscribe<String>) emitter -> new Thread(() -> {
IntStream.rangeClosed(1, 50).forEach(i -> {
if (!emitter.isDisposed()) {
//如果抛出异常执行到30就不会执行了
/*if (i == 30) {
throw new RuntimeException("老子不开心");
}*/
emitter.onNext("value_" + i);
}
});
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}).start()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("开始订阅啦");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("完成了");
}
});
System.out.println("主线程可以正常工作。。。");
} } ``` 上面代码有如下注意点
- 会产生新的线程,observer不会阻塞主线程的工作
- 其他和同步一样
转换(transform)observable
示例代码public class TransformObservable {
public static void main(String[] args) {
Observable.create((ObservableOnSubscribe<String>) emitter -> {
IntStream.rangeClosed(1, 50).forEach(i -> {
if (!emitter.isDisposed()) {
emitter.onNext("value_" + i);
}
});
if (!emitter.isDisposed()) {
emitter.onComplete();
}
})
.skip(5)
.take(10)
.map(s -> s + "_xform")
.subscribe(System.out::println);
}
}
异常处理
通过onError捕获异常
```java //observable代码 if (!emitter.isDisposed()) { if (i == 30) {
} emitter.onNext(“value_” + i); }throw new RuntimeException("老子不开心");
//observer代码 @Override public void onError(Throwable throwable) { throwable.printStackTrace(); }
<a name="BKtI3"></a>
### 使用onErrorResume的方式fallback
其他代码都同上
```java
onErrorResumeNext(throwable -> Observable.just("发生了异常,直接onComplete了"))