简介
Single就像是一个Observable,但是不发出一系列值(从没有到无限),它总是发出一个值或错误通知。
因此,您不必使用三种方法来响应来自Observable的通知(onNext, onError和onCompleted)来订阅Single,而是仅使用两种方法进行订阅:
onSuccess
- Single通过此方法将Single发出的唯一数据
onError
- Single通过此方法传递Throwable
Single将仅调用这些方法之一,并且只会调用一次。调用任一方法后,Single终止,对其的订阅结束。
操作符
像Observables一样,可以通过各种运算符来操纵Singles。一些运算符还允许在Observable和Single之间建立接口,以便您可以混合使用两种变体:
| operator | returns | description |
|---|---|---|
| concat and concatWith | Observable | 将多个Single发出的项串联为Observable |
| create | Single | 通过显式调用订阅者方法从头开始创建Single |
| delay | Single | 推迟数据的发射 |
| doOnError | Single | 返回一个Single,它在调用onError时也会调用指定的方法 |
| doOnSuccess | Single | 返回一个Single,它还会在调用onSuccess时调用您指定的方法 |
| error | Single | 返回一个立即通知订阅者错误的Single |
| flatMap | Single | 返回一个Single,它是对Single发出的数据进行函数映射的结果 |
| flatMapObservable | Observable | 返回一个Observable,它是岁Single发出的数据的函数的结果 |
| from | Single | 将Future转成Single |
| just | Single | 返回发出指定项目的Single |
| map | Single | 返回一个Single,该Single发出应用于源Single发出的项目的函数的结果 |
| merge and mergeWith | Observable | 将多个Single组合成Observable |
| observeOn | Single | 指定操作符之后的任务在哪个线程调度 |
| onErrorReturn | Single | 将发出错误通知的Single转换为发出指定项目的Single |
| subscribeOn | Single | 操作符之前的操作在哪个线程调度 |
| timeout | Single | 如果源Single在指定时间段内未发出值,则返回发出错误通知的Single。可以使用fallback在超时的时候返回默认值。也可以和onErrorReturn |
| toSingle | Single | 将Observable转为Single |
| toObservable | Observable | 将Single转为Observable |
| zip and zipWith | Single | 返回一个Single,该Single发出一个项,该项是应用到由两个或多个其他Single发出的项的函数的结果 |
concat and concatWith
弹珠图

代码示例
- 没有发生错误
输出的结果是hello,world!
Single<String> s1 = Single.just("hello");Single<String> s2 = Single.just(",world!");s1.concatWith(s2).subscribe(System.out::print);
- 其中一个observable发生了异常
先输出hello,然后输出异常的堆栈
ExecutorService executorService = Executors.newFixedThreadPool(1);Future<String> future = executorService.submit(() -> {throw new RuntimeException("哈哈哈!");});Single<String> s2 = Single.just("hello!");Single<String> s1 = Single.fromFuture(future);s2.concatWith(s1).subscribe(System.out::println);executorService.shutdown();
create
弹珠图
代码示例
- onSuccess
输出结果hello,world!
Single.create((SingleOnSubscribe<String>) emitter -> emitter.onSuccess("hello,world")).subscribe(new SingleObserver<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onSuccess(@NonNull String s) {System.out.println(s);}@Overridepublic void onError(@NonNull Throwable e) {}});
onError
Single.create((SingleOnSubscribe<String>) emitter -> emitter.onError(new RuntimeException("haha"))).subscribe(new SingleObserver<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onSuccess(@NonNull String s) {System.out.println(s);}@Overridepublic void onError(@NonNull Throwable e) {e.printStackTrace();}});
delay
弹珠图

There is also a version of this operator that allows you to perform the delay on a particular Scheduler:
代码示例
不使用schedule
从输出结果看delay操作是在另一个线程执行的
输出结果: RxComputationThreadPool-1 hello,world(用的调度器是computation)
Single.just("hello,world").delay(100, TimeUnit.MILLISECONDS).subscribe(r-> System.out.println(Thread.currentThread().getName()+" "+r));Thread.sleep(1000L);
- 使用schedule
RxCachedThreadScheduler-1 hello,world
Single.just("hello,world").delaySubscription(100, TimeUnit.MILLISECONDS, Schedulers.io()).subscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));Thread.sleep(1000L);
doOnError
弹珠图
代码示例
输出结果: 先输出error…,然后输出异常堆栈
Single<String> single = Single.error(new RuntimeException());single.doOnError(s -> System.out.println("error...")).subscribe(System.out::println);
doOnSuccess
弹珠图
代码示例
输出结果: 先输出hello,world!,然后输出success…
Single<String> single = Single.just("hello,world!");single.doOnSuccess(s -> System.out.println("success...")).subscribe(System.out::println);
error
弹珠图
代码示例
Single.error(new RuntimeException());
flatMap
弹珠图
代码示例
Single.just("hello,world!").flatMap(new Function<String, SingleSource<?>>() {@Overridepublic SingleSource<?> apply(String s) throws Throwable {return (SingleSource<Object>) observer -> observer.onSuccess(s + " flatMap");}}).subscribe(System.out::println);
flatMapObservable
弹珠图
代码示例
Single.just("hello,world!").flatMapObservable((Function<String, ObservableSource<?>>) s -> Observable.fromArray(Arrays.asList(1,2,3,4,5))).subscribe(System.out::println);
from
弹珠图

