简介
Single就像是一个Observable,但是不发出一系列值(从没有到无限),它总是发出一个值或错误通知。
因此,您不必使用三种方法来响应来自Observable的通知(onNext, onError和onCompleted)来订阅Single,而是仅使用两种方法进行订阅:
onSuccess
- Single通过此方法将Single发出的唯一数据
onError
- Single通过此方法传递Throwable
Single将仅调用这些方法之一,并且只会调用一次。调用任一方法后,Single终止,对其的订阅结束。
操作符
像Observables一样,可以通过各种运算符来操纵Singles。一些运算符还允许在Observable和Single之间建立接口,以便您可以混合使用两种变体:
operator | returns | description |
---|---|---|
concat and concatWith | Observable | 将多个Single发出的项串联为Observable |
create | Single | 通过显式调用订阅者方法从头开始创建Single |
delay | Single | 推迟数据的发射 |
doOnError | Single | 返回一个Single,它在调用onError时也会调用指定的方法 |
doOnSuccess | Single | 返回一个Single,它还会在调用onSuccess时调用您指定的方法 |
error | Single | 返回一个立即通知订阅者错误的Single |
flatMap | Single | 返回一个Single,它是对Single发出的数据进行函数映射的结果 |
flatMapObservable | Observable | 返回一个Observable,它是岁Single发出的数据的函数的结果 |
from | Single | 将Future转成Single |
just | Single | 返回发出指定项目的Single |
map | Single | 返回一个Single,该Single发出应用于源Single发出的项目的函数的结果 |
merge and mergeWith | Observable | 将多个Single组合成Observable |
observeOn | Single | 指定操作符之后的任务在哪个线程调度 |
onErrorReturn | Single | 将发出错误通知的Single转换为发出指定项目的Single |
subscribeOn | Single | 操作符之前的操作在哪个线程调度 |
timeout | Single | 如果源Single在指定时间段内未发出值,则返回发出错误通知的Single。可以使用fallback在超时的时候返回默认值。也可以和onErrorReturn |
toSingle | Single | 将Observable转为Single |
toObservable | Observable | 将Single转为Observable |
zip and zipWith | Single | 返回一个Single,该Single发出一个项,该项是应用到由两个或多个其他Single发出的项的函数的结果 |
concat and concatWith
弹珠图
代码示例
- 没有发生错误
输出的结果是hello,world!
Single<String> s1 = Single.just("hello");
Single<String> s2 = Single.just(",world!");
s1.concatWith(s2).subscribe(System.out::print);
- 其中一个observable发生了异常
先输出hello,然后输出异常的堆栈
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<String> future = executorService.submit(() -> {
throw new RuntimeException("哈哈哈!");
});
Single<String> s2 = Single.just("hello!");
Single<String> s1 = Single.fromFuture(future);
s2.concatWith(s1).subscribe(System.out::println);
executorService.shutdown();
create
弹珠图
代码示例
- onSuccess
输出结果hello,world!
Single.create((SingleOnSubscribe<String>) emitter -> emitter.onSuccess("hello,world")).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
onError
Single.create((SingleOnSubscribe<String>) emitter -> emitter.onError(new RuntimeException("haha"))).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull String s) {
System.out.println(s);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
}
});
delay
弹珠图
There is also a version of this operator that allows you to perform the delay on a particular Scheduler:
代码示例
不使用schedule
从输出结果看delay操作是在另一个线程执行的
输出结果: RxComputationThreadPool-1 hello,world(用的调度器是computation)
Single.just("hello,world")
.delay(100, TimeUnit.MILLISECONDS)
.subscribe(r-> System.out.println(Thread.currentThread().getName()+" "+r));
Thread.sleep(1000L);
- 使用schedule
RxCachedThreadScheduler-1 hello,world
Single.just("hello,world")
.delaySubscription(100, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
Thread.sleep(1000L);
doOnError
弹珠图
代码示例
输出结果: 先输出error…,然后输出异常堆栈
Single<String> single = Single.error(new RuntimeException());
single.doOnError(s -> System.out.println("error...")).subscribe(System.out::println);
doOnSuccess
弹珠图
代码示例
输出结果: 先输出hello,world!,然后输出success…
Single<String> single = Single.just("hello,world!");
single.doOnSuccess(s -> System.out.println("success...")).subscribe(System.out::println);
error
弹珠图
代码示例
Single.error(new RuntimeException());
flatMap
弹珠图
代码示例
Single.just("hello,world!")
.flatMap(new Function<String, SingleSource<?>>() {
@Override
public SingleSource<?> apply(String s) throws Throwable {
return (SingleSource<Object>) observer -> observer.onSuccess(s + " flatMap");
}
}).subscribe(System.out::println);
flatMapObservable
弹珠图
代码示例
Single.just("hello,world!")
.flatMapObservable((Function<String, ObservableSource<?>>) s -> Observable.fromArray(Arrays.asList(1,2,3,4,5)))
.subscribe(System.out::println);
from
弹珠图
There is also a variety that takes a Scheduler as an argument:
代码示例
Single.just("hello,world!")
.flatMap((Function<String, SingleSource<?>>) s -> Single.just(s+" flatMap")).subscribe(System.out::println);
just
map
弹珠图
代码示例
Single.just("hello")
.map(s -> s + ",world")
.subscribe(System.out::println);
merge and mergeWith
One version of merge takes a Single that emits a second Single and converts it into a Single that emits the item emitted by that second Single:
mergeWith可以将两个或者更多的Single merge成Observable
弹珠图
问题记录
Single.just("hello")
.mergeWith(Single.just(",world"))
.subscribe(System.out::println);
observeOn
弹珠图
代码示例
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {
System.out.println(Thread.currentThread().getName() + " begin emit item");
emitter.onSuccess("hello,world");
}}).observeOn(Schedulers.io())
.subscribe(s -> System.out.println(Thread.currentThread().getName() + " subscribe data " + s));
输出结果
main begin emit item
RxCachedThreadScheduler-1 subscribe data hello,world
onErrorReturn
正确时返回正确的值,发生错误时不返回异常返回用户自定义默认值
代码示例
代码示例
public static void main(String[] args) {
Single.just("hello")
.onErrorReturn(throwable -> {
System.out.println("发生了异常,返回默认值,此处输出异常信息: " + throwable.getMessage());
return "world";
}).subscribe(System.out::println);
Single.error(new RuntimeException("哈哈哈"))
.onErrorReturn((Function<Throwable, String>) throwable -> {
System.out.println("发生了异常,返回默认值,此处输出异常信息: " + throwable.getMessage());
return "world";
}).subscribe(System.out::println);
输出结果
hello
发生了异常,返回默认值,此处输出异常信息: 哈哈哈
world
subscribeOn
弹珠图
代码示例
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {
System.out.println(Thread.currentThread().getName() + " begin emit item");
emitter.onSuccess("hello,world");
}
}).subscribeOn(Schedulers.io())
.subscribe(s -> System.out.println(Thread.currentThread().getName() + " subscribe data " + s));
输出结果
RxCachedThreadScheduler-1 begin emit item
RxCachedThreadScheduler-1 subscribe data hello,world
timeout
弹珠图
如果在订阅后指定时间段内未发出任何项目,超时将导致Single异常中止并显示错误通知。通过指定的时间单位来设置此超时时间:
也可以结合scheduler操作符
timeout操作符也可以切换到备份Single,而不是在超时到期时发送错误通知:
结合scheduler操作符使用
代码示例
- 不使用scheduler
```java
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future
future = executorService.submit(new Callable () {
);@Override
public String call() throws Exception {
//模拟耗时的操作
Thread.sleep(300);
return "hello";
}
Single.fromFuture(future) .timeout(100, TimeUnit.MILLISECONDS) .subscribe(System.out::println);
executorService.shutdown();
输出结果
```java
Caused by: java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.
... 9 more
Exception in thread "RxComputationThreadPool-1" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException:
- 使用scheduler
```java
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future
future = executorService.submit(new Callable () {
);@Override
public String call() throws Exception {
//模拟耗时的操作
Thread.sleep(300);
return "hello";
}
Single.fromFuture(future) .observeOn(Schedulers.io()) .timeout(100, TimeUnit.MILLISECONDS) .subscribe(System.out::println);
executorService.shutdown();
输出结果<br />看输出结果可以看到RxComputationThreadPool-1是这个线程抛出的异常,结合上面的例子,解释就是使用timeout操作符后会将操作调度到RxComputationThreadPool-1线程,observeOn是切换timeout操作符之后的线程,因为抛出了异常,执行中断。
```java
Caused by: java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.
... 9 more
Exception in thread "RxComputationThreadPool-1" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException
- 使用fallback
超时返回默认值
Single.fromFuture(future)
.timeout(100, TimeUnit.MILLISECONDS,Single.just("default"))
.subscribe(System.out::println);
输出结果
default
toObservable
弹珠图
zip and zipWith
弹珠图
代码示例
Single.just("hello")
.zipWith(Single.just("world"), (s, s2) -> s.length()+s2.length()).subscribe(System.out::println);