操作符

过滤操作符

Take

take接受一个输入流和一个数值 amount 。输出流为输入流的前 amount 个值。
详细情况是:

  • n<amount 时,值会被发送
  • n==amount 时,值会被发送,同时流立即完成。

    TakeLast

    takeLast接受一个输入流和一个数值 amount 。输出流为输入流的后 amount 个值。
    详细情况是:

  • 当输入流完成后,输出流会

    • 发送由输入流发送的后 amount 个值
  • 如果输入流永远不能完成,输出流将一直等待

    TakeWhile

    takeWhile接受一个输入流和一个断言函数 predicate ,当断言函数返回 true 时,将输入流的值发送出去,一旦断言函数返回 false ,不再输出内容,并立即结束。

    TakeUntil

    takeUntil接受一个输入流和一个截止流(这个命令是根据流的作用命令的)。在截止流发送值之前,输入流的所有内容都会被发送到输出流中,一旦截止流发送值。输出流不再接收来自输入流的内容,并且立即完成。

Skip

skip接受一个输入流和一个数值 n ,输出流为输入流前 n 个值之外的值。

SkipLast

skipLast接受一个输入流和一个数值 amount ,当输入流完成之后,输出流会接收最后 amount 之外的所有值。
详细情况是:

  • 当输入流完成后,输出流会接收最后 amount 之外的值。
  • 如果输入流永远不能完成,输出流将一真等待。

    SkipWhile

    skipWhile同样接受一个输入流和一个断言函数 predicate ,当断言函数为 true 时,将忽略输入流的值,一旦断言函数返回 false ,将发送之后的值(无论断言函数返回的是 true 还是 false )。输出流会和输入流一起结束。

    SkipUntil

    skipUntil和takeUntil一样接受两个值,一个输入流和一个截止流,在截止流发送值之前,输出流会直接忽略输入流中的内容,一旦截止流发送内容,输出流将开始接收输入流中的值。

First

first接受一个输入流和一个断言函数 predicate ,输出流为断言函数返回 true 的第一个值。

Last

last 接受一个输入流和一个断言函数 predicate ,输出流为断言函数返回 true 的最后一个值。

ElementAt

elementAt接受两个必要参数输入流和索引值 index 和一个可选参数 defaultValuedefaultValue 的默认值为 undefined 。当输入流完成的时候,输出流会输出输入流中第 index 个值,如果不存在第 index 个值,则输出设置的 defaultValue 值(默认为 undefined ),同时输出流完成。

Filter

filter接受一个输入流和一个断言函数 predicate ,输出流了断言函数返回 true 的所有的值。

IgnoreElements

ignoreElements只接受一个输入流作为参数,在输出流中会忽略输入流中的所有内容,直接发送完成。

Distinct

distinct接受一个输入流,并可以接受两个可选参数,第一个可选参数为 Selector ,即以什么为基准来给输入流去重。默认是 undefined 。当 Selectorundefined 时,去重的基准为发送的值本身。当提供了一个 Selector 函数后,以函数的返回值为基准去重。
示例:
Without Selector :

  1. import { from } from "rxjs";
  2. import { disctinct } from "rxjs/operators";
  3. from([1,2,3,4,4,3,2,5])
  4. .pipe(
  5. distinct()
  6. )
  7. .subscripb(console.log); // 1,2,3,4,5

With Selector :

  1. import { from } from "rxjs";
  2. import { distinct } from "rxjs/operators";
  3. interface Person {
  4. age: number,
  5. name: string
  6. }
  7. from<Person[]>([
  8. {age:4,name:"foo",},
  9. {age:5,name:"bar"},
  10. {age:6,name:"foo"}
  11. ])
  12. .pipe(
  13. distinct((p:Person)=>p.name)
  14. )
  15. .subscribe(console.log); // {age:4,name:"foo"} ,{age:5,name:"bar"}

DisctinctUntilChanged

disctinctUntilChanged接受一个输入流和一个可选参数 compare 函数。
disctinctUntilChanged 会根据比较函数的返回值是不是为 true 来判断,输出流是否要输出该值。

  • 比较函数返回 true ,输出流不发送该值,将该值忽略。
  • 比较函数返回 false , 输出流发送该值。

在没有提供 compare 函数的情况下,会依照js 的规则做相等性比较。根据相等性比较的 truefalse 来确定是否要输出。
示例1:

  1. import { of } from 'rxjs';
  2. import { distinctUntilChanged } from 'rxjs/operators';
  3. of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe(
  4. distinctUntilChanged(),
  5. )
  6. .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4

示例2:

  1. import { of } from 'rxjs';
  2. import { distinctUntilChanged } from 'rxjs/operators';
  3. interface Person {
  4. age: number,
  5. name: string
  6. }
  7. of<Person>(
  8. { age: 4, name: 'Foo'},
  9. { age: 7, name: 'Bar'},
  10. { age: 5, name: 'Foo'},
  11. { age: 6, name: 'Foo'},
  12. ).pipe(
  13. distinctUntilChanged((p: Person, q: Person) => p.name === q.name),
  14. )
  15. .subscribe(x => console.log(x));
  16. // displays:
  17. // { age: 4, name: 'Foo' }
  18. // { age: 7, name: 'Bar' }
  19. // { age: 5, name: 'Foo' }

DistinctUntilKeyChanged

distinctUntilKeyChanged 和 distinctUntikChanged 相比,多了一个必选参数 key
示例1:

  1. import { of } from 'rxjs';
  2. import { distinctUntilKeyChanged } from 'rxjs/operators';
  3. interface Person {
  4. age: number,
  5. name: string
  6. }
  7. of<Person>(
  8. { age: 4, name: 'Foo'},
  9. { age: 7, name: 'Bar'},
  10. { age: 5, name: 'Foo'},
  11. { age: 6, name: 'Foo'},
  12. ).pipe(
  13. distinctUntilKeyChanged('name'),
  14. )
  15. .subscribe(x => console.log(x));
  16. // displays:
  17. // { age: 4, name: 'Foo' }
  18. // { age: 7, name: 'Bar' }
  19. // { age: 5, name: 'Foo' }