There is also a variety that takes a Scheduler as an argument:
代码示例
Single.just("hello,world!").flatMap((Function<String, SingleSource<?>>) s -> Single.just(s+" flatMap")).subscribe(System.out::println);
just
map
弹珠图
代码示例
Single.just("hello").map(s -> s + ",world").subscribe(System.out::println);
merge and mergeWith
One version of merge takes a Single that emits a second Single and converts it into a Single that emits the item emitted by that second Single:
mergeWith可以将两个或者更多的Single merge成Observable
弹珠图
问题记录
Single.just("hello").mergeWith(Single.just(",world")).subscribe(System.out::println);
observeOn
弹珠图
代码示例
Single.create(new SingleOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {System.out.println(Thread.currentThread().getName() + " begin emit item");emitter.onSuccess("hello,world");}}).observeOn(Schedulers.io()).subscribe(s -> System.out.println(Thread.currentThread().getName() + " subscribe data " + s));
输出结果
main begin emit itemRxCachedThreadScheduler-1 subscribe data hello,world
onErrorReturn
正确时返回正确的值,发生错误时不返回异常返回用户自定义默认值
代码示例
代码示例
public static void main(String[] args) {Single.just("hello").onErrorReturn(throwable -> {System.out.println("发生了异常,返回默认值,此处输出异常信息: " + throwable.getMessage());return "world";}).subscribe(System.out::println);Single.error(new RuntimeException("哈哈哈")).onErrorReturn((Function<Throwable, String>) throwable -> {System.out.println("发生了异常,返回默认值,此处输出异常信息: " + throwable.getMessage());return "world";}).subscribe(System.out::println);
输出结果
hello发生了异常,返回默认值,此处输出异常信息: 哈哈哈world
subscribeOn
弹珠图
代码示例
Single.create(new SingleOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {System.out.println(Thread.currentThread().getName() + " begin emit item");emitter.onSuccess("hello,world");}}).subscribeOn(Schedulers.io()).subscribe(s -> System.out.println(Thread.currentThread().getName() + " subscribe data " + s));
输出结果
RxCachedThreadScheduler-1 begin emit itemRxCachedThreadScheduler-1 subscribe data hello,world
timeout
弹珠图
如果在订阅后指定时间段内未发出任何项目,超时将导致Single异常中止并显示错误通知。通过指定的时间单位来设置此超时时间:
也可以结合scheduler操作符
timeout操作符也可以切换到备份Single,而不是在超时到期时发送错误通知:
结合scheduler操作符使用
代码示例
- 不使用scheduler
```java
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future
future = executorService.submit(new Callable () {
);@Overridepublic String call() throws Exception {//模拟耗时的操作Thread.sleep(300);return "hello";}
Single.fromFuture(future) .timeout(100, TimeUnit.MILLISECONDS) .subscribe(System.out::println);
executorService.shutdown();
输出结果```javaCaused by: java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.... 9 moreException in thread "RxComputationThreadPool-1" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException:
- 使用scheduler
```java
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future
future = executorService.submit(new Callable () {
);@Overridepublic String call() throws Exception {//模拟耗时的操作Thread.sleep(300);return "hello";}
Single.fromFuture(future) .observeOn(Schedulers.io()) .timeout(100, TimeUnit.MILLISECONDS) .subscribe(System.out::println);
executorService.shutdown();
输出结果<br />看输出结果可以看到RxComputationThreadPool-1是这个线程抛出的异常,结合上面的例子,解释就是使用timeout操作符后会将操作调度到RxComputationThreadPool-1线程,observeOn是切换timeout操作符之后的线程,因为抛出了异常,执行中断。```javaCaused by: java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.... 9 moreException in thread "RxComputationThreadPool-1" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException
- 使用fallback
超时返回默认值
Single.fromFuture(future).timeout(100, TimeUnit.MILLISECONDS,Single.just("default")).subscribe(System.out::println);
输出结果
default
toObservable
弹珠图
zip and zipWith
弹珠图
代码示例
Single.just("hello").zipWith(Single.just("world"), (s, s2) -> s.length()+s2.length()).subscribe(System.out::println);
