try/catch对于异步处理派不上用场(虽然能用 calblack 或 promise 的方式来处理,仍旧存在诸多缺点),RxJS要求用一种全新的思维方式来处理异常错误处理。
1. 异常处理不可避免
异常通常来自不可预期的外部环境变化,是真实应用开发中必须考虑的方面。
2. 异常处理的难点
2.1 try/catch只支持同步运算
下面例子中,同步的catch中是捕捉不到setTimeout中异步抛出的error的。
const invalidJsonString = 'Not Found';try {setTimeout(() => {JSON.parse(invalidJsonString);}, 1000);} catch (error) {console.log('catch error');}
2.2 回调函数的局限
可以通过传递一个回调函数,在会发生的位置捕错误,并且进行处理。它的问题是,让代码可读性差,一旦有多个异步调用,相互之间有依赖关系,就会出现嵌套地狱的问题。
const invalidJsonString = 'Not Found';const delayParse = (jsonString, callback) => {setTimeout(() => {try {const result = JSON.parse(jsonString);callback(null, result);} catch (error) {callback(error);}}, 1000);};delayParse(invalidJsonString, (error, result) => {if (error) {console.log('catch error: ', error);return;}console.log(result);});
2.3 Promise的异常处理
如下按照Promise的方法改写后,代码的可读性好了很多。但是它有一个缺点,不能充实。Promise对象一旦进入成功 或 失败,就不能再改变状态。也就没法进行诸如 fetch(url).retry(3)的调用。还有第二个缺点是,如果没有任何catch函数,那么真的发生错误就丢失掉了。
fetch('https://api.github.com/repos/ReactiveX/rxjs').then((response) => {console.log('got response: ', response)}).catch((error) => {console.log('catch error: ', error);});
3. RxJS的异常处理
函数如果抛出错误,就会改变外部状态,那么这个函数就不是纯函数。在RxJS中,错误一场和数据一样,会沿着数据流向下游动,最后触发Observer的error方法,一个管道中的错误不会影响其他管道。
如果所有都需要error来处理,Observer要处理的事情就太多,通常会通过以下两种方式将错误处理掉:
- 恢复(recover),让运算继续下去,操作符有 catch
- 重试(retry),重新尝试之前发生错误的操作,操作符有 retry、retryWhen
重试和恢复晚安配合使用,重试若干次数后,用恢复的方法继续运算。
3.1 catch
❓ 因为没有执行环境,对此代码的输出能预判到 1 2 3 8,为什么没有5不是很理解。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/of';import 'rxjs/add/operator/map';import 'rxjs/add/operator/catch';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};const source$ = Observable.range(1, 5);const error$ = source$.map(throwOnUnluckyNumber)const catch$ = error$.catch((err, caught$) => Observable.of(8));catch$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
3.2 retry
只有重试以后可能成功的操作,才有必要重试,下面例子只是为了演示。另外retry要限定重试的次数,并且因为不一定成功,还需要对于失败的场景做处理,即结合catch使用。
下面例子中会再遇到异常后立即重试3次,输出结果为1 2 3 1 2 3 1 2 3 8。虽然它相较Promise多了重试的能力,不过它的重试是失败后,马上开始重拾,不能指定延迟时间(服务挂掉通常不是瞬间能恢复好的)。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/of';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/retry';import 'rxjs/add/operator/catch';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};const source$ = Observable.range(1, 5);const error$ = source$.map(throwOnUnluckyNumber);const retry$ = error$.retry(2);const catch$ = retry$.catch(err => Observable.of(8));catch$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
3.3 retryWhen
retryWhen的参数是一个函数,哈数的第一个参数是$结尾的Observable对象,当它吐出一个错误时候,会充当notifier,如下例,只是延迟1秒后在重试一次数据获取。下面4小节列举了更丰富的使用方法。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/interval';import 'rxjs/add/observable/of';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/retryWhen';import 'rxjs/add/operator/catch';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};const source$ = Observable.range(1, 5);const error$ = source$.map(throwOnUnluckyNumber);const retryWhen$ = error$.retryWhen(err$ => Observable.interval(1000));retryWhen$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
3.3.1 延时重试
可以直接使用 $err.delay(1000)制定在1秒后重试。
3.3.2 用retryWhen实现retry
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/of';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/delay';import 'rxjs/add/operator/scan';import 'rxjs/add/operator/retryWhen';import 'rxjs/add/operator/catch';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};Observable.prototype.retry = function (maxCount) {return this.retryWhen(err$ => {return err$.scan((errorCount, err) => {if (errorCount >= maxCount) {throw err;}return errorCount + 1;}, 0)});};const source$ = Observable.range(1, 10);const error$ = source$.map(throwOnUnluckyNumber);const catch$ = error$.retry(2).catch(err => Observable.of(8));catch$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
3.3.3 延时间并有上限的重试
输出结果为:1 2 3 等待1s 1 2 3 err
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/of';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/delay';import 'rxjs/add/operator/scan';import 'rxjs/add/operator/retryWhen';import 'rxjs/add/operator/catch';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};Observable.prototype.retryWithDelay = function(maxCount, delayMilliseconds) {return this.retryWhen(err$ => {return err$.scan((errorCount, err) => {if (errorCount >= maxCount) {throw err;}return errorCount + 1;}, 0).delay(delayMilliseconds);});};const source$ = Observable.range(1, 10);const error$ = source$.map(throwOnUnluckyNumber);const retry$ = error$.retryWithDelay(2, 1000);retry$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
3.3.4 延伸递增重试
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/of';import 'rxjs/add/observable/timer';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/delayWhen';import 'rxjs/add/operator/zip';import 'rxjs/add/operator/scan';import 'rxjs/add/operator/retryWhen';import 'rxjs/add/operator/catch';Observable.prototype.retryWithExpotentialDelay = function (maxRetry, initialDelay) {return this.retryWhen(err$ => {return err$.scan((errorCount, err) => {if (errorCount >= maxRetry) {throw err;}return errorCount + 1;}, 0).delayWhen(errorCount => {const delayTime = Math.pow(2, errorCount - 1) * initialDelay;console.log('#delayTime: ', delayTime);return Observable.timer(delayTime);});});};const source$ = Observable.range(1, 10);const catch$ = source$.map(value => {if (value === 4) {throw new Error('unlucky number 4');}return value;}).retryWithExpotentialDelay(3, 100).catch(err => Observable.of('four'));catch$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
3.4 finally
finally是不论是否报错都会触发,它和do都不同在于,finally智慧发生一次作用,二do是对上游吐出的每个数据均执行。finally和do配合使用更,可以覆盖上游可能发生的所有事件。
4. 重试的本质
600毫秒吐出一个数字,2400毫秒时吐出4触发错误,触发之后1秒中重试。重试600毫秒输出0,然后1秒时又再次充实。因为重试都本质就是 取消订阅 + 重新订阅。
import {Observable} from 'rxjs/Observable';import 'rxjs/add/observable/range';import 'rxjs/add/observable/interval';import 'rxjs/add/observable/of';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/retryWhen';import 'rxjs/add/operator/catch';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};const source$ = Observable.interval(600);const error$ = source$.map(throwOnUnluckyNumber);const retryWhen$ = error$.retryWhen(err$ => Observable.interval(1000));retryWhen$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
上面例子是cold的模式,下面模拟了一个hot模式的数据流,仍旧是为了说明,retry本质是订阅+取消订阅。
on subscribe value: 0 value: 1 value: 2 value: 3 on unsubscribe on subscribe value: 5 value: 6 value: 6 on unsubscribe
import {Observable} from 'rxjs/Observable';import EventEmitter from 'events';import 'rxjs/add/observable/range';import 'rxjs/add/observable/interval';import 'rxjs/add/observable/of';import 'rxjs/add/observable/throw';import 'rxjs/add/operator/map';import 'rxjs/add/operator/delay';import 'rxjs/add/operator/scan';import 'rxjs/add/operator/retry';import 'rxjs/add/operator/take';const throwOnUnluckyNumber = value => {if (value === 4) {throw new Error('unlucky number 4');}return value;};const emitter = new EventEmitter();Observable.interval(600).subscribe((value) => {emitter.emit('tick', value);});const hotSource$ = Observable.create(observer => {console.log('on subscribe');const listener = (value) => observer.next(value);emitter.on('tick', listener);return () => {console.log('on unsubscribe');emitter.removeListener('tick', listener);}});const error$ = hotSource$.map(throwOnUnluckyNumber);const retry$ = error$.retry(1).take(7);retry$.subscribe(value => console.log('value: ', value),err => console.log('error: ', err),() => console.log('complete'));
