try/catch对于异步处理派不上用场(虽然能用 calblack 或 promise 的方式来处理,仍旧存在诸多缺点),RxJS要求用一种全新的思维方式来处理异常错误处理。

1. 异常处理不可避免

异常通常来自不可预期的外部环境变化,是真实应用开发中必须考虑的方面。

2. 异常处理的难点

2.1 try/catch只支持同步运算

下面例子中,同步的catch中是捕捉不到setTimeout中异步抛出的error的。

  1. const invalidJsonString = 'Not Found';
  2. try {
  3. setTimeout(() => {
  4. JSON.parse(invalidJsonString);
  5. }, 1000);
  6. } catch (error) {
  7. console.log('catch error');
  8. }

2.2 回调函数的局限

可以通过传递一个回调函数,在会发生的位置捕错误,并且进行处理。它的问题是,让代码可读性差,一旦有多个异步调用,相互之间有依赖关系,就会出现嵌套地狱的问题。

  1. const invalidJsonString = 'Not Found';
  2. const delayParse = (jsonString, callback) => {
  3. setTimeout(() => {
  4. try {
  5. const result = JSON.parse(jsonString);
  6. callback(null, result);
  7. } catch (error) {
  8. callback(error);
  9. }
  10. }, 1000);
  11. };
  12. delayParse(invalidJsonString, (error, result) => {
  13. if (error) {
  14. console.log('catch error: ', error);
  15. return;
  16. }
  17. console.log(result);
  18. });

2.3 Promise的异常处理

如下按照Promise的方法改写后,代码的可读性好了很多。但是它有一个缺点,不能充实。Promise对象一旦进入成功 或 失败,就不能再改变状态。也就没法进行诸如 fetch(url).retry(3)的调用。还有第二个缺点是,如果没有任何catch函数,那么真的发生错误就丢失掉了。

  1. fetch('https://api.github.com/repos/ReactiveX/rxjs')
  2. .then((response) => {
  3. console.log('got response: ', response)
  4. })
  5. .catch((error) => {
  6. console.log('catch error: ', error);
  7. });

3. RxJS的异常处理

函数如果抛出错误,就会改变外部状态,那么这个函数就不是纯函数。在RxJS中,错误一场和数据一样,会沿着数据流向下游动,最后触发Observer的error方法,一个管道中的错误不会影响其他管道。

如果所有都需要error来处理,Observer要处理的事情就太多,通常会通过以下两种方式将错误处理掉:

  • 恢复(recover),让运算继续下去,操作符有 catch
  • 重试(retry),重新尝试之前发生错误的操作,操作符有 retry、retryWhen

重试和恢复晚安配合使用,重试若干次数后,用恢复的方法继续运算。

3.1 catch

❓ 因为没有执行环境,对此代码的输出能预判到 1 2 3 8,为什么没有5不是很理解。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/operator/map';
  5. import 'rxjs/add/operator/catch';
  6. const throwOnUnluckyNumber = value => {
  7. if (value === 4) {
  8. throw new Error('unlucky number 4');
  9. }
  10. return value;
  11. };
  12. const source$ = Observable.range(1, 5);
  13. const error$ = source$.map(throwOnUnluckyNumber)
  14. const catch$ = error$.catch((err, caught$) => Observable.of(8));
  15. catch$.subscribe(
  16. value => console.log('value: ', value),
  17. err => console.log('error: ', err),
  18. () => console.log('complete')
  19. );

3.2 retry

只有重试以后可能成功的操作,才有必要重试,下面例子只是为了演示。另外retry要限定重试的次数,并且因为不一定成功,还需要对于失败的场景做处理,即结合catch使用。

下面例子中会再遇到异常后立即重试3次,输出结果为1 2 3 1 2 3 1 2 3 8。虽然它相较Promise多了重试的能力,不过它的重试是失败后,马上开始重拾,不能指定延迟时间(服务挂掉通常不是瞬间能恢复好的)。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/observable/throw';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/retry';
  7. import 'rxjs/add/operator/catch';
  8. const throwOnUnluckyNumber = value => {
  9. if (value === 4) {
  10. throw new Error('unlucky number 4');
  11. }
  12. return value;
  13. };
  14. const source$ = Observable.range(1, 5);
  15. const error$ = source$.map(throwOnUnluckyNumber);
  16. const retry$ = error$.retry(2);
  17. const catch$ = retry$.catch(err => Observable.of(8));
  18. catch$.subscribe(
  19. value => console.log('value: ', value),
  20. err => console.log('error: ', err),
  21. () => console.log('complete')
  22. );

