4.1.3 RxJava中的观察者模式

  1. @Test
  2. public void rxJavaBaseUse()
  3. {
  4. //被观察者( 主题 )
  5. Observable observable = Observable.create(
  6. new Action1<Emitter<String>>()
  7. {
  8. @Override
  9. public void call(Emitter<String> emitter)
  10. {
  11. emitter.onNext("apple");
  12. emitter.onNext("banana");
  13. emitter.onNext("pear");
  14. emitter.onCompleted();
  15. }
  16. }, Emitter.BackpressureMode.NONE);
  17. //订阅者(观察者)
  18. Subscriber<String> subscriber = new Subscriber<String>()
  19. {
  20. @Override
  21. public void onNext(String s)
  22. {
  23. log.info("onNext: {}", s);
  24. }
  25. @Override
  26. public void onCompleted()
  27. {
  28. log.info("onCompleted");
  29. }
  30. @Override
  31. public void onError(Throwable e)
  32. {
  33. log.info("onError");
  34. }
  35. };
  36. //订阅:Observable与Subscriber之间依然通过subscribe()进行关联。
  37. observable.subscribe(subscriber);
  38. }

4.1.4 RxJava的不完整回调

Rxjava为了支持函数式编程,另外定义了几个函数式接口:比较重要的:Action0和Action1

