操作符
过滤操作符
Take
take接受一个输入流和一个数值 amount 。输出流为输入流的前 amount 个值。
详细情况是:
- 当
n<amount时,值会被发送 -
TakeLast
takeLast接受一个输入流和一个数值
amount。输出流为输入流的后amount个值。
详细情况是: 当输入流完成后,输出流会
- 发送由输入流发送的后
amount个值
- 发送由输入流发送的后
- 如果输入流永远不能完成,输出流将一直等待
TakeWhile
takeWhile接受一个输入流和一个断言函数predicate,当断言函数返回true时,将输入流的值发送出去,一旦断言函数返回false,不再输出内容,并立即结束。TakeUntil
takeUntil接受一个输入流和一个截止流(这个命令是根据流的作用命令的)。在截止流发送值之前,输入流的所有内容都会被发送到输出流中,一旦截止流发送值。输出流不再接收来自输入流的内容,并且立即完成。
Skip
skip接受一个输入流和一个数值 n ,输出流为输入流前 n 个值之外的值。
SkipLast
skipLast接受一个输入流和一个数值 amount ,当输入流完成之后,输出流会接收最后 amount 之外的所有值。
详细情况是:
- 当输入流完成后,输出流会接收最后
amount之外的值。 - 如果输入流永远不能完成,输出流将一真等待。
SkipWhile
skipWhile同样接受一个输入流和一个断言函数predicate,当断言函数为true时,将忽略输入流的值,一旦断言函数返回false,将发送之后的值(无论断言函数返回的是true还是false)。输出流会和输入流一起结束。SkipUntil
skipUntil和takeUntil一样接受两个值,一个输入流和一个截止流,在截止流发送值之前,输出流会直接忽略输入流中的内容,一旦截止流发送内容,输出流将开始接收输入流中的值。
First
first接受一个输入流和一个断言函数 predicate ,输出流为断言函数返回 true 的第一个值。
Last
last 接受一个输入流和一个断言函数 predicate ,输出流为断言函数返回 true 的最后一个值。
ElementAt
elementAt接受两个必要参数输入流和索引值 index 和一个可选参数 defaultValue 。 defaultValue 的默认值为 undefined 。当输入流完成的时候,输出流会输出输入流中第 index 个值,如果不存在第 index 个值,则输出设置的 defaultValue 值(默认为 undefined ),同时输出流完成。
Filter
filter接受一个输入流和一个断言函数 predicate ,输出流了断言函数返回 true 的所有的值。
IgnoreElements
ignoreElements只接受一个输入流作为参数,在输出流中会忽略输入流中的所有内容,直接发送完成。
Distinct
distinct接受一个输入流,并可以接受两个可选参数,第一个可选参数为 Selector ,即以什么为基准来给输入流去重。默认是 undefined 。当 Selector 为 undefined 时,去重的基准为发送的值本身。当提供了一个 Selector 函数后,以函数的返回值为基准去重。
示例:
Without Selector :
import { from } from "rxjs";import { disctinct } from "rxjs/operators";from([1,2,3,4,4,3,2,5]).pipe(distinct()).subscripb(console.log); // 1,2,3,4,5
With Selector :
import { from } from "rxjs";import { distinct } from "rxjs/operators";interface Person {age: number,name: string}from<Person[]>([{age:4,name:"foo",},{age:5,name:"bar"},{age:6,name:"foo"}]).pipe(distinct((p:Person)=>p.name)).subscribe(console.log); // {age:4,name:"foo"} ,{age:5,name:"bar"}
DisctinctUntilChanged
disctinctUntilChanged接受一个输入流和一个可选参数 compare 函数。
disctinctUntilChanged 会根据比较函数的返回值是不是为 true 来判断,输出流是否要输出该值。
- 比较函数返回
true,输出流不发送该值,将该值忽略。 - 比较函数返回
false, 输出流发送该值。
在没有提供 compare 函数的情况下,会依照js 的规则做相等性比较。根据相等性比较的 true 和 false 来确定是否要输出。
示例1:
import { of } from 'rxjs';import { distinctUntilChanged } from 'rxjs/operators';of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe(distinctUntilChanged(),).subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
示例2:
import { of } from 'rxjs';import { distinctUntilChanged } from 'rxjs/operators';interface Person {age: number,name: string}of<Person>({ age: 4, name: 'Foo'},{ age: 7, name: 'Bar'},{ age: 5, name: 'Foo'},{ age: 6, name: 'Foo'},).pipe(distinctUntilChanged((p: Person, q: Person) => p.name === q.name),).subscribe(x => console.log(x));// displays:// { age: 4, name: 'Foo' }// { age: 7, name: 'Bar' }// { age: 5, name: 'Foo' }
DistinctUntilKeyChanged
distinctUntilKeyChanged 和 distinctUntikChanged 相比,多了一个必选参数 key 。
示例1:
import { of } from 'rxjs';import { distinctUntilKeyChanged } from 'rxjs/operators';interface Person {age: number,name: string}of<Person>({ age: 4, name: 'Foo'},{ age: 7, name: 'Bar'},{ age: 5, name: 'Foo'},{ age: 6, name: 'Foo'},).pipe(distinctUntilKeyChanged('name'),).subscribe(x => console.log(x));// displays:// { age: 4, name: 'Foo' }// { age: 7, name: 'Bar' }// { age: 5, name: 'Foo' }
示例2:
import { of } from 'rxjs';import { distinctUntilKeyChanged } from 'rxjs/operators';interface Person {age: number,name: string}of<Person>({ age: 4, name: 'Foo1'},{ age: 7, name: 'Bar'},{ age: 5, name: 'Foo2'},{ age: 6, name: 'Foo3'},).pipe(distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3)),).subscribe(x => console.log(x));// displays:// { age: 4, name: 'Foo1' }// { age: 7, name: 'Bar' }// { age: 5, name: 'Foo2' }
auditTime
auditTime接受一个输入流和一个 duration 参数。
当 auditTime 接收到一个源值的时候,auditTime会直接忽略该值以及指定 duration 时间内的所有值,然后向输出流发送出指定时间 duration 中的最新值,重复执行此过程。
示例:
const { interval } = Rx;const { auditTime } = RxOperators;interval(1000).pipe(auditTime(1100))
audit
audit接受一个输入流和一个 Observable 对象。audit 和 throttle 类似,但不同于 throttle 的是, audit 会发送时间窗口内的最新值,还不是第一次的值。在 audit 接收的 Observable 对象内部维护着一个内容的计时器(timer),当这个内部的计时器启用的时间, audit 会忽略输入流发送的值。在初始态下,这个内部的计时器是禁用的。当输入流的第一个值到达的时候,内部计时器启用。当 Observable 发送值或者完成的时候,内部计时器禁用,并将输入流的最新值发送给输出流。之后重复这个过程。
注意:在每次内部计时器禁用后,都会生成一个新的
Observable
示例:
const { interval,timer,pipe } = Rx;const { audit,tap } = RxOperators;function log(name=""){return pipe(tap(x=>console.log(`${name} ${x}`)))}interval(1000).pipe(audit(()=>timer(200,400).pipe(log("timer"))),log("audit"))
debounceTime
debounceTime接受一个输入流和一个 duration 值。
debounceTime在 duration 中只会发送最新值,在 duration 内如果输入流产生了多个值,那么除了最后的一个值被发送到输出流外,其他的值会直接被忽略。
示例:
import { fromEvent } from 'rxjs';import { debounceTime } from 'rxjs/operators';const clicks = fromEvent(document, 'click');const result = clicks.pipe(debounceTime(1000));result.subscribe(x => console.log(x));
debounce
debounce授受一个输入流和一个 Observable 对象。
在 Observable 发送值之前,debounce会一直忽略输入流发送的值,当 Observable 发送了新值,debounce会把输入流的最新值发送给输出流。
示例:
import { fromEvent, interval } from 'rxjs';import { debounce } from 'rxjs/operators';const clicks = fromEvent(document, 'click');const result = clicks.pipe(debounce(() => interval(1000)));result.subscribe(x => console.log(x));
sampleTime
sampleTime接受一个输入流和一个时间间隔 duration 值。
sampleTime会有指定的时间间隔内对输入流进行采样,sampleTime会把采样中的最新的值发送给输出流,如果本次的采样值和上次发送给输出流的值没有改变,则不发送值。
示例:
import { fromEvent } from 'rxjs';import { sampleTime } from 'rxjs/operators';const clicks = fromEvent(document, 'click');const result = clicks.pipe(sampleTime(1000));result.subscribe(x => console.log(x));
sample
sample接受一个输入流和一个 Observable 值。
sample会在 Observalbe 发送一个值或完成的时候对输入流进行采样,并把采样中的最新值发送给输出流(如果本次的采样值和上次的采样值相同,则忽略)。
示例:
import { fromEvent, interval } from 'rxjs';import { sample } from 'rxjs/operators';const seconds = interval(1000);const clicks = fromEvent(document, 'click');const result = seconds.pipe(sample(clicks));result.subscribe(x => console.log(x));
single
single接受一个输入流和一个断言函数。
single会根据断言函数发送输入流中的唯一值,如果不存在这样的唯一值,那么就发送 undefined 值。如果存在多个符合断言函数的值,那么会抛出错误。
示例1:
import { range } from 'rxjs';import { single } from 'rxjs/operators';const numbers = range(1,5).pipe(single());numbers.subscribe(x => console.log('never get called'), e => console.log('error'));// result// 'error'
示例2:
import { range } from 'rxjs';import { single } from 'rxjs/operators';const numbers = range(1,5).pipe(single());numbers.subscribe(x => console.log('never get called'), e => console.log('error'));// result// 'error'
throttleTime
throttleTime接受一个输入流和一个时间间隔 duration 。
在时间间隔 duration 内,throttleTime只接收输入流的第一个值,其余的值会被忽略,直到下一个时间间隔开始,重复此过程。
示例1:
import { fromEvent } from 'rxjs';import { throttleTime } from 'rxjs/operators';const clicks = fromEvent(document, 'click');const result = clicks.pipe(throttleTime(1000));result.subscribe(x => console.log(x));
示例2:
import { fromEvent, asyncScheduler } from 'rxjs';import { throttleTime, withLatestFrom } from 'rxjs/operators';// defaultThottleConfig = { leading: true, trailing: false }const throttleConfig = {leading: false,trailing: true}const click = fromEvent(document, 'click');const doubleClick = click.pipe(throttleTime(400, asyncScheduler, throttleConfig));doubleClick.subscribe((throttleValue: Event) => {console.log(`Double-clicked! Timestamp: ${throttleValue.timeStamp}`);});
throttle
throttle接受一个输入流和另一个 Observable 对象。
throttle和throttleTime的区别在于,对于时间服间的控制不再使用固定的 毫秒作为依据。而是依据一个 Observable 对象。
示例:
import { fromEvent } from 'rxjs';import { throttle } from 'rxjs/operators';const clicks = fromEvent(document, 'click');const result = clicks.pipe(throttle(ev => interval(1000)));result.subscribe(x => console.log(x));
组合操作符
combineLatest
combineLatest操作符是一个组合操作符,combineLatest操作符接受一个Observable数组,当数组中的所有Observable都发送值的时候,combineLatest操作符会收集每个Observable的最新值发送。只要数组中的任意一个Observable没有发送值,combineLatest就不会发送值。当数组中的所有Observable都完成之后,combineLatest完成。
combineLatest除了可以接收一个Observable数组外,还可以接收第二个resultSelector参数。这个参数是一个函数,这个函数的参数是Observable数组发送的值,返回值是当前combineLatest发送的值。
示例1:
普通的combineLatest:
import {timer,combineLatest} from 'rxjs';const timerOne = timer(1000,4000);const timerTwo = timer(2000,4000);const timerThree = timer(3000,4000);const combined = combineLatest([timerOne,timerTwo,timerThree]);const subscription = combined.subscribe(x=>console.log(x));
示例2:
含有resultSelector的combineLatest:
import {timer,combineLatest} from 'rxjs';import {take} from 'rxjs/operators';const timerOne = timer(1000,4000).pipe(take(3));const timerTwo = timer(2000,4000).pipe(take(3));const timerThree = timer(3000,4000).pipe(take(2));const combinedProject = combineLatest<number[],string>([timerOne,timerTwo,timerThree],(one,two,three)=>{return `Timer One Latest:${one},Timer Two Latest:${two},Timer Three Latest:${three}`})combinedProject.subscribe(x=>console.log(x));
concat
concat操作符接收一系列的Observable作为参数,concat会从第一个Observable开始把发送的值逐一发送出去,直到这个Observable完成,再发送下一个Observable发送的值。
示例:
import { concat, of } from "rxjs";import { delay } from "rxjs/operators";const sourceOne = of(1, 2, 3);const sourceTwo = of(4, 5, 6);const sourceThree = sourceOne.pipe(delay(3000));const example = concat(sourceThree, sourceTwo);example.subscribe((x) => console.log(`Example: Delayed source one:${x}`));
concatAll
concatAll操作符不接收参数。concatAll中作用是把流动到这里的高阶Observable打平为一阶的Observable。打平的方式就是把从第一个高阶的Observable开始,逐一和之后的Observable按顺序组合成一个Observable。
示例:
import { interval } from "rxjs";import { concatAll, map } from "rxjs/operators";const samplePromise = (val: number) => new Promise((resolve) => resolve(val));const source = interval(2000);const example = source.pipe(map((x) => samplePromise(x)),concatAll());const subscription = example.subscribe({next(value) {console.log(`Example with Promise:${value}`);},});
forkJoin
forkJoin接收一个Observable数组,其作用是在Observable数组中的所有Observable都完成的时候,把Observable数组中的最新值合并发送。
示例:
import { forkJoin, interval, of } from "rxjs";import { delay, take } from "rxjs/operators";const myPromise = (val: string) =>new Promise((resolve) =>setTimeout(() => resolve(`Promise Resolved:${val}`), 5000));const example = forkJoin([of("hello"),of("world").pipe(delay(1000)),interval(1000).pipe(take(1)),interval(1000).pipe(take(2)),myPromise("Result"),]);const subscription = example.subscribe((value) => console.log(value));
merge
merge接收一系统的Observable,并把每个Observable发送的值即时的发送出去。
示例:
import { interval, merge } from "rxjs";import { mapTo } from "rxjs/operators";const first = interval(2500);const second = interval(2000);const third = interval(1000);const fourth = interval(1500);const example = merge(first.pipe(mapTo("FIRST!")),second.pipe(mapTo("SECOND!")),third.pipe(mapTo("THIRD")),fourth.pipe(mapTo("FOURTH")));const subscription = example.subscribe({next(value) {console.log(value);},});
mergeAll
mergeAll操作符用于把接收到的高阶Observable打平。打平的方式是,当收到一个Observable的值的时候,就发送这个值。直到所有的Observable完成。另外,mergeAll还可以接收一个number类型的参数,用来控制可以打平接收的几个Observable。
示例:
import { interval } from "rxjs";import { delay, map, mergeAll, take } from "rxjs/operators";const source = interval(500).pipe(take(5));const example = source.pipe(map((x) => source.pipe(delay(1000), take(3))),mergeAll(2));example.subscribe({next(value) {console.log(value);},complete() {console.log("Complete!");},});
pairwise
pairwise操作符的作用是把前一个值和当前值组合成数组火发送。
示例:
import { interval } from "rxjs";import { pairwise, take } from "rxjs/operators";interval(1000).pipe(pairwise(), take(5)).subscribe({next(value) {console.log(value);},});
race
race操作符是一个具有选择性的合并操作符,race操作符会选择高阶Observable中最先返回值的进行执行。
示例:
import { interval, race } from "rxjs";import { mapTo, take } from "rxjs/operators";const example = race(interval(1000),interval(500).pipe(mapTo("I won!")),interval(1500)).pipe(take(5));example.subscribe({next(value) {console.log(value);},});
startWith
startWith操作符的作用是加Observable发送值之前,在Observable的前面添加指定的值。
示例:
import { interval } from "rxjs";import { startWith, take } from "rxjs/operators";const source = interval(1000);const example = source.pipe(startWith(-3, -2, -1), take(10));const subscription = example.subscribe({next(x) {console.log(x);},});
withLatestFrom
withLatestFrom操作符接收一个Observable作为参数。withLatestFrom的作用是当源Observable发送值的时候,withLatestFrom会取参数Observable的最新值和源Observable为最新值组合成数组,发送。
withLatestFrom和combineLatest的取值逻辑是一样的,不同之处在于触发发送值的时机:
- withLatestFrom发送值是时机是 源Observable发送新值的时候。
- combineLatest发送值的时机是 无论是 只要任意一个Observable发送值,combineLatest就发送值。
示例:
import { interval } from "rxjs";import { map, take, withLatestFrom } from "rxjs/operators";const source = interval(5000);const secondSource = interval(1000);const example = source.pipe(withLatestFrom(secondSource),map(([first, second]) => {return `First source (5s):${first},Second source (1s):${second}`;}),take(15));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
zip
zip操作符的作用是把所有的Observable发送值的第i个值,组合一个数组,向后发送。只要有一个Observable没有发送数据或已完成,zip就不会发送数据。
示例:
import { interval, zip } from "rxjs";import { take } from "rxjs/operators";const source = interval(1000);const example = zip(source, source.pipe(take(2)));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
条件操作符
defaultIfEmpty
defaultIfEmpty操作符的作用是:如果Observable在完成前没有发送任何值,那就就把defaultIfEmpty的参数发送出去。
示例:
import { interval, zip } from "rxjs";import { take } from "rxjs/operators";const source = interval(1000);const example = zip(source, source.pipe(take(2)));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
every
every是一个判定操作符,如果所有的值都符合条件,那么返回true,否则返回false。
示例:
import { of } from "rxjs";import { every } from "rxjs/operators";const source = of(2, 4, 6, 8, 10);const example = source.pipe(every((x) => x % 2 === 0));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
find
find操作符接收一个断言函数,find会返回断言函数为true的第一个值。到Observable完成时,还没有符合断言的值时,返回undefined。
示例:
import { from } from "rxjs";import { find } from "rxjs/operators";const arr: number[] = [3, 9, 15, 20];const source = from(arr);const example = source.pipe(find((x) => x % 5 === 0));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
findIndex
findIndex操作符接收一个断言函数,find会返回断言函数为true的第一个值的索引。到Observable完成时,还没有符合断言的值时,返回-1。
示例:
import { from } from "rxjs";import { findIndex } from "rxjs/operators";const arr: number[] = [3, 5, 15, 20];const source = from(arr);const example = source.pipe(findIndex((x) => x % 4 === 0));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
isEmpty
isEmpty是一个逻辑操作符,当在Observable在完成时没有发送值,isEmpty发送true,否则发送false.
示例:
import { EMPTY } from "rxjs";import { isEmpty } from "rxjs/operators";const result = EMPTY.pipe(isEmpty());const subscription = result.subscribe({next(x) {console.log(x);},});
创建操作符
from
from操作符可以从数组,字符串,map和Promise中创建Observable。
示例:
import { from } from "rxjs";const source = from(new Promise((resolve) => resolve("Hello World!")));source.subscribe({next(value) {console.log(value);},complete() {console.log("Complete!");},});
fromEvent
fromEvent可以从 DOM 事件中创建Observable,fromEvent的第一个参数是 DOM对象,第二个参数是事件对象,第三个参数是一个布尔值,表示是否使用捕获。
示例:
import { fromEvent } from 'rxjs';const clicksInDocument = fromEvent(document, 'click', true); // note optional configuration parameter// which will be passed to addEventListenerconst clicksInDiv = fromEvent(someDivInDocument, 'click');clicksInDocument.subscribe(() => console.log('document'));clicksInDiv.subscribe(() => console.log('div'));
interval
interval是一个按照一定的时间间隔发送数字序列的Observable。
示例:
import { interval } from 'rxjs';const source = interval(1000);const subscription = source.subscribe(val=>console.log(val));
of
of操作符的作用是把按顺序把任意对象自动转换成Observable;
示例:
import { of } from 'rxjs';const source = of({ name: 'Brian' }, [1, 2, 3], function hello() {return 'Hello';});const subscribe = source.subscribe(val => console.log(val));
range
range操作符的作用是按顺序发送指定区间内的值。
示例:
import { range } from 'rxjs';const source = range(1,3);source.subscribe(val => console.log(val));
timer
timer接收两个参数,第一个参数表示发送第一个值之前要延迟的时间,第二个参数表示在发送了第一个值之后,每隔多长时间再发送一个新的值。如果没有给定第二个参数,则只发送了第一个值之后就结束。
示例:
import { timer } from "rxjs";import { take } from "rxjs/operators";const source = timer(1000, 2000);const observer = {next<T>(x: T) {console.log(x);},complete() {},};const subscription = source.pipe(take(10)).subscribe(observer);
throwError
throwError操作符可以创建一个错误对象的Observable;
示例:
import { throwError } from 'rxjs';const source = throwError('This is an error!');// 输出: 'Error: This is an error!'const subscribe = source.subscribe({next: val => console.log(val),complete: () => console.log('Complete!'),error: val => console.log(`Error: ${val}`)});
错误处理操作符
catchError
catchError操作符主要用于在 rxjs 中捕获错误。catchError中一定要返回一个Observable。
示例:
import { from, of, timer } from "rxjs";import { catchError, mergeMap } from "rxjs/operators";const myBadPromise = () =>new Promise((resolve, reject) => reject("Rejected!"));const source = timer(1000);const example = source.pipe(mergeMap(() =>from(myBadPromise()).pipe(catchError((err) => of(`Bad Promise:${err}`)))));const subscription = example.subscribe({next(v) {console.log(v);},complete() {console.log("Complete!");},error(e) {console.error(e);},});
retry
retry操作符可以让程序在出错的情况下,重试n次(n是retry接收的参数)
import { interval, of, throwError } from "rxjs";import { mergeMap, retry } from "rxjs/operators";const source = interval(1000);const example = source.pipe(mergeMap((x) => {if (x > 5) {return throwError("Error!");}return of(x);}),retry(2));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},error(e) {console.log(e);},});
retryWhen
retryWhen操作符的作用是:当发生错误时,基于自定义的标准的重试Observable序列。retryWhen接收一函数,这个函数的参数是一个由Observable包裹的错误对象,函数的返回值应为一个Observable。
示例:
import { interval, timer } from "rxjs";import { delayWhen, map, retryWhen, tap } from "rxjs/operators";const source = interval(1000);const example = source.pipe(map((val) => {if (val > 5) {throw val;}return val;}),retryWhen((errors) => {return errors.pipe(tap((val) => console.log(`Value ${val} was to high!`)),delayWhen((val) => timer(val * 1000)));}));example.subscribe({next(value) {console.log(value);},complete() {console.log("Complete!");},error(e) {console.error(e);},});
转换操作符
buffer
buffer是缓存操作符,buffer接收一个Observable。buffer会缓存源Observable发送的值,缓存的值会在参数Observable发送值的时候以数组的形式发送出去。值的内容为上次发送值后的值到现在发送的值。
示例:
import { fromEvent, interval } from "rxjs";import { buffer } from "rxjs/operators";const myInterval = interval(1000);const bufferBy = fromEvent(document, "click");const example = myInterval.pipe(buffer(bufferBy));const subscription = example.subscribe({next(x) {console.log(`Buffered Values:${x}`);},});
bufferCount
bufferCount也是一个缓存操作符,boufferCount接收一个必选参数和一个可选参数,第一个参数表示每次需要缓存多少个值,第二个参数表示缓存中有几个值是新值,默认值是null,即缓存值中全部为新值。
示例:
没有第二个参数
import { interval } from "rxjs";import { bufferCount, take } from "rxjs/operators";const source = interval(1000);const example = source.pipe(bufferCount(3), take(10));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log(`Complete!`);},});
有第二个参数:
import { interval } from "rxjs";import { bufferCount, take } from "rxjs/operators";const source = interval(1000);const example = source.pipe(bufferCount(3, 1), take(10));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
bufferTime
bufferTime是一个和时间有关的缓存器,第一个参数表示的是要缓存多长时间内的数据,第二个参数(可选)是表示在前一个缓存器开启之后,多长时间开启下一个缓存器(默认是null,表示在前一个缓存器关闭后才开启下一下缓存器)。.
示例:
import { interval } from "rxjs";import { bufferTime, take } from "rxjs/operators";const source = interval(500);const example = source.pipe(bufferTime(2000, 1500), take(10));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
bufferToggle
bufferToggle接收两个参数,第一个参数是一个Observable,当这个Observable在发出值的时候,bufferToggle开始缓存值,第二个参数是一个返回Observable的函数,当返回的函数发出值的时候,bufferToggle停止缓存值。可以看出,bufferToggle是一个可以通过Observable来自定义缓存开关的操作符。
示例:
import { interval } from "rxjs";import { bufferToggle, take } from "rxjs/operators";const source = interval(1000);const start = interval(5000);const closing = (val: number) => {const period = 2000;console.log(`Value ${val} emitted, starting buffer! Closing in ${period / 1000}s!`);return interval(period);};const example = source.pipe(bufferToggle(start, closing), take(10));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
bufferWhen
bufferWhen接收一个返回Observable的函数作为参数,当这个Observable发送值的时候,bufferWhen立即停止缓存值,并发送已经缓存的值,同时开启下一个缓存。
示例:
import { interval } from "rxjs";import { bufferWhen, take } from "rxjs/operators";const source = interval(1000);const when = () => interval(5000);const example = source.pipe(bufferWhen(when), take(8));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
concatmap
将值映射成内部 observable,并按顺序订阅和发出。
示例:
import { of } from "rxjs";import { concatMap } from "rxjs/operators";const source = of("hello", "world");const examplePromise = (val: string) => new Promise((resolve) => resolve(val));const example = source.pipe(concatMap((val) => examplePromise(val)));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
concatMapTo
comcatMapTo和concatMap类型,只是concatMapTo不会接收Obsebale传递来的参数。
示例:
import { interval, of } from "rxjs";import { concatMapTo, delay, take } from "rxjs/operators";const source = interval(500).pipe(take(5));const fakeRequest = of("Network request complete").pipe(delay(3000));const example = source.pipe(concatMapTo(fakeRequest));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
exhaustMap
映射成内部 observable,忽略其他值直到该 observable 完成。
示例:
import { interval, merge, of } from "rxjs";import { delay, exhaustMap, take } from "rxjs/operators";const source = interval(1000);const delayed = source.pipe(delay(10), take(4));const example = merge(delayed, of(true)).pipe(exhaustMap((_) => source.pipe(take(5))));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
expand
expand可以递归调用内部的函数。
示例:
import { of } from "rxjs";import { expand, take } from "rxjs/operators";const source = of(2);const example = source.pipe(expand((val) => {console.log(`Cur:${val}`);return of(val * 2);}),take(6));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
groupBy
groupBy对Observable发送的值进行分组。
示例:
import { from } from "rxjs";import { groupBy, mergeMap, toArray } from "rxjs/operators";type People = { name: string; age: number };const people: People[] = [{ name: "Sue", age: 25 },{ name: "Joe", age: 30 },{ name: "Frank", age: 30 },{ name: "Sarah", age: 25 },];const source = from(people);const example = source.pipe(groupBy((person) => person.age),mergeMap((group) => group.pipe(toArray())));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
map
map操作符可以对源Observable的每个值应用投射函数。
示例:
import { from } from "rxjs";import { map } from "rxjs/operators";type Person = { name: string; age: number };const people: Person[] = [{ name: "Frank", age: 25 },{ name: "Joe", age: 30 },{ name: "Ryan", age: 24 },];const source = from(people);const example = source.pipe(map((x) => x.name));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
mapTo
mapTo可以把源Obervable发送的值映射成常量。
示例:
import { interval } from "rxjs";import { mapTo, take } from "rxjs/operators";const source = interval(1000);const example = source.pipe(mapTo("Hello World!"), take(10));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
mergeMap
mergeMap可以打平Observable。
示例:
import { of } from "rxjs";import { mergeMap } from "rxjs/operators";const source = of("Hello");const example = source.pipe(mergeMap((val) => of(`${val} world!`)));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
partition
parition可以根据断言把Observable分为两组。
示例:
import { from, merge, of, partition } from "rxjs";import { catchError, map } from "rxjs/operators";const arr: number[] = [1, 2, 3, 4, 5, 6];const source = from(arr);const example = source.pipe(map((val) => {if (val > 3) {throw `${val} greater than 3`;}return { success: true, value: val, error: null };}),catchError((val) => of({ error: val, value: null, success: false })));const [success, error] = partition(example, (val) => val.success);const subscription = merge(success.pipe(map((x) => {console.log(x.value);return x.value;})),error.pipe(map((x) => {console.error(x.error);return x.value;}))).subscribe({next(x) {console.log(x);},});
reduce
reduce是一个聚合操作符,它是将Observable的值归并为单个值,当Observable完成时再将这个值发出。
示例:
import { of } from "rxjs";import { reduce } from "rxjs/operators";const source = of(1, 2, 3, 4);const example = source.pipe(reduce((acc, value) => acc + value));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
scan
scan同样也是一个聚合操作符,它也是把Observable的值归并到一个值,但他会在Observable发送值之后就把当下的聚合值发送出去,也就是说Observable每发送一个值,scan就发生执行一次聚合,然后发送一个值。
示例:
import { of } from "rxjs";import { scan } from "rxjs/operators";const source = of(1, 2, 3);const example = source.pipe(scan((acc, value) => acc + value, 0));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
switchMap
swithcMap是一个打平操作符,在拥有众多的Observale传入switchMap的时候,switchMap会选择把其中的一个订阅的Observable发送,而取消对于其他Observable的订阅。
示例:
import { EMPTY, fromEvent, interval, merge } from "rxjs";import { mapTo, scan, startWith, switchMap, takeWhile } from "rxjs/operators";const countdownSeconds = 10;const setHtml = (id: string) => (val: string) =>(document.getElementById(id)!.innerHTML = val);const interval$ = interval(1000).pipe(mapTo(-1));const pause = document.getElementById("pause");const resume = document.getElementById("resume");const pause$ = fromEvent(pause!, "click").pipe(mapTo(false));const resume$ = fromEvent(resume!, "click").pipe(mapTo(true));const example = merge(pause$, resume$).pipe(startWith(true),switchMap((val) => (val ? interval$ : EMPTY)),scan((acc, value) => acc + value, countdownSeconds),takeWhile((v) => v >= 0));const subscription = example.subscribe({next(x) {setHtml("remaining")(`${x}`);},complete() {console.log("Complete");},});
window
window是一个具有分组能力的转换操作符。window接收一个Observable,当Observable发送出值的时候,window就把之前收集到的值,作为一个Observable发送出去,开发下一次的收集。也就是说,window可以根据接收的Observable把Observable分成多个Observable。
示例:
import { interval, timer } from "rxjs";import { mergeAll, scan, take, window } from "rxjs/operators";const source = timer(0, 1000);const example = source.pipe(window(interval(3000)), take(10));example.pipe(scan((acc) => acc + 1, 0)).subscribe({next(x) {console.log(`Window:${x}`);},complete() {console.log("Complete");},});const subscription = example.pipe(mergeAll()).subscribe({next(x) {console.log(x);},});
windowCount
windowCount和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowCount下一个收集的触发时机是收集的个数。
示例:
import { interval } from "rxjs";import { mergeAll, take, tap, windowCount } from "rxjs/operators";const source = interval(3000);const example = source.pipe(windowCount(3),tap(() => console.log(`Window!`)),mergeAll(),take(9));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
windowTime
windowTime和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowTime下一个收集的触发时机是到达指定的时间。
示例:
import { timer } from "rxjs";import { mergeAll, take, tap, windowTime } from "rxjs/operators";const source = timer(0, 1000);const example = source.pipe(windowTime(3000),tap(() => console.info(`WindowTime`)));const subscription = example.pipe(mergeAll(), take(10)).subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
windowWhen
windowWhen和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowWhen是在接收到的函数返回的Observable发送值的时候,关闭这次收集,开始下次收集。
示例:
import { interval, timer } from "rxjs";import { mergeAll, take, tap, windowWhen } from "rxjs/operators";const source = timer(0, 1000);const example = source.pipe(windowWhen(() => interval(3000)),tap(() => console.info("New Window!")));const subscription = example.pipe(mergeAll(), take(8)).subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
windowToggle
windowWhen和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowToggle有两个参数,第一个参数是一个Observable,第二个参数是一个返回Observable 的函数,当第一个参数的Observable发送值的时候,开发收集,当第二个参数返回的Observable发送值的时候,停止这次收集,并把收集的值作为Observable发送出去。
示例:
工具操作符
tap
tap是一个工具操作符,它的设计是用来在输入输出流不改变的情况下,执行一些副作用的代码。从而使得代码的其他部分的函数式更强。最常使用的就是调试代码。
示例:
import { of } from 'rxjs';import { tap, map } from 'rxjs/operators';of(Math.random()).pipe(tap(console.log),map(n => n > 0.5 ? 'big' : 'small')).subscribe(console.log);
delay
delay同样是一个工具操作符,它和tap一样,也能保证输入和输出的内容的一致性。但和tap不同的是,delay的作用是延迟源Observable发送的值的时间。delay可以接收一个Number类型的值,也可以接收一个Date类型的值。当参数是一个Number类型的值时,延迟时间的数值对应的毫秒数。当参数是一个Date类型的值时,延迟时间是到达指定的时间。
示例:
import { timer } from "rxjs";import { delay, take } from "rxjs/operators";const source = timer(0, 1000);const example = source.pipe(delay(500), take(8));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
delayWhen
delayWhen和delay一样是一个延迟Obserable发送值的工作操作符,但delayWhen延迟的时机取决于接收的参数中返回的Obserable发送值的时机。
示例:
import { interval } from "rxjs";import { delayWhen, take } from "rxjs/operators";const source = interval(800);const example = source.pipe(delayWhen((val) => interval(val * 1000)),take(8));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
dematerialize
dematerialize的作用是把RxJS中的ObservableNotification对象展开。
示例:
import { of } from "rxjs";import { dematerialize } from "rxjs/operators";import type { ObservableNotification } from "rxjs";const notifA: ObservableNotification<string> = { kind: "N", value: "A" };const notifB: ObservableNotification<string> = { kind: "N", value: "B" };const notifC: ObservableNotification<null> = {kind: "E",error: new TypeError("x.toUpperCase is not a function"),};const materialize = of(notifA, notifB, notifC);const example = materialize.pipe(dematerialize());const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},error(e) {console.log(e);},});
materialize
materialize的作用和RxJS的作用相反,是把一个值包装为ObservableNotification对象。
示例:
import { Observable, of } from "rxjs";import { map, materialize } from "rxjs/operators";const letters: Observable<any> = of("a", "b", 13, "c");const upperCase = letters.pipe(map((x) => x.toUpperCase()));const materialized = upperCase.pipe(materialize());const subscription = materialized.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
timeInterval
timeInterval操作符的作用是在Observable原有值的基础上添加一个interval字段,这个interval字段表示在两个Observable发送的两个值之间的时间间隔,而之前Observable的值使用value来包装。组成以下的格式的内容:{value:value,interval:interval}。
示例:
import { interval } from "rxjs";import { take, timeInterval } from "rxjs/operators";const seconds = interval(1000);const subscription = seconds.pipe(timeInterval(), take(8)).subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
timestamp
timestamp操作符的作用是把Observable发送的值进行包装,形成下面的格式:{value:value,timestamp:number}。这里的value就是之前Observable发送的值,而timestamp的值就是value发送时的时间戳。
示例:
import { interval } from "rxjs";import { take, timestamp } from "rxjs/operators";const source = interval(1000);const example = source.pipe(timestamp(), take(8));const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete!");},});
timeout
timeout操作符可以接收一个参数,这个参数是一个对象:{first?:number,each?:number,with?:(x:TimeoutInfo)=>Observable},这里的first值表示在发送第一个值之前最多经历的时间,如果在这个时间内没有发送值,就会执行with提供的函数,而在没有指定with的情况下就会报TimeoutError的错误。这里的each表示每两个值之间的最大时间间隔,如果大于这个时间间隔,也会执行with提供的函数,而在没有指定with的情况下就会报TimeoutError的错误。
示例:
import { interval } from "rxjs";import { take, timeInterval, timeout } from "rxjs/operators";const slow$ = interval(1000);const fast$ = interval(500);const example = slow$.pipe(timeout({first: 800,with: (val) => {console.log(val);return fast$;},}),timeInterval(),take(16));example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
TimeoutInfo:
export interface TimeoutInfo<T, M = unknown> {/** timeout配置中提供的可选meta项 */readonly meta: M;/** 在发生超时前,Observable已经发送的值的数量 */readonly seen: number;/** 在发生超时前,Observable发送的最后值 */readonly lastValue: T | null;}
toArray
toArray操作符会收集Observable发送的所有值,当Observable完成时,把收集到的值以数组的形式发送出去。
示例:
import { interval } from "rxjs";import { take, toArray } from "rxjs/operators";const source = interval(1000);const example = source.pipe(take(8), toArray());const subscription = example.subscribe({next(x) {console.log(x);},complete() {console.log("Complete");},});
observeOn
示例:
import { animationFrameScheduler, fromEvent, interval } from "rxjs";import { mergeMap, observeOn, startWith, take, tap } from "rxjs/operators";const someStart: HTMLElement = document.querySelector("#someStart")!;const someDiv: HTMLElement = document.querySelector("#someDiv")!;const intervals = interval(10);const example = fromEvent(someStart, "click").pipe(mergeMap(() => intervals.pipe(observeOn(animationFrameScheduler))),startWith(0),take(100));example.subscribe({next(x) {someDiv.style.height = x + "px";},complete() {console.log("Complete");},});
使用
测试
对于rxjs的测试,我们通常是使用弹珠测试来进行的。其已集成在了rxjs中。下面是和jest一起使用的示例:
import { from } form 'rxjs';import { TestScheduler } from 'rxjs/testing';describe('test example', ()=>{let testScheduler: TestScheduler;beforeEach(() => {testScheduler = new TestScheduler((actual, expected) => {expect(actual).toEqual(expected);})})it('should be example', ()=> {testScfheduler.run(({ expectObservable }) => {const expectedMarble = '(abc|)';const observable$ = from([1, 2, 3]);const expected = {a: 1, b: 2, c: 3};expectObservable(observable$).toBe(expectedMarble, expected);})})})
更多的测试请参考:[RxJS测试](https://www.yuque.com/silencezhpf/js/rxjs_test~)
