简介

Single就像是一个Observable,但是不发出一系列值(从没有到无限),它总是发出一个值或错误通知。
因此,您不必使用三种方法来响应来自Observable的通知(onNext, onError和onCompleted)来订阅Single,而是仅使用两种方法进行订阅:
onSuccess

  • Single通过此方法将Single发出的唯一数据

onError

  • Single通过此方法传递Throwable

Single将仅调用这些方法之一,并且只会调用一次。调用任一方法后,Single终止,对其的订阅结束。

操作符

像Observables一样,可以通过各种运算符来操纵Singles。一些运算符还允许在Observable和Single之间建立接口,以便您可以混合使用两种变体:

operator returns description
concat and concatWith Observable 将多个Single发出的项串联为Observable
create Single 通过显式调用订阅者方法从头开始创建Single
delay Single 推迟数据的发射
doOnError Single 返回一个Single,它在调用onError时也会调用指定的方法
doOnSuccess Single 返回一个Single,它还会在调用onSuccess时调用您指定的方法
error Single 返回一个立即通知订阅者错误的Single
flatMap Single 返回一个Single,它是对Single发出的数据进行函数映射的结果
flatMapObservable Observable 返回一个Observable,它是岁Single发出的数据的函数的结果
from Single 将Future转成Single
just Single 返回发出指定项目的Single
map Single 返回一个Single,该Single发出应用于源Single发出的项目的函数的结果
merge and mergeWith Observable 将多个Single组合成Observable
observeOn Single 指定操作符之后的任务在哪个线程调度
onErrorReturn Single 将发出错误通知的Single转换为发出指定项目的Single
subscribeOn Single 操作符之前的操作在哪个线程调度
timeout Single 如果源Single在指定时间段内未发出值,则返回发出错误通知的Single。可以使用fallback在超时的时候返回默认值。也可以和onErrorReturn
toSingle Single 将Observable转为Single
toObservable Observable 将Single转为Observable
zip and zipWith Single 返回一个Single,该Single发出一个项,该项是应用到由两个或多个其他Single发出的项的函数的结果

image.png

concat and concatWith

弹珠图

Single - 图2

Single - 图3

代码示例

  • 没有发生错误

输出的结果是hello,world!

  1. Single<String> s1 = Single.just("hello");
  2. Single<String> s2 = Single.just(",world!");
  3. s1.concatWith(s2).subscribe(System.out::print);
  • 其中一个observable发生了异常

先输出hello,然后输出异常的堆栈

  1. ExecutorService executorService = Executors.newFixedThreadPool(1);
  2. Future<String> future = executorService.submit(() -> {
  3. throw new RuntimeException("哈哈哈!");
  4. });
  5. Single<String> s2 = Single.just("hello!");
  6. Single<String> s1 = Single.fromFuture(future);
  7. s2.concatWith(s1).subscribe(System.out::println);
  8. executorService.shutdown();

create

弹珠图

Single - 图4

代码示例

  • onSuccess

输出结果hello,world!

  1. Single.create((SingleOnSubscribe<String>) emitter -> emitter.onSuccess("hello,world")).subscribe(new SingleObserver<String>() {
  2. @Override
  3. public void onSubscribe(@NonNull Disposable d) {
  4. }
  5. @Override
  6. public void onSuccess(@NonNull String s) {
  7. System.out.println(s);
  8. }
  9. @Override
  10. public void onError(@NonNull Throwable e) {
  11. }
  12. });
  • onError

    1. Single.create((SingleOnSubscribe<String>) emitter -> emitter.onError(new RuntimeException("haha"))).subscribe(new SingleObserver<String>() {
    2. @Override
    3. public void onSubscribe(@NonNull Disposable d) {
    4. }
    5. @Override
    6. public void onSuccess(@NonNull String s) {
    7. System.out.println(s);
    8. }
    9. @Override
    10. public void onError(@NonNull Throwable e) {
    11. e.printStackTrace();
    12. }
    13. });

    delay

    弹珠图

    Single - 图5
    There is also a version of this operator that allows you to perform the delay on a particular Scheduler:
    Single - 图6

    代码示例

  • 不使用schedule

