操作符
过滤操作符
Take
take接受一个输入流和一个数值 amount
。输出流为输入流的前 amount
个值。
详细情况是:
- 当
n<amount
时,值会被发送 -
TakeLast
takeLast接受一个输入流和一个数值
amount
。输出流为输入流的后amount
个值。
详细情况是: 当输入流完成后,输出流会
- 发送由输入流发送的后
amount
个值
- 发送由输入流发送的后
- 如果输入流永远不能完成,输出流将一直等待
TakeWhile
takeWhile接受一个输入流和一个断言函数predicate
,当断言函数返回true
时,将输入流的值发送出去,一旦断言函数返回false
,不再输出内容,并立即结束。TakeUntil
takeUntil接受一个输入流和一个截止流(这个命令是根据流的作用命令的)。在截止流发送值之前,输入流的所有内容都会被发送到输出流中,一旦截止流发送值。输出流不再接收来自输入流的内容,并且立即完成。
Skip
skip接受一个输入流和一个数值 n
,输出流为输入流前 n
个值之外的值。
SkipLast
skipLast接受一个输入流和一个数值 amount
,当输入流完成之后,输出流会接收最后 amount
之外的所有值。
详细情况是:
- 当输入流完成后,输出流会接收最后
amount
之外的值。 - 如果输入流永远不能完成,输出流将一真等待。
SkipWhile
skipWhile同样接受一个输入流和一个断言函数predicate
,当断言函数为true
时,将忽略输入流的值,一旦断言函数返回false
,将发送之后的值(无论断言函数返回的是true
还是false
)。输出流会和输入流一起结束。SkipUntil
skipUntil和takeUntil一样接受两个值,一个输入流和一个截止流,在截止流发送值之前,输出流会直接忽略输入流中的内容,一旦截止流发送内容,输出流将开始接收输入流中的值。
First
first接受一个输入流和一个断言函数 predicate
,输出流为断言函数返回 true
的第一个值。
Last
last 接受一个输入流和一个断言函数 predicate
,输出流为断言函数返回 true
的最后一个值。
ElementAt
elementAt接受两个必要参数输入流和索引值 index
和一个可选参数 defaultValue
。 defaultValue
的默认值为 undefined
。当输入流完成的时候,输出流会输出输入流中第 index
个值,如果不存在第 index
个值,则输出设置的 defaultValue
值(默认为 undefined
),同时输出流完成。
Filter
filter接受一个输入流和一个断言函数 predicate
,输出流了断言函数返回 true
的所有的值。
IgnoreElements
ignoreElements只接受一个输入流作为参数,在输出流中会忽略输入流中的所有内容,直接发送完成。
Distinct
distinct接受一个输入流,并可以接受两个可选参数,第一个可选参数为 Selector
,即以什么为基准来给输入流去重。默认是 undefined
。当 Selector
为 undefined
时,去重的基准为发送的值本身。当提供了一个 Selector
函数后,以函数的返回值为基准去重。
示例:
Without Selector
:
import { from } from "rxjs";
import { disctinct } from "rxjs/operators";
from([1,2,3,4,4,3,2,5])
.pipe(
distinct()
)
.subscripb(console.log); // 1,2,3,4,5
With Selector
:
import { from } from "rxjs";
import { distinct } from "rxjs/operators";
interface Person {
age: number,
name: string
}
from<Person[]>([
{age:4,name:"foo",},
{age:5,name:"bar"},
{age:6,name:"foo"}
])
.pipe(
distinct((p:Person)=>p.name)
)
.subscribe(console.log); // {age:4,name:"foo"} ,{age:5,name:"bar"}
DisctinctUntilChanged
disctinctUntilChanged接受一个输入流和一个可选参数 compare
函数。
disctinctUntilChanged 会根据比较函数的返回值是不是为 true
来判断,输出流是否要输出该值。
- 比较函数返回
true
,输出流不发送该值,将该值忽略。 - 比较函数返回
false
, 输出流发送该值。
在没有提供 compare
函数的情况下,会依照js 的规则做相等性比较。根据相等性比较的 true
和 false
来确定是否要输出。
示例1:
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe(
distinctUntilChanged(),
)
.subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4
示例2:
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
interface Person {
age: number,
name: string
}
of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'},
{ age: 6, name: 'Foo'},
).pipe(
distinctUntilChanged((p: Person, q: Person) => p.name === q.name),
)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
DistinctUntilKeyChanged
distinctUntilKeyChanged 和 distinctUntikChanged 相比,多了一个必选参数 key
。
示例1:
import { of } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';
interface Person {
age: number,
name: string
}
of<Person>(
{ age: 4, name: 'Foo'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo'},
{ age: 6, name: 'Foo'},
).pipe(
distinctUntilKeyChanged('name'),
)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }
示例2:
import { of } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';
interface Person {
age: number,
name: string
}
of<Person>(
{ age: 4, name: 'Foo1'},
{ age: 7, name: 'Bar'},
{ age: 5, name: 'Foo2'},
{ age: 6, name: 'Foo3'},
).pipe(
distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3)),
)
.subscribe(x => console.log(x));
// displays:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }
auditTime
auditTime接受一个输入流和一个 duration
参数。
当 auditTime
接收到一个源值的时候,auditTime会直接忽略该值以及指定 duration
时间内的所有值,然后向输出流发送出指定时间 duration
中的最新值,重复执行此过程。
示例:
const { interval } = Rx;
const { auditTime } = RxOperators;
interval(1000).pipe(auditTime(1100))
audit
audit接受一个输入流和一个 Observable
对象。audit
和 throttle
类似,但不同于 throttle
的是, audit
会发送时间窗口内的最新值,还不是第一次的值。在 audit
接收的 Observable
对象内部维护着一个内容的计时器(timer),当这个内部的计时器启用的时间, audit
会忽略输入流发送的值。在初始态下,这个内部的计时器是禁用的。当输入流的第一个值到达的时候,内部计时器启用。当 Observable
发送值或者完成的时候,内部计时器禁用,并将输入流的最新值发送给输出流。之后重复这个过程。
注意:在每次内部计时器禁用后,都会生成一个新的
Observable
示例:
const { interval,timer,pipe } = Rx;
const { audit,tap } = RxOperators;
function log(name=""){
return pipe(
tap(x=>console.log(`${name} ${x}`))
)
}
interval(1000)
.pipe(
audit(()=>timer(200,400).pipe(log("timer"))),
log("audit")
)
debounceTime
debounceTime接受一个输入流和一个 duration
值。
debounceTime在 duration
中只会发送最新值,在 duration
内如果输入流产生了多个值,那么除了最后的一个值被发送到输出流外,其他的值会直接被忽略。
示例:
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(debounceTime(1000));
result.subscribe(x => console.log(x));
debounce
debounce授受一个输入流和一个 Observable
对象。
在 Observable
发送值之前,debounce会一直忽略输入流发送的值,当 Observable
发送了新值,debounce会把输入流的最新值发送给输出流。
示例:
import { fromEvent, interval } from 'rxjs';
import { debounce } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(debounce(() => interval(1000)));
result.subscribe(x => console.log(x));
sampleTime
sampleTime接受一个输入流和一个时间间隔 duration
值。
sampleTime会有指定的时间间隔内对输入流进行采样,sampleTime会把采样中的最新的值发送给输出流,如果本次的采样值和上次发送给输出流的值没有改变,则不发送值。
示例:
import { fromEvent } from 'rxjs';
import { sampleTime } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(sampleTime(1000));
result.subscribe(x => console.log(x));
sample
sample接受一个输入流和一个 Observable
值。
sample会在 Observalbe
发送一个值或完成的时候对输入流进行采样,并把采样中的最新值发送给输出流(如果本次的采样值和上次的采样值相同,则忽略)。
示例:
import { fromEvent, interval } from 'rxjs';
import { sample } from 'rxjs/operators';
const seconds = interval(1000);
const clicks = fromEvent(document, 'click');
const result = seconds.pipe(sample(clicks));
result.subscribe(x => console.log(x));
single
single接受一个输入流和一个断言函数。
single会根据断言函数发送输入流中的唯一值,如果不存在这样的唯一值,那么就发送 undefined
值。如果存在多个符合断言函数的值,那么会抛出错误。
示例1:
import { range } from 'rxjs';
import { single } from 'rxjs/operators';
const numbers = range(1,5).pipe(single());
numbers.subscribe(x => console.log('never get called'), e => console.log('error'));
// result
// 'error'
示例2:
import { range } from 'rxjs';
import { single } from 'rxjs/operators';
const numbers = range(1,5).pipe(single());
numbers.subscribe(x => console.log('never get called'), e => console.log('error'));
// result
// 'error'
throttleTime
throttleTime接受一个输入流和一个时间间隔 duration
。
在时间间隔 duration
内,throttleTime只接收输入流的第一个值,其余的值会被忽略,直到下一个时间间隔开始,重复此过程。
示例1:
import { fromEvent } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(throttleTime(1000));
result.subscribe(x => console.log(x));
示例2:
import { fromEvent, asyncScheduler } from 'rxjs';
import { throttleTime, withLatestFrom } from 'rxjs/operators';
// defaultThottleConfig = { leading: true, trailing: false }
const throttleConfig = {
leading: false,
trailing: true
}
const click = fromEvent(document, 'click');
const doubleClick = click.pipe(
throttleTime(400, asyncScheduler, throttleConfig)
);
doubleClick.subscribe((throttleValue: Event) => {
console.log(`Double-clicked! Timestamp: ${throttleValue.timeStamp}`);
});
throttle
throttle接受一个输入流和另一个 Observable
对象。
throttle和throttleTime的区别在于,对于时间服间的控制不再使用固定的 毫秒作为依据。而是依据一个 Observable
对象。
示例:
import { fromEvent } from 'rxjs';
import { throttle } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(throttle(ev => interval(1000)));
result.subscribe(x => console.log(x));
组合操作符
combineLatest
combineLatest操作符是一个组合操作符,combineLatest操作符接受一个Observable数组,当数组中的所有Observable都发送值的时候,combineLatest操作符会收集每个Observable的最新值发送。只要数组中的任意一个Observable没有发送值,combineLatest就不会发送值。当数组中的所有Observable都完成之后,combineLatest完成。
combineLatest除了可以接收一个Observable数组外,还可以接收第二个resultSelector
参数。这个参数是一个函数,这个函数的参数是Observable数组发送的值,返回值是当前combineLatest发送的值。
示例1:
普通的combineLatest
:
import {timer,combineLatest} from 'rxjs';
const timerOne = timer(1000,4000);
const timerTwo = timer(2000,4000);
const timerThree = timer(3000,4000);
const combined = combineLatest([timerOne,timerTwo,timerThree]);
const subscription = combined.subscribe(x=>console.log(x));
示例2:
含有resultSelector
的combineLatest
:
import {timer,combineLatest} from 'rxjs';
import {take} from 'rxjs/operators';
const timerOne = timer(1000,4000).pipe(take(3));
const timerTwo = timer(2000,4000).pipe(take(3));
const timerThree = timer(3000,4000).pipe(take(2));
const combinedProject = combineLatest<number[],string>(
[timerOne,timerTwo,timerThree],
(one,two,three)=>{
return `Timer One Latest:${one},
Timer Two Latest:${two},
Timer Three Latest:${three}`
}
)
combinedProject.subscribe(x=>console.log(x));
concat
concat操作符接收一系列的Observable作为参数,concat会从第一个Observable开始把发送的值逐一发送出去,直到这个Observable完成,再发送下一个Observable发送的值。
示例:
import { concat, of } from "rxjs";
import { delay } from "rxjs/operators";
const sourceOne = of(1, 2, 3);
const sourceTwo = of(4, 5, 6);
const sourceThree = sourceOne.pipe(delay(3000));
const example = concat(sourceThree, sourceTwo);
example.subscribe((x) => console.log(`Example: Delayed source one:${x}`));
concatAll
concatAll操作符不接收参数。concatAll中作用是把流动到这里的高阶Observable打平为一阶的Observable。打平的方式就是把从第一个高阶的Observable开始,逐一和之后的Observable按顺序组合成一个Observable。
示例:
import { interval } from "rxjs";
import { concatAll, map } from "rxjs/operators";
const samplePromise = (val: number) => new Promise((resolve) => resolve(val));
const source = interval(2000);
const example = source.pipe(
map((x) => samplePromise(x)),
concatAll()
);
const subscription = example.subscribe({
next(value) {
console.log(`Example with Promise:${value}`);
},
});
forkJoin
forkJoin接收一个Observable数组,其作用是在Observable数组中的所有Observable都完成的时候,把Observable数组中的最新值合并发送。
示例:
import { forkJoin, interval, of } from "rxjs";
import { delay, take } from "rxjs/operators";
const myPromise = (val: string) =>
new Promise((resolve) =>
setTimeout(() => resolve(`Promise Resolved:${val}`), 5000)
);
const example = forkJoin([
of("hello"),
of("world").pipe(delay(1000)),
interval(1000).pipe(take(1)),
interval(1000).pipe(take(2)),
myPromise("Result"),
]);
const subscription = example.subscribe((value) => console.log(value));
merge
merge接收一系统的Observable,并把每个Observable发送的值即时的发送出去。
示例:
import { interval, merge } from "rxjs";
import { mapTo } from "rxjs/operators";
const first = interval(2500);
const second = interval(2000);
const third = interval(1000);
const fourth = interval(1500);
const example = merge(
first.pipe(mapTo("FIRST!")),
second.pipe(mapTo("SECOND!")),
third.pipe(mapTo("THIRD")),
fourth.pipe(mapTo("FOURTH"))
);
const subscription = example.subscribe({
next(value) {
console.log(value);
},
});
mergeAll
mergeAll操作符用于把接收到的高阶Observable打平。打平的方式是,当收到一个Observable的值的时候,就发送这个值。直到所有的Observable完成。另外,mergeAll还可以接收一个number
类型的参数,用来控制可以打平接收的几个Observable。
示例:
import { interval } from "rxjs";
import { delay, map, mergeAll, take } from "rxjs/operators";
const source = interval(500).pipe(take(5));
const example = source.pipe(
map((x) => source.pipe(delay(1000), take(3))),
mergeAll(2)
);
example.subscribe({
next(value) {
console.log(value);
},
complete() {
console.log("Complete!");
},
});
pairwise
pairwise操作符的作用是把前一个值和当前值组合成数组火发送。
示例:
import { interval } from "rxjs";
import { pairwise, take } from "rxjs/operators";
interval(1000)
.pipe(pairwise(), take(5))
.subscribe({
next(value) {
console.log(value);
},
});
race
race操作符是一个具有选择性的合并操作符,race操作符会选择高阶Observable中最先返回值的进行执行。
示例:
import { interval, race } from "rxjs";
import { mapTo, take } from "rxjs/operators";
const example = race(
interval(1000),
interval(500).pipe(mapTo("I won!")),
interval(1500)
).pipe(take(5));
example.subscribe({
next(value) {
console.log(value);
},
});
startWith
startWith操作符的作用是加Observable发送值之前,在Observable的前面添加指定的值。
示例:
import { interval } from "rxjs";
import { startWith, take } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(startWith(-3, -2, -1), take(10));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
});
withLatestFrom
withLatestFrom操作符接收一个Observable作为参数。withLatestFrom的作用是当源Observable发送值的时候,withLatestFrom会取参数Observable的最新值和源Observable为最新值组合成数组,发送。
withLatestFrom和combineLatest的取值逻辑是一样的,不同之处在于触发发送值的时机:
- withLatestFrom发送值是时机是 源Observable发送新值的时候。
- combineLatest发送值的时机是 无论是 只要任意一个Observable发送值,combineLatest就发送值。
示例:
import { interval } from "rxjs";
import { map, take, withLatestFrom } from "rxjs/operators";
const source = interval(5000);
const secondSource = interval(1000);
const example = source.pipe(
withLatestFrom(secondSource),
map(([first, second]) => {
return `First source (5s):${first},Second source (1s):${second}`;
}),
take(15)
);
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
zip
zip操作符的作用是把所有的Observable发送值的第i个值,组合一个数组,向后发送。只要有一个Observable没有发送数据或已完成,zip就不会发送数据。
示例:
import { interval, zip } from "rxjs";
import { take } from "rxjs/operators";
const source = interval(1000);
const example = zip(source, source.pipe(take(2)));
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
条件操作符
defaultIfEmpty
defaultIfEmpty操作符的作用是:如果Observable在完成前没有发送任何值,那就就把defaultIfEmpty的参数发送出去。
示例:
import { interval, zip } from "rxjs";
import { take } from "rxjs/operators";
const source = interval(1000);
const example = zip(source, source.pipe(take(2)));
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
every
every是一个判定操作符,如果所有的值都符合条件,那么返回true
,否则返回false
。
示例:
import { of } from "rxjs";
import { every } from "rxjs/operators";
const source = of(2, 4, 6, 8, 10);
const example = source.pipe(every((x) => x % 2 === 0));
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
find
find操作符接收一个断言函数,find会返回断言函数为true
的第一个值。到Observable完成时,还没有符合断言的值时,返回undefined
。
示例:
import { from } from "rxjs";
import { find } from "rxjs/operators";
const arr: number[] = [3, 9, 15, 20];
const source = from(arr);
const example = source.pipe(find((x) => x % 5 === 0));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
findIndex
findIndex操作符接收一个断言函数,find会返回断言函数为true
的第一个值的索引。到Observable完成时,还没有符合断言的值时,返回-1
。
示例:
import { from } from "rxjs";
import { findIndex } from "rxjs/operators";
const arr: number[] = [3, 5, 15, 20];
const source = from(arr);
const example = source.pipe(findIndex((x) => x % 4 === 0));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
isEmpty
isEmpty是一个逻辑操作符,当在Observable在完成时没有发送值,isEmpty发送true
,否则发送false
.
示例:
import { EMPTY } from "rxjs";
import { isEmpty } from "rxjs/operators";
const result = EMPTY.pipe(isEmpty());
const subscription = result.subscribe({
next(x) {
console.log(x);
},
});
创建操作符
from
from操作符可以从数组,字符串,map和Promise中创建Observable。
示例:
import { from } from "rxjs";
const source = from(new Promise((resolve) => resolve("Hello World!")));
source.subscribe({
next(value) {
console.log(value);
},
complete() {
console.log("Complete!");
},
});
fromEvent
fromEvent可以从 DOM 事件中创建Observable,fromEvent的第一个参数是 DOM对象,第二个参数是事件对象,第三个参数是一个布尔值,表示是否使用捕获。
示例:
import { fromEvent } from 'rxjs';
const clicksInDocument = fromEvent(document, 'click', true); // note optional configuration parameter
// which will be passed to addEventListener
const clicksInDiv = fromEvent(someDivInDocument, 'click');
clicksInDocument.subscribe(() => console.log('document'));
clicksInDiv.subscribe(() => console.log('div'));
interval
interval是一个按照一定的时间间隔发送数字序列的Observable。
示例:
import { interval } from 'rxjs';
const source = interval(1000);
const subscription = source.subscribe(val=>console.log(val));
of
of操作符的作用是把按顺序把任意对象自动转换成Observable;
示例:
import { of } from 'rxjs';
const source = of({ name: 'Brian' }, [1, 2, 3], function hello() {
return 'Hello';
});
const subscribe = source.subscribe(val => console.log(val));
range
range操作符的作用是按顺序发送指定区间内的值。
示例:
import { range } from 'rxjs';
const source = range(1,3);
source.subscribe(val => console.log(val));
timer
timer接收两个参数,第一个参数表示发送第一个值之前要延迟的时间,第二个参数表示在发送了第一个值之后,每隔多长时间再发送一个新的值。如果没有给定第二个参数,则只发送了第一个值之后就结束。
示例:
import { timer } from "rxjs";
import { take } from "rxjs/operators";
const source = timer(1000, 2000);
const observer = {
next<T>(x: T) {
console.log(x);
},
complete() {},
};
const subscription = source.pipe(take(10)).subscribe(observer);
throwError
throwError操作符可以创建一个错误对象的Observable;
示例:
import { throwError } from 'rxjs';
const source = throwError('This is an error!');
// 输出: 'Error: This is an error!'
const subscribe = source.subscribe({
next: val => console.log(val),
complete: () => console.log('Complete!'),
error: val => console.log(`Error: ${val}`)
});
错误处理操作符
catchError
catchError操作符主要用于在 rxjs 中捕获错误。catchError中一定要返回一个Observable。
示例:
import { from, of, timer } from "rxjs";
import { catchError, mergeMap } from "rxjs/operators";
const myBadPromise = () =>
new Promise((resolve, reject) => reject("Rejected!"));
const source = timer(1000);
const example = source.pipe(
mergeMap(() =>
from(myBadPromise()).pipe(catchError((err) => of(`Bad Promise:${err}`)))
)
);
const subscription = example.subscribe({
next(v) {
console.log(v);
},
complete() {
console.log("Complete!");
},
error(e) {
console.error(e);
},
});
retry
retry操作符可以让程序在出错的情况下,重试n次(n是retry接收的参数)
import { interval, of, throwError } from "rxjs";
import { mergeMap, retry } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(
mergeMap((x) => {
if (x > 5) {
return throwError("Error!");
}
return of(x);
}),
retry(2)
);
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
error(e) {
console.log(e);
},
});
retryWhen
retryWhen操作符的作用是:当发生错误时,基于自定义的标准的重试Observable序列。retryWhen接收一函数,这个函数的参数是一个由Observable包裹的错误对象,函数的返回值应为一个Observable。
示例:
import { interval, timer } from "rxjs";
import { delayWhen, map, retryWhen, tap } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(
map((val) => {
if (val > 5) {
throw val;
}
return val;
}),
retryWhen((errors) => {
return errors.pipe(
tap((val) => console.log(`Value ${val} was to high!`)),
delayWhen((val) => timer(val * 1000))
);
})
);
example.subscribe({
next(value) {
console.log(value);
},
complete() {
console.log("Complete!");
},
error(e) {
console.error(e);
},
});
转换操作符
buffer
buffer是缓存操作符,buffer接收一个Observable。buffer会缓存源Observable发送的值,缓存的值会在参数Observable发送值的时候以数组的形式发送出去。值的内容为上次发送值后的值到现在发送的值。
示例:
import { fromEvent, interval } from "rxjs";
import { buffer } from "rxjs/operators";
const myInterval = interval(1000);
const bufferBy = fromEvent(document, "click");
const example = myInterval.pipe(buffer(bufferBy));
const subscription = example.subscribe({
next(x) {
console.log(`Buffered Values:${x}`);
},
});
bufferCount
bufferCount也是一个缓存操作符,boufferCount接收一个必选参数和一个可选参数,第一个参数表示每次需要缓存多少个值,第二个参数表示缓存中有几个值是新值,默认值是null,即缓存值中全部为新值。
示例:
没有第二个参数
import { interval } from "rxjs";
import { bufferCount, take } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(bufferCount(3), take(10));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log(`Complete!`);
},
});
有第二个参数:
import { interval } from "rxjs";
import { bufferCount, take } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(bufferCount(3, 1), take(10));
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
bufferTime
bufferTime是一个和时间有关的缓存器,第一个参数表示的是要缓存多长时间内的数据,第二个参数(可选)是表示在前一个缓存器开启之后,多长时间开启下一个缓存器(默认是null,表示在前一个缓存器关闭后才开启下一下缓存器)。.
示例:
import { interval } from "rxjs";
import { bufferTime, take } from "rxjs/operators";
const source = interval(500);
const example = source.pipe(bufferTime(2000, 1500), take(10));
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
bufferToggle
bufferToggle接收两个参数,第一个参数是一个Observable,当这个Observable在发出值的时候,bufferToggle开始缓存值,第二个参数是一个返回Observable的函数,当返回的函数发出值的时候,bufferToggle停止缓存值。可以看出,bufferToggle是一个可以通过Observable来自定义缓存开关的操作符。
示例:
import { interval } from "rxjs";
import { bufferToggle, take } from "rxjs/operators";
const source = interval(1000);
const start = interval(5000);
const closing = (val: number) => {
const period = 2000;
console.log(
`Value ${val} emitted, starting buffer! Closing in ${period / 1000}s!`
);
return interval(period);
};
const example = source.pipe(bufferToggle(start, closing), take(10));
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
bufferWhen
bufferWhen接收一个返回Observable的函数作为参数,当这个Observable发送值的时候,bufferWhen立即停止缓存值,并发送已经缓存的值,同时开启下一个缓存。
示例:
import { interval } from "rxjs";
import { bufferWhen, take } from "rxjs/operators";
const source = interval(1000);
const when = () => interval(5000);
const example = source.pipe(bufferWhen(when), take(8));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
concatmap
将值映射成内部 observable,并按顺序订阅和发出。
示例:
import { of } from "rxjs";
import { concatMap } from "rxjs/operators";
const source = of("hello", "world");
const examplePromise = (val: string) => new Promise((resolve) => resolve(val));
const example = source.pipe(concatMap((val) => examplePromise(val)));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
concatMapTo
comcatMapTo和concatMap类型,只是concatMapTo不会接收Obsebale传递来的参数。
示例:
import { interval, of } from "rxjs";
import { concatMapTo, delay, take } from "rxjs/operators";
const source = interval(500).pipe(take(5));
const fakeRequest = of("Network request complete").pipe(delay(3000));
const example = source.pipe(concatMapTo(fakeRequest));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
exhaustMap
映射成内部 observable,忽略其他值直到该 observable 完成。
示例:
import { interval, merge, of } from "rxjs";
import { delay, exhaustMap, take } from "rxjs/operators";
const source = interval(1000);
const delayed = source.pipe(delay(10), take(4));
const example = merge(delayed, of(true)).pipe(
exhaustMap((_) => source.pipe(take(5)))
);
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
expand
expand可以递归调用内部的函数。
示例:
import { of } from "rxjs";
import { expand, take } from "rxjs/operators";
const source = of(2);
const example = source.pipe(
expand((val) => {
console.log(`Cur:${val}`);
return of(val * 2);
}),
take(6)
);
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
groupBy
groupBy对Observable发送的值进行分组。
示例:
import { from } from "rxjs";
import { groupBy, mergeMap, toArray } from "rxjs/operators";
type People = { name: string; age: number };
const people: People[] = [
{ name: "Sue", age: 25 },
{ name: "Joe", age: 30 },
{ name: "Frank", age: 30 },
{ name: "Sarah", age: 25 },
];
const source = from(people);
const example = source.pipe(
groupBy((person) => person.age),
mergeMap((group) => group.pipe(toArray()))
);
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
map
map操作符可以对源Observable的每个值应用投射函数。
示例:
import { from } from "rxjs";
import { map } from "rxjs/operators";
type Person = { name: string; age: number };
const people: Person[] = [
{ name: "Frank", age: 25 },
{ name: "Joe", age: 30 },
{ name: "Ryan", age: 24 },
];
const source = from(people);
const example = source.pipe(map((x) => x.name));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
mapTo
mapTo可以把源Obervable发送的值映射成常量。
示例:
import { interval } from "rxjs";
import { mapTo, take } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(mapTo("Hello World!"), take(10));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
mergeMap
mergeMap可以打平Observable。
示例:
import { of } from "rxjs";
import { mergeMap } from "rxjs/operators";
const source = of("Hello");
const example = source.pipe(mergeMap((val) => of(`${val} world!`)));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
partition
parition可以根据断言把Observable分为两组。
示例:
import { from, merge, of, partition } from "rxjs";
import { catchError, map } from "rxjs/operators";
const arr: number[] = [1, 2, 3, 4, 5, 6];
const source = from(arr);
const example = source.pipe(
map((val) => {
if (val > 3) {
throw `${val} greater than 3`;
}
return { success: true, value: val, error: null };
}),
catchError((val) => of({ error: val, value: null, success: false }))
);
const [success, error] = partition(example, (val) => val.success);
const subscription = merge(
success.pipe(
map((x) => {
console.log(x.value);
return x.value;
})
),
error.pipe(
map((x) => {
console.error(x.error);
return x.value;
})
)
).subscribe({
next(x) {
console.log(x);
},
});
reduce
reduce是一个聚合操作符,它是将Observable的值归并为单个值,当Observable完成时再将这个值发出。
示例:
import { of } from "rxjs";
import { reduce } from "rxjs/operators";
const source = of(1, 2, 3, 4);
const example = source.pipe(reduce((acc, value) => acc + value));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
scan
scan同样也是一个聚合操作符,它也是把Observable的值归并到一个值,但他会在Observable发送值之后就把当下的聚合值发送出去,也就是说Observable每发送一个值,scan就发生执行一次聚合,然后发送一个值。
示例:
import { of } from "rxjs";
import { scan } from "rxjs/operators";
const source = of(1, 2, 3);
const example = source.pipe(scan((acc, value) => acc + value, 0));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
switchMap
swithcMap是一个打平操作符,在拥有众多的Observale传入switchMap的时候,switchMap会选择把其中的一个订阅的Observable发送,而取消对于其他Observable的订阅。
示例:
import { EMPTY, fromEvent, interval, merge } from "rxjs";
import { mapTo, scan, startWith, switchMap, takeWhile } from "rxjs/operators";
const countdownSeconds = 10;
const setHtml = (id: string) => (val: string) =>
(document.getElementById(id)!.innerHTML = val);
const interval$ = interval(1000).pipe(mapTo(-1));
const pause = document.getElementById("pause");
const resume = document.getElementById("resume");
const pause$ = fromEvent(pause!, "click").pipe(mapTo(false));
const resume$ = fromEvent(resume!, "click").pipe(mapTo(true));
const example = merge(pause$, resume$).pipe(
startWith(true),
switchMap((val) => (val ? interval$ : EMPTY)),
scan((acc, value) => acc + value, countdownSeconds),
takeWhile((v) => v >= 0)
);
const subscription = example.subscribe({
next(x) {
setHtml("remaining")(`${x}`);
},
complete() {
console.log("Complete");
},
});
window
window是一个具有分组能力的转换操作符。window接收一个Observable,当Observable发送出值的时候,window就把之前收集到的值,作为一个Observable发送出去,开发下一次的收集。也就是说,window可以根据接收的Observable把Observable分成多个Observable。
示例:
import { interval, timer } from "rxjs";
import { mergeAll, scan, take, window } from "rxjs/operators";
const source = timer(0, 1000);
const example = source.pipe(window(interval(3000)), take(10));
example.pipe(scan((acc) => acc + 1, 0)).subscribe({
next(x) {
console.log(`Window:${x}`);
},
complete() {
console.log("Complete");
},
});
const subscription = example.pipe(mergeAll()).subscribe({
next(x) {
console.log(x);
},
});
windowCount
windowCount和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowCount下一个收集的触发时机是收集的个数。
示例:
import { interval } from "rxjs";
import { mergeAll, take, tap, windowCount } from "rxjs/operators";
const source = interval(3000);
const example = source.pipe(
windowCount(3),
tap(() => console.log(`Window!`)),
mergeAll(),
take(9)
);
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
windowTime
windowTime和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowTime下一个收集的触发时机是到达指定的时间。
示例:
import { timer } from "rxjs";
import { mergeAll, take, tap, windowTime } from "rxjs/operators";
const source = timer(0, 1000);
const example = source.pipe(
windowTime(3000),
tap(() => console.info(`WindowTime`))
);
const subscription = example.pipe(mergeAll(), take(10)).subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
windowWhen
windowWhen和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowWhen是在接收到的函数返回的Observable发送值的时候,关闭这次收集,开始下次收集。
示例:
import { interval, timer } from "rxjs";
import { mergeAll, take, tap, windowWhen } from "rxjs/operators";
const source = timer(0, 1000);
const example = source.pipe(
windowWhen(() => interval(3000)),
tap(() => console.info("New Window!"))
);
const subscription = example.pipe(mergeAll(), take(8)).subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
windowToggle
windowWhen和window具有类似的功能,只是window触发下一个收集的时间是接收的Observable发送值,而windowToggle有两个参数,第一个参数是一个Observable,第二个参数是一个返回Observable 的函数,当第一个参数的Observable发送值的时候,开发收集,当第二个参数返回的Observable发送值的时候,停止这次收集,并把收集的值作为Observable发送出去。
示例:
工具操作符
tap
tap是一个工具操作符,它的设计是用来在输入输出流不改变的情况下,执行一些副作用的代码。从而使得代码的其他部分的函数式更强。最常使用的就是调试代码。
示例:
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
of(Math.random()).pipe(
tap(console.log),
map(n => n > 0.5 ? 'big' : 'small')
).subscribe(console.log);
delay
delay同样是一个工具操作符,它和tap一样,也能保证输入和输出的内容的一致性。但和tap不同的是,delay的作用是延迟源Observable发送的值的时间。delay可以接收一个Number
类型的值,也可以接收一个Date
类型的值。当参数是一个Number
类型的值时,延迟时间的数值对应的毫秒数。当参数是一个Date
类型的值时,延迟时间是到达指定的时间。
示例:
import { timer } from "rxjs";
import { delay, take } from "rxjs/operators";
const source = timer(0, 1000);
const example = source.pipe(delay(500), take(8));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
delayWhen
delayWhen和delay一样是一个延迟Obserable发送值的工作操作符,但delayWhen延迟的时机取决于接收的参数中返回的Obserable发送值的时机。
示例:
import { interval } from "rxjs";
import { delayWhen, take } from "rxjs/operators";
const source = interval(800);
const example = source.pipe(
delayWhen((val) => interval(val * 1000)),
take(8)
);
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
dematerialize
dematerialize的作用是把RxJS中的ObservableNotification
对象展开。
示例:
import { of } from "rxjs";
import { dematerialize } from "rxjs/operators";
import type { ObservableNotification } from "rxjs";
const notifA: ObservableNotification<string> = { kind: "N", value: "A" };
const notifB: ObservableNotification<string> = { kind: "N", value: "B" };
const notifC: ObservableNotification<null> = {
kind: "E",
error: new TypeError("x.toUpperCase is not a function"),
};
const materialize = of(notifA, notifB, notifC);
const example = materialize.pipe(dematerialize());
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
error(e) {
console.log(e);
},
});
materialize
materialize的作用和RxJS的作用相反,是把一个值包装为ObservableNotification
对象。
示例:
import { Observable, of } from "rxjs";
import { map, materialize } from "rxjs/operators";
const letters: Observable<any> = of("a", "b", 13, "c");
const upperCase = letters.pipe(map((x) => x.toUpperCase()));
const materialized = upperCase.pipe(materialize());
const subscription = materialized.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
timeInterval
timeInterval操作符的作用是在Observable原有值的基础上添加一个interval
字段,这个interval
字段表示在两个Observable发送的两个值之间的时间间隔,而之前Observable的值使用value
来包装。组成以下的格式的内容:{value:value,interval:interval}
。
示例:
import { interval } from "rxjs";
import { take, timeInterval } from "rxjs/operators";
const seconds = interval(1000);
const subscription = seconds.pipe(timeInterval(), take(8)).subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
timestamp
timestamp操作符的作用是把Observable发送的值进行包装,形成下面的格式:{value:value,timestamp:number}
。这里的value就是之前Observable发送的值,而timestamp的值就是value发送时的时间戳。
示例:
import { interval } from "rxjs";
import { take, timestamp } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(timestamp(), take(8));
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete!");
},
});
timeout
timeout操作符可以接收一个参数,这个参数是一个对象:{first?:number,each?:number,with?:(x:TimeoutInfo)=>Observable}
,这里的first
值表示在发送第一个值之前最多经历的时间,如果在这个时间内没有发送值,就会执行with
提供的函数,而在没有指定with
的情况下就会报TimeoutError
的错误。这里的each
表示每两个值之间的最大时间间隔,如果大于这个时间间隔,也会执行with
提供的函数,而在没有指定with
的情况下就会报TimeoutError
的错误。
示例:
import { interval } from "rxjs";
import { take, timeInterval, timeout } from "rxjs/operators";
const slow$ = interval(1000);
const fast$ = interval(500);
const example = slow$.pipe(
timeout({
first: 800,
with: (val) => {
console.log(val);
return fast$;
},
}),
timeInterval(),
take(16)
);
example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
TimeoutInfo:
export interface TimeoutInfo<T, M = unknown> {
/** timeout配置中提供的可选meta项 */
readonly meta: M;
/** 在发生超时前,Observable已经发送的值的数量 */
readonly seen: number;
/** 在发生超时前,Observable发送的最后值 */
readonly lastValue: T | null;
}
toArray
toArray操作符会收集Observable发送的所有值,当Observable完成时,把收集到的值以数组的形式发送出去。
示例:
import { interval } from "rxjs";
import { take, toArray } from "rxjs/operators";
const source = interval(1000);
const example = source.pipe(take(8), toArray());
const subscription = example.subscribe({
next(x) {
console.log(x);
},
complete() {
console.log("Complete");
},
});
observeOn
示例:
import { animationFrameScheduler, fromEvent, interval } from "rxjs";
import { mergeMap, observeOn, startWith, take, tap } from "rxjs/operators";
const someStart: HTMLElement = document.querySelector("#someStart")!;
const someDiv: HTMLElement = document.querySelector("#someDiv")!;
const intervals = interval(10);
const example = fromEvent(someStart, "click").pipe(
mergeMap(() => intervals.pipe(observeOn(animationFrameScheduler))),
startWith(0),
take(100)
);
example.subscribe({
next(x) {
someDiv.style.height = x + "px";
},
complete() {
console.log("Complete");
},
});
使用
测试
对于rxjs
的测试,我们通常是使用弹珠测试来进行的。其已集成在了rxjs
中。下面是和jest
一起使用的示例:
import { from } form 'rxjs';
import { TestScheduler } from 'rxjs/testing';
describe('test example', ()=>{
let testScheduler: TestScheduler;
beforeEach(() => {
testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
})
})
it('should be example', ()=> {
testScfheduler.run(({ expectObservable }) => {
const expectedMarble = '(abc|)';
const observable$ = from([1, 2, 3]);
const expected = {a: 1, b: 2, c: 3};
expectObservable(observable$).toBe(expectedMarble, expected);
})
})
})
更多的测试请参考:[RxJS测试](https://www.yuque.com/silencezhpf/js/rxjs_test~)