作用
定期收集发射的数据,打包发送到下游。
Buffer操作符根据原Observable创建一个新的Observable。原Observable正常发射数据,新的Observable会打包收到的数据发射给下游。如果收到的是onError通知,则会立刻通知给下游。
示例
注释:代码使用RxJava2,版本号2.2.19。
buffer(int count)
**buffer(int count, Callable<U> bufferSupplier)**
以列表(List)的形式发射不重叠的缓存,每一个缓存至多指定(count)项数据(最后发射的列表数据可能少于count项)。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
**buffer(int count, int skip)**
**buffer(int count, int skip, Callable<U> bufferSupplier)<br />**
从原Observable发射的第一个数据开始收集,每收集到skip个内容时,将这skip内容的前count个内容以列表(List)的形式打包发射给下游。skip
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
**buffer(long timespan, TimeUnit unit)**
**buffer(long timespan, TimeUnit unit, Scheduler scheduler)**
以指定的时间间隔收集数据,以List的形式打包发射到下游。
**buffer(long timespan, TimeUnit unit, int count)**
**buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count)**
**buffer( long timespan, TimeUnit unit, Scheduler scheduler, int count, Callable<U> bufferSupplier, boolean restartTimerOnMaxSize)**
每当收到count个数据或者每过了一段指定时间,都会以List的形式打包数据发射。打包的数据可能少于count项甚至为空。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
参数restartTimerOnMaxSize
**buffer(long timespan, long timeskip, TimeUnit unit)**
**buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler)**
**buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier)**
在每个收集数据的时间段(timespan)收集数据,再以List的形式打包数据发射。发射后间隔(timeskip)又开始重新收集。下图的timeshift对应的就是timeskip。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
buffer(ObservableSource<B> boundary)
buffer(Callable<? extends ObservableSource<B>> boundarySupplier)<br />
- **
buffer(ObservableSource<B> boundary, Callable<U> bufferSupplier)
buffer(ObservableSource<B> boundary, final int initialCapacity)
buffer(Callable<? extends ObservableSource<B>> boundarySupplier, Callable<U> bufferSupplier)<br />
添加一个Observable作为监视器。监视器每发射一个值都作为边界,边界之前的打包内容发射,边界之后开始下一个打包工作。类似用刀切一样,每次刀落下就会完成分块,而刀就是由监视器控制的。
参数boundarySupplier作为工厂,他的call方法会提供作为监视器的Observable。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
参数initialCapacity会预设打包容器的大小。
buffer( ObservableSource<? extends TOpening> openingIndicator, Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
**buffer( ObservableSource<? extends TOpening> openingIndicator, Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator, Callable<U> bufferSupplier)**
上面通过添加边界(boundary)监视器的方式为打包进行分块。这里的实现可以指定打包的起点和结束点,更加地灵活。
- buffer在背压上的表现