3.3 retryWhen

retryWhen的参数是一个函数,哈数的第一个参数是$结尾的Observable对象,当它吐出一个错误时候,会充当notifier,如下例,只是延迟1秒后在重试一次数据获取。下面4小节列举了更丰富的使用方法。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/observable/of';
  5. import 'rxjs/add/observable/throw';
  6. import 'rxjs/add/operator/map';
  7. import 'rxjs/add/operator/retryWhen';
  8. import 'rxjs/add/operator/catch';
  9. const throwOnUnluckyNumber = value => {
  10. if (value === 4) {
  11. throw new Error('unlucky number 4');
  12. }
  13. return value;
  14. };
  15. const source$ = Observable.range(1, 5);
  16. const error$ = source$.map(throwOnUnluckyNumber);
  17. const retryWhen$ = error$.retryWhen(err$ => Observable.interval(1000));
  18. retryWhen$.subscribe(
  19. value => console.log('value: ', value),
  20. err => console.log('error: ', err),
  21. () => console.log('complete')
  22. );

3.3.1 延时重试

可以直接使用 $err.delay(1000)制定在1秒后重试。

3.3.2 用retryWhen实现retry

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/observable/throw';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/delay';
  7. import 'rxjs/add/operator/scan';
  8. import 'rxjs/add/operator/retryWhen';
  9. import 'rxjs/add/operator/catch';
  10. const throwOnUnluckyNumber = value => {
  11. if (value === 4) {
  12. throw new Error('unlucky number 4');
  13. }
  14. return value;
  15. };
  16. Observable.prototype.retry = function (maxCount) {
  17. return this.retryWhen(err$ => {
  18. return err$.scan((errorCount, err) => {
  19. if (errorCount >= maxCount) {
  20. throw err;
  21. }
  22. return errorCount + 1;
  23. }, 0)
  24. });
  25. };
  26. const source$ = Observable.range(1, 10);
  27. const error$ = source$.map(throwOnUnluckyNumber);
  28. const catch$ = error$.retry(2)
  29. .catch(err => Observable.of(8));
  30. catch$.subscribe(
  31. value => console.log('value: ', value),
  32. err => console.log('error: ', err),
  33. () => console.log('complete')
  34. );

3.3.3 延时间并有上限的重试

输出结果为:1 2 3 等待1s 1 2 3 err

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/observable/throw';
  5. import 'rxjs/add/operator/map';
  6. import 'rxjs/add/operator/delay';
  7. import 'rxjs/add/operator/scan';
  8. import 'rxjs/add/operator/retryWhen';
  9. import 'rxjs/add/operator/catch';
  10. const throwOnUnluckyNumber = value => {
  11. if (value === 4) {
  12. throw new Error('unlucky number 4');
  13. }
  14. return value;
  15. };
  16. Observable.prototype.retryWithDelay = function(maxCount, delayMilliseconds) {
  17. return this.retryWhen(err$ => {
  18. return err$.scan((errorCount, err) => {
  19. if (errorCount >= maxCount) {
  20. throw err;
  21. }
  22. return errorCount + 1;
  23. }, 0).delay(delayMilliseconds);
  24. });
  25. };
  26. const source$ = Observable.range(1, 10);
  27. const error$ = source$.map(throwOnUnluckyNumber);
  28. const retry$ = error$.retryWithDelay(2, 1000);
  29. retry$.subscribe(
  30. value => console.log('value: ', value),
  31. err => console.log('error: ', err),
  32. () => console.log('complete')
  33. );

