我们经常见到这么一些场景:

  • 微博的列表页面
  • 各类协同工具的任务看板,比如 Teambition

流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑 - 知乎 - 图1

这类场景的一个共同特点是:

  • 由若干个小方块构成
  • 每个小方块需要以一个业务实体为主体(一条微博,一个任务),聚合一些其他关联信息(参与者,标签等)

这么一个界面,我们考虑它的完全展示,可能会有这么两种方案:

  • 服务端渲染,查询所有数据,生成 HTML 之后发送给浏览器
  • 前端渲染,查询所有数据,发送给浏览器生成 HTML 展示

微博使用的前一种,并且引入了 bigpipe 机制来生成界面,而 Teambition 则使用后一种,主要差别还是由于产品形态。

业务上的挑战

在前端渲染的情况下,这么一种界面形态,所带来的挑战有哪些呢?

  • 信息量较大,导致查询较复杂,其中有部分数据是可复用的,比如说,这么一大片面板,可能几百条任务,但是其中人员可能就 20 个,所有参与者都在这 20 个人里面。
  • 如果要做一些比较实时的交互,会比较麻烦,比如说,某个用户修改了头像,某个标签定义修改了文字,都会需要去立刻更新当前界面所有的引用部分。

所以,这就要求我们的数据查询是离散化的,任务信息和额外的关联信息分开查询,然后前端来组装,这样,一是可以减少传输数据量,二是可以分析出数据之间的关系,更新的时候容易追踪。

除此之外,Teambition 的操作会在全业务维度使用 WebSocket 来做更新推送,比如说,当前任务看板中,有某个东西变化了(其他人创建了任务、修改了字段),都会由服务端推送消息,来促使前端更新界面。

离散的数据会让我们需要使用缓存。比如说,界面建立起来之后,如果有人在其他端创建了任务,那么,本地的看板只需收到这条任务信息并创建视图,并不需要再去查询人员、标签等关联信息,因为之前已经获取过。所以,大致会是这个样子:

某视图组件的展示,需要聚合 ABC 三个实体,其中,如果哪个实体在缓存中存在,就不去服务端拉取,只拉取无缓存的实体。

这个过程带给我们第一个挑战:

查询同一种数据,可能是同步的(缓存中获取),可能是异步的(AJAX 获取),业务代码编写需要考虑两种情况。

WebSocket 推送则用来保证我们前端缓存的正确性。但是,我们需要注意到,WebSocket 的编程方式跟 AJAX 是不一样的,WebSocket 是一种订阅,跟主流程很难整合起来,而 AJAX 相对来说,可以组织得包含在主流程中。

例如,对同一种更新的不同发起方(自己修改一个东西,别人修改这个东西),这两种的后续其实是一样,但代码并不相同,需要写两份业务代码。

这样就带给我们第二个挑战:

获取数据和数据的更新通知,写法是不同的,会加大业务代码编写的复杂度。

我们的数据这么离散,从视图角度看,每块视图所需要的数据,都可能是经过比较长而复杂的组合,才能满足展示的需要。

所以,第三个挑战:

每个渲染数据,都是通过若干个查询过程(刚才提到的组合同步异步)组合而成,如何清晰地定义这种组合关系?

此外,我们可能面临这样的场景:

一组数据经过多种规则(过滤,排序)之后,又需要插入新的数据(主动新增了一条,WebSocket 推送了别人新建的一条),这些新增数据都不能直接加进来,而是也必须走一遍这些规则,再合并到结果中。

这就是第四个挑战:

对于已有数据和未来数据,如何简化它们应用同样规则的代码复杂度。

带着这些问题,我们来开始今天的思考过程。

同步和异步

在前端,经常会碰到同步、异步代码的统一。假设我们要实现一个方法:当有某个值的时候,就返回这个值,否则去服务端获取这个值。

通常的做法是使用 Promise:

  1. function getDataP() {
  2. if (a) {
  3. return Promise.resolve(a)
  4. } else {
  5. return AJAX.get('a')
  6. }
  7. }

所以,我们处理这个事情的办法就是,如果不确定是同步还是异步,那就取异步,因为它可以兼容同步,刚才代码里面的 resolve 就是强制把同步的东西也转换为兼容异步的 Promise。

