快速入门

  1. public class HelloWorld {
  2. public static void main(String[] args) {
  3. Flowable.fromArray("Ben", "George").subscribe(s -> System.out.println("Hello " + s + "!"));
  4. }
  5. }

创建observable

使用observable操作符

可以使用just()或者from()方法将对象,数组,集合转换为Observable对象。

  1. Observable<String> stringObservable = Observable.fromArray("a", "b", "c");
  2. stringObservable.subscribe(s -> System.out.println(s + " "));
  3. List<Integer> intList = Arrays.asList(1, 3, 5, 7);
  4. Observable<Integer> integerObservable = Observable.fromStream(intList.stream());
  5. integerObservable.subscribe(s -> System.out.println(s + " "));
  6. Observable.just("one object").subscribe(System.out::println);

对于由Observable发射的每个元素,这些转换后的Observable都会同步调用它们的任何订阅者的onNext()方法,然后将调用订阅者的onCompleted()方法。

使用create方法

image.png
可以使用Create运算符从头创建Observable。向此运算符传递一个接受观察者作为其参数的函数。编写此函数,使其表现为可观察到的状态-通过适当地调用观察者的onNext,onError和onCompleted方法。Observable必须仅一次调用观察者的onCompleted方法或一次仅调用其onError方法。

同步

  1. package com.zihao.observable.create;
  2. import io.reactivex.rxjava3.annotations.NonNull;
  3. import io.reactivex.rxjava3.core.Observable;
  4. import io.reactivex.rxjava3.core.ObservableOnSubscribe;
  5. import io.reactivex.rxjava3.core.Observer;
  6. import io.reactivex.rxjava3.disposables.Disposable;
  7. import java.util.stream.IntStream;
  8. /**
  9. * 同步observable
  10. * 不会产生额外的线程
  11. *
  12. * @author tangzihao
  13. * @Date 2020/12/28 9:10 下午
  14. */
  15. public class SyncCreateObservable {
  16. public static void main(String[] args) {
  17. Observable.create((ObservableOnSubscribe<String>) emitter -> {
  18. IntStream.rangeClosed(1, 50).forEach(i -> {
  19. if (!emitter.isDisposed()) {
  20. //如果抛出异常执行到30就不会执行了
  21. /*if (i == 30) {
  22. throw new RuntimeException("老子不开心");
  23. }*/
  24. emitter.onNext("value_" + i);
  25. }
  26. });
  27. if (!emitter.isDisposed()) {
  28. emitter.onComplete();
  29. }
  30. }).subscribe(new Observer<String>() {
  31. @Override
  32. public void onSubscribe(@NonNull Disposable d) {
  33. System.out.println("开始订阅啦");
  34. }
  35. @Override
  36. public void onNext(String s) {
  37. System.out.println(s);
  38. }
  39. @Override
  40. public void onError(Throwable throwable) {
  41. throwable.printStackTrace();
  42. }
  43. @Override
  44. public void onComplete() {
  45. System.out.println("完成了");
  46. }
  47. });
  48. }
  49. }

上面代码有几个注意点:

  • 这个是同步的代码,不会产生新的线程,主线程会等上面的onComplete后才会执行
  • emitter.isDisposed()判断是否不感兴趣了,早期版本叫做unsubscribed
  • 如果抛出异常,异常之后的都不会执行,包括onComplete方法也不会执行
  • 如果没有抛出异常的话,onError方法不会执行
  • onSubscribe,onError,onComplete只会执行一次。

    异步

    ```java package com.zihao.observable.create;

import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableOnSubscribe; import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.disposables.Disposable;

import java.util.stream.IntStream;

/**

  • @author tangzihao
  • @Date 2020/12/28 9:46 下午 */ public class AsyncCreateObservable { public static void main(String[] args) {

    1. Observable.create((ObservableOnSubscribe<String>) emitter -> new Thread(() -> {
    2. IntStream.rangeClosed(1, 50).forEach(i -> {
    3. if (!emitter.isDisposed()) {
    4. //如果抛出异常执行到30就不会执行了
    5. /*if (i == 30) {
    6. throw new RuntimeException("老子不开心");
    7. }*/
    8. emitter.onNext("value_" + i);
    9. }
    10. });
    11. if (!emitter.isDisposed()) {
    12. emitter.onComplete();
    13. }
    14. }).start()).subscribe(new Observer<String>() {
    15. @Override
    16. public void onSubscribe(@NonNull Disposable d) {
    17. System.out.println("开始订阅啦");
    18. }
    19. @Override
    20. public void onNext(String s) {
    21. System.out.println(s);
    22. }
    23. @Override
    24. public void onError(Throwable throwable) {
    25. throwable.printStackTrace();
    26. }
    27. @Override
    28. public void onComplete() {
    29. System.out.println("完成了");
    30. }
    31. });
    32. System.out.println("主线程可以正常工作。。。");

    } } ``` 上面代码有如下注意点

  • 会产生新的线程,observer不会阻塞主线程的工作
  • 其他和同步一样

    转换(transform)observable

    image.png
    示例代码
    1. public class TransformObservable {
    2. public static void main(String[] args) {
    3. Observable.create((ObservableOnSubscribe<String>) emitter -> {
    4. IntStream.rangeClosed(1, 50).forEach(i -> {
    5. if (!emitter.isDisposed()) {
    6. emitter.onNext("value_" + i);
    7. }
    8. });
    9. if (!emitter.isDisposed()) {
    10. emitter.onComplete();
    11. }
    12. })
    13. .skip(5)
    14. .take(10)
    15. .map(s -> s + "_xform")
    16. .subscribe(System.out::println);
    17. }
    18. }

    异常处理

    通过onError捕获异常

    ```java //observable代码 if (!emitter.isDisposed()) { if (i == 30) {
    1. throw new RuntimeException("老子不开心");
    } emitter.onNext(“value_” + i); }

//observer代码 @Override public void onError(Throwable throwable) { throwable.printStackTrace(); }

  1. <a name="BKtI3"></a>
  2. ### 使用onErrorResume的方式fallback
  3. 其他代码都同上
  4. ```java
  5. onErrorResumeNext(throwable -> Observable.just("发生了异常,直接onComplete了"))