示例2:

  1. import { of } from 'rxjs';
  2. import { distinctUntilKeyChanged } from 'rxjs/operators';
  3. interface Person {
  4. age: number,
  5. name: string
  6. }
  7. of<Person>(
  8. { age: 4, name: 'Foo1'},
  9. { age: 7, name: 'Bar'},
  10. { age: 5, name: 'Foo2'},
  11. { age: 6, name: 'Foo3'},
  12. ).pipe(
  13. distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3)),
  14. )
  15. .subscribe(x => console.log(x));
  16. // displays:
  17. // { age: 4, name: 'Foo1' }
  18. // { age: 7, name: 'Bar' }
  19. // { age: 5, name: 'Foo2' }

auditTime

auditTime接受一个输入流和一个 duration 参数。
auditTime 接收到一个源值的时候,auditTime会直接忽略该值以及指定 duration 时间内的所有值,然后向输出流发送出指定时间 duration 中的最新值,重复执行此过程。
示例:

  1. const { interval } = Rx;
  2. const { auditTime } = RxOperators;
  3. interval(1000).pipe(auditTime(1100))

audit

audit接受一个输入流和一个 Observable 对象。
auditthrottle 类似,但不同于 throttle 的是, audit 会发送时间窗口内的最新值,还不是第一次的值。在 audit 接收的 Observable 对象内部维护着一个内容的计时器(timer),当这个内部的计时器启用的时间, audit 会忽略输入流发送的值。在初始态下,这个内部的计时器是禁用的。当输入流的第一个值到达的时候,内部计时器启用。当 Observable 发送值或者完成的时候,内部计时器禁用,并将输入流的最新值发送给输出流。之后重复这个过程。

注意:在每次内部计时器禁用后,都会生成一个新的 Observable

示例:

  1. const { interval,timer,pipe } = Rx;
  2. const { audit,tap } = RxOperators;
  3. function log(name=""){
  4. return pipe(
  5. tap(x=>console.log(`${name} ${x}`))
  6. )
  7. }
  8. interval(1000)
  9. .pipe(
  10. audit(()=>timer(200,400).pipe(log("timer"))),
  11. log("audit")
  12. )

debounceTime

debounceTime接受一个输入流和一个 duration 值。
debounceTime在 duration 中只会发送最新值,在 duration 内如果输入流产生了多个值,那么除了最后的一个值被发送到输出流外,其他的值会直接被忽略。
示例:

  1. import { fromEvent } from 'rxjs';
  2. import { debounceTime } from 'rxjs/operators';
  3. const clicks = fromEvent(document, 'click');
  4. const result = clicks.pipe(debounceTime(1000));
  5. result.subscribe(x => console.log(x));

debounce

debounce授受一个输入流和一个 Observable 对象。
Observable 发送值之前,debounce会一直忽略输入流发送的值,当 Observable 发送了新值,debounce会把输入流的最新值发送给输出流。
示例:

  1. import { fromEvent, interval } from 'rxjs';
  2. import { debounce } from 'rxjs/operators';
  3. const clicks = fromEvent(document, 'click');
  4. const result = clicks.pipe(debounce(() => interval(1000)));
  5. result.subscribe(x => console.log(x));

sampleTime

sampleTime接受一个输入流和一个时间间隔 duration 值。
sampleTime会有指定的时间间隔内对输入流进行采样,sampleTime会把采样中的最新的值发送给输出流,如果本次的采样值和上次发送给输出流的值没有改变,则不发送值。
示例:

  1. import { fromEvent } from 'rxjs';
  2. import { sampleTime } from 'rxjs/operators';
  3. const clicks = fromEvent(document, 'click');
  4. const result = clicks.pipe(sampleTime(1000));
  5. result.subscribe(x => console.log(x));

sample

sample接受一个输入流和一个 Observable 值。
sample会在 Observalbe 发送一个值或完成的时候对输入流进行采样,并把采样中的最新值发送给输出流(如果本次的采样值和上次的采样值相同,则忽略)。
示例:

  1. import { fromEvent, interval } from 'rxjs';
  2. import { sample } from 'rxjs/operators';
  3. const seconds = interval(1000);
  4. const clicks = fromEvent(document, 'click');
  5. const result = seconds.pipe(sample(clicks));
  6. result.subscribe(x => console.log(x));

single

single接受一个输入流和一个断言函数。
single会根据断言函数发送输入流中的唯一值,如果不存在这样的唯一值,那么就发送 undefined 值。如果存在多个符合断言函数的值,那么会抛出错误。
示例1:

  1. import { range } from 'rxjs';
  2. import { single } from 'rxjs/operators';
  3. const numbers = range(1,5).pipe(single());
  4. numbers.subscribe(x => console.log('never get called'), e => console.log('error'));
  5. // result
  6. // 'error'

示例2:

  1. import { range } from 'rxjs';
  2. import { single } from 'rxjs/operators';
  3. const numbers = range(1,5).pipe(single());
  4. numbers.subscribe(x => console.log('never get called'), e => console.log('error'));
  5. // result
  6. // 'error'

throttleTime

throttleTime接受一个输入流和一个时间间隔 duration
在时间间隔 duration 内,throttleTime只接收输入流的第一个值,其余的值会被忽略,直到下一个时间间隔开始,重复此过程。
示例1:

  1. import { fromEvent } from 'rxjs';
  2. import { throttleTime } from 'rxjs/operators';
  3. const clicks = fromEvent(document, 'click');
  4. const result = clicks.pipe(throttleTime(1000));
  5. result.subscribe(x => console.log(x));

示例2:

  1. import { fromEvent, asyncScheduler } from 'rxjs';
  2. import { throttleTime, withLatestFrom } from 'rxjs/operators';
  3. // defaultThottleConfig = { leading: true, trailing: false }
  4. const throttleConfig = {
  5. leading: false,
  6. trailing: true
  7. }
  8. const click = fromEvent(document, 'click');
  9. const doubleClick = click.pipe(
  10. throttleTime(400, asyncScheduler, throttleConfig)
  11. );
  12. doubleClick.subscribe((throttleValue: Event) => {
  13. console.log(`Double-clicked! Timestamp: ${throttleValue.timeStamp}`);
  14. });

throttle