我们只用 Promise 当然也可以解决问题,但 RxJS 中的 Observable 在这一点上可以一样做到:

function getDataO() {
  if (a) {
    return Observable.of(a)
  } else {
    return Observable.fromPromise(AJAX.get('a'))
  }
}

有人要说了,你这段代码还不如 Promise,因为还是要从它转啊,优势在哪里呢?

我们来看看刚才封装出来的方法,分别是怎么使用的呢?

getDataP().then(data => {
  // Promise 只有一个返回值,响应一次
  console.log(data)
})

getDataO().subscribe(data => {
  // Observable 可以有多个返回值,响应多次
  console.log(data)
})

在这一节里,我们不对比两者优势,只看解决问题可以通过怎样的办法:

  • getData(),只能做同步的事情
  • getDataP(),可以做同步和异步的事情
  • getDataO(),可以做同步和异步的事情

结论就是,无论 Promise 还是 Observable,都可以实现同步和异步的封装。

获取和订阅

通常,我们在前端会使用观察者或者订阅发布模式来实现自定义事件这样的东西,这实际上就是一种订阅。

从视图的角度看,其实它所面临的是:

得到了一个新的任务数据,我要展示它

至于说,这个东西是怎么得到的,是主动查询来的,还是别人推送过来的,并不重要,这不是它的职责,它只管显示。

所以,我们要给它封装的是两个东西:

  • 主动查询的数据
  • 被动推送的数据

然后,就变成类似这么一个东西:

service.on('task', data => {
  // render
})

这么一来,视图这里就可以用相同的方式应对两种不同来源的数据了,service 内部可以去把两者统一,在各自的回调里面触发这个自定义事件 task。

但我们似乎忽略了什么事,视图除了响应这种事件之外,还需要去主动触发一下初始化的查询请求:

service.on('task', data => {
  // render
})

service.getData()   // 加了这么一句来主动触发请求

这样看起来还是挺别扭的,回到上一节里面我们的那个 Observable 示例:

getDataO().subscribe(data => {
  // render
})

这么一句好像就搞定了我们要求的所有事情。我们可以这么去理解这件事:

  • getDataO 是一个业务过程
  • 业务过程的结果数据可以被订阅

这样,我们就可以把获取和订阅这两件事合并到一起,视图层的关注点就简单很多了。

可组合的数据管道

依据上一节的思路,我们可以把查询过程和 WebSocket 响应过程抽象,融为一体。

说起来很容易,但关注其实现的话,就会发现这个过程是需要好多步骤的,比如说:

data1      data2      data3
  |          |          |
  ------------          |
        |               |
        -----------------
                |
              state

一个视图所需要的数据可能是这样的:

  • data1 跟 data2 通过某种组合,得到一个结果
  • 这个结果再去跟 data3 组合,得到最终结果

我们怎么去抽象这个过程呢?

注意,这里面 data1,data2,data3,可能都是之前提到过的,包含了同步和异步封装的一个过程,具体来说,就是一个 RxJS Observable。

可以把每个 Observable 视为一节数据流的管道,我们所要做的,是根据它们之间的关系,把这些管道组装起来,这样,从管道的某个入口传入数据,在末端就可以得到最终的结果。

RxJS 给我们提供了一堆操作符用于处理这些 Observable 之间的关系,比如说,我们可以这样:

const A$ = Observable.interval(1000)
const B$ = Observable.of(3)
const C$ = Observable.from([5, 6, 7])

const D$ = C$.toArray()
  .map(arr => arr.reduce((a, b) => a + b), 0)
const E$ = Observable.combineLatest(A$, B$, D$)
   .map(arr => arr.reduce((a, b) => a + b), 0)

上述的 D 就是通过 C 进行一次转换所得到的数据管道,而 E 是把 A,B,D 进行拼装之后得到的数据管道,

A ------> |
B ------> | -> E
C -> D -> |

从以上的示意图就可以看出它们之间的组合关系,通过这种方式,我们可以描述出业务逻辑的组合关系,把每个小粒度的业务封装到数据管道中,然后对它们进行组装,拼装出整体逻辑来。

现在和未来

