RxJS
RxJS是一个库,它通过使用observable序列来编写异步和基于事件的程序。它提供了一个核心类型Observable,附属类型(Observer、Schedulers、Subjects)和受[Array#extras]启发的操作符(map、reduce、filter、every等),这些数组操作符可以把异步事件作为集合来处理。
RxJS是ReactiveX变成理念的JS版本。是一种针对异步数据流的编程。
它将一切数据,包括HTTP请求、DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。
未使用RxJS时获取异步数据的方式
通过回调函数获取
- 新增一个request服务 ```typescript import { Injectable } from ‘@angular/core’;
@Injectable({ providedIn: ‘root’ }) export class RequestService {
constructor() { }
// 外部通过同步方式直接获取返回的数据
getData() { return ‘this is service data’; }
// 外部通过给getCallbackData传入回调函数获取name变量 getCallbackData(callbackFunction) { setTimeout(() => { let name = ‘张三’; callbackFunction(name); }, 1000); } }
2.
在app.module.ts中加入该服务
3.
在testrequest组件中调用request服务的相关方法
```typescript
import { Component, OnInit } from '@angular/core';
import { RequestService } from '../../services/request.service';
@Component({
selector: 'app-testreqesut',
templateUrl: './testreqesut.component.html',
styleUrls: ['./testreqesut.component.less']
})
export class TestreqesutComponent implements OnInit {
// 注入服务
constructor(public requestService:RequestService) { }
ngOnInit(): void {
// 同步方法直接调用
let requestData = this.requestService.getData();
console.log(requestData);
// 通过回调函数获取getCallbackData方法的name变量
this.requestService.getCallbackData((data) => {
console.log(data);
});
}
}
通过Promise获取
- 修改request服务 ```typescript import { Injectable } from ‘@angular/core’;
@Injectable({ providedIn: ‘root’ }) export class RequestService {
constructor() { }
getData() { return ‘this is service data’; }
// 返回一个Promise对象,外部通过getCallbackData().then(resolve(name))进行调用 getCallbackData() { return new Promise((resolve) => { setTimeout(() => { let name = ‘张三—-Promise’; resolve(name); }, 1000); });
} }
2.
修改testrequest组件
```typescript
import { Component, OnInit } from '@angular/core';
import { RequestService } from '../../services/request.service';
@Component({
selector: 'app-testreqesut',
templateUrl: './testreqesut.component.html',
styleUrls: ['./testreqesut.component.less']
})
export class TestreqesutComponent implements OnInit {
constructor(public requestService:RequestService) { }
ngOnInit(): void {
let requestData = this.requestService.getData();
console.log(requestData);
// 通过Promise的then方法进行调用
this.requestService.getCallbackData().then((data) => {
console.log(data);
});
}
}
RxJS获取异步数据
写法类似Promise。
ES6的异步对象类型为Promise < == > RxJS中的异步对象类型为Observable;
ES6的Promise返回成功数据为resolve(数据) < == > RxJS中Observable返回成功数据为observe.next(数据);
ES6的Promise返回失败数据为reject(数据) < == > RxJS中Observable返回失败数据为observe.error(数据);
ES6中Promise的调用为then方法 < == > RxJS中Observable的调用为subscribe方法;
RxJS可以中途撤回、发射多个值、提供了多种工具函数等
获取异步数据
- 修改request服务 ```typescript import { Injectable } from ‘@angular/core’;
// 引入rsjs的Observable // Angular中已经自带了RxJS,无需另外安装 import { Observable } from ‘rxjs’
@Injectable({ providedIn: ‘root’ }) export class RequestService {
constructor() { }
getData() { return ‘this is service data’; }
// 返回一个Promise对象,外部通过getCallbackData().then(回调函数(name))进行调用 getCallbackData() { return new Promise((resolve, reject) => { setTimeout(() => { let name = ‘张三—-Promise’; resolve(name); // 失败时调用 reject(‘失败了’); }, 1000); }); }
// 返回一个Observable对象,外部通过getRxjsData().subscribe(回调函数(name)) 进行调用
getRxjsData() { return new Observable((observe) => { setTimeout(() => { let name = ‘张三—-observe’; observe.next(name); // 失败时调用:observe.error(‘失败了’); }, 1000); }); } }
2.
修改testrequest组件
```typescript
import { Component, OnInit } from '@angular/core';
import { RequestService } from '../../services/request.service';
@Component({
selector: 'app-testreqesut',
templateUrl: './testreqesut.component.html',
styleUrls: ['./testreqesut.component.less']
})
export class TestreqesutComponent implements OnInit {
constructor(public requestService:RequestService) { }
ngOnInit(): void {
let requestData = this.requestService.getData();
console.log(requestData);
// 通过Promise的then方法进行调用
this.requestService.getCallbackData().then((data) => {
console.log(data);
});
// 通过Observable的subscribe方法调用
this.requestService.getRxjsData().subscribe((data) => {
console.log(data);
});
}
}
撤回订阅(中途撤回)
Promise创建之后,动作时无法撤回的。Observable不一样,动作可以通过unsubscribe()方法进行中途撤回
- 修改request服务,setTimeOut时间修改为3秒 ```typescript import { Injectable } from ‘@angular/core’; import { Observable } from ‘rxjs’
@Injectable({ providedIn: ‘root’ }) export class RequestService {
constructor() { }
// 设置为3秒后执行
getRxjsData() { return new Observable((observe) => { setTimeout(() => { let name = ‘张三—-observe’; observe.next(name); }, 3000); }); } }
2.
修改testrequest组件,设置为获取异步对象之后1秒钟(request的3秒方法还未执行)便取消订阅
```typescript
import { Component, OnInit } from '@angular/core';
import { RequestService } from '../../services/request.service';
@Component({
selector: 'app-testreqesut',
templateUrl: './testreqesut.component.html',
styleUrls: ['./testreqesut.component.less']
})
export class TestreqesutComponent implements OnInit {
constructor(public requestService:RequestService) { }
ngOnInit(): void {
// 将subscribe方法的返回结果赋值给sub变量
let sub = this.requestService.getRxjsData().subscribe((data) => {
console.log(data);
});
// 设置1秒钟后,sub变量调用unsubscribe取消订阅
setTimeout(() => {
sub.unsubscribe();
}, 1000);
}
}
订阅后多次执行
Promise不能多次执行,对于Promise来说,最终结果要么resolve(兑现),要么reject(拒绝),而且都只能触发一次,同一个Promise对象上多次调用resolve方法会抛出异常。
Observable可以不断地触发下一个值,就像next()这个方法名字所暗示的一样。
- 修改request服务,setTimeout改成setInterval(),实现多次执行observe.next方法 ```typescript import { Injectable } from ‘@angular/core’;
import { Observable } from ‘rxjs’
@Injectable({ providedIn: ‘root’ }) export class RequestService {
constructor() { }
// setTimeOut改成setInterval,多次执行observe.next
getRxjsIntervalData() { let count = 0; return new Observable((observe) => { setInterval(() => { count++; observe.next(count); }, 3000); }); } }
2.
修改testrequest组件,调用该服务
```typescript
import { Component, OnInit } from '@angular/core';
import { RequestService } from '../../services/request.service';
@Component({
selector: 'app-testreqesut',
templateUrl: './testreqesut.component.html',
styleUrls: ['./testreqesut.component.less']
})
export class TestreqesutComponent implements OnInit {
constructor(public requestService:RequestService) { }
ngOnInit(): void {
this.requestService.getRxjsIntervalData().subscribe((data) => {
console.log(data);
});
}
}
RxJS工具函数
Angular6之后使用以前的RxJS方法,必须安装rxjs-compat模块才可以使用map、filter方法。
Angular6之后官方使用的是RxJS6的新特性,所以官方给出了一个暂时延缓不需要修改rxjs代码的方法:
npm install rxjs-compat
本地使用Angular8、9,不需要另外安装rxjs-compat
示例代码:
- 编写request服务 ```typescript import { Injectable } from ‘@angular/core’; import { Observable } from ‘rxjs’;
// 加入需要用到的rxjs的工具函数操作符,例如map、filter等 import { filter,map } from ‘rxjs/operators’;
@Injectable({ providedIn: ‘root’ }) export class RequestService {
constructor() { }
// 每隔2秒输出一个数,持续输出
getRxjsIntervalData() { let count = 0; return new Observable((observe) => { setInterval(() => { count++; observe.next(count); }, 2000); }); } }
2.
编写testrequest组件
```typescript
import { Component, OnInit } from '@angular/core';
import { RequestService } from '../../services/request.service';
import { filter, map } from 'rxjs/operators';
@Component({
selector: 'app-testreqesut',
templateUrl: './testreqesut.component.html',
styleUrls: ['./testreqesut.component.less']
})
export class TestreqesutComponent implements OnInit {
constructor(public requestService:RequestService) { }
ngOnInit(): void {
// 获取observe.next返回的操作流
let stream = this.requestService.getRxjsIntervalData();
// 为流添加pipe管道,在管道内加入map、filter方法
stream
.pipe(
// 仅输出偶数
filter((value:number) => {
if(value%2 == 0) {
return true;
}
}),
// 将value转换为value的平方进行输出
map((value:number) => value**2)
)
.subscribe((data) => {
console.log(data);
});
}
}