ZIP

在一种新的可能场景中处理多个数据来源时会带来:多从个Observables接收数据,处理它们,然后将它们合并成一个新的可观测序列来使用。RxJava有一个特殊的方法可以完成:zip()合并两个或者多个Observables发射出的数据项,根据指定的函数Func*变换它们,并发射一个新值。下图展示了zip()方法如何处理发射的“numbers”和“letters”然后将它们合并一个新的数据项:

ZIP - 图1

对于“真实世界”的例子来说,我们将使用已安装的应用列表和一个新的动态的Observable来让例子变得有点有趣味。

  1. Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

tictocObservable变量使用interval()函数每秒生成一个Long类型的数据:虽简单但有效,正如之前所说的,我们需要一个Func对象。因为它需要传两个参数,所以是Func2:

  1. private AppInfo updateTitle(AppInfoappInfo, Long time) {
  2. appInfo.setName(time + " " + appInfo.getName());
  3. return appInfo;
  4. }

现在我们的loadList()函数变成这样:

  1. private void loadList(List<AppInfo> apps) {
  2. mRecyclerView.setVisibility(View.VISIBLE);
  3. Observable<AppInfo> observableApp = Observable.from(apps);
  4. Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);
  5. Observable.zip(observableApp, tictoc,
  6. (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
  7. .observeOn(AndroidSchedulers.mainThread())
  8. .subscribe(new Observer<AppInfo>() {
  9. @Override
  10. public void onCompleted() {
  11. Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
  12. }
  13. @Override
  14. public void onError(Throwable e) {
  15. mSwipeRefreshLayout.setRefreshing(false);
  16. Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
  17. }
  18. @Override
  19. public void onNext(AppInfoappInfo) {
  20. if (mSwipeRefreshLayout.isRefreshing()) {
  21. mSwipeRefreshLayout.setRefreshing(false);
  22. }
  23. mAddedApps.add(appInfo);
  24. int position = mAddedApps.size() - 1;
  25. mAdapter.addApplication(position, appInfo);
  26. mRecyclerView.smoothScrollToPosition(position);
  27. }
  28. });
  29. }

正如你看到的那样,zip()函数有三个参数:两个Observables和一个Func2

仔细一看会发现observeOn()函数。它将在下一章中讲解:现在我们可以小试一下。

结果如下:

ZIP - 图2