![image.png](https://cdn.nlark.com/yuque/0/2022/png/10390466/1644916977976-a35971fb-9b65-4cdc-8095-5c8abde35776.png?x-oss-process=image/format,png#clientId=u278a0a82-1d7f-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=510&id=u738f5486&margin=%5Bobject%20Object%5D&name=image.png&originHeight=510&originWidth=900&originalType=binary&ratio=1&rotation=0&showTitle=false&size=1377826&status=done&style=none&taskId=ua1671c9d-b6ba-4def-ab61-f45687030c6&title=&width=900)
1 「创建类」操作符
前面章节都是通过new来创建Observable对象,RxJS提供了很多「创建类」操作符,应该尽量使用「创建类」操作符,避免使用 Observalbe构造函数。
2 创建同步数据流
2.1 create
本质上是返回了一个 new Observable(subscribe) 创建的对象。
2.2 of
创建制定数据集合的 Observable 对象,让代码更简单。
2.3 range
创建一个连续数字序列的 Observable对象。Observable.range(1,100)
2.4 generate
循环创建 Observable 对象。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/generate';
/*
const result = [];
for (let i=2; i<10; i+=2) {
result.push(i*i);
}
console.log(result);
*/
const stream$ = Observable.generate(
2,
value => value < 10,
value => value + 2,
value => value * value
);
stream$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.5 repeat
创建重复数据的数据流,属于 实例操作符。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/repeat';
const source$ = Observable.of(1, 2, 3);
const repeated$= source$.repeat(10);
repeated$.subscribe(
console.log,
null,
() => console.log('complete')
);
上例相对简单repeat的上游是 $source,如果上游是开发者自己控制的 Observable 对象,subscriber 必须明确传递 cmplete 的实现,否则 repeat 时,无法判断上一次是否结束了。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/operator/repeat';
const source$ = Observable.create(observer => {
console.log('on subscribe');
setTimeout(() => observer.next(1), 1000);
setTimeout(() => observer.next(2), 2000);
setTimeout(() => observer.next(3), 3000);
setTimeout(() => observer.complete(), 4000);
return {
unsubscribe: () => {
console.log('on unsubscribe');
}
}
});
const repeated$ = source$.repeat(2);
//setInterval(() => console.log(), 50000);
repeated$.subscribe(
console.log,
null,
() => console.log('complete')
);
2.6 三个极简操作符
- empty,直接完结,没有参数,不产生任何数据 |
- trhow,什么都不做,直接出错,给下游传递一个Error对象 x
- never,不吐出数据,也不完结,也不产生错误,一直到永远 ————-
3. 创建异步数据的Observable对象
RxJS 提供的操作符就是让开发者在日常尽量不要考虑时间因素。
3.1 interval和timer
interval和timer的地位等同于原生JavaScript的setInterval和setTimeout,但功能并不完全一样。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/interval';
const source$ = Observable.interval(1000);
source$.subscribe(
console.log,
null,
() => console.log('complete')
);
每间隔1秒突出一个数据,从0开始,如果想从更大的数值开始,可以跟map结合使用,将返回结果+1。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
const source$ = Observable.timer(2000, 1000);
source$.subscribe(
console.log,
null,
() => console.log('complete')
);
第一个参数指定了多久后吐出第1个数据(仍旧从0开始),如果第二个参数没指定,数据流就结束了;第二个参数是间隔时间,指定了回继续吐出数据。如果两个参数一样,等同于使用interval传递一个参数。
3.2 from
from 是包容性极强的一个操作符,可以把所有像Observable对象的参数(几乎是任何对象)转化为Observable对象。例如:数字、类数组(例如arguments)、generator、字符串(会被拆开成多个字符吐出)。
3.3 fromPromise
参数是Promise对象, 如果Promise成功结束,from产生的Observable对象会立即吐出Promise成功后的数据。
3.4 fromEvent
<!doctype html>
<html>
<body>
<div>
<button id="clickMe">Click Me</button>
<div id="text">0</div>
</div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
let clickCount = 0;
const event$ = Rx.Observable.fromEvent(document.querySelector('#clickMe'), 'click');
event$.subscribe(
() => {
document.querySelector('#text').innerText = ++clickCount
}
);
</script>
</body>
</html>
import {Observable} from 'rxjs/Observable';
import EventEmitter from 'events';
import 'rxjs/add/observable/fromEvent';
const emitter = new EventEmitter();
const source$ = Observable.fromEvent(emitter, 'msg');
source$.subscribe(
console.log,
error => console.log('catch', error),
() => console.log('complete')
);
emitter.emit('msg', 1);
emitter.emit('msg', 2);
emitter.emit('another-msg', 'oops');
emitter.emit('msg', 3);
上述例子输出1、2、3。唯一特殊的是,fromEvent 产生的是 hot Observable对象,没有subscribe之前emit的数据是 不会被接收到的。
3.5 fromEventPattern
fromEvent 要求数据源表现得像是浏览器的DOM 或者 Node.js的EventEmitter,如果数据源并不按照这样的方式产生数据,需要使用灵活度更高的操作符。
import {Observable} from 'rxjs/Observable';
import EventEmitter from 'events';
import 'rxjs/add/observable/fromEventPattern';
const emitter = new EventEmitter();
const addHandler = (handler) => {
emitter.addListener('msg', handler);
};
const removeHandler = (handler) => {
emitter.removeListener('msg', handler);
}
const source$ = Observable.fromEventPattern(addHandler, removeHandler);
const subscription = source$.subscribe(
console.log,
error => console.log('catch', error),
() => console.log('complete')
);
emitter.emit('msg', 'hello');
emitter.emit('msg', 'world');
subscription.unsubscribe();
emitter.emit('msg', 'end');
在第一个参数重,将长生的数据交给handler;第二个参数负责unsbuscribe所必须的处理。
3.6 ajax
可以根据AJAX请求的返回结果产生Observable对象。
<!doctype html>
<html>
<body>
<div>
<button id="getStar">Get RxJS Star Count</button>
<div id="text"></div>
</div>
<script src="https://unpkg.com/rxjs@5.5.2/bundles/Rx.min.js"></script>
<script>
Rx.Observable.fromEvent(
document.querySelector('#getStar'),
'click'
).subscribe(
() => {
Rx.Observable.ajax('https://api.github.com/repos/ReactiveX/rxjs', {responseType: 'json'}).
subscribe(value => {
const starCount = value.response.stargazers_count;
document.querySelector('#text').innerText = starCount;
});
}
);
</script>
</body>
</html>
3.7 repeatWhen
在repeat的重复订阅的基础上,增加了反复订阅的间隔时间。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/repeat';
const source$ = Observable.of(1, 2, 3);
const repeated$= source$.repeat(10);
repeated$.subscribe(
console.log,
null,
() => console.log('complete')
);
3.8 defer
尽早创建Observable代理(Proxy),但是不做分配资源的工作,只在被订阅的时候,才会常见真正占用资源的Observable。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/defer';
import 'rxjs/add/observable/of';
const observableFactory = () => Observable.of(1, 2, 3);
const source$ = Observable.defer(observableFactory);
source$.subscribe(console.log);
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/defer';
import fetch from 'node-fetch';
const observableFactory = () => fetch('https://api.github.com/repos/ReactiveX/rxjs');
const source$ = Observable.defer(observableFactory);
source$.subscribe(console.log);