从输出结果看delay操作是在另一个线程执行的
输出结果: RxComputationThreadPool-1 hello,world(用的调度器是computation)

  1. Single.just("hello,world")
  2. .delay(100, TimeUnit.MILLISECONDS)
  3. .subscribe(r-> System.out.println(Thread.currentThread().getName()+" "+r));
  4. Thread.sleep(1000L);
  • 使用schedule

RxCachedThreadScheduler-1 hello,world

  1. Single.just("hello,world")
  2. .delaySubscription(100, TimeUnit.MILLISECONDS, Schedulers.io())
  3. .subscribe(r -> System.out.println(Thread.currentThread().getName() + " " + r));
  4. Thread.sleep(1000L);

doOnError

弹珠图

Single - 图7

代码示例

输出结果: 先输出error…,然后输出异常堆栈

  1. Single<String> single = Single.error(new RuntimeException());
  2. single.doOnError(s -> System.out.println("error...")).subscribe(System.out::println);

doOnSuccess

弹珠图

Single - 图8

代码示例

输出结果: 先输出hello,world!,然后输出success…

  1. Single<String> single = Single.just("hello,world!");
  2. single.doOnSuccess(s -> System.out.println("success...")).subscribe(System.out::println);

error

弹珠图

Single - 图9

代码示例

  1. Single.error(new RuntimeException());

flatMap

弹珠图

Single - 图10

代码示例

  1. Single.just("hello,world!")
  2. .flatMap(new Function<String, SingleSource<?>>() {
  3. @Override
  4. public SingleSource<?> apply(String s) throws Throwable {
  5. return (SingleSource<Object>) observer -> observer.onSuccess(s + " flatMap");
  6. }
  7. }).subscribe(System.out::println);

flatMapObservable

弹珠图

Single - 图11

代码示例

  1. Single.just("hello,world!")
  2. .flatMapObservable((Function<String, ObservableSource<?>>) s -> Observable.fromArray(Arrays.asList(1,2,3,4,5)))
  3. .subscribe(System.out::println);

from

弹珠图

Single - 图12
There is also a variety that takes a Scheduler as an argument: Single - 图13

代码示例

  1. Single.just("hello,world!")
  2. .flatMap((Function<String, SingleSource<?>>) s -> Single.just(s+" flatMap")).subscribe(System.out::println);

just

Single - 图14

map

弹珠图

Single - 图15

代码示例

  1. Single.just("hello")
  2. .map(s -> s + ",world")
  3. .subscribe(System.out::println);

merge and mergeWith

One version of merge takes a Single that emits a second Single and converts it into a Single that emits the item emitted by that second Single:
mergeWith可以将两个或者更多的Single merge成Observable

弹珠图

Single - 图16

问题记录

  1. Single.just("hello")
  2. .mergeWith(Single.just(",world"))
  3. .subscribe(System.out::println);

observeOn

弹珠图

使用observeOn后在另一个线程执行之后的操作
Single - 图17

代码示例

  1. Single.create(new SingleOnSubscribe<String>() {
  2. @Override
  3. public void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {
  4. System.out.println(Thread.currentThread().getName() + " begin emit item");
  5. emitter.onSuccess("hello,world");
  6. }}).observeOn(Schedulers.io())
  7. .subscribe(s -> System.out.println(Thread.currentThread().getName() + " subscribe data " + s));

输出结果

  1. main begin emit item
  2. RxCachedThreadScheduler-1 subscribe data hello,world

onErrorReturn

正确时返回正确的值,发生错误时不返回异常返回用户自定义默认值

代码示例

Single - 图18

