作用

定期收集发射的数据,打包发送到下游。
image.png
Buffer操作符根据原Observable创建一个新的Observable。原Observable正常发射数据,新的Observable会打包收到的数据发射给下游。如果收到的是onError通知,则会立刻通知给下游。

示例

注释:代码使用RxJava2,版本号2.2.19。

  • buffer(int count)
  • **buffer(int count, Callable<U> bufferSupplier)**

以列表(List)的形式发射不重叠的缓存,每一个缓存至多指定(count)项数据(最后发射的列表数据可能少于count项)。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
image.png

  • **buffer(int count, int skip)**
  • **buffer(int count, int skip, Callable<U> bufferSupplier)<br />**

从原Observable发射的第一个数据开始收集,每收集到skip个内容时,将这skip内容的前count个内容以列表(List)的形式打包发射给下游。skipcount时又会有部分不会被打包进来。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
image.png

  • **buffer(long timespan, TimeUnit unit)**
  • **buffer(long timespan, TimeUnit unit, Scheduler scheduler)**

以指定的时间间隔收集数据,以List的形式打包发射到下游。
image.png

  • **buffer(long timespan, TimeUnit unit, int count)**
  • **buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count)**
  • **buffer( long timespan, TimeUnit unit, Scheduler scheduler, int count, Callable<U> bufferSupplier, boolean restartTimerOnMaxSize)**

每当收到count个数据或者每过了一段指定时间,都会以List的形式打包数据发射。打包的数据可能少于count项甚至为空。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
参数restartTimerOnMaxSize
image.png

  • **buffer(long timespan, long timeskip, TimeUnit unit)**
  • **buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler)**
  • **buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier)**

在每个收集数据的时间段(timespan)收集数据,再以List的形式打包数据发射。发射后间隔(timeskip)又开始重新收集。下图的timeshift对应的就是timeskip。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
image.png

  • buffer(ObservableSource<B> boundary)
  • buffer(Callable<? extends ObservableSource<B>> boundarySupplier)<br />
  • **buffer(ObservableSource<B> boundary, Callable<U> bufferSupplier)
  • buffer(ObservableSource<B> boundary, final int initialCapacity)
  • buffer(Callable<? extends ObservableSource<B>> boundarySupplier, Callable<U> bufferSupplier)<br />

添加一个Observable作为监视器。监视器每发射一个值都作为边界,边界之前的打包内容发射,边界之后开始下一个打包工作。类似用刀切一样,每次刀落下就会完成分块,而刀就是由监视器控制的。
参数boundarySupplier作为工厂,他的call方法会提供作为监视器的Observable。
参数bufferSupplier作为工厂,他的call方法的返回值会指定一个集合(Collection),这个集合会成为每次打包的容器。
参数initialCapacity会预设打包容器的大小。
image.png

  • buffer( ObservableSource<? extends TOpening> openingIndicator, Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
  • **buffer( ObservableSource<? extends TOpening> openingIndicator, Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator, Callable<U> bufferSupplier)**

上面通过添加边界(boundary)监视器的方式为打包进行分块。这里的实现可以指定打包的起点和结束点,更加地灵活。
image.png

  • buffer在背压上的表现

//TBD

参考资料

http://reactivex.io/documentation/operators/buffer.html