rxjs 相信大家都听说过, 没听说过的可以看看他们官网, 是一个响应式编程的库, Rxjs 跟 react 结合来用的人就比较少了, 这里专门用一个例子来举例看看如何使用 rxjs 替代 redux 此类的状态管理库来充当 react 等框架的 model 层. 下面假设读者具有了 rxjs 的基础知识, 对最基础的 API, 例如 subscribe 和 unsubscribe 不做特别介绍.

重点

实际上在前端娱乐圈里已经流行了 observable 这个概念, 比如 mobx 和 vue3 都是基于 observable 这个概念的, 有可能给部分人造成一种印象, 以为 reactive 就是 Proxy 监听一下变化然后触发一下重新渲染, 确实 observable 就是这样的东西, 然而 rxjs 的强大之处并不在于这里, 在 rxjs 里面, 并没有魔法一样的 Proxy, 只有冷冰冰的 next(), 而 rxjs 的强大之处其实在于具有强大表达能力的 operators.

rxjs 当前版本包含了可以说让人眼花缭乱的静态方法和操作符 (operators), 使得刚上手的人会感到有一定门槛, 但是其实我们常用的并不多. 下面通过一个例子来看看如何在实战中使用 rxjs 管理 react 的 model 层.

例子还是 todoList, 视图层就用 react.

首先我们需要一个把 observable 的状态同步到 react 组件的东西, 这个东西在 redux 里叫 useSelector, 这里我们实现一个叫做 useObservable 的 hook:

  1. function useObservable<Value>(ob: Observable<Value>): Value | null

具体实现就当做作业了, 这个还是很好实现的, 注意要兼容 concurrent mode, 并且当 ob 本身改变的时候需要取消原有订阅再订阅新 ob.

然后我们就可以实现一个 model 了, 代码如下

// src/todo/service/api.ts
import { fromFetch } from "rxjs/fetch"
import { map, switchMap } from "rxjs/operators"

interface ITodoItem {
    id: number
}

export interface FetchTodoListParams {
    page: number
    pageSize: number
    search: string
}

function handleRequest<T=unknown>(request: Observable<Response>){
    return request.pipe(
        switchMap(x => x.json()),
        switchMap(handleError)
        map(x => x.data as T)
    )
}

const apiService = {
    // 这个代表了一切的‘读’的方法
    loadList(params: FetchTodoListParams) {
        return fromFetch("/todolist" + new URLSearchParams((params as unknown) as Record<string, string>).toString(), {
            method: "get",
        }).pipe(
            x=>handleRequest<TodoItem[]>(x)
        )
    },
    // 这个可以代表一切的‘写’的方法, 你可以脑补成增删改
    someAction(todoItemId: ITodoItem["id"]) {
        return fromFetch("/todolist/someAction", {
            method: "post",
            body: JSON.stringify(todoItemId),
        }).pipe(
            handleRequest
        )
    },
}

export default apiService

那么上面就是一个 apiService, 这里为了方便省略了 handleError 的实现, 下面直接从 TodoListModel 里 import 这个 service, 喜欢 di 之类的同学可以用自己喜欢的 di 框架注入服务.

// src/todo/model.ts
import apiService, {FetchTodoListParams} from "todo/service/api"
import { BehaviorSubject, combineLatest, Subject } from "rxjs"
import { map, publishReplay, refCount, startWith, debounceTime, switchMap, tap } from "rxjs/operators"

class TodoListModel {
    // 注意这个是私有的, 组件不需要关心这个.
    private listNeedsUpdate = new Subject()
    public pagination = new BehaviorSubject({
        page: 0,
        pageSize: 10,
    })
    public search = new BehaviorSubject("")
    //prettier-ignore
    public todoList = combineLatest([
        this.pagination,
        this.search.pipe(
            debounceTime(600)
        ), 
        this.listNeedsUpdate.pipe(
            startWith(null)
        )
    ]).pipe(
        switchMap(([pagination, search]) => {
            return apiService.loadList({
                ...pagination,
                search,
            })
        }),
        publishReplay(),
        refCount(),
    )
    public someAction(todoItemId: number) {
        return apiService
            .someAction(todoItemId)
            .pipe(
                tap(() => {
                    this.listNeedsUpdate.next()
                })
            )
            .toPromise()
    }
    public changePagination(page: number, pageSize: number){
        this.pagination.next({
            page,
            pageSize,   
        })
    }
    public changeSearch(search: string){
        this.search.next(search)
    }
}