throttle接受一个输入流和另一个 Observable 对象。
throttle和throttleTime的区别在于,对于时间服间的控制不再使用固定的 毫秒作为依据。而是依据一个 Observable 对象。
示例:

  1. import { fromEvent } from 'rxjs';
  2. import { throttle } from 'rxjs/operators';
  3. const clicks = fromEvent(document, 'click');
  4. const result = clicks.pipe(throttle(ev => interval(1000)));
  5. result.subscribe(x => console.log(x));

组合操作符

combineLatest

combineLatest操作符是一个组合操作符,combineLatest操作符接受一个Observable数组,当数组中的所有Observable都发送值的时候,combineLatest操作符会收集每个Observable的最新值发送。只要数组中的任意一个Observable没有发送值,combineLatest就不会发送值。当数组中的所有Observable都完成之后,combineLatest完成。
combineLatest除了可以接收一个Observable数组外,还可以接收第二个resultSelector参数。这个参数是一个函数,这个函数的参数是Observable数组发送的值,返回值是当前combineLatest发送的值。
示例1:
普通的combineLatest:

  1. import {timer,combineLatest} from 'rxjs';
  2. const timerOne = timer(1000,4000);
  3. const timerTwo = timer(2000,4000);
  4. const timerThree = timer(3000,4000);
  5. const combined = combineLatest([timerOne,timerTwo,timerThree]);
  6. const subscription = combined.subscribe(x=>console.log(x));

示例2:
含有resultSelectorcombineLatest:

  1. import {timer,combineLatest} from 'rxjs';
  2. import {take} from 'rxjs/operators';
  3. const timerOne = timer(1000,4000).pipe(take(3));
  4. const timerTwo = timer(2000,4000).pipe(take(3));
  5. const timerThree = timer(3000,4000).pipe(take(2));
  6. const combinedProject = combineLatest<number[],string>(
  7. [timerOne,timerTwo,timerThree],
  8. (one,two,three)=>{
  9. return `Timer One Latest:${one},
  10. Timer Two Latest:${two},
  11. Timer Three Latest:${three}`
  12. }
  13. )
  14. combinedProject.subscribe(x=>console.log(x));

concat

concat操作符接收一系列的Observable作为参数,concat会从第一个Observable开始把发送的值逐一发送出去,直到这个Observable完成,再发送下一个Observable发送的值。
示例:

  1. import { concat, of } from "rxjs";
  2. import { delay } from "rxjs/operators";
  3. const sourceOne = of(1, 2, 3);
  4. const sourceTwo = of(4, 5, 6);
  5. const sourceThree = sourceOne.pipe(delay(3000));
  6. const example = concat(sourceThree, sourceTwo);
  7. example.subscribe((x) => console.log(`Example: Delayed source one:${x}`));

concatAll

concatAll操作符不接收参数。concatAll中作用是把流动到这里的高阶Observable打平为一阶的Observable。打平的方式就是把从第一个高阶的Observable开始,逐一和之后的Observable按顺序组合成一个Observable。
示例:

  1. import { interval } from "rxjs";
  2. import { concatAll, map } from "rxjs/operators";
  3. const samplePromise = (val: number) => new Promise((resolve) => resolve(val));
  4. const source = interval(2000);
  5. const example = source.pipe(
  6. map((x) => samplePromise(x)),
  7. concatAll()
  8. );
  9. const subscription = example.subscribe({
  10. next(value) {
  11. console.log(`Example with Promise:${value}`);
  12. },
  13. });

forkJoin

forkJoin接收一个Observable数组,其作用是在Observable数组中的所有Observable都完成的时候,把Observable数组中的最新值合并发送。
示例:

  1. import { forkJoin, interval, of } from "rxjs";
  2. import { delay, take } from "rxjs/operators";
  3. const myPromise = (val: string) =>
  4. new Promise((resolve) =>
  5. setTimeout(() => resolve(`Promise Resolved:${val}`), 5000)
  6. );
  7. const example = forkJoin([
  8. of("hello"),
  9. of("world").pipe(delay(1000)),
  10. interval(1000).pipe(take(1)),
  11. interval(1000).pipe(take(2)),
  12. myPromise("Result"),
  13. ]);
  14. const subscription = example.subscribe((value) => console.log(value));

merge

merge接收一系统的Observable,并把每个Observable发送的值即时的发送出去。
示例:

  1. import { interval, merge } from "rxjs";
  2. import { mapTo } from "rxjs/operators";
  3. const first = interval(2500);
  4. const second = interval(2000);
  5. const third = interval(1000);
  6. const fourth = interval(1500);
  7. const example = merge(
  8. first.pipe(mapTo("FIRST!")),
  9. second.pipe(mapTo("SECOND!")),
  10. third.pipe(mapTo("THIRD")),
  11. fourth.pipe(mapTo("FOURTH"))
  12. );
  13. const subscription = example.subscribe({
  14. next(value) {
  15. console.log(value);
  16. },
  17. });

mergeAll

mergeAll操作符用于把接收到的高阶Observable打平。打平的方式是,当收到一个Observable的值的时候,就发送这个值。直到所有的Observable完成。另外,mergeAll还可以接收一个number类型的参数,用来控制可以打平接收的几个Observable。
示例:

  1. import { interval } from "rxjs";
  2. import { delay, map, mergeAll, take } from "rxjs/operators";
  3. const source = interval(500).pipe(take(5));
  4. const example = source.pipe(
  5. map((x) => source.pipe(delay(1000), take(3))),
  6. mergeAll(2)
  7. );
  8. example.subscribe({
  9. next(value) {
  10. console.log(value);
  11. },
  12. complete() {
  13. console.log("Complete!");
  14. },
  15. });

pairwise

pairwise操作符的作用是把前一个值和当前值组合成数组火发送。
示例:

  1. import { interval } from "rxjs";
  2. import { pairwise, take } from "rxjs/operators";
  3. interval(1000)
  4. .pipe(pairwise(), take(5))
  5. .subscribe({
  6. next(value) {
  7. console.log(value);
  8. },
  9. });

race

