
这是上一篇 怎样按触发顺序执行异步任务 的引文,感谢阅读。。
xstream是专门为cycle.js定制开发的函数响应式流库(functional reactive stream library)。
它很简洁,只提供了Stream,Listener,Producer,MemoryStream四个概念。
我们先来学习xstream,然后再挖掘流(stream)与CPS的关系。
1. xstream的用法
(1)流(stream)
流可以看做一个事件流,流上面可以绑定多个监听器,
当流中某事件发生的时,会自动广播。
有了流之后,我们就可以对流的整体进行操作了。
在xstream中对流进行变换,是通过operator实现的,
operator处理一个或多个流,返回一个新的流。
let stream2=stream1.map(/*...*/);let stream3=stream2.filter(/*...*/);
如上,map和filter就是operator。
(2)监听器(listener)
监听器用于处理当前发生的事件,时刻接受流中对所发生事件的广播。
在xstream中,监听器是一个包含next,error,complete方法的对象,
流中每次事件发生,都会自动调用监听器的next方法,
流中有错误发生时,会调用error方法,
整个流停止,不再有事件发生时,调用complete方法。
let listener={next:val=>{/*...*/},error:err=>{/*...*/},complete:()=>{/*...*/}};
(3)生产者(producer)
生产者用来生成流。
它是一个包含start和stop方法的对象,用于表示流的开始和终止。start函数中会使用listener,因此,listener的next方法实际上是在这里调用的。
import xs from 'xstream';let producer={start(listener){// listener.next(/*...*/)},stop(){/*...*/}};let stream=xs.create(producer);
(4)有记忆的流(MemoryStream)
有记忆的流,和普通的流在operator方面和listener方面并无二致,
唯一不同的是,有记忆的流可以将当前事件中的值传给下一个事件。
(这里对主题帮助不大,我们暂且略过
2. 例子
我们学习了xstream的API,现在终于可以看到它的全貌了,
import xs from 'xstream';let producer = {start: listener => {let i = 0;while (++i) {if (i > 10) {break;}listener.next(i);}},stop: () => { }};let stream1 = xs.create(producer);let stream2 = stream1.map(x => x * 2);stream2.addListener({next: val => console.log(val),error: val => { },complete: () => { }});
最后结果会输出从2到20的偶数。
3. CPS
我们看到实际上是在流中调用了listener,即通过listener.next(i)广播了i,
然后,流经历了一系列的变换,导致流广播的值发生了改变,
体现到最后的listener中,接收的值就不是最开始的i了,
而是i经历了x=> x*2之后的值i*2。
(1)对流进行抽象
认识到问题的本质后,我们可以将流看成以下形式,
let stream = cont => {let i = 0;while (++i) {if (i > 10) {break;}cont(i);}}
其中,cont表示continuation。
(continuation的话题比较大,这里不影响阅读,暂略
(2)挂载listener
然后我们先不考虑对流进行变换,我们直接模拟挂载listener的场景,
stream(x => console.log(x));
好了,这个时候,实际上我们是将流的continuation传给了它,
结果自然是输出从1到10的数字了。
(3)对流进行变换
我们怎样对流进行变换呢,
实际上,我们需要做的就是将一个流变成另一个流,
或者说白了,就是改变cont,然后进行传递(CPS
这可能比较晦涩难懂,我们直接看例子吧,模拟一下x=>x*2,
(这是可以运行的
let stream1 = cont => {let i = 0;while (++i) {if (i > 10) {break;}cont(i);}};let stream2 = cont => {let newCont = v => cont(v * 2);stream1(newCont);};// 简写为// let stream2 = cont => stream1(v => cont(v * 2));stream2(x=>console.log(x));
(4)实现map,filter和merge
我们来尝试实现xstream中几个常用的operator,它们都返回一个新的流。
//map是对流中的每个值进行变换let map = function (fn) {let stream = this;return cont => stream(x => cont(fn(x)));};let stream2 = map.call(stream1, x => x * 2);//filter是对流中的值进行过滤let filter = function (fn) {let stream = this;return cont => stream(x => fn(x) && cont(x));};let stream3 = filter.call(stream1, x => x % 2 != 0);//merge是合并两个流let merge = function (otherStream) {let stream = this;return cont => {stream(cont);otherStream(cont);};};let stream4 = merge.call(stream2, stream3);
4. 总结与展望
xstream采用了流的概念,实现了事件源与事件处理逻辑的分离,
而且,对流的变换都是一些纯函数,组合起来更方便,
因此成就了cycle.js这个优美的框架,从而MVI全新的架构模式破土而出,
这一切,一定会在人机交互界面的解决方案上开启新的篇章啊。
