作用
源Observable发射的数据会经过指定的方法计算得到一个Key,所有的数据按Key进行分组,每个分组由一个Observable发射。GroupBy操作符会将Observable拆分为多个Observable,每个Observable负责源Observable数据序列的一个自序列。
在RxJava中,会被拆分为多个GroupedObservable。GroupedObservable是Observable的子类,提供了getKey方法可以获取到对应的Key。
注意:GroupedObservable会在订阅后开始缓存数据,如果你忽略了某个GroupedObservable就会引起内存泄漏。所以当你不需要观察某个GroupedObservable的时候可以使用take(0)这样的操作符来丢弃缓存。
如果你取消订阅了某个GroupedObservable,这个GroupedObservable会终止。如果源Observable有发射了与这个Key相匹配的数据,又回重新创建一个GroupedObservable。
示例
注释:代码使用RxJava2,版本号2.2.19。
RxJava中提供了以下几种实现和扩展。(过长方法去掉了泛型内容)
groupBy(Function<? super T, ? extends K> keySelector)groupBy(Function<? super T, ? extends K> keySelector,boolean delayError)groupBy(Function keySelector,Function valueSelector)groupBy(Function keySelector,Function valueSelector,boolean delayError)groupBy(Function keySelector,Function valueSelector,boolean delayError,int bufferSize)groupJoin(ObservableSource other,Function leftEnd,Function rightEnd,BiFunction resultSelector)<br />String[] names = {"Lucy", "Andy", "Johnson", "Lee","Frank"};Observable<String> fromArray = Observable.fromArray(names);fromArray.groupBy(new Function<String, Integer>() {@Overridepublic Integer apply(String s) {//按名字长度分组return s.length();}}).subscribe(new Consumer<GroupedObservable<Integer, String>>() {@Overridepublic void accept(GroupedObservable<Integer, String> integerStringGroupedObservable) {//TODO}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {//TODO}});
参考资料
http://reactivex.io/documentation/operators/groupby.html