代码示例

  1. public static void main(String[] args) {
  2. Single.just("hello")
  3. .onErrorReturn(throwable -> {
  4. System.out.println("发生了异常,返回默认值,此处输出异常信息: " + throwable.getMessage());
  5. return "world";
  6. }).subscribe(System.out::println);
  7. Single.error(new RuntimeException("哈哈哈"))
  8. .onErrorReturn((Function<Throwable, String>) throwable -> {
  9. System.out.println("发生了异常,返回默认值,此处输出异常信息: " + throwable.getMessage());
  10. return "world";
  11. }).subscribe(System.out::println);

输出结果

  1. hello
  2. 发生了异常,返回默认值,此处输出异常信息: 哈哈哈
  3. world

subscribeOn

subscribeOn会把其之前的所有操作在某个线程执行

弹珠图

Single - 图19

代码示例

  1. Single.create(new SingleOnSubscribe<String>() {
  2. @Override
  3. public void subscribe(@NonNull SingleEmitter<String> emitter) throws Throwable {
  4. System.out.println(Thread.currentThread().getName() + " begin emit item");
  5. emitter.onSuccess("hello,world");
  6. }
  7. }).subscribeOn(Schedulers.io())
  8. .subscribe(s -> System.out.println(Thread.currentThread().getName() + " subscribe data " + s));

输出结果

  1. RxCachedThreadScheduler-1 begin emit item
  2. RxCachedThreadScheduler-1 subscribe data hello,world

timeout

弹珠图

如果在订阅后指定时间段内未发出任何项目,超时将导致Single异常中止并显示错误通知。通过指定的时间单位来设置此超时时间:
Single - 图20
也可以结合scheduler操作符
Single - 图21
timeout操作符也可以切换到备份Single,而不是在超时到期时发送错误通知:
Single - 图22
结合scheduler操作符使用
Single - 图23

代码示例

  • 不使用scheduler ```java ExecutorService executorService = Executors.newFixedThreadPool(2); Future future = executorService.submit(new Callable() {
    1. @Override
    2. public String call() throws Exception {
    3. //模拟耗时的操作
    4. Thread.sleep(300);
    5. return "hello";
    6. }
    );

Single.fromFuture(future) .timeout(100, TimeUnit.MILLISECONDS) .subscribe(System.out::println);

executorService.shutdown();

  1. 输出结果
  2. ```java
  3. Caused by: java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.
  4. ... 9 more
  5. Exception in thread "RxComputationThreadPool-1" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException:
  • 使用scheduler ```java ExecutorService executorService = Executors.newFixedThreadPool(2); Future future = executorService.submit(new Callable() {
    1. @Override
    2. public String call() throws Exception {
    3. //模拟耗时的操作
    4. Thread.sleep(300);
    5. return "hello";
    6. }
    );

Single.fromFuture(future) .observeOn(Schedulers.io()) .timeout(100, TimeUnit.MILLISECONDS) .subscribe(System.out::println);

executorService.shutdown();

  1. 输出结果<br />看输出结果可以看到RxComputationThreadPool-1是这个线程抛出的异常,结合上面的例子,解释就是使用timeout操作符后会将操作调度到RxComputationThreadPool-1线程,observeOn是切换timeout操作符之后的线程,因为抛出了异常,执行中断。
  2. ```java
  3. Caused by: java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.
  4. ... 9 more
  5. Exception in thread "RxComputationThreadPool-1" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException
  • 使用fallback

超时返回默认值

  1. Single.fromFuture(future)
  2. .timeout(100, TimeUnit.MILLISECONDS,Single.just("default"))
  3. .subscribe(System.out::println);

输出结果

  1. default

toObservable

将Single转为Observable

弹珠图

Single - 图24

zip and zipWith

弹珠图

Single - 图25

代码示例

  1. Single.just("hello")
  2. .zipWith(Single.just("world"), (s, s2) -> s.length()+s2.length()).subscribe(System.out::println);