4.2 创建型操作符

  1. create(): 使用函数从头创建一个 Observable 主题对象
  2. defer(): 只有当订阅者订阅才创建 Observable 主题对象,为每个订阅创建一个新的Observable 主题对象。
  3. range(): 创建一个弹射指定范围的整数序列的 Observable 主题对象
  4. interval(): 创建一个按照给定的时间间隔弹射整数序列的 Observable 主题对象
  5. timer(): 创建一个在给定的延时之后弹射单个数据的 Observable 主题对象。
  6. empty(): 创建一个什么都不做直接通知完成的 Observable 主题对象。
  7. error(): 创建一个什么都不做直接通知错误的 Observable 主题对象。
  8. never(): 创建一个不弹射任何数据的 Observable 主题对象。

    4.2.1 just操作符

    Observable 的 just 操作符用于创建一个 Observable 主题,并且会将实参数据弹射出来。 just操作符可接收多个实参,所有实参都将被逐一弹射。

    1. @Test
    2. public void justDemo()
    3. {
    4. //发送一个字符串"hello world"
    5. Observable.just("hello world" )
    6. .subscribe(s -> log.info("just string->" + s));
    7. //逐一发送1,2,3,4这四个整数
    8. Observable.just(1, 2, 3, 4)
    9. .subscribe(i -> log.info("just int->" + i));
    10. }

    虽然 just 操作符可以弹射多个数据,但是上限为 9 个。

    4.2.2 from操作符

    from 操作符以数组、 Iterable 迭代器等对象作为输入,创建一个 Observable 主题对象,然后将实参(如数组、 Iterable 迭代器等)中的数据元素逐一弹射出去

    1. @Test
    2. public void fromDemo()
    3. {
    4. //逐一发送一个字符数组的每个元素
    5. String[] items = {"a", "b", "c", "d", "e", "f"};
    6. Observable.from(items)
    7. .subscribe(s -> log.info("just string->" + s));
    8. //逐一发送送一个字符数组的每个元素
    9. Integer[] array = {1, 2, 3, 4};
    10. List<Integer> list = Arrays.asList(array);
    11. Observable.from(list)
    12. .subscribe(i -> log.info("just int->" + i));
    13. }

    4.2.3 range操作符

    1. @Test
    2. public void rangeDemo()
    3. {
    4. //逐一发一组范围内的整数序列
    5. Observable.range(1, 10)
    6. .subscribe(i -> log.info("just int->" + i));
    7. }

    4.2.4 interval操作符

    interval 操作符创建一个 Observable 主题对象(消息流),该消息流会按照固定时间间隔发射整数序列。

    1. @Test
    2. public void intervalDemo() throws InterruptedException
    3. {
    4. Observable
    5. .interval(100, TimeUnit.MILLISECONDS)
    6. .subscribe(aLong -> log.info(aLong.toString()));
    7. Thread.sleep(Integer.MAX_VALUE);
    8. }

    4.2.5 defer操作符

    just、 from、 range 以及其他创建操作符都是在创建主题时弹射数据,而不是在被订阅时弹射数据。而 defer 操作符在创建主题时并不弹射数据,它会一直等待, 直到有观察者订阅才会弹射数据

    4.3 过滤型操作

    4.3.1 filter操作符

    1. /**
    2. * 演示 filter 基本使用
    3. */
    4. @Test
    5. public void filterDemo()
    6. {
    7. //通过filter过滤能被5整除的数
    8. Observable.range(1, 20)
    9. .filter(new Func1<Integer, Boolean>()
    10. {
    11. @Override
    12. public Boolean call(Integer integer)
    13. {
    14. return integer % 5 == 0;
    15. }
    16. })
    17. .subscribe(i -> log.info("filter int->" + i));
    18. }
    19. /**
    20. * 演示 filter 基本使用 ,lamda 形式
    21. */
    22. @Test
    23. public void filterDemoLamda()
    24. {
    25. //通过filter过滤能被5整除的数
    26. Observable.range(1, 20)
    27. .filter(integer -> integer % 5 == 0)
    28. .subscribe(i -> log.info("filter int->" + i));
    29. }

    4.3.2 distinct操作符

    1. /**
    2. * 演示 distinct 基本使用
    3. */
    4. @Test
    5. public void distinctDemo()
    6. {
    7. Observable.just("apple", "pair", "banana", "apple", "pair" )
    8. .distinct()
    9. .subscribe(s -> log.info("distinct s->" + s));
    10. }

    4.4 转换型操作符

    4.4.1 map操作符

    1. /**
    2. * 演示 map 转换
    3. */
    4. @Test
    5. public void intervalDemo()
    6. {
    7. Observable.range(1, 4)
    8. .map(i -> i * i)
    9. .subscribe(i -> log.info(i.toString()));
    10. }

    4.4.2 flatMap操作符

  9. flatMap 转换是一对一类型或者一对多类型的,原来弹射了几个数据,转换之后可以是更多个数据。

  10. flatMap 转换同样可以改变弹射的数据类型。
  11. flatMap 转换后的数据还是会逐个发射给下游的 Subscriber 来接收,表面上就像这些数据是由一个 Observable 发射的一样,其实是多个 Observable 发射然后合并的
    1. /**
    2. * 演示 flapMap 转换
    3. */
    4. @Test
    5. public void flapMapDemo()
    6. {
    7. /**
    8. * 注意 flatMap 中的 just 所创建的是一个新的流
    9. */
    10. Observable.range(1, 4)
    11. .flatMap(i -> Observable.just(i * i, i * i + 1))
    12. .subscribe(i -> log.info(i.toString()));
    13. }
    image.png

    4.4.3 scan操作符

    scan 操作符对一个 Observable 流序列的每一项数据应用一个累积函数, 然后将这个函数的累积结果弹射出去。
    image.png ```java @Test public void scanDemo() {
    1. /**
    2. * 定义一个 accumulator 累积函数
    3. */
    4. Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>()
    5. {
    6. @Override
    7. public Integer call(Integer input1, Integer input2)
    8. {
    9. log.info(" {} + {} = {} ", input1, input2, input1 + input2);
    10. return input1 + input2;
    11. }
    12. };
  1. /**
  2. * 使用scan 进行流扫描
  3. */
  4. Observable.range(1, 5)
  5. .scan(accumulator)
  6. .subscribe(new Action1<Integer>()
  7. {
  8. @Override
  9. public void call(Integer sum)
  10. {
  11. log.info(" 累加的结果: {} ", sum);
  12. }
  13. });
  14. }
  1. <a name="lvb78"></a>
  2. ## 4.5 聚合操作符
  3. <a name="vyNkr"></a>
  4. ## 4.5.1 count操作符
  5. ```java
  6. /**
  7. * 演示 count 计数操作符
  8. */
  9. @Test
  10. public void countDemo()
  11. {
  12. String[] items = {"one", "two", "three","fore"};
  13. Integer count = Observable
  14. .from(items)
  15. .count()
  16. .toBlocking().single();
  17. log.info("计数的结果为 {}",count);
  18. }

4.5.2 reduce操作符