race操作符是一个具有选择性的合并操作符,race操作符会选择高阶Observable中最先返回值的进行执行。
示例:

  1. import { interval, race } from "rxjs";
  2. import { mapTo, take } from "rxjs/operators";
  3. const example = race(
  4. interval(1000),
  5. interval(500).pipe(mapTo("I won!")),
  6. interval(1500)
  7. ).pipe(take(5));
  8. example.subscribe({
  9. next(value) {
  10. console.log(value);
  11. },
  12. });

startWith

startWith操作符的作用是加Observable发送值之前,在Observable的前面添加指定的值。
示例:

  1. import { interval } from "rxjs";
  2. import { startWith, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(startWith(-3, -2, -1), take(10));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. });

withLatestFrom

withLatestFrom操作符接收一个Observable作为参数。withLatestFrom的作用是当源Observable发送值的时候,withLatestFrom会取参数Observable的最新值和源Observable为最新值组合成数组,发送。
withLatestFrom和combineLatest的取值逻辑是一样的,不同之处在于触发发送值的时机:

  • withLatestFrom发送值是时机是 源Observable发送新值的时候。
  • combineLatest发送值的时机是 无论是 只要任意一个Observable发送值,combineLatest就发送值。

示例:

  1. import { interval } from "rxjs";
  2. import { map, take, withLatestFrom } from "rxjs/operators";
  3. const source = interval(5000);
  4. const secondSource = interval(1000);
  5. const example = source.pipe(
  6. withLatestFrom(secondSource),
  7. map(([first, second]) => {
  8. return `First source (5s):${first},Second source (1s):${second}`;
  9. }),
  10. take(15)
  11. );
  12. const subscription = example.subscribe({
  13. next(x) {
  14. console.log(x);
  15. },
  16. complete() {
  17. console.log("Complete!");
  18. },
  19. });

zip

zip操作符的作用是把所有的Observable发送值的第i个值,组合一个数组,向后发送。只要有一个Observable没有发送数据或已完成,zip就不会发送数据。
示例:

  1. import { interval, zip } from "rxjs";
  2. import { take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = zip(source, source.pipe(take(2)));
  5. example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

条件操作符

defaultIfEmpty

defaultIfEmpty操作符的作用是:如果Observable在完成前没有发送任何值,那就就把defaultIfEmpty的参数发送出去。
示例:

  1. import { interval, zip } from "rxjs";
  2. import { take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = zip(source, source.pipe(take(2)));
  5. example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

every

every是一个判定操作符,如果所有的值都符合条件,那么返回true,否则返回false
示例:

  1. import { of } from "rxjs";
  2. import { every } from "rxjs/operators";
  3. const source = of(2, 4, 6, 8, 10);
  4. const example = source.pipe(every((x) => x % 2 === 0));
  5. example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

find

find操作符接收一个断言函数,find会返回断言函数为true的第一个值。到Observable完成时,还没有符合断言的值时,返回undefined
示例:

  1. import { from } from "rxjs";
  2. import { find } from "rxjs/operators";
  3. const arr: number[] = [3, 9, 15, 20];
  4. const source = from(arr);
  5. const example = source.pipe(find((x) => x % 5 === 0));
  6. const subscription = example.subscribe({
  7. next(x) {
  8. console.log(x);
  9. },
  10. complete() {
  11. console.log("Complete");
  12. },
  13. });

findIndex

findIndex操作符接收一个断言函数,find会返回断言函数为true的第一个值的索引。到Observable完成时,还没有符合断言的值时,返回-1
示例:

  1. import { from } from "rxjs";
  2. import { findIndex } from "rxjs/operators";
  3. const arr: number[] = [3, 5, 15, 20];
  4. const source = from(arr);
  5. const example = source.pipe(findIndex((x) => x % 4 === 0));
  6. const subscription = example.subscribe({
  7. next(x) {
  8. console.log(x);
  9. },
  10. complete() {
  11. console.log("Complete");
  12. },
  13. });

isEmpty

isEmpty是一个逻辑操作符,当在Observable在完成时没有发送值,isEmpty发送true,否则发送false.
示例:

  1. import { EMPTY } from "rxjs";
  2. import { isEmpty } from "rxjs/operators";
  3. const result = EMPTY.pipe(isEmpty());
  4. const subscription = result.subscribe({
  5. next(x) {
  6. console.log(x);
  7. },
  8. });

创建操作符

from

from操作符可以从数组,字符串,map和Promise中创建Observable。
示例:

  1. import { from } from "rxjs";
  2. const source = from(new Promise((resolve) => resolve("Hello World!")));
  3. source.subscribe({
  4. next(value) {
  5. console.log(value);
  6. },
  7. complete() {
  8. console.log("Complete!");
  9. },
  10. });

fromEvent

fromEvent可以从 DOM 事件中创建Observable,fromEvent的第一个参数是 DOM对象,第二个参数是事件对象,第三个参数是一个布尔值,表示是否使用捕获。
示例:

  1. import { fromEvent } from 'rxjs';
  2. const clicksInDocument = fromEvent(document, 'click', true); // note optional configuration parameter
  3. // which will be passed to addEventListener
  4. const clicksInDiv = fromEvent(someDivInDocument, 'click');
  5. clicksInDocument.subscribe(() => console.log('document'));
  6. clicksInDiv.subscribe(() => console.log('div'));

interval

interval是一个按照一定的时间间隔发送数字序列的Observable。
示例:

  1. import { interval } from 'rxjs';
  2. const source = interval(1000);
  3. const subscription = source.subscribe(val=>console.log(val));

of

of操作符的作用是把按顺序把任意对象自动转换成Observable;
示例:

  1. import { of } from 'rxjs';
  2. const source = of({ name: 'Brian' }, [1, 2, 3], function hello() {
  3. return 'Hello';
  4. });
  5. const subscribe = source.subscribe(val => console.log(val));

range

range操作符的作用是按顺序发送指定区间内的值。
示例:

  1. import { range } from 'rxjs';
  2. const source = range(1,3);
  3. source.subscribe(val => console.log(val));

timer

timer接收两个参数,第一个参数表示发送第一个值之前要延迟的时间,第二个参数表示在发送了第一个值之后,每隔多长时间再发送一个新的值。如果没有给定第二个参数,则只发送了第一个值之后就结束。
示例:

  1. import { timer } from "rxjs";
  2. import { take } from "rxjs/operators";
  3. const source = timer(1000, 2000);
  4. const observer = {
  5. next<T>(x: T) {
  6. console.log(x);
  7. },
  8. complete() {},
  9. };
  10. const subscription = source.pipe(take(10)).subscribe(observer);

throwError

throwError操作符可以创建一个错误对象的Observable;
示例:

  1. import { throwError } from 'rxjs';
  2. const source = throwError('This is an error!');
  3. // 输出: 'Error: This is an error!'
  4. const subscribe = source.subscribe({
  5. next: val => console.log(val),
  6. complete: () => console.log('Complete!'),
  7. error: val => console.log(`Error: ${val}`)
  8. });

错误处理操作符

catchError

catchError操作符主要用于在 rxjs 中捕获错误。catchError中一定要返回一个Observable。
示例:

  1. import { from, of, timer } from "rxjs";
  2. import { catchError, mergeMap } from "rxjs/operators";
  3. const myBadPromise = () =>
  4. new Promise((resolve, reject) => reject("Rejected!"));
  5. const source = timer(1000);
  6. const example = source.pipe(
  7. mergeMap(() =>
  8. from(myBadPromise()).pipe(catchError((err) => of(`Bad Promise:${err}`)))
  9. )
  10. );
  11. const subscription = example.subscribe({
  12. next(v) {
  13. console.log(v);
  14. },
  15. complete() {
  16. console.log("Complete!");
  17. },
  18. error(e) {
  19. console.error(e);
  20. },
  21. });

retry

retry操作符可以让程序在出错的情况下,重试n次(n是retry接收的参数)

  1. import { interval, of, throwError } from "rxjs";
  2. import { mergeMap, retry } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(
  5. mergeMap((x) => {
  6. if (x > 5) {
  7. return throwError("Error!");
  8. }
  9. return of(x);
  10. }),
  11. retry(2)
  12. );
  13. example.subscribe({
  14. next(x) {
  15. console.log(x);
  16. },
  17. complete() {
  18. console.log("Complete!");
  19. },
  20. error(e) {
  21. console.log(e);
  22. },
  23. });

retryWhen

retryWhen操作符的作用是:当发生错误时,基于自定义的标准的重试Observable序列。retryWhen接收一函数,这个函数的参数是一个由Observable包裹的错误对象,函数的返回值应为一个Observable。
示例:

  1. import { interval, timer } from "rxjs";
  2. import { delayWhen, map, retryWhen, tap } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(
  5. map((val) => {
  6. if (val > 5) {
  7. throw val;
  8. }
  9. return val;
  10. }),
  11. retryWhen((errors) => {
  12. return errors.pipe(
  13. tap((val) => console.log(`Value ${val} was to high!`)),
  14. delayWhen((val) => timer(val * 1000))
  15. );
  16. })
  17. );
  18. example.subscribe({
  19. next(value) {
  20. console.log(value);
  21. },
  22. complete() {
  23. console.log("Complete!");
  24. },
  25. error(e) {
  26. console.error(e);
  27. },
  28. });

转换操作符

buffer

buffer是缓存操作符,buffer接收一个Observable。buffer会缓存源Observable发送的值,缓存的值会在参数Observable发送值的时候以数组的形式发送出去。值的内容为上次发送值后的值到现在发送的值。
示例:

  1. import { fromEvent, interval } from "rxjs";
  2. import { buffer } from "rxjs/operators";
  3. const myInterval = interval(1000);
  4. const bufferBy = fromEvent(document, "click");
  5. const example = myInterval.pipe(buffer(bufferBy));
  6. const subscription = example.subscribe({
  7. next(x) {
  8. console.log(`Buffered Values:${x}`);
  9. },
  10. });

bufferCount

bufferCount也是一个缓存操作符,boufferCount接收一个必选参数和一个可选参数,第一个参数表示每次需要缓存多少个值,第二个参数表示缓存中有几个值是新值,默认值是null,即缓存值中全部为新值。
示例:
没有第二个参数

  1. import { interval } from "rxjs";
  2. import { bufferCount, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(bufferCount(3), take(10));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log(`Complete!`);
  11. },
  12. });

