Merge

在”异步的世界“中经常会创建这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。RxJava的merge()方法将帮助你把两个甚至更多的Observables合并到他们发射的数据项里。下图给出了把两个序列合并在一个最终发射的Observable。

Merge - 图1

正如你看到的那样,发射的数据被交叉合并到一个Observable里面。注意如果你同步的合并Observable,它们将连接在一起并且不会交叉。

像往常一样,我们用我们的App和已安装的App列表来创建了一个“真实世界”的例子。为此我们还需要第二个Observable。我们可以创建一个单独的应用列表然后让它逆序排列。当然这没有实际的意义,只是为了这个例子。对于第二个列表,我们的loadList()函数像下面这样:

  1. private void loadList(List<AppInfo> apps) {
  2. mRecyclerView.setVisibility(View.VISIBLE);
  3. List reversedApps = Lists.reverse(apps);
  4. Observable<AppInfo> observableApps =Observable.from(apps);
  5. Observable<AppInfo> observableReversedApps =Observable.from(reversedApps);
  6. Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
  7. mergedObserbable.subscribe(new Observer<AppInfo>(){
  8. @Override
  9. public void onCompleted() {
  10. mSwipeRefreshLayout.setRefreshing(false);
  11. Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
  12. }
  13. @Override
  14. public void onError(Throwable e) {
  15. Toast.makeText(getActivity(), "One of the two Observable threw an error!", Toast.LENGTH_SHORT).show();
  16. mSwipeRefreshLayout.setRefreshing(false);
  17. }
  18. @Override
  19. public void onNext(AppInfoappInfo) {
  20. mAddedApps.add(appInfo);
  21. mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
  22. }
  23. });
  24. }

我们创建了Observable和observableApps数据项以及新的observableReversedApps逆序列表。使用Observable.merge(),我们可以创建新的Observable MergedObservable,它在单个可观测序列中发射源Observables发出的所有数据。

正如你能看到的,每个方法签名都是一样的,因此我们的观察者无需在意任何不同就可以复用代码。结果如下: Merge - 图2

注意错误时的toast消息,你可以认为每个Observable抛出的错误都将会打断合并。如果你需要避免这种情况,RxJava提供了mergeDelayError(),它能从一个Observable中继续发射数据即便是其中有一个抛出了错误。当所有的Observables都完成时,mergeDelayError()将会发射onError(),如下图所示:

Merge - 图3