image.png
与scan有异曲同工之妙

  1. /**
  2. * 演示 reduce 扫描操作符
  3. */
  4. @Test
  5. public void reduceDemo()
  6. {
  7. /**
  8. * 定义一个 accumulator 累积函数
  9. */
  10. Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>()
  11. {
  12. @Override
  13. public Integer call(Integer input1, Integer input2)
  14. {
  15. log.info(" {} + {} = {} ", input1, input2, input1 + input2);
  16. return input1 + input2;
  17. }
  18. };
  19. /**
  20. * 使用 reduce 规约操作符
  21. */
  22. Observable.range(1, 5)
  23. .reduce(accumulator)
  24. .subscribe(new Action1<Integer>()
  25. {
  26. @Override
  27. public void call(Integer sum)
  28. {
  29. log.info(" 规约的结果: {} ", sum);
  30. }
  31. });
  32. }

4.6 其他操作符

4.6.1 take操作符

  1. @Test
  2. public void takeDemo() throws InterruptedException
  3. {
  4. Observable.interval(1, TimeUnit.SECONDS) //设置间隔执行
  5. .take(10) //10秒倒计时
  6. .map(aLong -> 10 - aLong)
  7. .subscribe(aLong -> log.info(aLong.toString()));
  8. Thread.sleep(Integer.MAX_VALUE);
  9. }

4.6.2 window操作符

RxJava 的窗口可以理解为固定数量(或者固定时间间隔)的元素分组。 假定通过 window 操作符以固定数量 n 进行窗口划分,一旦流上弹射的元素的数量足够一个窗口的数量 n, 那么输出流上将弹出一个新的元素,输出元素是一个 Observable 主题对象,该主题包含源流窗口之内的 n 个元素
image.png

  1. /**
  2. * window 创建操作符 创建滑动窗口
  3. * 演示 window 创建操作符 创建滑动窗口
  4. */
  5. @Test
  6. public void simpleWindowObserverDemo()
  7. {
  8. List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
  9. Observable.from(srcList)
  10. .window(3)
  11. .flatMap(o -> o.toList())
  12. .subscribe(list -> log.info(list.toString()));
  13. }

创建重叠窗口使用函数 window(int count, int skip),其中第一个参数为窗口的元素个数,第二个参数为下一个窗口跳过的元素个数
image.png

  1. /**
  2. * window 创建操作符 创建滑动窗口
  3. * 演示 window 创建操作符 创建滑动窗口
  4. */
  5. @Test
  6. public void windowObserverDemo()
  7. {
  8. List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
  9. Observable.from(srcList)
  10. .window(3, 1)
  11. .flatMap(o -> o.toList())
  12. .subscribe(list -> log.info(list.toString()));
  13. }

4.7 RxJava的Scheduler调度器

  1. Schedulers.io():用于获取内部的 ioScheduler 调度器实例
  2. Schedulers. newThread ():用于获取内部的 newThreadScheduler 调度器实例,该调度器 为 RxJava 流操作创建一个新线程。
  3. Schedulers.trampoline ():使用当前线程立即执行 RxJava 流操作。
  4. Schedulers. single ():使用 RxJava 内置的单例线程执行 RxJava 流操作

    4.8 背压

    4.8.1 背压问题

    当上下游的流操作处于不同的线程时,如果上游弹射数据的速度快于下游接收处理数据的速度,对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失, 又不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。

    4.8.2 背压问题的几种应对模式

    在创建主题时可以使用 Observable 类的一个重载的 create 方法设置具体的背压模式, 该方法的源代码如下:

    1. public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {
    2. return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure));
    3. }

    背压模式有多种,比较常用的有“最近模式”Emitter.BackpressureMode.LATEST。这种模式的含义为:如果消费跟不上, 那么仅仅缓存最近弹射出来的数据,将老旧一点的数据直接丢弃
    背压模式:

  5. BackpressureMode.DROP: 在这种模式下, Observable 主题使用固定大小为 128 的缓冲区。如果下游订阅者无法处理,流的第一个元素就会缓存下来,后续的会被丢弃。

  6. BackpressureMode.LATEST
  7. BackpressureMode.NONE 和 BackpressureMode.ERROR: 在这两种模式中发送的数据不使用背压。
  8. BackpressureMode.BUFFER: 在这种模式下,有一个无限的缓冲区(初始化时是 128) ,下游消费不了的元素全部会放到缓冲区中。如果缓冲区中持续地积累, 就会导致内存耗尽,抛出OutOfMemoryException 异常