本章介绍的转化处理会让管道中的数据发生变化。在本章中我们还会介绍回压控制的另外一种方式,无损的回压力控制。
1. 转化类操作符
RxJS中除了使用创建类操作符作为数据源头外,解决大多数问题都是使用 合并、过滤、转化类操作符。
对数据的转化可以分为两种:
- 对每个数据进行转化 x => f(x),f为转化函数
步转化单个数据,而是对数据进行组合,例如 A,B,C => [A,B,C],无损回压控制就属于这一种
2. 映射数据
2.1 map
在JavaScript中就有这么一个函数,它的两个区别在于:
可以接收异步产生的数据
- 可以接收第二个参数作为context(但是不建议使用,因为使用了之后就不是纯函数了) ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/of’; import ‘rxjs/add/operator/map’;
const source$ = Observable.of(3, 1, 4);
const context = {separator: ‘:’};
const mapFunc = (function (separator) {
return function(value, index) {
return ${value} ${separator} ${index};
};
})(context.separator)
const result$ = source$.map(mapFunc);
result$.subscribe(
console.log,
null,
() => console.log(‘complete’)
);
<a name="kpiTS"></a>## 2.2 mapTo不论上游数据是什么,都返回同一个数据,可以用map来实现```jsonObservable.prototype.mapTo = function (value) {return this.map(x => value);};
2.3 pluck
单独从结构中提取某个字段,下例中如果参数只有name,可以拿到name;如果嵌套属性看不到,则返回undefined:
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/of';import 'rxjs/add/operator/pluck';const source$ = Observable.of({name: 'RxJS', version: 'v4'},{name: 'React', version: 'v15'},{name: 'React', version: 'v16'},{name: 'RxJS', version: 'v5'});const result$ = source$.pluck('name', 'what', 'else');result$.subscribe(console.log,null,() => console.log('complete'));
3. 缓存窗口:无损回压控制
无损回压控制把上游在一段时间内产生的数据放到一个数据集合里,可以是一个数组,操作符以buffer开头;也可以是以一个Observable对象,操作符以window开头。buffer的操作符和window的操作符完全对应。
3.1 windowTime 和 bufferTime
函数指定了缓存窗口的间隔。
windowTime产生的是对象,数据不需要延迟;而bufferTime产生的是数组,需要缓存(等待)上游的数据。
第二个参数指定了多少时间内产生一个间隔(不传地话代表和第一个参数相同)。
第三个参数指定了每个缓存区间容纳的最多的数据个数。
3.2 windowCount 和 bufferCount
函数指定了缓存区块中的数据个数,第二个参数是注定了多少个数据后,在开启一个窗口。
下面例子是第一个参数和第二个参数不一致的时候的弹珠图:
//run this file after babel. Otherwise the timing might not be correct.import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/timer';import 'rxjs/add/operator/windowCount';const source$ = Observable.timer(0, 100);const result$ = source$.windowCount(4);result$.subscribe(console.log,null,() => console.log('complete'));d

