作用
源Observable发射的数据会经过指定的方法计算得到一个Key,所有的数据按Key进行分组,每个分组由一个Observable发射。GroupBy
操作符会将Observable拆分为多个Observable,每个Observable负责源Observable数据序列的一个自序列。
在RxJava中,会被拆分为多个GroupedObservable。GroupedObservable是Observable的子类,提供了getKey方法可以获取到对应的Key。
注意:GroupedObservable会在订阅后开始缓存数据,如果你忽略了某个GroupedObservable就会引起内存泄漏。所以当你不需要观察某个GroupedObservable的时候可以使用take(0)
这样的操作符来丢弃缓存。
如果你取消订阅了某个GroupedObservable,这个GroupedObservable会终止。如果源Observable有发射了与这个Key相匹配的数据,又回重新创建一个GroupedObservable。
示例
注释:代码使用RxJava2,版本号2.2.19。
RxJava中提供了以下几种实现和扩展。(过长方法去掉了泛型内容)
groupBy(Function<? super T, ? extends K> keySelector)
groupBy(Function<? super T, ? extends K> keySelector,boolean delayError)
groupBy(Function keySelector,Function valueSelector)
groupBy(Function keySelector,Function valueSelector,boolean delayError)
groupBy(Function keySelector,Function valueSelector,boolean delayError,int bufferSize)
groupJoin(ObservableSource other,Function leftEnd,Function rightEnd,BiFunction resultSelector)<br />
String[] names = {"Lucy", "Andy", "Johnson", "Lee","Frank"};
Observable<String> fromArray = Observable.fromArray(names);
fromArray.groupBy(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
//按名字长度分组
return s.length();
}
}).subscribe(new Consumer<GroupedObservable<Integer, String>>() {
@Override
public void accept(GroupedObservable<Integer, String> integerStringGroupedObservable) {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
//TODO
}
});
参考资料
http://reactivex.io/documentation/operators/groupby.html