然后我们的组件就可以在任何地方依赖这个服务了:

export function ListView(){
    const list = useObservable(todoListModel.todoList)
    return <div>
        <List dataSource={list} renderItem={...} />
    </div>
}
export function TableView(){
    const list = useObservable(todoListModel.todoList)
    return <div>
        <Table dataSource={list} columns={...} />
    </div>
}
export function SearchView(){
    const pagination = useObservable(todoListModel.params)
    return <div>
        <Pagination page={pagination.page} pageSize={pagination.pageSize} onChange={(page, pageSize)=>{
            todoListModel.changeParams({
                page,
                pageSize
            })
        }}>
        <Search value={pagination.search} onSearch={newSearch=>{
            todoListModel.changeParams({
                search: newSearch
            })
        }} />
    </div>
}

为什么

下面我首先讲一下这里用到的每个操作符是什么, 为什么要这么写.

首先从 apiService 开始

fromFetch

这个跟 fetch api 一模一样, 唯一的区别就是 fetch api 返回的是 promise, 而 fromFetch 返回的是 observable, 当 observable 被退订的时候, 会去调用 abortControler 的 abort 来取消这个请求.

switchMap

它非常类似 Array.prototype 的 flatMap, 以及 Promise.prototype.then 的一半作用. 其作用就是把嵌套的 Observable 摊平, 放在我们这里就是类似 redux-saga 的 takeLatest, 当有新请求的时候取消仍然在跑的旧请求. 了解函数式编程的同学知道这个是 monad 的 bind 函数. 把嵌套的 observable 摊平的方法有很多,其他还有 mergeMap 和 exhaustMap 等,用的最多的还是 switchMap。

map

它非常类似 Array.prototype 的 map, 以及 Promise.prototype.then 的另一半作用. 其作用就是把 Observable 里的值转换成新的值, 这个可以说是最常用的操作符.

可以看出其实 switchMap 和 map 在这里就是 then 的作用.

然后是 model.

combineLatest (静态方法)

它类似 Promise.all, 需要注意的是, 如果它的参数 observable 的其中一个迟迟不 next 的话, 它就没办法将数据传下去, 因此我们这里用了 startWith, 其道理是讲得通的, 即使列表没有发生变化 (发生变化这个事件被我们的 needsUpdate 代表了), 我们初始的时候也需要调用一次请求拿到数据.

debounceTime

这个很好理解, 输入框需要 debounce 才能发出请求, 否则就会输入一个字母发一次请求.

tap

它类似 Array.prototype 的 forEach, 其作用在于执行副作用, 这里用于在 someAction 完成之后触发重新拉取数据

publishReplay & refCount 重要!

这个是灵魂所在.

要讲清楚这个, 首先需要搞懂冷热 observable 的概念. 这个在网上能搜到很多说法, 并且对于 rx 新手来说往往难以理解, 我在这里用一套我自己总结的 (可能过分简化的) 方法来说明:

我们用 Promise 来类比, 如果 Promise 也有冷热的话, 就是:

let cold = ()=>{
    return new Promise(resolve=>setTimeout(resolve, 600))
}
let hot = new Promise(resolve=>setTimeout(resolve, 600))

也就是说, Observable 的冷热就是 (我们先抛开 rxjs 不管, 讨论广义的 observable):

let cold = ()=>多个可以订阅的值;
let hot = 多个可以订阅的值;

相信大家能注意到其区别了, 冷的 observable, 对于每个订阅者来说会分别创建单独的数据流, 而热的 observable 是所有订阅者共享一个数据流.

冷的 observable 的例子就是 API 请求. 这一点能够从 fromFetch 和 fetch 的区别看出来.

  • 对于 fetch 来说, 当你调用 fetch(“/todo”) 的时候, 浏览器就会立刻发出请求.

  • 因此我们用 fetch 定义一个 api 调用会这么写: const fetchTodo = ()=>fetch("/todo"), 注意这是一个‘冷’的 Promise

  • 如果是 fromFetch, 则不会立刻发起请求, 而是在有人订阅的时候才会发出请求.

  • 因此我们用 fromFetch 定义一个 api 调用应该这么写: const fetchTodo = fromFetch("/todo"), 注意这个跟上面的‘冷’的 Promise 对应了