3.3 windowWhen 和 bufferWhen
函数传递一个函数closingWhen,它返回一个Observable对象,一吐出数据就关闭数据流,重新下一个区块。因为closingWhen控制的时机和上游的数据没有任何关系,因此整个函数较为鸡肋。
//run this file after babel. Otherwise the timing might not be correct.import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/timer';import 'rxjs/add/operator/bufferWhen';const source$ = Observable.timer(0, 100);const closingSelector = () => {return Observable.timer(400);};const result$ = source$.bufferWhen(closingSelector);result$.subscribe(console.log,null,() => console.log('complete'));
3.4 windowToggle 和 bufferToggle
第一个参数:执行的时间区间划分了缓冲区间,执行的返回数据作为第二个函数的参数。
第二个参数:根据第一个参数(无所谓什么值)来决定关闭,缓冲区。
例如下面例子,没400秒吐出一个数据,第一次吐出0,第二次吐吐出;0的时候将缓冲去在200毫秒后关闭,1点时候将缓存区在100毫秒后关闭。结果就是第一段400毫秒吐出[1,2],第二段400毫秒内吐出[5],第一段800毫秒吐出[8,9]。
//run this file after babel. Otherwise the timing might not be correct.import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/timer';import 'rxjs/add/operator/windowToggle';const source$ = Observable.timer(0, 100);const openings$ = Observable.timer(0, 400);const closingSelector = value => {console.log('#enter closingSelector', value);return value % 2 === 0 ? Observable.timer(200) : Observable.timer(100);};const result$ = source$.windowToggle(openings$, closingSelector);result$.subscribe(console.log,null,() => console.log('complete'));
3.5 window和buffer
函数传递一个$notifier,产生一个数据后,切断上一个缓存区间,开始下一个缓存区间。下面例子中,第二个参数是400,数据流永远不会结束,如果将第二个参数去除,那么$result的弹珠图回在400ms就结束。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/timer';import 'rxjs/add/operator/window';const source$ = Observable.timer(0, 100);const notifer$ = Observable.timer(400, 400);const result$ = source$.window(notifer$);result$.subscribe(console.log,null,() => console.log('complete'));

4. 高阶的map
4.1 concatMap
第一个Observable对象产生的数据完结后,第二个内部Observable对象才会被订阅。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/operator/take';import 'rxjs/add/operator/concatMap';const source$ = Observable.interval(100);const result$ = source$.concatMap((value, index) => {console.log('#enter func');return Observable.interval(100).take(5);});result$.subscribe(console.log,null,() => console.log('complete'));

下面是一个简介的用于处理拖拽功能的例子(takeUtil接收一个notifier用于关闭数据流):
<!doctype html><html><head><meta http-equiv="content-type" content="text/html; charset="><style type="text/css">body {position: relative;width: 100%;height: 100%;}#box {position: absolute;width: 100px;height: 100px;border: 1px solid black;}</style></head><body><div id="box"></div><script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script><script>const box = document.querySelector('#box');const mouseDown$ = Rx.Observable.fromEvent(box, 'mousedown');const mouseUp$ = Rx.Observable.fromEvent(box, 'mouseup');const mouseOut$ = Rx.Observable.fromEvent(box, 'mouseout');const mouseMove$ = Rx.Observable.fromEvent(box, 'mousemove');const drag$ = mouseDown$.concatMap((startEvent)=> {const initialLeft = box.offsetLeft;const initialTop = box.offsetTop;const stop$ = mouseUp$.merge(mouseOut$);return mouseMove$.takeUntil(stop$).map(moveEvent => {return {x: moveEvent.x - startEvent.x + initialLeft,y: moveEvent.y - startEvent.y + initialTop,};});});drag$.subscribe(event => {box.style.left = event.x + 'px';box.style.top = event.y + 'px';});</script></body></html>
4.2 mergeMap
内部Observable对象的数据,来一个给下游传递一个,不做任何等待。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/operator/take';import 'rxjs/add/operator/mergeMap';const source$ = Observable.interval(200).take(2);const result$ = source$.mergeMap((value, index) => {console.log('#enter func');return Observable.interval(100).take(5);});result$.subscribe(console.log,null,() => console.log('complete'));

