本章介绍的转化处理会让管道中的数据发生变化。在本章中我们还会介绍回压控制的另外一种方式,无损的回压力控制。

1. 转化类操作符

RxJS中除了使用创建类操作符作为数据源头外,解决大多数问题都是使用 合并、过滤、转化类操作符。

对数据的转化可以分为两种:

  1. 对每个数据进行转化 x => f(x),f为转化函数
  2. 步转化单个数据,而是对数据进行组合,例如 A,B,C => [A,B,C],无损回压控制就属于这一种

    2. 映射数据

    2.1 map

    在JavaScript中就有这么一个函数,它的两个区别在于:

  3. 可以接收异步产生的数据

  4. 可以接收第二个参数作为context(但是不建议使用,因为使用了之后就不是纯函数了) ```json import {Observable} from ‘rxjs/Observable’; import ‘rxjs/add/observable/of’; import ‘rxjs/add/operator/map’;

const source$ = Observable.of(3, 1, 4); const context = {separator: ‘:’}; const mapFunc = (function (separator) { return function(value, index) { return ${value} ${separator} ${index}; }; })(context.separator) const result$ = source$.map(mapFunc); result$.subscribe( console.log, null, () => console.log(‘complete’) );

  1. <a name="kpiTS"></a>
  2. ## 2.2 mapTo
  3. 不论上游数据是什么,都返回同一个数据,可以用map来实现
  4. ```json
  5. Observable.prototype.mapTo = function (value) {
  6. return this.map(x => value);
  7. };

2.3 pluck

单独从结构中提取某个字段,下例中如果参数只有name,可以拿到name;如果嵌套属性看不到,则返回undefined:

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/pluck';
  4. const source$ = Observable.of(
  5. {name: 'RxJS', version: 'v4'},
  6. {name: 'React', version: 'v15'},
  7. {name: 'React', version: 'v16'},
  8. {name: 'RxJS', version: 'v5'}
  9. );
  10. const result$ = source$.pluck('name', 'what', 'else');
  11. result$.subscribe(
  12. console.log,
  13. null,
  14. () => console.log('complete')
  15. );

3. 缓存窗口:无损回压控制

无损回压控制把上游在一段时间内产生的数据放到一个数据集合里,可以是一个数组,操作符以buffer开头;也可以是以一个Observable对象,操作符以window开头。buffer的操作符和window的操作符完全对应。

3.1 windowTime 和 bufferTime

函数指定了缓存窗口的间隔。

windowTime产生的是对象,数据不需要延迟;而bufferTime产生的是数组,需要缓存(等待)上游的数据。
第二个参数指定了多少时间内产生一个间隔(不传地话代表和第一个参数相同)。
第三个参数指定了每个缓存区间容纳的最多的数据个数。

3.2 windowCount 和 bufferCount

函数指定了缓存区块中的数据个数,第二个参数是注定了多少个数据后,在开启一个窗口。

下面例子是第一个参数和第二个参数不一致的时候的弹珠图:

  1. //run this file after babel. Otherwise the timing might not be correct.
  2. import {Observable} from 'rxjs/Observable';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/windowCount';
  5. const source$ = Observable.timer(0, 100);
  6. const result$ = source$.windowCount(4);
  7. result$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );d

image.png

3.3 windowWhen 和 bufferWhen

函数传递一个函数closingWhen,它返回一个Observable对象,一吐出数据就关闭数据流,重新下一个区块。因为closingWhen控制的时机和上游的数据没有任何关系,因此整个函数较为鸡肋。

  1. //run this file after babel. Otherwise the timing might not be correct.
  2. import {Observable} from 'rxjs/Observable';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/bufferWhen';
  5. const source$ = Observable.timer(0, 100);
  6. const closingSelector = () => {
  7. return Observable.timer(400);
  8. };
  9. const result$ = source$.bufferWhen(closingSelector);
  10. result$.subscribe(
  11. console.log,
  12. null,
  13. () => console.log('complete')
  14. );

3.4 windowToggle 和 bufferToggle

第一个参数:执行的时间区间划分了缓冲区间,执行的返回数据作为第二个函数的参数。
第二个参数:根据第一个参数(无所谓什么值)来决定关闭,缓冲区。

例如下面例子,没400秒吐出一个数据,第一次吐出0,第二次吐吐出;0的时候将缓冲去在200毫秒后关闭,1点时候将缓存区在100毫秒后关闭。结果就是第一段400毫秒吐出[1,2],第二段400毫秒内吐出[5],第一段800毫秒吐出[8,9]。

  1. //run this file after babel. Otherwise the timing might not be correct.
  2. import {Observable} from 'rxjs/Observable';
  3. import 'rxjs/add/observable/timer';
  4. import 'rxjs/add/operator/windowToggle';
  5. const source$ = Observable.timer(0, 100);
  6. const openings$ = Observable.timer(0, 400);
  7. const closingSelector = value => {
  8. console.log('#enter closingSelector', value);
  9. return value % 2 === 0 ? Observable.timer(200) : Observable.timer(100);
  10. };
  11. const result$ = source$.windowToggle(openings$, closingSelector);
  12. result$.subscribe(
  13. console.log,
  14. null,
  15. () => console.log('complete')
  16. );