热的 observable 的例子就是按钮的点击事件, 按钮的点击事件并不会由于你订阅还是没有订阅而改变其数据流.

那冷和热跟 publishReplay 有什么关系呢?

首先, 对于 todoList 这个 API 来说, 我们是不喜欢每个对 todoList 的订阅都单独发出请求的, 而 fromFetch 这种冷的 Observable 就是这种行为, 所以我们需要把它变成热的, 也就是把不共享的, 冷的数据流, 变成共享的, 热的数据流. 所以我们需要 publishXXX, rx 里有 publish, publishBehavior, publishLast, publishReplay 等 publish 系列的操作符, 干的都是类似的事情, 那就是把冷的 observable 变成热的.

其次, 如果订阅状态的组件在请求完成之后才订阅, 我们希望它也能读取到请求结果, 因此我们需要数据流有重放机制, 在订阅发生的时候重放最近的一个值, 因此我们选择了 publishReplay 这个操作符.

这还没完, 如果我们仅仅是把 publishReplay 放到 pipe 里面的话, 我们发现下游订阅组件没有收到任何数据改变, 这是为什么呢?

想想如果是你来实现一个把冷的 Promise 转换成热的 Promise 的函数, 你会怎么实现?

let cold = ()=>{
    return new Promise(resolve=>setTimeout(resolve, 600))
}

function publish(cold){
    return cold()
}

这样类型上能说得通, 但是放到 observable 的世界里是不行的, 因为我们不能在 publishXXX() 进行冷热转换的时候就立刻运行冷 observable 的内部逻辑 (上面的例子就是发出 API 请求).

所以再回到 Promise 的世界里来, 我们可以做到更好, 那就是在有 then 的时候才去运行 cold

let cold = ()=>{
    return new Promise(resolve=>setTimeout(resolve, 600))
}

function publish(cold){
    let p
    function then(onresolve, onreject){
        if(!p){
            p = cold()
        }
        return p.then(resolve, onreject)
    }
    return {
        then,
        catch(onreject){
            return then(undefined, onreject)
        }
    }
}

把这种类比放到 rxjs 的世界来说, 就是在第一个订阅者来的时候, 去订阅我们的冷的 observable, 然后后续的订阅者也共享这个数据流, 然后还能做到一点就是在最后一个订阅者退订的时候, 退订冷的 observable.

这个恰恰就是 refCount 操作符所做的事情. 从名字上可以看出它是引用计数的意思, 因为这个行为非常类似有指针的语言对指针进行引用计数来 gc 的做法.

把 publishReplay 和 refCount 总结起来, 他们共同完成的事情就是: 当第一个 useObservable hook 的作用运行的时候, 就去发出请求, 第二个以及第 n 个 useObervable 都共用这一份响应. 并且如果 useObservable hook 所在组件全都 unmount 了, 也就是引用变成了 0 了, 那就立刻调用 abortController 来取消这个不需要的请求.

如果用 redux 来做这个, 需要几行代码? 相信大家能看出 rxjs 精简到令人发指的优点了.

另外一个好处就是完全解耦了模型和视图, 视图甚至不需要有一个 “请求数据” 的 dispatch 的副作用. 只需要声明对数据流本身的依赖. 而模型会在第一个依赖来临的时候自动填充状态, 并在依赖已经不被需要的时候立刻清除所有需要清理的副作用(例如 AbortSignal.abort()).

附加题: 跟踪 loading 和 error

我们可以实现这么一个 custom operator 来对请求的 loading 和 error 进行跟踪, 同样作为附加题来让大家实现:

import {OperatorFunction, ObservableInput} from "rxjs"

type TrackedResponse<R> = {
    loading: true
} | {
    loading: false,
    error: Error
} | {
    loading: false,
    data: R
}

function trackLoadingAndError<Params, Response>(request: (params: Params)=>ObservableInput<Response>): OperatorFunction<Params, TrackedResponse<Response>>

用这个操作符替换掉上面 todoList 的 switchMap 之后, 我们就得到了 list 数据的 loading 值和 error, 并且此操作符可以在任何地方得到复用.
https://zhuanlan.zhihu.com/p/342649155