4.1.3 RxJava中的观察者模式
@Test
public void rxJavaBaseUse()
{
//被观察者( 主题 )
Observable observable = Observable.create(
new Action1<Emitter<String>>()
{
@Override
public void call(Emitter<String> emitter)
{
emitter.onNext("apple");
emitter.onNext("banana");
emitter.onNext("pear");
emitter.onCompleted();
}
}, Emitter.BackpressureMode.NONE);
//订阅者(观察者)
Subscriber<String> subscriber = new Subscriber<String>()
{
@Override
public void onNext(String s)
{
log.info("onNext: {}", s);
}
@Override
public void onCompleted()
{
log.info("onCompleted");
}
@Override
public void onError(Throwable e)
{
log.info("onError");
}
};
//订阅:Observable与Subscriber之间依然通过subscribe()进行关联。
observable.subscribe(subscriber);
}
4.1.4 RxJava的不完整回调
Rxjava为了支持函数式编程,另外定义了几个函数式接口:比较重要的:Action0和Action1
4.2 创建型操作符
- create(): 使用函数从头创建一个 Observable 主题对象
- defer(): 只有当订阅者订阅才创建 Observable 主题对象,为每个订阅创建一个新的Observable 主题对象。
- range(): 创建一个弹射指定范围的整数序列的 Observable 主题对象
- interval(): 创建一个按照给定的时间间隔弹射整数序列的 Observable 主题对象
- timer(): 创建一个在给定的延时之后弹射单个数据的 Observable 主题对象。
- empty(): 创建一个什么都不做直接通知完成的 Observable 主题对象。
- error(): 创建一个什么都不做直接通知错误的 Observable 主题对象。
never(): 创建一个不弹射任何数据的 Observable 主题对象。
4.2.1 just操作符
Observable 的 just 操作符用于创建一个 Observable 主题,并且会将实参数据弹射出来。 just操作符可接收多个实参,所有实参都将被逐一弹射。
@Test
public void justDemo()
{
//发送一个字符串"hello world"
Observable.just("hello world" )
.subscribe(s -> log.info("just string->" + s));
//逐一发送1,2,3,4这四个整数
Observable.just(1, 2, 3, 4)
.subscribe(i -> log.info("just int->" + i));
}
虽然 just 操作符可以弹射多个数据,但是上限为 9 个。
4.2.2 from操作符
from 操作符以数组、 Iterable 迭代器等对象作为输入,创建一个 Observable 主题对象,然后将实参(如数组、 Iterable 迭代器等)中的数据元素逐一弹射出去
@Test
public void fromDemo()
{
//逐一发送一个字符数组的每个元素
String[] items = {"a", "b", "c", "d", "e", "f"};
Observable.from(items)
.subscribe(s -> log.info("just string->" + s));
//逐一发送送一个字符数组的每个元素
Integer[] array = {1, 2, 3, 4};
List<Integer> list = Arrays.asList(array);
Observable.from(list)
.subscribe(i -> log.info("just int->" + i));
}
4.2.3 range操作符
@Test
public void rangeDemo()
{
//逐一发一组范围内的整数序列
Observable.range(1, 10)
.subscribe(i -> log.info("just int->" + i));
}
4.2.4 interval操作符
interval 操作符创建一个 Observable 主题对象(消息流),该消息流会按照固定时间间隔发射整数序列。
@Test
public void intervalDemo() throws InterruptedException
{
Observable
.interval(100, TimeUnit.MILLISECONDS)
.subscribe(aLong -> log.info(aLong.toString()));
Thread.sleep(Integer.MAX_VALUE);
}
4.2.5 defer操作符
just、 from、 range 以及其他创建操作符都是在创建主题时弹射数据,而不是在被订阅时弹射数据。而 defer 操作符在创建主题时并不弹射数据,它会一直等待, 直到有观察者订阅才会弹射数据
4.3 过滤型操作
4.3.1 filter操作符
/**
* 演示 filter 基本使用
*/
@Test
public void filterDemo()
{
//通过filter过滤能被5整除的数
Observable.range(1, 20)
.filter(new Func1<Integer, Boolean>()
{
@Override
public Boolean call(Integer integer)
{
return integer % 5 == 0;
}
})
.subscribe(i -> log.info("filter int->" + i));
}
/**
* 演示 filter 基本使用 ,lamda 形式
*/
@Test
public void filterDemoLamda()
{
//通过filter过滤能被5整除的数
Observable.range(1, 20)
.filter(integer -> integer % 5 == 0)
.subscribe(i -> log.info("filter int->" + i));
}
4.3.2 distinct操作符
/**
* 演示 distinct 基本使用
*/
@Test
public void distinctDemo()
{
Observable.just("apple", "pair", "banana", "apple", "pair" )
.distinct()
.subscribe(s -> log.info("distinct s->" + s));
}
4.4 转换型操作符
4.4.1 map操作符
/**
* 演示 map 转换
*/
@Test
public void intervalDemo()
{
Observable.range(1, 4)
.map(i -> i * i)
.subscribe(i -> log.info(i.toString()));
}
4.4.2 flatMap操作符
flatMap 转换是一对一类型或者一对多类型的,原来弹射了几个数据,转换之后可以是更多个数据。
- flatMap 转换同样可以改变弹射的数据类型。
- flatMap 转换后的数据还是会逐个发射给下游的 Subscriber 来接收,表面上就像这些数据是由一个 Observable 发射的一样,其实是多个 Observable 发射然后合并的
/**
* 演示 flapMap 转换
*/
@Test
public void flapMapDemo()
{
/**
* 注意 flatMap 中的 just 所创建的是一个新的流
*/
Observable.range(1, 4)
.flatMap(i -> Observable.just(i * i, i * i + 1))
.subscribe(i -> log.info(i.toString()));
}
4.4.3 scan操作符
scan 操作符对一个 Observable 流序列的每一项数据应用一个累积函数, 然后将这个函数的累积结果弹射出去。
```java @Test public void scanDemo() {/**
* 定义一个 accumulator 累积函数
*/
Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer input1, Integer input2)
{
log.info(" {} + {} = {} ", input1, input2, input1 + input2);
return input1 + input2;
}
};
/**
* 使用scan 进行流扫描
*/
Observable.range(1, 5)
.scan(accumulator)
.subscribe(new Action1<Integer>()
{
@Override
public void call(Integer sum)
{
log.info(" 累加的结果: {} ", sum);
}
});
}
<a name="lvb78"></a>
## 4.5 聚合操作符
<a name="vyNkr"></a>
## 4.5.1 count操作符
```java
/**
* 演示 count 计数操作符
*/
@Test
public void countDemo()
{
String[] items = {"one", "two", "three","fore"};
Integer count = Observable
.from(items)
.count()
.toBlocking().single();
log.info("计数的结果为 {}",count);
}
4.5.2 reduce操作符
与scan有异曲同工之妙
/**
* 演示 reduce 扫描操作符
*/
@Test
public void reduceDemo()
{
/**
* 定义一个 accumulator 累积函数
*/
Func2<Integer, Integer, Integer> accumulator = new Func2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer input1, Integer input2)
{
log.info(" {} + {} = {} ", input1, input2, input1 + input2);
return input1 + input2;
}
};
/**
* 使用 reduce 规约操作符
*/
Observable.range(1, 5)
.reduce(accumulator)
.subscribe(new Action1<Integer>()
{
@Override
public void call(Integer sum)
{
log.info(" 规约的结果: {} ", sum);
}
});
}
4.6 其他操作符
4.6.1 take操作符
@Test
public void takeDemo() throws InterruptedException
{
Observable.interval(1, TimeUnit.SECONDS) //设置间隔执行
.take(10) //10秒倒计时
.map(aLong -> 10 - aLong)
.subscribe(aLong -> log.info(aLong.toString()));
Thread.sleep(Integer.MAX_VALUE);
}
4.6.2 window操作符
RxJava 的窗口可以理解为固定数量(或者固定时间间隔)的元素分组。 假定通过 window 操作符以固定数量 n 进行窗口划分,一旦流上弹射的元素的数量足够一个窗口的数量 n, 那么输出流上将弹出一个新的元素,输出元素是一个 Observable 主题对象,该主题包含源流窗口之内的 n 个元素
/**
* window 创建操作符 创建滑动窗口
* 演示 window 创建操作符 创建滑动窗口
*/
@Test
public void simpleWindowObserverDemo()
{
List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
Observable.from(srcList)
.window(3)
.flatMap(o -> o.toList())
.subscribe(list -> log.info(list.toString()));
}
创建重叠窗口使用函数 window(int count, int skip),其中第一个参数为窗口的元素个数,第二个参数为下一个窗口跳过的元素个数
/**
* window 创建操作符 创建滑动窗口
* 演示 window 创建操作符 创建滑动窗口
*/
@Test
public void windowObserverDemo()
{
List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
Observable.from(srcList)
.window(3, 1)
.flatMap(o -> o.toList())
.subscribe(list -> log.info(list.toString()));
}
4.7 RxJava的Scheduler调度器
- Schedulers.io():用于获取内部的 ioScheduler 调度器实例
- Schedulers. newThread ():用于获取内部的 newThreadScheduler 调度器实例,该调度器 为 RxJava 流操作创建一个新线程。
- Schedulers.trampoline ():使用当前线程立即执行 RxJava 流操作。
Schedulers. single ():使用 RxJava 内置的单例线程执行 RxJava 流操作
4.8 背压
4.8.1 背压问题
当上下游的流操作处于不同的线程时,如果上游弹射数据的速度快于下游接收处理数据的速度,对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失, 又不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。
4.8.2 背压问题的几种应对模式
在创建主题时可以使用 Observable 类的一个重载的 create 方法设置具体的背压模式, 该方法的源代码如下:
public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {
return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure));
}
背压模式有多种,比较常用的有“最近模式”Emitter.BackpressureMode.LATEST。这种模式的含义为:如果消费跟不上, 那么仅仅缓存最近弹射出来的数据,将老旧一点的数据直接丢弃
背压模式:BackpressureMode.DROP: 在这种模式下, Observable 主题使用固定大小为 128 的缓冲区。如果下游订阅者无法处理,流的第一个元素就会缓存下来,后续的会被丢弃。
- BackpressureMode.LATEST
- BackpressureMode.NONE 和 BackpressureMode.ERROR: 在这两种模式中发送的数据不使用背压。
- BackpressureMode.BUFFER: 在这种模式下,有一个无限的缓冲区(初始化时是 128) ,下游消费不了的元素全部会放到缓冲区中。如果缓冲区中持续地积累, 就会导致内存耗尽,抛出OutOfMemoryException 异常