image.png

3.5 window和buffer

函数传递一个$notifier,产生一个数据后,切断上一个缓存区间,开始下一个缓存区间。下面例子中,第二个参数是400,数据流永远不会结束,如果将第二个参数去除,那么$result的弹珠图回在400ms就结束。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/window';
  4. const source$ = Observable.timer(0, 100);
  5. const notifer$ = Observable.timer(400, 400);
  6. const result$ = source$.window(notifer$);
  7. result$.subscribe(
  8. console.log,
  9. null,
  10. () => console.log('complete')
  11. );

image.png

image.png

4. 高阶的map

4.1 concatMap

第一个Observable对象产生的数据完结后,第二个内部Observable对象才会被订阅。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/take';
  4. import 'rxjs/add/operator/concatMap';
  5. const source$ = Observable.interval(100);
  6. const result$ = source$.concatMap(
  7. (value, index) => {
  8. console.log('#enter func');
  9. return Observable.interval(100).take(5);
  10. }
  11. );
  12. result$.subscribe(
  13. console.log,
  14. null,
  15. () => console.log('complete')
  16. );

image.png

下面是一个简介的用于处理拖拽功能的例子(takeUtil接收一个notifier用于关闭数据流):

  1. <!doctype html>
  2. <html>
  3. <head>
  4. <meta http-equiv="content-type" content="text/html; charset=">
  5. <style type="text/css">
  6. body {
  7. position: relative;
  8. width: 100%;
  9. height: 100%;
  10. }
  11. #box {
  12. position: absolute;
  13. width: 100px;
  14. height: 100px;
  15. border: 1px solid black;
  16. }
  17. </style>
  18. </head>
  19. <body>
  20. <div id="box"></div>
  21. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  22. <script>
  23. const box = document.querySelector('#box');
  24. const mouseDown$ = Rx.Observable.fromEvent(box, 'mousedown');
  25. const mouseUp$ = Rx.Observable.fromEvent(box, 'mouseup');
  26. const mouseOut$ = Rx.Observable.fromEvent(box, 'mouseout');
  27. const mouseMove$ = Rx.Observable.fromEvent(box, 'mousemove');
  28. const drag$ = mouseDown$.concatMap((startEvent)=> {
  29. const initialLeft = box.offsetLeft;
  30. const initialTop = box.offsetTop;
  31. const stop$ = mouseUp$.merge(mouseOut$);
  32. return mouseMove$.takeUntil(stop$).map(moveEvent => {
  33. return {
  34. x: moveEvent.x - startEvent.x + initialLeft,
  35. y: moveEvent.y - startEvent.y + initialTop,
  36. };
  37. });
  38. });
  39. drag$.subscribe(event => {
  40. box.style.left = event.x + 'px';
  41. box.style.top = event.y + 'px';
  42. });
  43. </script>
  44. </body>
  45. </html>

4.2 mergeMap

内部Observable对象的数据,来一个给下游传递一个,不做任何等待。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/take';
  4. import 'rxjs/add/operator/mergeMap';
  5. const source$ = Observable.interval(200).take(2);
  6. const result$ = source$.mergeMap(
  7. (value, index) => {
  8. console.log('#enter func');
  9. return Observable.interval(100).take(5);
  10. }
  11. );
  12. result$.subscribe(
  13. console.log,
  14. null,
  15. () => console.log('complete')
  16. );

image.png

mergeMap能够解决异步操作的问题,典型的就是处理ajax请求。

  1. <!doctype html>
  2. <html>
  3. <head>
  4. <meta http-equiv="content-type" content="text/html; charset=">
  5. <style type="text/css">
  6. </style>
  7. </head>
  8. <body>
  9. <button id="send">Call API</button>
  10. <div id="result"></div>
  11. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  12. <script>
  13. const apiUrl = 'https://api.github.com/repos/ReactiveX/rxjs';
  14. const sendButton = document.querySelector('#send');
  15. Rx.Observable.fromEvent(sendButton, 'click').mergeMap(() => {
  16. return Rx.Observable.ajax(apiUrl);
  17. }).subscribe(result => {
  18. const count = result.response.stargazers_count;
  19. document.querySelector('#result').innerText = count;
  20. });
  21. </script>
  22. </body>
  23. </html>

4.3 switchMap