mergeMap能够解决异步操作的问题,典型的就是处理ajax请求。
<!doctype html><html><head><meta http-equiv="content-type" content="text/html; charset="><style type="text/css"></style></head><body><button id="send">Call API</button><div id="result"></div><script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script><script>const apiUrl = 'https://api.github.com/repos/ReactiveX/rxjs';const sendButton = document.querySelector('#send');Rx.Observable.fromEvent(sendButton, 'click').mergeMap(() => {return Rx.Observable.ajax(apiUrl);}).subscribe(result => {const count = result.response.stargazers_count;document.querySelector('#result').innerText = count;});</script></body></html>
4.3 switchMap
前一节中,后发送的请求,数据回来可能在前面。switchMap中,后产生的内部Observable对象优先级总是更高,只要有新的Observable对象产生,立即退订之前的内部Observable,改为从最新的内部Observable对象拿数据。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/operator/take';import 'rxjs/add/operator/switchMap';const source$ = Observable.interval(100).take(2);0 10 10 10 1const result$ = source$.switchMap((value, index) => {console.log('#enter func');return Observable.interval(100).take(5);});result$.subscribe(console.log,null,() => console.log('complete'));
4.4 exhaustMap
和switchMap相反,exhaustMap实现是,先返回的Observable对象除非数据完结,否则不会调用后续的project产生新的Observable对象。
4.5 高阶的mapTo
- concactMapTo
- mergeMapTo
- switchMapTo
参数直接是Observable对象,而非返回Observable对象的函数。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/observable/of';import 'rxjs/add/operator/take';import 'rxjs/add/operator/concatMapTo';const intervalStream$ = Observable.interval(100);const innerStream$ = Observable.interval(1000).take(3);//const innerStream$ = Observable.of(1, 2, 3);const downStream$ = intervalStream$.concatMapTo(innerStream$);downStream$.subscribe(console.log,null,() => console.log('complete'));
4.6 expand
expand 产生的数据除了给下游,又回当作新数据从自己的上游吐出,从而导致循环(除非限定结束条件)。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/observable/of';import 'rxjs/add/operator/delay';import 'rxjs/add/operator/expand';const source$ = Observable.of(1);const result$ = source$.expand((value, index) => Observable.of(value * 2).delay(100));result$.subscribe(console.log,null,() => console.log('complete'));
5. 数据分组
5.1 groupBy
根据第一个参数的返回值作为分组依据,将数据分组,它的返回是一个GroupedObservable对象实例。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/operator/groupBy';const source$ = Observable.interval(1000);const groupBy$ = source$.groupBy(x => x % 2);groupBy$.subscribe(console.log,null,() => console.log('complete'));

在网页中,可以利用groupBy对不同类型按钮的点击事件进行分组。
<!doctype html><html><head><meta http-equiv="content-type" content="text/html; charset="><style type="text/css">body {position: relative;width: 100%;height: 100%;}#box {position: absolute;width: 100px;height: 100px;border: 1px solid black;}</style></head><body><button class="foo">Button One</button><button class="foo">Button Two</button><button class="bar">Button Three</button><script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script><script>const fooEventHandler = event => console.log('Foo', event);const barEventHandler = event => console.log('Bar', event);const click$ = Rx.Observable.fromEvent(document, 'click');const groupByClass$ = click$.groupBy(event => event.target.className);groupByClass$.filter(value => value.key === 'foo').mergeAll().subscribe(fooEventHandler);groupByClass$.filter(value => value.key === 'bar').mergeAll().subscribe(barEventHandler);</script></body></html>
5.2 partition
传递给它的函数返回true时的数据放入一个Observable对象,否则放入另一个Observable对象。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/timer';import 'rxjs/add/operator/partition';const source$ = Observable.timer(0, 100);const [even$, odd$] = source$.partition(x => x % 2 === 0);even$.subscribe(value => console.log('even:', value));odd$.subscribe(value => console.log('odd:', value));
6. 累计数据
6.1 scan
scan和reduce的区别在于,reduce必须上游所有数据产生完毕后,才会产生数据;而scan可以处理永不完结的上游数据。scan是构建交互式应用程序最重要的一个操作符,因为能够维持应用的当前状态,也可以根据流持续更新这些状态。有了它,我们就不需要一个全局变量来维持应用状态,因为状态隐藏在了每一次掉用scan之中。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/interval';import 'rxjs/add/operator/scan';const source$ = Observable.interval(100);const result$ = source$.scan((accumulation, value) => {return accumulation + value;});result$.subscribe(console.log,null,() => console.log('complete'));
6.2 mergeScan
官方对这个操作符介绍极少,有人提议删除这个操作符。书中倒是有具体介绍,此处略去。
