RxJS

RxJS是一个库,它通过使用observable序列来编写异步和基于事件的程序。它提供了一个核心类型Observable,附属类型(Observer、Schedulers、Subjects)和受[Array#extras]启发的操作符(map、reduce、filter、every等),这些数组操作符可以把异步事件作为集合来处理。

RxJS是ReactiveX变成理念的JS版本。是一种针对异步数据流的编程。

它将一切数据,包括HTTP请求、DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

未使用RxJS时获取异步数据的方式

通过回调函数获取

  1. 新增一个request服务 ```typescript import { Injectable } from ‘@angular/core’;

@Injectable({ providedIn: ‘root’ }) export class RequestService {

constructor() { }

  1. // 外部通过同步方式直接获取返回的数据

getData() { return ‘this is service data’; }

// 外部通过给getCallbackData传入回调函数获取name变量 getCallbackData(callbackFunction) { setTimeout(() => { let name = ‘张三’; callbackFunction(name); }, 1000); } }

  1. 2.
  2. app.module.ts中加入该服务
  3. 3.
  4. testrequest组件中调用request服务的相关方法
  5. ```typescript
  6. import { Component, OnInit } from '@angular/core';
  7. import { RequestService } from '../../services/request.service';
  8. @Component({
  9. selector: 'app-testreqesut',
  10. templateUrl: './testreqesut.component.html',
  11. styleUrls: ['./testreqesut.component.less']
  12. })
  13. export class TestreqesutComponent implements OnInit {
  14. // 注入服务
  15. constructor(public requestService:RequestService) { }
  16. ngOnInit(): void {
  17. // 同步方法直接调用
  18. let requestData = this.requestService.getData();
  19. console.log(requestData);
  20. // 通过回调函数获取getCallbackData方法的name变量
  21. this.requestService.getCallbackData((data) => {
  22. console.log(data);
  23. });
  24. }
  25. }

通过Promise获取

  1. 修改request服务 ```typescript import { Injectable } from ‘@angular/core’;

@Injectable({ providedIn: ‘root’ }) export class RequestService {

constructor() { }

getData() { return ‘this is service data’; }

// 返回一个Promise对象,外部通过getCallbackData().then(resolve(name))进行调用 getCallbackData() { return new Promise((resolve) => { setTimeout(() => { let name = ‘张三—-Promise’; resolve(name); }, 1000); });

} }

  1. 2.
  2. 修改testrequest组件
  3. ```typescript
  4. import { Component, OnInit } from '@angular/core';
  5. import { RequestService } from '../../services/request.service';
  6. @Component({
  7. selector: 'app-testreqesut',
  8. templateUrl: './testreqesut.component.html',
  9. styleUrls: ['./testreqesut.component.less']
  10. })
  11. export class TestreqesutComponent implements OnInit {
  12. constructor(public requestService:RequestService) { }
  13. ngOnInit(): void {
  14. let requestData = this.requestService.getData();
  15. console.log(requestData);
  16. // 通过Promise的then方法进行调用
  17. this.requestService.getCallbackData().then((data) => {
  18. console.log(data);
  19. });
  20. }
  21. }

RxJS获取异步数据

写法类似Promise。

ES6的异步对象类型为Promise < == > RxJS中的异步对象类型为Observable;

ES6的Promise返回成功数据为resolve(数据) < == > RxJS中Observable返回成功数据为observe.next(数据);

ES6的Promise返回失败数据为reject(数据) < == > RxJS中Observable返回失败数据为observe.error(数据);

ES6中Promise的调用为then方法 < == > RxJS中Observable的调用为subscribe方法;

RxJS可以中途撤回、发射多个值、提供了多种工具函数等

获取异步数据

  1. 修改request服务 ```typescript import { Injectable } from ‘@angular/core’;

// 引入rsjs的Observable // Angular中已经自带了RxJS,无需另外安装 import { Observable } from ‘rxjs’

@Injectable({ providedIn: ‘root’ }) export class RequestService {

constructor() { }

getData() { return ‘this is service data’; }

// 返回一个Promise对象,外部通过getCallbackData().then(回调函数(name))进行调用 getCallbackData() { return new Promise((resolve, reject) => { setTimeout(() => { let name = ‘张三—-Promise’; resolve(name); // 失败时调用 reject(‘失败了’); }, 1000); }); }

  1. // 返回一个Observable对象,外部通过getRxjsData().subscribe(回调函数(name)) 进行调用

getRxjsData() { return new Observable((observe) => { setTimeout(() => { let name = ‘张三—-observe’; observe.next(name); // 失败时调用:observe.error(‘失败了’); }, 1000); }); } }

  1. 2.
  2. 修改testrequest组件
  3. ```typescript
  4. import { Component, OnInit } from '@angular/core';
  5. import { RequestService } from '../../services/request.service';
  6. @Component({
  7. selector: 'app-testreqesut',
  8. templateUrl: './testreqesut.component.html',
  9. styleUrls: ['./testreqesut.component.less']
  10. })
  11. export class TestreqesutComponent implements OnInit {
  12. constructor(public requestService:RequestService) { }
  13. ngOnInit(): void {
  14. let requestData = this.requestService.getData();
  15. console.log(requestData);
  16. // 通过Promise的then方法进行调用
  17. this.requestService.getCallbackData().then((data) => {
  18. console.log(data);
  19. });
  20. // 通过Observable的subscribe方法调用
  21. this.requestService.getRxjsData().subscribe((data) => {
  22. console.log(data);
  23. });
  24. }
  25. }

撤回订阅(中途撤回)

Promise创建之后,动作时无法撤回的。Observable不一样,动作可以通过unsubscribe()方法进行中途撤回

  1. 修改request服务,setTimeOut时间修改为3秒 ```typescript import { Injectable } from ‘@angular/core’; import { Observable } from ‘rxjs’

@Injectable({ providedIn: ‘root’ }) export class RequestService {

constructor() { }

  1. // 设置为3秒后执行

getRxjsData() { return new Observable((observe) => { setTimeout(() => { let name = ‘张三—-observe’; observe.next(name); }, 3000); }); } }

  1. 2.
  2. 修改testrequest组件,设置为获取异步对象之后1秒钟(request3秒方法还未执行)便取消订阅
  3. ```typescript
  4. import { Component, OnInit } from '@angular/core';
  5. import { RequestService } from '../../services/request.service';
  6. @Component({
  7. selector: 'app-testreqesut',
  8. templateUrl: './testreqesut.component.html',
  9. styleUrls: ['./testreqesut.component.less']
  10. })
  11. export class TestreqesutComponent implements OnInit {
  12. constructor(public requestService:RequestService) { }
  13. ngOnInit(): void {
  14. // 将subscribe方法的返回结果赋值给sub变量
  15. let sub = this.requestService.getRxjsData().subscribe((data) => {
  16. console.log(data);
  17. });
  18. // 设置1秒钟后,sub变量调用unsubscribe取消订阅
  19. setTimeout(() => {
  20. sub.unsubscribe();
  21. }, 1000);
  22. }
  23. }

订阅后多次执行

Promise不能多次执行,对于Promise来说,最终结果要么resolve(兑现),要么reject(拒绝),而且都只能触发一次,同一个Promise对象上多次调用resolve方法会抛出异常。

Observable可以不断地触发下一个值,就像next()这个方法名字所暗示的一样。

  1. 修改request服务,setTimeout改成setInterval(),实现多次执行observe.next方法 ```typescript import { Injectable } from ‘@angular/core’;

import { Observable } from ‘rxjs’

@Injectable({ providedIn: ‘root’ }) export class RequestService {

constructor() { }

  1. // setTimeOut改成setInterval,多次执行observe.next

getRxjsIntervalData() { let count = 0; return new Observable((observe) => { setInterval(() => { count++; observe.next(count); }, 3000); }); } }

  1. 2.
  2. 修改testrequest组件,调用该服务
  3. ```typescript
  4. import { Component, OnInit } from '@angular/core';
  5. import { RequestService } from '../../services/request.service';
  6. @Component({
  7. selector: 'app-testreqesut',
  8. templateUrl: './testreqesut.component.html',
  9. styleUrls: ['./testreqesut.component.less']
  10. })
  11. export class TestreqesutComponent implements OnInit {
  12. constructor(public requestService:RequestService) { }
  13. ngOnInit(): void {
  14. this.requestService.getRxjsIntervalData().subscribe((data) => {
  15. console.log(data);
  16. });
  17. }
  18. }

RxJS工具函数

Angular6之后使用以前的RxJS方法,必须安装rxjs-compat模块才可以使用map、filter方法。

Angular6之后官方使用的是RxJS6的新特性,所以官方给出了一个暂时延缓不需要修改rxjs代码的方法:

  1. npm install rxjs-compat

本地使用Angular8、9,不需要另外安装rxjs-compat

示例代码:

  1. 编写request服务 ```typescript import { Injectable } from ‘@angular/core’; import { Observable } from ‘rxjs’;

// 加入需要用到的rxjs的工具函数操作符,例如map、filter等 import { filter,map } from ‘rxjs/operators’;

@Injectable({ providedIn: ‘root’ }) export class RequestService {

constructor() { }

  1. // 每隔2秒输出一个数,持续输出

getRxjsIntervalData() { let count = 0; return new Observable((observe) => { setInterval(() => { count++; observe.next(count); }, 2000); }); } }

  1. 2.
  2. 编写testrequest组件
  3. ```typescript
  4. import { Component, OnInit } from '@angular/core';
  5. import { RequestService } from '../../services/request.service';
  6. import { filter, map } from 'rxjs/operators';
  7. @Component({
  8. selector: 'app-testreqesut',
  9. templateUrl: './testreqesut.component.html',
  10. styleUrls: ['./testreqesut.component.less']
  11. })
  12. export class TestreqesutComponent implements OnInit {
  13. constructor(public requestService:RequestService) { }
  14. ngOnInit(): void {
  15. // 获取observe.next返回的操作流
  16. let stream = this.requestService.getRxjsIntervalData();
  17. // 为流添加pipe管道,在管道内加入map、filter方法
  18. stream
  19. .pipe(
  20. // 仅输出偶数
  21. filter((value:number) => {
  22. if(value%2 == 0) {
  23. return true;
  24. }
  25. }),
  26. // 将value转换为value的平方进行输出
  27. map((value:number) => value**2)
  28. )
  29. .subscribe((data) => {
  30. console.log(data);
  31. });
  32. }
  33. }