有第二个参数:

  1. import { interval } from "rxjs";
  2. import { bufferCount, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(bufferCount(3, 1), take(10));
  5. example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

bufferTime

bufferTime是一个和时间有关的缓存器,第一个参数表示的是要缓存多长时间内的数据,第二个参数(可选)是表示在前一个缓存器开启之后,多长时间开启下一个缓存器(默认是null,表示在前一个缓存器关闭后才开启下一下缓存器)。.
示例:

  1. import { interval } from "rxjs";
  2. import { bufferTime, take } from "rxjs/operators";
  3. const source = interval(500);
  4. const example = source.pipe(bufferTime(2000, 1500), take(10));
  5. example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

bufferToggle

bufferToggle接收两个参数,第一个参数是一个Observable,当这个Observable在发出值的时候,bufferToggle开始缓存值,第二个参数是一个返回Observable的函数,当返回的函数发出值的时候,bufferToggle停止缓存值。可以看出,bufferToggle是一个可以通过Observable来自定义缓存开关的操作符。
示例:

  1. import { interval } from "rxjs";
  2. import { bufferToggle, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const start = interval(5000);
  5. const closing = (val: number) => {
  6. const period = 2000;
  7. console.log(
  8. `Value ${val} emitted, starting buffer! Closing in ${period / 1000}s!`
  9. );
  10. return interval(period);
  11. };
  12. const example = source.pipe(bufferToggle(start, closing), take(10));
  13. example.subscribe({
  14. next(x) {
  15. console.log(x);
  16. },
  17. complete() {
  18. console.log("Complete!");
  19. },
  20. });

bufferWhen

bufferWhen接收一个返回Observable的函数作为参数,当这个Observable发送值的时候,bufferWhen立即停止缓存值,并发送已经缓存的值,同时开启下一个缓存。
示例:

  1. import { interval } from "rxjs";
  2. import { bufferWhen, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const when = () => interval(5000);
  5. const example = source.pipe(bufferWhen(when), take(8));
  6. const subscription = example.subscribe({
  7. next(x) {
  8. console.log(x);
  9. },
  10. complete() {
  11. console.log("Complete!");
  12. },
  13. });

concatmap

将值映射成内部 observable,并按顺序订阅和发出。
示例:

  1. import { of } from "rxjs";
  2. import { concatMap } from "rxjs/operators";
  3. const source = of("hello", "world");
  4. const examplePromise = (val: string) => new Promise((resolve) => resolve(val));
  5. const example = source.pipe(concatMap((val) => examplePromise(val)));
  6. const subscription = example.subscribe({
  7. next(x) {
  8. console.log(x);
  9. },
  10. complete() {
  11. console.log("Complete!");
  12. },
  13. });

concatMapTo

comcatMapTo和concatMap类型,只是concatMapTo不会接收Obsebale传递来的参数。
示例:

  1. import { interval, of } from "rxjs";
  2. import { concatMapTo, delay, take } from "rxjs/operators";
  3. const source = interval(500).pipe(take(5));
  4. const fakeRequest = of("Network request complete").pipe(delay(3000));
  5. const example = source.pipe(concatMapTo(fakeRequest));
  6. const subscription = example.subscribe({
  7. next(x) {
  8. console.log(x);
  9. },
  10. complete() {
  11. console.log("Complete");
  12. },
  13. });

exhaustMap

映射成内部 observable,忽略其他值直到该 observable 完成。
示例:

  1. import { interval, merge, of } from "rxjs";
  2. import { delay, exhaustMap, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const delayed = source.pipe(delay(10), take(4));
  5. const example = merge(delayed, of(true)).pipe(
  6. exhaustMap((_) => source.pipe(take(5)))
  7. );
  8. const subscription = example.subscribe({
  9. next(x) {
  10. console.log(x);
  11. },
  12. complete() {
  13. console.log("Complete");
  14. },
  15. });

expand

expand可以递归调用内部的函数。
示例:

  1. import { of } from "rxjs";
  2. import { expand, take } from "rxjs/operators";
  3. const source = of(2);
  4. const example = source.pipe(
  5. expand((val) => {
  6. console.log(`Cur:${val}`);
  7. return of(val * 2);
  8. }),
  9. take(6)
  10. );
  11. const subscription = example.subscribe({
  12. next(x) {
  13. console.log(x);
  14. },
  15. complete() {
  16. console.log("Complete");
  17. },
  18. });

groupBy

groupBy对Observable发送的值进行分组。
示例:

  1. import { from } from "rxjs";
  2. import { groupBy, mergeMap, toArray } from "rxjs/operators";
  3. type People = { name: string; age: number };
  4. const people: People[] = [
  5. { name: "Sue", age: 25 },
  6. { name: "Joe", age: 30 },
  7. { name: "Frank", age: 30 },
  8. { name: "Sarah", age: 25 },
  9. ];
  10. const source = from(people);
  11. const example = source.pipe(
  12. groupBy((person) => person.age),
  13. mergeMap((group) => group.pipe(toArray()))
  14. );
  15. const subscription = example.subscribe({
  16. next(x) {
  17. console.log(x);
  18. },
  19. complete() {
  20. console.log("Complete!");
  21. },
  22. });

map

map操作符可以对源Observable的每个值应用投射函数。
示例:

  1. import { from } from "rxjs";
  2. import { map } from "rxjs/operators";
  3. type Person = { name: string; age: number };
  4. const people: Person[] = [
  5. { name: "Frank", age: 25 },
  6. { name: "Joe", age: 30 },
  7. { name: "Ryan", age: 24 },
  8. ];
  9. const source = from(people);
  10. const example = source.pipe(map((x) => x.name));
  11. const subscription = example.subscribe({
  12. next(x) {
  13. console.log(x);
  14. },
  15. complete() {
  16. console.log("Complete");
  17. },
  18. });

mapTo

mapTo可以把源Obervable发送的值映射成常量。
示例:

  1. import { interval } from "rxjs";
  2. import { mapTo, take } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(mapTo("Hello World!"), take(10));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete");
  11. },
  12. });

mergeMap

mergeMap可以打平Observable。
示例:

  1. import { of } from "rxjs";
  2. import { mergeMap } from "rxjs/operators";
  3. const source = of("Hello");
  4. const example = source.pipe(mergeMap((val) => of(`${val} world!`)));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete");
  11. },
  12. });

