之前,在 怎样按触发顺序执行异步任务 里,介绍了流的一些基本概念,在 https://github.com/frontend9/fe9-library/issues/25 中,又列举了一种轮询业务场景的实现,本文试图用这种理念去实现同样的轮询功能。本文的例子使用 xstream.js 来编写。

    想要使用流的理念来实现业务功能,最重要的就是思维的抽象,比较直接的方法论是:

    寻找一切变更的源头,理清事情之间的顺序和依赖关系。

    在定时轮询整个场景中,我们可以发现,一切事情的起源是定时器,只有这个来源是不依赖于任何其他东西的,因此,我们得到了第一个数据流:

    1. const periodic$ = xs.periodic(1000);

    这样,就得到了一个每秒发生一次的流。

    然后,这个流导致什么事情呢?

    发送一个HTTP请求,并且取回结果。

    并且,我们可以注意到,这个操作是由前一个流触发的,每一次定时器的变动都会触发一次请求,因此,可以进行这么一个映射:

    timer => request

    所以,可以得到以下代码:

    1. const request$ = periodic$
    2. .map(_ => xs.fromPromise(request(endPointURL)))

    此处,我们把一个普通的请求转化为了流,这个流里面实际上最多只会有一个值,也就是请求结果。

    需要注意的是,经过我们上面的操作,数据流的形态已经变成二阶了,也就是说,request$ 中的每个元素,都又是一个流(从 Promise 转化出来的流)

    想要得到我们预期的效果,就必须对这个流降阶,最简单的方式就是 flatten:

    1. const imageUrl$ = request$.flatten();

    降阶的意义是把高阶流的值提出来,并且合并到一个低阶流。刚才我们从 Promise 转出来的流,实际上每个里面只会有一个结果,本次降阶的含义大致类似于:

    1. [[result1], [result2], [result3]] => [result1, result2, result3]

    至此,我们只需直接订阅这个 imageUrl$,就可以持续不断地从其中获得新的图片地址了。

    回顾整个过程,我们好像是建立了一条线路,或者一条管道,使得数据可以在其中流通。

    但我们的需求还要更复杂一些,因为它是要允许通过一个开关,来控制定时拉取操作的启用与否。

    那么,我们怎么才能把开关的逻辑加进去呢?

    加开关的思路是在刚才的管道上加个阀门。

    看看刚才的线路:

    1. 定时器 -> 请求 -> 请求结果

    这个阀门加在哪里呢,很显然是加在请求和请求结果之间,因为我们的逻辑是:当开关关闭的时候,即使有还在发的请求,它的结果我们也不要了,所以在结果这里处理是比较合适的。

    构造一个开关:

    1. const switch$: Stream<boolean> = xs.create();

    然后,把它跟之前的请求流组合起来:

    1. const result$ = xs.combine(
    2. imageUrl$,
    3. switch$,
    4. )
    5. .filter(arr => {
    6. const isPolling = arr[1];
    7. return isPolling;
    8. })
    9. .map(arr => {
    10. const result = arr[0];
    11. return result.message;
    12. });

    解释一下上面的代码:

    • combine 操作,是把若干个流组合起来,以各自最后一个值,形成数组,所以,组合之后的流,每个值都是一个数组,数组的第一个值是请求结果,第二个值是开关

    • filter 操作,丢弃所有开关关闭状态时候的值

    • map 操作,把留下的结果的 message 取出,即为符合我们预期的值

    整个过程的流转关系,可以画一个图如下:

    1. periodic$ -> request$ -> imgUrl$ -> |
    2. | -> combine$ -> filter$ -> result$
    3. switch$ -> |

    整个视图之外的完整的逻辑如下:

    1. import * as React from 'react';
    2. import xs, { Stream } from 'xstream';
    3. import { DogView } from '../DogView';
    4. import request from '../../service/request';
    5. const endPointURL = 'https://dog.ceo/api/breeds/image/random';
    6. interface IAppState {
    7. imgUrl: string;
    8. }
    9. export default class App extends React.Component<{}, IAppState> {
    10. public state: IAppState = {
    11. imgUrl: 'https://images.dog.ceo/breeds/puggle/IMG_114654.jpg',
    12. };
    13. private switch$: Stream<boolean> = xs.create();
    14. private poll$: Stream<string>;
    15. public componentWillMount() {
    16. const request$ = xs.periodic(1000)
    17. .map(_ => xs.fromPromise(request(endPointURL)))
    18. .flatten();
    19. this.poll$ = xs.combine(
    20. request$,
    21. this.switch$,
    22. )
    23. .filter(arr => {
    24. const isPolling = arr[1];
    25. return isPolling;
    26. })
    27. .map(arr => {
    28. const result = arr[0];
    29. return result.message;
    30. });
    31. this.poll$.addListener({
    32. next: (imgUrl) => this.setState({
    33. imgUrl,
    34. }),
    35. })
    36. }
    37. public render() {
    38. return (
    39. <DogView
    40. onClickFetchImg={this.onClickFetchImg}
    41. onStartPolling={this.onStartPolling}
    42. onStopPolling={this.onStopPolling}
    43. dogImgURL={this.state.imgUrl}
    44. />
    45. );
    46. }
    47. private onClickFetchImg = async () => {
    48. const result = await request(endPointURL);
    49. this.setState({
    50. imgUrl: result.message,
    51. });
    52. }
    53. private onStartPolling = () => {
    54. // 偷懒起见,这里可以用 _n,比较正式一点的话,这里可以造一个 producer,或者拿这个按钮的事件来形成新的流
    55. this.switch$._n(true);
    56. }
    57. private onStopPolling = () => {
    58. this.switch$._n(false);
    59. }
    60. }

    从这段代码中,我们可以看到,流式编程的简洁性与高度抽象性,并且,它在工程上可以达到一种平衡,也就是:

    • 对普通的 crud 代码,一次性的请求,还是用 async-await 去解决,不增加额外的负担

    • 对复杂场景,是一种对代码结构侵入很小的模式,并且,对 TypeScript 的支持非常好,而且其内部实现不依赖于 JavaScript 的高级语法特性