之前,在 怎样按触发顺序执行异步任务 里,介绍了流的一些基本概念,在 https://github.com/frontend9/fe9-library/issues/25 中,又列举了一种轮询业务场景的实现,本文试图用这种理念去实现同样的轮询功能。本文的例子使用 xstream.js 来编写。
想要使用流的理念来实现业务功能,最重要的就是思维的抽象,比较直接的方法论是:
寻找一切变更的源头,理清事情之间的顺序和依赖关系。
在定时轮询整个场景中,我们可以发现,一切事情的起源是定时器,只有这个来源是不依赖于任何其他东西的,因此,我们得到了第一个数据流:
const periodic$ = xs.periodic(1000);
这样,就得到了一个每秒发生一次的流。
然后,这个流导致什么事情呢?
发送一个HTTP请求,并且取回结果。
并且,我们可以注意到,这个操作是由前一个流触发的,每一次定时器的变动都会触发一次请求,因此,可以进行这么一个映射:
timer => request
所以,可以得到以下代码:
const request$ = periodic$
.map(_ => xs.fromPromise(request(endPointURL)))
此处,我们把一个普通的请求转化为了流,这个流里面实际上最多只会有一个值,也就是请求结果。
需要注意的是,经过我们上面的操作,数据流的形态已经变成二阶了,也就是说,request$ 中的每个元素,都又是一个流(从 Promise 转化出来的流)
想要得到我们预期的效果,就必须对这个流降阶,最简单的方式就是 flatten:
const imageUrl$ = request$.flatten();
降阶的意义是把高阶流的值提出来,并且合并到一个低阶流。刚才我们从 Promise 转出来的流,实际上每个里面只会有一个结果,本次降阶的含义大致类似于:
[[result1], [result2], [result3]] => [result1, result2, result3]
至此,我们只需直接订阅这个 imageUrl$,就可以持续不断地从其中获得新的图片地址了。
回顾整个过程,我们好像是建立了一条线路,或者一条管道,使得数据可以在其中流通。
但我们的需求还要更复杂一些,因为它是要允许通过一个开关,来控制定时拉取操作的启用与否。
那么,我们怎么才能把开关的逻辑加进去呢?
加开关的思路是在刚才的管道上加个阀门。
看看刚才的线路:
定时器 -> 请求 -> 请求结果
这个阀门加在哪里呢,很显然是加在请求和请求结果之间,因为我们的逻辑是:当开关关闭的时候,即使有还在发的请求,它的结果我们也不要了,所以在结果这里处理是比较合适的。
构造一个开关:
const switch$: Stream<boolean> = xs.create();
然后,把它跟之前的请求流组合起来:
const result$ = xs.combine(
imageUrl$,
switch$,
)
.filter(arr => {
const isPolling = arr[1];
return isPolling;
})
.map(arr => {
const result = arr[0];
return result.message;
});
解释一下上面的代码:
combine 操作,是把若干个流组合起来,以各自最后一个值,形成数组,所以,组合之后的流,每个值都是一个数组,数组的第一个值是请求结果,第二个值是开关
filter 操作,丢弃所有开关关闭状态时候的值
map 操作,把留下的结果的 message 取出,即为符合我们预期的值
整个过程的流转关系,可以画一个图如下:
periodic$ -> request$ -> imgUrl$ -> |
| -> combine$ -> filter$ -> result$
switch$ -> |
整个视图之外的完整的逻辑如下:
import * as React from 'react';
import xs, { Stream } from 'xstream';
import { DogView } from '../DogView';
import request from '../../service/request';
const endPointURL = 'https://dog.ceo/api/breeds/image/random';
interface IAppState {
imgUrl: string;
}
export default class App extends React.Component<{}, IAppState> {
public state: IAppState = {
imgUrl: 'https://images.dog.ceo/breeds/puggle/IMG_114654.jpg',
};
private switch$: Stream<boolean> = xs.create();
private poll$: Stream<string>;
public componentWillMount() {
const request$ = xs.periodic(1000)
.map(_ => xs.fromPromise(request(endPointURL)))
.flatten();
this.poll$ = xs.combine(
request$,
this.switch$,
)
.filter(arr => {
const isPolling = arr[1];
return isPolling;
})
.map(arr => {
const result = arr[0];
return result.message;
});
this.poll$.addListener({
next: (imgUrl) => this.setState({
imgUrl,
}),
})
}
public render() {
return (
<DogView
onClickFetchImg={this.onClickFetchImg}
onStartPolling={this.onStartPolling}
onStopPolling={this.onStopPolling}
dogImgURL={this.state.imgUrl}
/>
);
}
private onClickFetchImg = async () => {
const result = await request(endPointURL);
this.setState({
imgUrl: result.message,
});
}
private onStartPolling = () => {
// 偷懒起见,这里可以用 _n,比较正式一点的话,这里可以造一个 producer,或者拿这个按钮的事件来形成新的流
this.switch$._n(true);
}
private onStopPolling = () => {
this.switch$._n(false);
}
}
从这段代码中,我们可以看到,流式编程的简洁性与高度抽象性,并且,它在工程上可以达到一种平衡,也就是:
对普通的 crud 代码,一次性的请求,还是用 async-await 去解决,不增加额外的负担
对复杂场景,是一种对代码结构侵入很小的模式,并且,对 TypeScript 的支持非常好,而且其内部实现不依赖于 JavaScript 的高级语法特性