作用

源Observable发射的数据会经过指定的方法计算得到一个Key,所有的数据按Key进行分组,每个分组由一个Observable发射。GroupBy操作符会将Observable拆分为多个Observable,每个Observable负责源Observable数据序列的一个自序列。
image.png
在RxJava中,会被拆分为多个GroupedObservable。GroupedObservable是Observable的子类,提供了getKey方法可以获取到对应的Key。

注意:GroupedObservable会在订阅后开始缓存数据,如果你忽略了某个GroupedObservable就会引起内存泄漏。所以当你不需要观察某个GroupedObservable的时候可以使用take(0)这样的操作符来丢弃缓存。

如果你取消订阅了某个GroupedObservable,这个GroupedObservable会终止。如果源Observable有发射了与这个Key相匹配的数据,又回重新创建一个GroupedObservable。

示例

注释:代码使用RxJava2,版本号2.2.19。
RxJava中提供了以下几种实现和扩展。(过长方法去掉了泛型内容)

  • groupBy(Function<? super T, ? extends K> keySelector)
  • groupBy(Function<? super T, ? extends K> keySelector,boolean delayError)
  • groupBy(Function keySelector,Function valueSelector)
  • groupBy(Function keySelector,Function valueSelector,boolean delayError)
  • groupBy(Function keySelector,Function valueSelector,boolean delayError,int bufferSize)
  • groupJoin(ObservableSource other,Function leftEnd,Function rightEnd,BiFunction resultSelector)<br />
    1. String[] names = {"Lucy", "Andy", "Johnson", "Lee","Frank"};
    2. Observable<String> fromArray = Observable.fromArray(names);
    3. fromArray.groupBy(new Function<String, Integer>() {
    4. @Override
    5. public Integer apply(String s) {
    6. //按名字长度分组
    7. return s.length();
    8. }
    9. }).subscribe(new Consumer<GroupedObservable<Integer, String>>() {
    10. @Override
    11. public void accept(GroupedObservable<Integer, String> integerStringGroupedObservable) {
    12. //TODO
    13. }
    14. }, new Consumer<Throwable>() {
    15. @Override
    16. public void accept(Throwable throwable) {
    17. //TODO
    18. }
    19. });

    参考资料

    http://reactivex.io/documentation/operators/groupby.html