在业务开发中,我们时常遇到这么一种场景:

已过滤排序的列表中加入一条新数据,要重新按照这条规则走一遍。

我用一个简单的类比来描述这件事:

每个进教室的同学都可以得到一颗糖

这句话表达了两个含义:

  • 在这句断言产生之前,对于已经在教室里的每个人,都应当去给他们发一颗糖
  • 在这句断言形成以后,再进入这个教室的每个人,都应当得到一颗糖

这里面,第一句表达的是现在,第二句表达的是未来。我们编写业务程序的时候,往往会把现在和未来分开考虑,而忽略了他们之间存在的深层次的一致性。

我们想通了这个事情之后,再反过来考虑刚才这个问题,能得到的结论是:

进入本列表的数据都应当经过某种过滤规则和某种排序规则

这才是一个合适的业务抽象,然后再编写代码就是:

const final$ = source$.map(filterA).map(sorterA)

其中,source 代表来源,而 final 代表结果。来源经过 filterA 变换、sorterA 变换之后,得到结果。

然后,我们再去考虑来源的定义:

const source$ = start$.merge(patch$)

来源等于初始数据与新增数据的合并。

然后,实现出 filterA 和 sorterA,就完成了整个这段业务逻辑的抽象定义。给 start 和 patch 分别进行定义,比如说,start 是一个查询,而 patch 是一个推送,它就是可运行的了。最后,我们在 final 上添加一个订阅,整个过程就完美地映射到了界面上。

很多时候,我们编写代码都会考虑进行合适的抽象,但这两个字代表的含义在很多场景下并不相同。

很多人会懂得把代码划分为若干方法,若干类型,若干组件,以为这样就能够把整套业务的运转过程抽象出来,其实不然。

业务逻辑的抽象是与业务单元不同的方式,前者是血脉和神经,后者是肢体和器官,两者需要结合在一起,才能够成为鲜活的整体。

一般场景下,业务单元的抽象难度相对较低,很容易理解,也容易获得关注,所以通常都能做得还不错,比如最近两年,对于组件化之类的话题,都能够谈得起来了,但对于业务逻辑的抽象,大部分项目是做得很不够的,值得深思。

视图如何使用数据流

以上,我们谈及的都是在业务逻辑的角度,如何使用 RxJS 来组织数据的获取和变更封装,最终,这些东西是需要反映到视图上去的,这里面有些什么有意思的东西呢?

我们知道,现在主流的 MV*框架都基于一个共同的理念:MDV(模型驱动视图),在这个理念下,一切对于视图的变更,首先都应当是模型的变更,然后通过模型和视图的映射关系,自动同步过去。

在这个过程中,我们可能会需要通过一些方式定义这种关系,比如 Angular 和 Vue 中的模板,React 中的 JSX 等等。

在这些体系中,如果要使用 RxJS 的 Observable,都非常简单:

data$.subscribe(data => {
  // 这里根据所使用的视图库,用不同的方式响应数据
  // 如果是 React 或者 Vue,手动把这个往 state 或者 data 设置
  // 如果是 Angular 2,可以不用这步,直接把 Observable 用 async pipe 绑定到视图
  // 如果是 CycleJS ……
})

这里面有几个点要说一下:

Angular2 对 RxJS 的使用是非常方便的,形如:let todo of todos$ | async 这种代码,可以直接绑定一个 Observable 到视图上,会自动订阅和销毁,比较简便优雅地解决了 “等待数据”,“数据结果不为空”,“数据结果为空” 这三种状态的差异。Vue 也可以用插件达到类似的效果。

CycleJS 比较特别,它整个运行过程就是基于类似 RxJS 的机制,甚至包括视图,看官方的这个 Demo:

import {run} from '@cycle/xstream-run';
import {div, label, input, hr, h1, makeDOMDriver} from '@cycle/dom';

function main(sources) {
  const sinks = {
    DOM: sources.DOM.select('.field').events('input')
      .map(ev => ev.target.value)
      .startWith('')
      .map(name =>
        div([
          label('Name:'),
          input('.field', {attrs: {type: 'text'}}),
          hr(),
          h1('Hello ' + name),
        ])
      )
  };
  return sinks;
}