partition

parition可以根据断言把Observable分为两组。
示例:

  1. import { from, merge, of, partition } from "rxjs";
  2. import { catchError, map } from "rxjs/operators";
  3. const arr: number[] = [1, 2, 3, 4, 5, 6];
  4. const source = from(arr);
  5. const example = source.pipe(
  6. map((val) => {
  7. if (val > 3) {
  8. throw `${val} greater than 3`;
  9. }
  10. return { success: true, value: val, error: null };
  11. }),
  12. catchError((val) => of({ error: val, value: null, success: false }))
  13. );
  14. const [success, error] = partition(example, (val) => val.success);
  15. const subscription = merge(
  16. success.pipe(
  17. map((x) => {
  18. console.log(x.value);
  19. return x.value;
  20. })
  21. ),
  22. error.pipe(
  23. map((x) => {
  24. console.error(x.error);
  25. return x.value;
  26. })
  27. )
  28. ).subscribe({
  29. next(x) {
  30. console.log(x);
  31. },
  32. });

reduce

reduce是一个聚合操作符,它是将Observable的值归并为单个值,当Observable完成时再将这个值发出。
示例:

  1. import { of } from "rxjs";
  2. import { reduce } from "rxjs/operators";
  3. const source = of(1, 2, 3, 4);
  4. const example = source.pipe(reduce((acc, value) => acc + value));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete");
  11. },
  12. });

scan

scan同样也是一个聚合操作符,它也是把Observable的值归并到一个值,但他会在Observable发送值之后就把当下的聚合值发送出去,也就是说Observable每发送一个值,scan就发生执行一次聚合,然后发送一个值。
示例:

  1. import { of } from "rxjs";
  2. import { scan } from "rxjs/operators";
  3. const source = of(1, 2, 3);
  4. const example = source.pipe(scan((acc, value) => acc + value, 0));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete");
  11. },
  12. });