前一节中,后发送的请求,数据回来可能在前面。switchMap中,后产生的内部Observable对象优先级总是更高,只要有新的Observable对象产生,立即退订之前的内部Observable,改为从最新的内部Observable对象拿数据。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/take';
  4. import 'rxjs/add/operator/switchMap';
  5. const source$ = Observable.interval(100).take(2);
  6. 0 1
  7. 0 1
  8. 0 1
  9. 0 1
  10. const result$ = source$.switchMap(
  11. (value, index) => {
  12. console.log('#enter func');
  13. return Observable.interval(100).take(5);
  14. }
  15. );
  16. result$.subscribe(
  17. console.log,
  18. null,
  19. () => console.log('complete')
  20. );

image.png

4.4 exhaustMap

和switchMap相反,exhaustMap实现是,先返回的Observable对象除非数据完结,否则不会调用后续的project产生新的Observable对象。

4.5 高阶的mapTo

  • concactMapTo
  • mergeMapTo
  • switchMapTo

参数直接是Observable对象,而非返回Observable对象的函数。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/operator/take';
  5. import 'rxjs/add/operator/concatMapTo';
  6. const intervalStream$ = Observable.interval(100);
  7. const innerStream$ = Observable.interval(1000).take(3);
  8. //const innerStream$ = Observable.of(1, 2, 3);
  9. const downStream$ = intervalStream$.concatMapTo(
  10. innerStream$
  11. );
  12. downStream$.subscribe(
  13. console.log,
  14. null,
  15. () => console.log('complete')
  16. );

4.6 expand

expand 产生的数据除了给下游,又回当作新数据从自己的上游吐出,从而导致循环(除非限定结束条件)。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/operator/delay';
  5. import 'rxjs/add/operator/expand';
  6. const source$ = Observable.of(1);
  7. const result$ = source$.expand(
  8. (value, index) => Observable.of(value * 2).delay(100)
  9. );
  10. result$.subscribe(
  11. console.log,
  12. null,
  13. () => console.log('complete')
  14. );

5. 数据分组

5.1 groupBy

根据第一个参数的返回值作为分组依据,将数据分组,它的返回是一个GroupedObservable对象实例。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/groupBy';
  4. const source$ = Observable.interval(1000);
  5. const groupBy$ = source$.groupBy(
  6. x => x % 2
  7. );
  8. groupBy$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

image.png

在网页中,可以利用groupBy对不同类型按钮的点击事件进行分组。

  1. <!doctype html>
  2. <html>
  3. <head>
  4. <meta http-equiv="content-type" content="text/html; charset=">
  5. <style type="text/css">
  6. body {
  7. position: relative;
  8. width: 100%;
  9. height: 100%;
  10. }
  11. #box {
  12. position: absolute;
  13. width: 100px;
  14. height: 100px;
  15. border: 1px solid black;
  16. }
  17. </style>
  18. </head>
  19. <body>
  20. <button class="foo">Button One</button>
  21. <button class="foo">Button Two</button>
  22. <button class="bar">Button Three</button>
  23. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  24. <script>
  25. const fooEventHandler = event => console.log('Foo', event);
  26. const barEventHandler = event => console.log('Bar', event);
  27. const click$ = Rx.Observable.fromEvent(document, 'click');
  28. const groupByClass$ = click$.groupBy(event => event.target.className);
  29. groupByClass$.filter(value => value.key === 'foo')
  30. .mergeAll()
  31. .subscribe(fooEventHandler);
  32. groupByClass$.filter(value => value.key === 'bar')
  33. .mergeAll()
  34. .subscribe(barEventHandler);
  35. </script>
  36. </body>
  37. </html>

5.2 partition

传递给它的函数返回true时的数据放入一个Observable对象,否则放入另一个Observable对象。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. import 'rxjs/add/operator/partition';
  4. const source$ = Observable.timer(0, 100);
  5. const [even$, odd$] = source$.partition(x => x % 2 === 0);
  6. even$.subscribe(value => console.log('even:', value));
  7. odd$.subscribe(value => console.log('odd:', value));

6. 累计数据

6.1 scan

scan和reduce的区别在于,reduce必须上游所有数据产生完毕后,才会产生数据;而scan可以处理永不完结的上游数据。scan是构建交互式应用程序最重要的一个操作符,因为能够维持应用的当前状态,也可以根据流持续更新这些状态。有了它,我们就不需要一个全局变量来维持应用状态,因为状态隐藏在了每一次掉用scan之中。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. import 'rxjs/add/operator/scan';
  4. const source$ = Observable.interval(100);
  5. const result$ = source$.scan((accumulation, value) => {
  6. return accumulation + value;
  7. });
  8. result$.subscribe(
  9. console.log,
  10. null,
  11. () => console.log('complete')
  12. );

6.2 mergeScan

官方对这个操作符介绍极少,有人提议删除这个操作符。书中倒是有具体介绍,此处略去。