3.3.4 延伸递增重试

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/of';
  4. import 'rxjs/add/observable/timer';
  5. import 'rxjs/add/observable/throw';
  6. import 'rxjs/add/operator/map';
  7. import 'rxjs/add/operator/delayWhen';
  8. import 'rxjs/add/operator/zip';
  9. import 'rxjs/add/operator/scan';
  10. import 'rxjs/add/operator/retryWhen';
  11. import 'rxjs/add/operator/catch';
  12. Observable.prototype.retryWithExpotentialDelay = function (maxRetry, initialDelay) {
  13. return this.retryWhen(
  14. err$ => {
  15. return err$.scan((errorCount, err) => {
  16. if (errorCount >= maxRetry) {
  17. throw err;
  18. }
  19. return errorCount + 1;
  20. }, 0)
  21. .delayWhen(errorCount => {
  22. const delayTime = Math.pow(2, errorCount - 1) * initialDelay;
  23. console.log('#delayTime: ', delayTime);
  24. return Observable.timer(delayTime);
  25. });
  26. });
  27. };
  28. const source$ = Observable.range(1, 10);
  29. const catch$ = source$.map(
  30. value => {
  31. if (value === 4) {
  32. throw new Error('unlucky number 4');
  33. }
  34. return value;
  35. }
  36. ).retryWithExpotentialDelay(3, 100)
  37. .catch(err => Observable.of('four'));
  38. catch$.subscribe(
  39. value => console.log('value: ', value),
  40. err => console.log('error: ', err),
  41. () => console.log('complete')
  42. );

3.4 finally

finally是不论是否报错都会触发,它和do都不同在于,finally智慧发生一次作用,二do是对上游吐出的每个数据均执行。finally和do配合使用更,可以覆盖上游可能发生的所有事件。

4. 重试的本质

600毫秒吐出一个数字,2400毫秒时吐出4触发错误,触发之后1秒中重试。重试600毫秒输出0,然后1秒时又再次充实。因为重试都本质就是 取消订阅 + 重新订阅。

  1. import {Observable} from 'rxjs/Observable';
  2. import 'rxjs/add/observable/range';
  3. import 'rxjs/add/observable/interval';
  4. import 'rxjs/add/observable/of';
  5. import 'rxjs/add/observable/throw';
  6. import 'rxjs/add/operator/map';
  7. import 'rxjs/add/operator/retryWhen';
  8. import 'rxjs/add/operator/catch';
  9. const throwOnUnluckyNumber = value => {
  10. if (value === 4) {
  11. throw new Error('unlucky number 4');
  12. }
  13. return value;
  14. };
  15. const source$ = Observable.interval(600);
  16. const error$ = source$.map(throwOnUnluckyNumber);
  17. const retryWhen$ = error$.retryWhen(err$ => Observable.interval(1000));
  18. retryWhen$.subscribe(
  19. value => console.log('value: ', value),
  20. err => console.log('error: ', err),
  21. () => console.log('complete')
  22. );

上面例子是cold的模式,下面模拟了一个hot模式的数据流,仍旧是为了说明,retry本质是订阅+取消订阅。

on subscribe value: 0 value: 1 value: 2 value: 3 on unsubscribe on subscribe value: 5 value: 6 value: 6 on unsubscribe

  1. import {Observable} from 'rxjs/Observable';
  2. import EventEmitter from 'events';
  3. import 'rxjs/add/observable/range';
  4. import 'rxjs/add/observable/interval';
  5. import 'rxjs/add/observable/of';
  6. import 'rxjs/add/observable/throw';
  7. import 'rxjs/add/operator/map';
  8. import 'rxjs/add/operator/delay';
  9. import 'rxjs/add/operator/scan';
  10. import 'rxjs/add/operator/retry';
  11. import 'rxjs/add/operator/take';
  12. const throwOnUnluckyNumber = value => {
  13. if (value === 4) {
  14. throw new Error('unlucky number 4');
  15. }
  16. return value;
  17. };
  18. const emitter = new EventEmitter();
  19. Observable.interval(600).subscribe((value) => {
  20. emitter.emit('tick', value);
  21. });
  22. const hotSource$ = Observable.create(observer => {
  23. console.log('on subscribe');
  24. const listener = (value) => observer.next(value);
  25. emitter.on('tick', listener);
  26. return () => {
  27. console.log('on unsubscribe');
  28. emitter.removeListener('tick', listener);
  29. }
  30. });
  31. const error$ = hotSource$.map(throwOnUnluckyNumber);
  32. const retry$ = error$.retry(1).take(7);
  33. retry$.subscribe(
  34. value => console.log('value: ', value),
  35. err => console.log('error: ', err),
  36. () => console.log('complete')
  37. );