SubscribeOn and ObserveOn

我们学到了如何在一个调度器上运行一个任务。但是我们如何利用它来和Observables一起工作呢?RxJava提供了subscribeOn()方法来用于每个Observable对象。subscribeOn()方法用Scheduler来作为参数并在这个Scheduler上执行Observable调用。

在“真实世界”这个例子中,我们调整loadList()函数。首先,我们需要一个新的getApps()方法来检索已安装的应用列表:

  1. private Observable<AppInfo> getApps() {
  2. return Observable.create(subscriber -> {
  3. List<AppInfo> apps = new ArrayList<>();
  4. SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
  5. Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
  6. String serializedApps = sharedPref.getString("APPS", "");
  7. if (!"".equals(serializedApps)) {
  8. apps = new Gson().fromJson(serializedApps,appInfoType);
  9. }
  10. for (AppInfo app : apps) {
  11. subscriber.onNext(app);
  12. }
  13. subscriber.onCompleted();
  14. });
  15. }

getApps()方法返回一个AppInfo的Observable。它先从Android的SharePreferences读取到已安装的应用程序列表。反序列化,并一个接一个的发射AppInfo数据。使用新的方法来检索列表,loadList()函数改成下面这样:

  1. private void loadList() {
  2. mRecyclerView.setVisibility(View.VISIBLE);
  3. getApps().subscribe(new Observer<AppInfo>() {
  4. @Override
  5. public void onCompleted() {
  6. mSwipeRefreshLayout.setRefreshing(false);
  7. Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
  8. }
  9. @Override
  10. public void onError(Throwable e) {
  11. Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
  12. mSwipeRefreshLayout.setRefreshing(false);
  13. }
  14. @Override
  15. public void onNext(AppInfo appInfo) {
  16. mAddedApps.add(appInfo);
  17. mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
  18. }
  19. });
  20. }

如果我们运行代码,StrictMode将会报告一个不合规操作,这是因为SharePreferences会减慢I/O操作。我们所需要做的是指定getApps()需要在调度器上执行:

  1. getApps().subscribeOn(Schedulers.io())
  2. .subscribe(new Observer<AppInfo>() { [...]

Schedulers.io()将会去掉StrictMode的不合规操作,但是我们的App现在崩溃了是因为:

  1. at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.jav a:58)
  2. at java.util.concurrent.Executors$RunnableAdapter.call(Executors. java:422)
  3. at java.util.concurrent.FutureTask.run(FutureTask.java:237)
  4. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.access$201(ScheduledThreadPoolExecutor.java:152)
  5. at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutu reTask.run(ScheduledThreadPoolExecutor.java:265)
  6. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolEx ecutor.java:1112)
  7. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolE xecutor.java:587)
  8. at java.lang.Thread.run(Thread.java:841) Caused by:
  9. android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.

Only the original thread that created a view hierarchy can touch its views.

我们再次回到Android的世界。这条信息简单的告诉我们我们试图在一个非UI线程来修改UI操作。意思是我们需要在I/O调度器上执行我们的代码。因此我们需要和I/O调度器一起执行代码,但是当结果返回时我们需要在UI线程上操作。RxJava让你能够订阅一个指定的调度器并观察它。我们只需在loadList()函数添加几行代码,那么每一项就都准备好了:

  1. getApps()
  2. .onBackpressureBuffer()
  3. .subscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(new Observer<AppInfo>() { [...]

observeOn()方法将会在指定的调度器上返回结果:如例子中的UI线程。onBackpressureBuffer()方法将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。做完这些工作之后,如果我们运行App,就会出现已安装的程序列表:

SubscribeOn and ObserveOn - 图1