1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/10390466/1644916977976-a35971fb-9b65-4cdc-8095-5c8abde35776.png?x-oss-process=image/format,png#clientId=u278a0a82-1d7f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=510&id=u738f5486&margin=%5Bobject%20Object%5D&name=image.png&originHeight=510&originWidth=900&originalType=binary&ratio=1&rotation=0&showTitle=false&size=1377826&status=done&style=none&taskId=ua1671c9d-b6ba-4def-ab61-f45687030c6&title=&width=900)

1 「创建类」操作符

前面章节都是通过new来创建Observable对象,RxJS提供了很多「创建类」操作符,应该尽量使用「创建类」操作符,避免使用 Observalbe构造函数。

2 创建同步数据流

对于同步数据流,数据之间的时间间隔不存在。

2.1 create

本质上是返回了一个 new Observable(subscribe) 创建的对象。

2.2 of

创建制定数据集合的 Observable 对象,让代码更简单。

2.3 range

创建一个连续数字序列的 Observable对象。Observable.range(1,100)

2.4 generate

循环创建 Observable 对象。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/generate';
  3. /*
  4. const result = [];
  5. for (let i=2; i<10; i+=2) {
  6. result.push(i*i);
  7. }
  8. console.log(result);
  9. */
  10. const stream$ = Observable.generate(
  11. 2,
  12. value => value < 10,
  13. value => value + 2,
  14. value => value * value
  15. );
  16. stream$.subscribe(
  17. console.log,
  18. null,
  19. () => console.log('complete')
  20. );

2.5 repeat

创建重复数据的数据流,属于 实例操作符

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/repeat';
  4. const source$ = Observable.of(1, 2, 3);
  5. const repeated$= source$.repeat(10);
  6. repeated$.subscribe(
  7. console.log,
  8. null,
  9. () => console.log('complete')
  10. );

上例相对简单repeat的上游是 $source,如果上游是开发者自己控制的 Observable 对象,subscriber 必须明确传递 cmplete 的实现,否则 repeat 时,无法判断上一次是否结束了。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/operator/repeat';
  3. const source$ = Observable.create(observer => {
  4. console.log('on subscribe');
  5. setTimeout(() => observer.next(1), 1000);
  6. setTimeout(() => observer.next(2), 2000);
  7. setTimeout(() => observer.next(3), 3000);
  8. setTimeout(() => observer.complete(), 4000);
  9. return {
  10. unsubscribe: () => {
  11. console.log('on unsubscribe');
  12. }
  13. }
  14. });
  15. const repeated$ = source$.repeat(2);
  16. //setInterval(() => console.log(), 50000);
  17. repeated$.subscribe(
  18. console.log,
  19. null,
  20. () => console.log('complete')
  21. );

2.6 三个极简操作符

  • empty,直接完结,没有参数,不产生任何数据 |
  • trhow,什么都不做,直接出错,给下游传递一个Error对象 x
  • never,不吐出数据,也不完结,也不产生错误,一直到永远 ————-

3. 创建异步数据的Observable对象

RxJS 提供的操作符就是让开发者在日常尽量不要考虑时间因素。

3.1 interval和timer

interval和timer的地位等同于原生JavaScript的setInterval和setTimeout,但功能并不完全一样。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/interval';
  3. const source$ = Observable.interval(1000);
  4. source$.subscribe(
  5. console.log,
  6. null,
  7. () => console.log('complete')
  8. );

每间隔1秒突出一个数据,从0开始,如果想从更大的数值开始,可以跟map结合使用,将返回结果+1。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/timer';
  3. const source$ = Observable.timer(2000, 1000);
  4. source$.subscribe(
  5. console.log,
  6. null,
  7. () => console.log('complete')
  8. );

第一个参数指定了多久后吐出第1个数据(仍旧从0开始),如果第二个参数没指定,数据流就结束了;第二个参数是间隔时间,指定了回继续吐出数据。如果两个参数一样,等同于使用interval传递一个参数。

3.2 from

from 是包容性极强的一个操作符,可以把所有像Observable对象的参数(几乎是任何对象)转化为Observable对象。例如:数字、类数组(例如arguments)、generator、字符串(会被拆开成多个字符吐出)。

3.3 fromPromise

参数是Promise对象, 如果Promise成功结束,from产生的Observable对象会立即吐出Promise成功后的数据。

