本章介绍的转化处理会让管道中的数据发生变化。在本章中我们还会介绍回压控制的另外一种方式,无损的回压力控制。
1. 转化类操作符
RxJS中除了使用创建类操作符作为数据源头外,解决大多数问题都是使用 合并、过滤、转化类操作符。
对数据的转化可以分为两种:
- 对每个数据进行转化 x => f(x),f为转化函数
步转化单个数据,而是对数据进行组合,例如 A,B,C => [A,B,C],无损回压控制就属于这一种
2. 映射数据
2.1 map
在JavaScript中就有这么一个函数,它的两个区别在于:
可以接收异步产生的数据
- 可以接收第二个参数作为context(但是不建议使用,因为使用了之后就不是纯函数了) ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/of’; import ‘rxjs/add/operator/map’;
const source$ = Observable.of(3, 1, 4);
const context = {separator: ‘:’};
const mapFunc = (function (separator) {
return function(value, index) {
return ${value} ${separator} ${index}
;
};
})(context.separator)
const result$ = source$.map(mapFunc);
result$.subscribe(
console.log,
null,
() => console.log(‘complete’)
);
<a name="kpiTS"></a>
## 2.2 mapTo
不论上游数据是什么,都返回同一个数据,可以用map来实现
```json
Observable.prototype.mapTo = function (value) {
return this.map(x => value);
};
2.3 pluck
单独从结构中提取某个字段,下例中如果参数只有name,可以拿到name;如果嵌套属性看不到,则返回undefined:
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/pluck';
const source$ = Observable.of(
{name: 'RxJS', version: 'v4'},
{name: 'React', version: 'v15'},
{name: 'React', version: 'v16'},
{name: 'RxJS', version: 'v5'}
);
const result$ = source$.pluck('name', 'what', 'else');
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
3. 缓存窗口:无损回压控制
无损回压控制把上游在一段时间内产生的数据放到一个数据集合里,可以是一个数组,操作符以buffer开头;也可以是以一个Observable对象,操作符以window开头。buffer的操作符和window的操作符完全对应。
3.1 windowTime 和 bufferTime
函数指定了缓存窗口的间隔。
windowTime产生的是对象,数据不需要延迟;而bufferTime产生的是数组,需要缓存(等待)上游的数据。
第二个参数指定了多少时间内产生一个间隔(不传地话代表和第一个参数相同)。
第三个参数指定了每个缓存区间容纳的最多的数据个数。
3.2 windowCount 和 bufferCount
函数指定了缓存区块中的数据个数,第二个参数是注定了多少个数据后,在开启一个窗口。
下面例子是第一个参数和第二个参数不一致的时候的弹珠图:
//run this file after babel. Otherwise the timing might not be correct.
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/windowCount';
const source$ = Observable.timer(0, 100);
const result$ = source$.windowCount(4);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);d
3.3 windowWhen 和 bufferWhen
函数传递一个函数closingWhen,它返回一个Observable对象,一吐出数据就关闭数据流,重新下一个区块。因为closingWhen控制的时机和上游的数据没有任何关系,因此整个函数较为鸡肋。
//run this file after babel. Otherwise the timing might not be correct.
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/bufferWhen';
const source$ = Observable.timer(0, 100);
const closingSelector = () => {
return Observable.timer(400);
};
const result$ = source$.bufferWhen(closingSelector);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
3.4 windowToggle 和 bufferToggle
第一个参数:执行的时间区间划分了缓冲区间,执行的返回数据作为第二个函数的参数。
第二个参数:根据第一个参数(无所谓什么值)来决定关闭,缓冲区。
例如下面例子,没400秒吐出一个数据,第一次吐出0,第二次吐吐出;0的时候将缓冲去在200毫秒后关闭,1点时候将缓存区在100毫秒后关闭。结果就是第一段400毫秒吐出[1,2],第二段400毫秒内吐出[5],第一段800毫秒吐出[8,9]。
//run this file after babel. Otherwise the timing might not be correct.
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/windowToggle';
const source$ = Observable.timer(0, 100);
const openings$ = Observable.timer(0, 400);
const closingSelector = value => {
console.log('#enter closingSelector', value);
return value % 2 === 0 ? Observable.timer(200) : Observable.timer(100);
};
const result$ = source$.windowToggle(openings$, closingSelector);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
3.5 window和buffer
函数传递一个$notifier,产生一个数据后,切断上一个缓存区间,开始下一个缓存区间。下面例子中,第二个参数是400,数据流永远不会结束,如果将第二个参数去除,那么$result的弹珠图回在400ms就结束。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/window';
const source$ = Observable.timer(0, 100);
const notifer$ = Observable.timer(400, 400);
const result$ = source$.window(notifer$);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
4. 高阶的map
4.1 concatMap
第一个Observable对象产生的数据完结后,第二个内部Observable对象才会被订阅。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/concatMap';
const source$ = Observable.interval(100);
const result$ = source$.concatMap(
(value, index) => {
console.log('#enter func');
return Observable.interval(100).take(5);
}
);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
下面是一个简介的用于处理拖拽功能的例子(takeUtil接收一个notifier用于关闭数据流):
<!doctype html>
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=">
<style type="text/css">
body {
position: relative;
width: 100%;
height: 100%;
}
#box {
position: absolute;
width: 100px;
height: 100px;
border: 1px solid black;
}
</style>
</head>
<body>
<div id="box"></div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
const box = document.querySelector('#box');
const mouseDown$ = Rx.Observable.fromEvent(box, 'mousedown');
const mouseUp$ = Rx.Observable.fromEvent(box, 'mouseup');
const mouseOut$ = Rx.Observable.fromEvent(box, 'mouseout');
const mouseMove$ = Rx.Observable.fromEvent(box, 'mousemove');
const drag$ = mouseDown$.concatMap((startEvent)=> {
const initialLeft = box.offsetLeft;
const initialTop = box.offsetTop;
const stop$ = mouseUp$.merge(mouseOut$);
return mouseMove$.takeUntil(stop$).map(moveEvent => {
return {
x: moveEvent.x - startEvent.x + initialLeft,
y: moveEvent.y - startEvent.y + initialTop,
};
});
});
drag$.subscribe(event => {
box.style.left = event.x + 'px';
box.style.top = event.y + 'px';
});
</script>
</body>
</html>
4.2 mergeMap
内部Observable对象的数据,来一个给下游传递一个,不做任何等待。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/mergeMap';
const source$ = Observable.interval(200).take(2);
const result$ = source$.mergeMap(
(value, index) => {
console.log('#enter func');
return Observable.interval(100).take(5);
}
);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
mergeMap能够解决异步操作的问题,典型的就是处理ajax请求。
<!doctype html>
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=">
<style type="text/css">
</style>
</head>
<body>
<button id="send">Call API</button>
<div id="result"></div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
const apiUrl = 'https://api.github.com/repos/ReactiveX/rxjs';
const sendButton = document.querySelector('#send');
Rx.Observable.fromEvent(sendButton, 'click').mergeMap(() => {
return Rx.Observable.ajax(apiUrl);
}).subscribe(result => {
const count = result.response.stargazers_count;
document.querySelector('#result').innerText = count;
});
</script>
</body>
</html>
4.3 switchMap
前一节中,后发送的请求,数据回来可能在前面。switchMap中,后产生的内部Observable对象优先级总是更高,只要有新的Observable对象产生,立即退订之前的内部Observable,改为从最新的内部Observable对象拿数据。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/switchMap';
const source$ = Observable.interval(100).take(2);
0 1
0 1
0 1
0 1
const result$ = source$.switchMap(
(value, index) => {
console.log('#enter func');
return Observable.interval(100).take(5);
}
);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
4.4 exhaustMap
和switchMap相反,exhaustMap实现是,先返回的Observable对象除非数据完结,否则不会调用后续的project产生新的Observable对象。
4.5 高阶的mapTo
- concactMapTo
- mergeMapTo
- switchMapTo
参数直接是Observable对象,而非返回Observable对象的函数。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/concatMapTo';
const intervalStream$ = Observable.interval(100);
const innerStream$ = Observable.interval(1000).take(3);
//const innerStream$ = Observable.of(1, 2, 3);
const downStream$ = intervalStream$.concatMapTo(
innerStream$
);
downStream$.subscribe(
console.log,
null,
() => console.log('complete')
);
4.6 expand
expand 产生的数据除了给下游,又回当作新数据从自己的上游吐出,从而导致循环(除非限定结束条件)。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/expand';
const source$ = Observable.of(1);
const result$ = source$.expand(
(value, index) => Observable.of(value * 2).delay(100)
);
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
5. 数据分组
5.1 groupBy
根据第一个参数的返回值作为分组依据,将数据分组,它的返回是一个GroupedObservable对象实例。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/groupBy';
const source$ = Observable.interval(1000);
const groupBy$ = source$.groupBy(
x => x % 2
);
groupBy$.subscribe(
console.log,
null,
() => console.log('complete')
);
在网页中,可以利用groupBy对不同类型按钮的点击事件进行分组。
<!doctype html>
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=">
<style type="text/css">
body {
position: relative;
width: 100%;
height: 100%;
}
#box {
position: absolute;
width: 100px;
height: 100px;
border: 1px solid black;
}
</style>
</head>
<body>
<button class="foo">Button One</button>
<button class="foo">Button Two</button>
<button class="bar">Button Three</button>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
const fooEventHandler = event => console.log('Foo', event);
const barEventHandler = event => console.log('Bar', event);
const click$ = Rx.Observable.fromEvent(document, 'click');
const groupByClass$ = click$.groupBy(event => event.target.className);
groupByClass$.filter(value => value.key === 'foo')
.mergeAll()
.subscribe(fooEventHandler);
groupByClass$.filter(value => value.key === 'bar')
.mergeAll()
.subscribe(barEventHandler);
</script>
</body>
</html>
5.2 partition
传递给它的函数返回true时的数据放入一个Observable对象,否则放入另一个Observable对象。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/partition';
const source$ = Observable.timer(0, 100);
const [even$, odd$] = source$.partition(x => x % 2 === 0);
even$.subscribe(value => console.log('even:', value));
odd$.subscribe(value => console.log('odd:', value));
6. 累计数据
6.1 scan
scan和reduce的区别在于,reduce必须上游所有数据产生完毕后,才会产生数据;而scan可以处理永不完结的上游数据。scan是构建交互式应用程序最重要的一个操作符,因为能够维持应用的当前状态,也可以根据流持续更新这些状态。有了它,我们就不需要一个全局变量来维持应用状态,因为状态隐藏在了每一次掉用scan之中。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/scan';
const source$ = Observable.interval(100);
const result$ = source$.scan((accumulation, value) => {
return accumulation + value;
});
result$.subscribe(
console.log,
null,
() => console.log('complete')
);
6.2 mergeScan
官方对这个操作符介绍极少,有人提议删除这个操作符。书中倒是有具体介绍,此处略去。