switchMap

swithcMap是一个打平操作符,在拥有众多的Observale传入switchMap的时候,switchMap会选择把其中的一个订阅的Observable发送,而取消对于其他Observable的订阅。
示例:

  1. import { EMPTY, fromEvent, interval, merge } from "rxjs";
  2. import { mapTo, scan, startWith, switchMap, takeWhile } from "rxjs/operators";
  3. const countdownSeconds = 10;
  4. const setHtml = (id: string) => (val: string) =>
  5. (document.getElementById(id)!.innerHTML = val);
  6. const interval$ = interval(1000).pipe(mapTo(-1));
  7. const pause = document.getElementById("pause");
  8. const resume = document.getElementById("resume");
  9. const pause$ = fromEvent(pause!, "click").pipe(mapTo(false));
  10. const resume$ = fromEvent(resume!, "click").pipe(mapTo(true));
  11. const example = merge(pause$, resume$).pipe(
  12. startWith(true),
  13. switchMap((val) => (val ? interval$ : EMPTY)),
  14. scan((acc, value) => acc + value, countdownSeconds),
  15. takeWhile((v) => v >= 0)
  16. );
  17. const subscription = example.subscribe({
  18. next(x) {
  19. setHtml("remaining")(`${x}`);
  20. },
  21. complete() {
  22. console.log("Complete");
  23. },
  24. });

window

window是一个具有分组能力的转换操作符。window接收一个Observable,当Observable发送出值的时候,window就把之前收集到的值,作为一个Observable发送出去,开发下一次的收集。也就是说,window可以根据接收的Observable把Observable分成多个Observable。
示例:

  1. import { interval, timer } from "rxjs";
  2. import { mergeAll, scan, take, window } from "rxjs/operators";
  3. const source = timer(0, 1000);
  4. const example = source.pipe(window(interval(3000)), take(10));
  5. example.pipe(scan((acc) => acc + 1, 0)).subscribe({
  6. next(x) {
  7. console.log(`Window:${x}`);
  8. },
  9. complete() {
  10. console.log("Complete");
  11. },
  12. });
  13. const subscription = example.pipe(mergeAll()).subscribe({
  14. next(x) {
  15. console.log(x);
  16. },
  17. });

windowCount

windowCount和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowCount下一个收集的触发时机是收集的个数。
示例:

  1. import { interval } from "rxjs";
  2. import { mergeAll, take, tap, windowCount } from "rxjs/operators";
  3. const source = interval(3000);
  4. const example = source.pipe(
  5. windowCount(3),
  6. tap(() => console.log(`Window!`)),
  7. mergeAll(),
  8. take(9)
  9. );
  10. const subscription = example.subscribe({
  11. next(x) {
  12. console.log(x);
  13. },
  14. complete() {
  15. console.log("Complete");
  16. },
  17. });

windowTime

windowTime和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowTime下一个收集的触发时机是到达指定的时间。
示例:

  1. import { timer } from "rxjs";
  2. import { mergeAll, take, tap, windowTime } from "rxjs/operators";
  3. const source = timer(0, 1000);
  4. const example = source.pipe(
  5. windowTime(3000),
  6. tap(() => console.info(`WindowTime`))
  7. );
  8. const subscription = example.pipe(mergeAll(), take(10)).subscribe({
  9. next(x) {
  10. console.log(x);
  11. },
  12. complete() {
  13. console.log("Complete");
  14. },
  15. });

windowWhen

windowWhen和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowWhen是在接收到的函数返回的Observable发送值的时候,关闭这次收集,开始下次收集。
示例:

  1. import { interval, timer } from "rxjs";
  2. import { mergeAll, take, tap, windowWhen } from "rxjs/operators";
  3. const source = timer(0, 1000);
  4. const example = source.pipe(
  5. windowWhen(() => interval(3000)),
  6. tap(() => console.info("New Window!"))
  7. );
  8. const subscription = example.pipe(mergeAll(), take(8)).subscribe({
  9. next(x) {
  10. console.log(x);
  11. },
  12. complete() {
  13. console.log("Complete");
  14. },
  15. });

windowToggle

windowWhen和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowToggle有两个参数,第一个参数是一个Observable,第二个参数是一个返回Observable 的函数,当第一个参数的Observable发送值的时候,开发收集,当第二个参数返回的Observable发送值的时候,停止这次收集,并把收集的值作为Observable发送出去。
示例:

工具操作符

tap

tap是一个工具操作符,它的设计是用来在输入输出流不改变的情况下,执行一些副作用的代码。从而使得代码的其他部分的函数式更强。最常使用的就是调试代码。
示例:

  1. import { of } from 'rxjs';
  2. import { tap, map } from 'rxjs/operators';
  3. of(Math.random()).pipe(
  4. tap(console.log),
  5. map(n => n > 0.5 ? 'big' : 'small')
  6. ).subscribe(console.log);

delay

delay同样是一个工具操作符,它和tap一样,也能保证输入和输出的内容的一致性。但和tap不同的是,delay的作用是延迟源Observable发送的值的时间。delay可以接收一个Number类型的值,也可以接收一个Date类型的值。当参数是一个Number类型的值时,延迟时间的数值对应的毫秒数。当参数是一个Date类型的值时,延迟时间是到达指定的时间。
示例:

  1. import { timer } from "rxjs";
  2. import { delay, take } from "rxjs/operators";
  3. const source = timer(0, 1000);
  4. const example = source.pipe(delay(500), take(8));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

delayWhen

delayWhen和delay一样是一个延迟Obserable发送值的工作操作符,但delayWhen延迟的时机取决于接收的参数中返回的Obserable发送值的时机。
示例:

  1. import { interval } from "rxjs";
  2. import { delayWhen, take } from "rxjs/operators";
  3. const source = interval(800);
  4. const example = source.pipe(
  5. delayWhen((val) => interval(val * 1000)),
  6. take(8)
  7. );
  8. const subscription = example.subscribe({
  9. next(x) {
  10. console.log(x);
  11. },
  12. complete() {
  13. console.log("Complete");
  14. },
  15. });

dematerialize

