函数响应式流库探秘 - 图1

这是上一篇 怎样按触发顺序执行异步任务 的引文,感谢阅读。。


xstream是专门为cycle.js定制开发的函数响应式流库(functional reactive stream library)。
它很简洁,只提供了Stream,Listener,Producer,MemoryStream四个概念。
我们先来学习xstream,然后再挖掘流(stream)与CPS的关系。

1. xstream的用法

(1)流(stream)

流可以看做一个事件流,流上面可以绑定多个监听器,
当流中某事件发生的时,会自动广播。

有了流之后,我们就可以对流的整体进行操作了。
在xstream中对流进行变换,是通过operator实现的,
operator处理一个或多个流,返回一个新的流。

  1. let stream2=stream1.map(/*...*/);
  2. let stream3=stream2.filter(/*...*/);

如上,mapfilter就是operator

(2)监听器(listener)

监听器用于处理当前发生的事件,时刻接受流中对所发生事件的广播。
在xstream中,监听器是一个包含nexterrorcomplete方法的对象,
流中每次事件发生,都会自动调用监听器的next方法,
流中有错误发生时,会调用error方法,
整个流停止,不再有事件发生时,调用complete方法。

  1. let listener={
  2. next:val=>{/*...*/},
  3. error:err=>{/*...*/},
  4. complete:()=>{/*...*/}
  5. };

(3)生产者(producer)

生产者用来生成流。
它是一个包含startstop方法的对象,用于表示流的开始和终止。
start函数中会使用listener,因此,listenernext方法实际上是在这里调用的。

  1. import xs from 'xstream';
  2. let producer={
  3. start(listener){
  4. // listener.next(/*...*/)
  5. },
  6. stop(){/*...*/}
  7. };
  8. let stream=xs.create(producer);

(4)有记忆的流(MemoryStream)

有记忆的流,和普通的流在operator方面和listener方面并无二致,
唯一不同的是,有记忆的流可以将当前事件中的值传给下一个事件。
(这里对主题帮助不大,我们暂且略过

2. 例子

我们学习了xstream的API,现在终于可以看到它的全貌了,

  1. import xs from 'xstream';
  2. let producer = {
  3. start: listener => {
  4. let i = 0;
  5. while (++i) {
  6. if (i > 10) {
  7. break;
  8. }
  9. listener.next(i);
  10. }
  11. },
  12. stop: () => { }
  13. };
  14. let stream1 = xs.create(producer);
  15. let stream2 = stream1.map(x => x * 2);
  16. stream2.addListener({
  17. next: val => console.log(val),
  18. error: val => { },
  19. complete: () => { }
  20. });

最后结果会输出从220的偶数。

3. CPS

我们看到实际上是在流中调用了listener,即通过listener.next(i)广播了i
然后,流经历了一系列的变换,导致流广播的值发生了改变,
体现到最后的listener中,接收的值就不是最开始的i了,
而是i经历了x=> x*2之后的值i*2

(1)对流进行抽象

认识到问题的本质后,我们可以将流看成以下形式,

  1. let stream = cont => {
  2. let i = 0;
  3. while (++i) {
  4. if (i > 10) {
  5. break;
  6. }
  7. cont(i);
  8. }
  9. }

其中,cont表示continuation
(continuation的话题比较大,这里不影响阅读,暂略

(2)挂载listener

然后我们先不考虑对流进行变换,我们直接模拟挂载listener的场景,

  1. stream(x => console.log(x));

好了,这个时候,实际上我们是将流的continuation传给了它,
结果自然是输出从110的数字了。

(3)对流进行变换

我们怎样对流进行变换呢,
实际上,我们需要做的就是将一个流变成另一个流,
或者说白了,就是改变cont,然后进行传递(CPS

这可能比较晦涩难懂,我们直接看例子吧,模拟一下x=>x*2
(这是可以运行的

  1. let stream1 = cont => {
  2. let i = 0;
  3. while (++i) {
  4. if (i > 10) {
  5. break;
  6. }
  7. cont(i);
  8. }
  9. };
  10. let stream2 = cont => {
  11. let newCont = v => cont(v * 2);
  12. stream1(newCont);
  13. };
  14. // 简写为
  15. // let stream2 = cont => stream1(v => cont(v * 2));
  16. stream2(x=>console.log(x));

(4)实现mapfiltermerge

我们来尝试实现xstream中几个常用的operator,它们都返回一个新的流。

  1. //map是对流中的每个值进行变换
  2. let map = function (fn) {
  3. let stream = this;
  4. return cont => stream(x => cont(fn(x)));
  5. };
  6. let stream2 = map.call(stream1, x => x * 2);
  7. //filter是对流中的值进行过滤
  8. let filter = function (fn) {
  9. let stream = this;
  10. return cont => stream(x => fn(x) && cont(x));
  11. };
  12. let stream3 = filter.call(stream1, x => x % 2 != 0);
  13. //merge是合并两个流
  14. let merge = function (otherStream) {
  15. let stream = this;
  16. return cont => {
  17. stream(cont);
  18. otherStream(cont);
  19. };
  20. };
  21. let stream4 = merge.call(stream2, stream3);

4. 总结与展望

xstream采用了流的概念,实现了事件源与事件处理逻辑的分离,
而且,对流的变换都是一些纯函数,组合起来更方便,
因此成就了cycle.js这个优美的框架,从而MVI全新的架构模式破土而出,
这一切,一定会在人机交互界面的解决方案上开启新的篇章啊。


参考

xstream
Cycle.js Document