这是上一篇 怎样按触发顺序执行异步任务 的引文,感谢阅读。。
xstream是专门为cycle.js定制开发的函数响应式流库(functional reactive stream library)。
它很简洁,只提供了Stream,Listener,Producer,MemoryStream四个概念。
我们先来学习xstream,然后再挖掘流(stream)与CPS的关系。
1. xstream的用法
(1)流(stream)
流可以看做一个事件流,流上面可以绑定多个监听器,
当流中某事件发生的时,会自动广播。
有了流之后,我们就可以对流的整体进行操作了。
在xstream中对流进行变换,是通过operator
实现的,
operator处理一个或多个流,返回一个新的流。
let stream2=stream1.map(/*...*/);
let stream3=stream2.filter(/*...*/);
如上,map
和filter
就是operator
。
(2)监听器(listener)
监听器用于处理当前发生的事件,时刻接受流中对所发生事件的广播。
在xstream中,监听器是一个包含next
,error
,complete
方法的对象,
流中每次事件发生,都会自动调用监听器的next
方法,
流中有错误发生时,会调用error
方法,
整个流停止,不再有事件发生时,调用complete
方法。
let listener={
next:val=>{/*...*/},
error:err=>{/*...*/},
complete:()=>{/*...*/}
};
(3)生产者(producer)
生产者用来生成流。
它是一个包含start
和stop
方法的对象,用于表示流的开始和终止。start
函数中会使用listener
,因此,listener
的next
方法实际上是在这里调用的。
import xs from 'xstream';
let producer={
start(listener){
// listener.next(/*...*/)
},
stop(){/*...*/}
};
let stream=xs.create(producer);
(4)有记忆的流(MemoryStream)
有记忆的流,和普通的流在operator
方面和listener
方面并无二致,
唯一不同的是,有记忆的流可以将当前事件中的值传给下一个事件。
(这里对主题帮助不大,我们暂且略过
2. 例子
我们学习了xstream的API,现在终于可以看到它的全貌了,
import xs from 'xstream';
let producer = {
start: listener => {
let i = 0;
while (++i) {
if (i > 10) {
break;
}
listener.next(i);
}
},
stop: () => { }
};
let stream1 = xs.create(producer);
let stream2 = stream1.map(x => x * 2);
stream2.addListener({
next: val => console.log(val),
error: val => { },
complete: () => { }
});
最后结果会输出从2
到20
的偶数。
3. CPS
我们看到实际上是在流中调用了listener
,即通过listener.next(i)
广播了i
,
然后,流经历了一系列的变换,导致流广播的值发生了改变,
体现到最后的listener
中,接收的值就不是最开始的i
了,
而是i
经历了x=> x*2
之后的值i*2
。
(1)对流进行抽象
认识到问题的本质后,我们可以将流看成以下形式,
let stream = cont => {
let i = 0;
while (++i) {
if (i > 10) {
break;
}
cont(i);
}
}
其中,cont表示continuation。
(continuation的话题比较大,这里不影响阅读,暂略
(2)挂载listener
然后我们先不考虑对流进行变换,我们直接模拟挂载listener
的场景,
stream(x => console.log(x));
好了,这个时候,实际上我们是将流的continuation
传给了它,
结果自然是输出从1
到10
的数字了。
(3)对流进行变换
我们怎样对流进行变换呢,
实际上,我们需要做的就是将一个流变成另一个流,
或者说白了,就是改变cont
,然后进行传递(CPS
这可能比较晦涩难懂,我们直接看例子吧,模拟一下x=>x*2
,
(这是可以运行的
let stream1 = cont => {
let i = 0;
while (++i) {
if (i > 10) {
break;
}
cont(i);
}
};
let stream2 = cont => {
let newCont = v => cont(v * 2);
stream1(newCont);
};
// 简写为
// let stream2 = cont => stream1(v => cont(v * 2));
stream2(x=>console.log(x));
(4)实现map
,filter
和merge
我们来尝试实现xstream
中几个常用的operator
,它们都返回一个新的流。
//map是对流中的每个值进行变换
let map = function (fn) {
let stream = this;
return cont => stream(x => cont(fn(x)));
};
let stream2 = map.call(stream1, x => x * 2);
//filter是对流中的值进行过滤
let filter = function (fn) {
let stream = this;
return cont => stream(x => fn(x) && cont(x));
};
let stream3 = filter.call(stream1, x => x % 2 != 0);
//merge是合并两个流
let merge = function (otherStream) {
let stream = this;
return cont => {
stream(cont);
otherStream(cont);
};
};
let stream4 = merge.call(stream2, stream3);
4. 总结与展望
xstream采用了流的概念,实现了事件源与事件处理逻辑的分离,
而且,对流的变换都是一些纯函数,组合起来更方便,
因此成就了cycle.js这个优美的框架,从而MVI全新的架构模式破土而出,
这一切,一定会在人机交互界面的解决方案上开启新的篇章啊。