run(main, { DOM: makeDOMDriver('#app-container') });

这里面,注意 DOM.select 这段。这里,明显是在界面还不存在的情况下就开始 select,开始添加事件监听了,这就是我刚才提到的预先定义规则,统一现在与未来:如果界面有. field,就立刻添加监听,如果没有,等有了就添加。

那么,我们从视图的角度,还可以对 RxJS 得出什么思考呢?

  1. 可以实现异步的计算属性。

在上次这篇数据的关联计算里简单提了一下,其实整篇是在给这篇做伏笔。

  1. 我们有没有考虑过,如何从视图的角度去组织这些数据流?

一个分析过程可以是这样:

  • 检阅某视图,发现它需要数据 a,b,c
  • 把它们的来源分别定义为数据流 A,B,C
  • 分析 A,B,C 的来源,发现 A 来源于 D 和 E;B 来源于 E 和 F;C 来源于 G
  • 分别定义这些来源,合并相同的部分,得到多条直达视图的管道流
  • 然后定义这些管道流的组合过程,做合适的抽象

小结

使用 RxJS,我们可以达到以下目的:

  • 同步与异步的统一
  • 获取和订阅的统一
  • 现在与未来的统一
  • 可组合的数据变更过程

还有:

  • 数据与视图的精确绑定
  • 条件变更之后的自动重新计算

Teambition SDK

Teambition 新版数据层使用 RxJS 构建,不依赖任何展现框架,可以被任何展现框架使用,甚至可以在 NodeJS 中使用,对外提供了一整套 Reactive 的 API,可以查阅文档和代码来了解详细的实现机制。

基于这套机制,可以很轻松实现一套基于 Teambition 平台的独立视图,欢迎第三方开发者发挥自己的想象,用它构建出各种各样有趣的东西。我们也会逐步添加一些示例。

如何理解整个机制

怎么理解这么一套机制呢,可以想象一下这张图:

流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑 - 知乎 - 图2

把 Teambition SDK 看作一个 CPU,API 就是他对外提供的引脚,视图组件接在这些引脚上,每次调用 API,就如同从一个引脚输入数据,但可能触发多个引脚对外发送数据。细节可以参见 SDK 的设计文档。

另外,对于 RxJS 数据流的组合,也可以参见这篇文章,你点开链接之后可能心想:这两者有什么关系!

翻到最后那个图,从侧面看到多个波叠加,你想象一下,如果把视图的状态理解为一个时间轴上的流,它可以被视为若干个其他流的叠加,这么多流叠加起来,在当前时刻的值,就是能够表达我们所见视图的全部状态数据。

这么想一遍是不是就容易理解多了?

我第一次看到 RxJS 相关理念大概是 5 年前,当时老赵他们在讨论这个,我看了几天之后的感觉就是对智商形成了巨大考验,直到最近一两年才算是入门了,不过仅限与业务应用,背后的深层数学理论仍然是不通的。现在的程度,大概相当于一个勉强能应用四则运算解应用题的小学生吧。

还有一个问题是,虽然刚才又是贴图又是贴链接,显得好厉害,但我大学时候的数字电路和信号系统都是挂了的,但最近回头想这些东西,发现突然好像能理解了,果然很多东西背后的思想是一致的。

后记

今年年初,我在知乎回答了一个问题:前端如何更好的实现接口的缓存和更新?

正是这篇文章引起的思考使得我加入 Teambition,因为这正是一个完美的场景,入职之后跟团队的 同学详细描述了思路,经过他半年的持续努力,实现了这样的一个东西,挺不容易的。

目前我们的 Mobile Web 版本使用了这个 SDK,Web 还没有深度接入,因为需要解决新老数据的同步问题,正在努力中。

广告:招一个人参与 Teambition SDK 的维护,70% 以上时间是维护这个东西,偶尔参与业务上的一些改进,要求熟悉 RxJS。

另外,如果有对于这种情况下的视图改进有兴趣的,也可以联系我,也要一个。

这篇文章,我讲了两次,第一次是半个月之前开源中国在重庆举办的分享,另一次是昨天下午在饿了么的分享。

今天总结写出来,给大家分享一下。
https://zhuanlan.zhihu.com/p/23305264