dematerialize的作用是把RxJS中的ObservableNotification对象展开。
示例:

  1. import { of } from "rxjs";
  2. import { dematerialize } from "rxjs/operators";
  3. import type { ObservableNotification } from "rxjs";
  4. const notifA: ObservableNotification<string> = { kind: "N", value: "A" };
  5. const notifB: ObservableNotification<string> = { kind: "N", value: "B" };
  6. const notifC: ObservableNotification<null> = {
  7. kind: "E",
  8. error: new TypeError("x.toUpperCase is not a function"),
  9. };
  10. const materialize = of(notifA, notifB, notifC);
  11. const example = materialize.pipe(dematerialize());
  12. const subscription = example.subscribe({
  13. next(x) {
  14. console.log(x);
  15. },
  16. complete() {
  17. console.log("Complete");
  18. },
  19. error(e) {
  20. console.log(e);
  21. },
  22. });

materialize

materialize的作用和RxJS的作用相反,是把一个值包装为ObservableNotification对象。
示例:

  1. import { Observable, of } from "rxjs";
  2. import { map, materialize } from "rxjs/operators";
  3. const letters: Observable<any> = of("a", "b", 13, "c");
  4. const upperCase = letters.pipe(map((x) => x.toUpperCase()));
  5. const materialized = upperCase.pipe(materialize());
  6. const subscription = materialized.subscribe({
  7. next(x) {
  8. console.log(x);
  9. },
  10. complete() {
  11. console.log("Complete");
  12. },
  13. });

timeInterval

timeInterval操作符的作用是在Observable原有值的基础上添加一个interval字段,这个interval字段表示在两个Observable发送的两个值之间的时间间隔,而之前Observable的值使用value来包装。组成以下的格式的内容:{value:value,interval:interval}
示例:

  1. import { interval } from "rxjs";
  2. import { take, timeInterval } from "rxjs/operators";
  3. const seconds = interval(1000);
  4. const subscription = seconds.pipe(timeInterval(), take(8)).subscribe({
  5. next(x) {
  6. console.log(x);
  7. },
  8. complete() {
  9. console.log("Complete!");
  10. },
  11. });

timestamp

timestamp操作符的作用是把Observable发送的值进行包装,形成下面的格式:{value:value,timestamp:number}。这里的value就是之前Observable发送的值,而timestamp的值就是value发送时的时间戳。
示例:

  1. import { interval } from "rxjs";
  2. import { take, timestamp } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(timestamp(), take(8));
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete!");
  11. },
  12. });

timeout

timeout操作符可以接收一个参数,这个参数是一个对象:{first?:number,each?:number,with?:(x:TimeoutInfo)=>Observable},这里的first值表示在发送第一个值之前最多经历的时间,如果在这个时间内没有发送值,就会执行with提供的函数,而在没有指定with的情况下就会报TimeoutError的错误。这里的each表示每两个值之间的最大时间间隔,如果大于这个时间间隔,也会执行with提供的函数,而在没有指定with的情况下就会报TimeoutError的错误。
示例:

  1. import { interval } from "rxjs";
  2. import { take, timeInterval, timeout } from "rxjs/operators";
  3. const slow$ = interval(1000);
  4. const fast$ = interval(500);
  5. const example = slow$.pipe(
  6. timeout({
  7. first: 800,
  8. with: (val) => {
  9. console.log(val);
  10. return fast$;
  11. },
  12. }),
  13. timeInterval(),
  14. take(16)
  15. );
  16. example.subscribe({
  17. next(x) {
  18. console.log(x);
  19. },
  20. complete() {
  21. console.log("Complete");
  22. },
  23. });

TimeoutInfo:

  1. export interface TimeoutInfo<T, M = unknown> {
  2. /** timeout配置中提供的可选meta项 */
  3. readonly meta: M;
  4. /** 在发生超时前,Observable已经发送的值的数量 */
  5. readonly seen: number;
  6. /** 在发生超时前,Observable发送的最后值 */
  7. readonly lastValue: T | null;
  8. }

toArray

toArray操作符会收集Observable发送的所有值,当Observable完成时,把收集到的值以数组的形式发送出去。
示例:

  1. import { interval } from "rxjs";
  2. import { take, toArray } from "rxjs/operators";
  3. const source = interval(1000);
  4. const example = source.pipe(take(8), toArray());
  5. const subscription = example.subscribe({
  6. next(x) {
  7. console.log(x);
  8. },
  9. complete() {
  10. console.log("Complete");
  11. },
  12. });

observeOn

示例:

  1. import { animationFrameScheduler, fromEvent, interval } from "rxjs";
  2. import { mergeMap, observeOn, startWith, take, tap } from "rxjs/operators";
  3. const someStart: HTMLElement = document.querySelector("#someStart")!;
  4. const someDiv: HTMLElement = document.querySelector("#someDiv")!;
  5. const intervals = interval(10);
  6. const example = fromEvent(someStart, "click").pipe(
  7. mergeMap(() => intervals.pipe(observeOn(animationFrameScheduler))),
  8. startWith(0),
  9. take(100)
  10. );
  11. example.subscribe({
  12. next(x) {
  13. someDiv.style.height = x + "px";
  14. },
  15. complete() {
  16. console.log("Complete");
  17. },
  18. });

使用

测试

对于rxjs的测试,我们通常是使用弹珠测试来进行的。其已集成在了rxjs中。下面是和jest一起使用的示例:

  1. import { from } form 'rxjs';
  2. import { TestScheduler } from 'rxjs/testing';
  3. describe('test example', ()=>{
  4. let testScheduler: TestScheduler;
  5. beforeEach(() => {
  6. testScheduler = new TestScheduler((actual, expected) => {
  7. expect(actual).toEqual(expected);
  8. })
  9. })
  10. it('should be example', ()=> {
  11. testScfheduler.run(({ expectObservable }) => {
  12. const expectedMarble = '(abc|)';
  13. const observable$ = from([1, 2, 3]);
  14. const expected = {a: 1, b: 2, c: 3};
  15. expectObservable(observable$).toBe(expectedMarble, expected);
  16. })
  17. })
  18. })
  1. 更多的测试请参考:[RxJS测试](https://www.yuque.com/silencezhpf/js/rxjs_test~)