3.4 fromEvent

  1. <!doctype html>
  2. <html>
  3. <body>
  4. <div>
  5. <button id="clickMe">Click Me</button>
  6. <div id="text">0</div>
  7. </div>
  8. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  9. <script>
  10. let clickCount = 0;
  11. const event$ = Rx.Observable.fromEvent(document.querySelector('#clickMe'), 'click');
  12. event$.subscribe(
  13. () => {
  14. document.querySelector('#text').innerText = ++clickCount
  15. }
  16. );
  17. </script>
  18. </body>
  19. </html>
  1. import {Observable} from 'rxjs/Observable';
  2. import EventEmitter from 'events';
  3. import 'rxjs/add/observable/fromEvent';
  4. const emitter = new EventEmitter();
  5. const source$ = Observable.fromEvent(emitter, 'msg');
  6. source$.subscribe(
  7. console.log,
  8. error => console.log('catch', error),
  9. () => console.log('complete')
  10. );
  11. emitter.emit('msg', 1);
  12. emitter.emit('msg', 2);
  13. emitter.emit('another-msg', 'oops');
  14. emitter.emit('msg', 3);

上述例子输出1、2、3。唯一特殊的是,fromEvent 产生的是 hot Observable对象,没有subscribe之前emit的数据是 不会被接收到的。

3.5 fromEventPattern

fromEvent 要求数据源表现得像是浏览器的DOM 或者 Node.js的EventEmitter,如果数据源并不按照这样的方式产生数据,需要使用灵活度更高的操作符。

  1. import {Observable} from 'rxjs/Observable';
  2. import EventEmitter from 'events';
  3. import 'rxjs/add/observable/fromEventPattern';
  4. const emitter = new EventEmitter();
  5. const addHandler = (handler) => {
  6. emitter.addListener('msg', handler);
  7. };
  8. const removeHandler = (handler) => {
  9. emitter.removeListener('msg', handler);
  10. }
  11. const source$ = Observable.fromEventPattern(addHandler, removeHandler);
  12. const subscription = source$.subscribe(
  13. console.log,
  14. error => console.log('catch', error),
  15. () => console.log('complete')
  16. );
  17. emitter.emit('msg', 'hello');
  18. emitter.emit('msg', 'world');
  19. subscription.unsubscribe();
  20. emitter.emit('msg', 'end');

在第一个参数重,将长生的数据交给handler;第二个参数负责unsbuscribe所必须的处理。

3.6 ajax

可以根据AJAX请求的返回结果产生Observable对象。

  1. <!doctype html>
  2. <html>
  3. <body>
  4. <div>
  5. <button id="getStar">Get RxJS Star Count</button>
  6. <div id="text"></div>
  7. </div>
  8. <script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
  9. <script>
  10. Rx.Observable.fromEvent(
  11. document.querySelector('#getStar'),
  12. 'click'
  13. ).subscribe(
  14. () => {
  15. Rx.Observable.ajax('https://api.github.com/repos/ReactiveX/rxjs', {responseType: 'json'}).
  16. subscribe(value => {
  17. const starCount = value.response.stargazers_count;
  18. document.querySelector('#text').innerText = starCount;
  19. });
  20. }
  21. );
  22. </script>
  23. </body>
  24. </html>

3.7 repeatWhen

在repeat的重复订阅的基础上,增加了反复订阅的间隔时间。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/of';
  3. import 'rxjs/add/operator/repeat';
  4. const source$ = Observable.of(1, 2, 3);
  5. const repeated$= source$.repeat(10);
  6. repeated$.subscribe(
  7. console.log,
  8. null,
  9. () => console.log('complete')
  10. );

3.8 defer

尽早创建Observable代理(Proxy),但是不做分配资源的工作,只在被订阅的时候,才会常见真正占用资源的Observable。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/defer';
  3. import 'rxjs/add/observable/of';
  4. const observableFactory = () => Observable.of(1, 2, 3);
  5. const source$ = Observable.defer(observableFactory);
  6. source$.subscribe(console.log);
  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/defer';
  3. import fetch from 'node-fetch';
  4. const observableFactory = () => fetch('https://api.github.com/repos/ReactiveX/rxjs');
  5. const source$ = Observable.defer(observableFactory);
  6. source$.subscribe(console.log);