try/catch对于异步处理派不上用场(虽然能用 calblack 或 promise 的方式来处理,仍旧存在诸多缺点),RxJS要求用一种全新的思维方式来处理异常错误处理。
1. 异常处理不可避免
异常通常来自不可预期的外部环境变化,是真实应用开发中必须考虑的方面。
2. 异常处理的难点
2.1 try/catch只支持同步运算
下面例子中,同步的catch中是捕捉不到setTimeout中异步抛出的error的。
const invalidJsonString = 'Not Found';
try {
setTimeout(() => {
JSON.parse(invalidJsonString);
}, 1000);
} catch (error) {
console.log('catch error');
}
2.2 回调函数的局限
可以通过传递一个回调函数,在会发生的位置捕错误,并且进行处理。它的问题是,让代码可读性差,一旦有多个异步调用,相互之间有依赖关系,就会出现嵌套地狱的问题。
const invalidJsonString = 'Not Found';
const delayParse = (jsonString, callback) => {
setTimeout(() => {
try {
const result = JSON.parse(jsonString);
callback(null, result);
} catch (error) {
callback(error);
}
}, 1000);
};
delayParse(invalidJsonString, (error, result) => {
if (error) {
console.log('catch error: ', error);
return;
}
console.log(result);
});
2.3 Promise的异常处理
如下按照Promise的方法改写后,代码的可读性好了很多。但是它有一个缺点,不能充实。Promise对象一旦进入成功 或 失败,就不能再改变状态。也就没法进行诸如 fetch(url).retry(3)
的调用。还有第二个缺点是,如果没有任何catch函数,那么真的发生错误就丢失掉了。
fetch('https://api.github.com/repos/ReactiveX/rxjs')
.then((response) => {
console.log('got response: ', response)
})
.catch((error) => {
console.log('catch error: ', error);
});
3. RxJS的异常处理
函数如果抛出错误,就会改变外部状态,那么这个函数就不是纯函数。在RxJS中,错误一场和数据一样,会沿着数据流向下游动,最后触发Observer的error方法,一个管道中的错误不会影响其他管道。
如果所有都需要error来处理,Observer要处理的事情就太多,通常会通过以下两种方式将错误处理掉:
- 恢复(recover),让运算继续下去,操作符有 catch
- 重试(retry),重新尝试之前发生错误的操作,操作符有 retry、retryWhen
重试和恢复晚安配合使用,重试若干次数后,用恢复的方法继续运算。
3.1 catch
❓ 因为没有执行环境,对此代码的输出能预判到 1 2 3 8,为什么没有5不是很理解。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
const source$ = Observable.range(1, 5);
const error$ = source$.map(throwOnUnluckyNumber)
const catch$ = error$.catch((err, caught$) => Observable.of(8));
catch$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
3.2 retry
只有重试以后可能成功的操作,才有必要重试,下面例子只是为了演示。另外retry要限定重试的次数,并且因为不一定成功,还需要对于失败的场景做处理,即结合catch使用。
下面例子中会再遇到异常后立即重试3次,输出结果为1 2 3 1 2 3 1 2 3 8。虽然它相较Promise多了重试的能力,不过它的重试是失败后,马上开始重拾,不能指定延迟时间(服务挂掉通常不是瞬间能恢复好的)。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/retry';
import 'rxjs/add/operator/catch';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
const source$ = Observable.range(1, 5);
const error$ = source$.map(throwOnUnluckyNumber);
const retry$ = error$.retry(2);
const catch$ = retry$.catch(err => Observable.of(8));
catch$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
3.3 retryWhen
retryWhen的参数是一个函数,哈数的第一个参数是$结尾的Observable对象,当它吐出一个错误时候,会充当notifier,如下例,只是延迟1秒后在重试一次数据获取。下面4小节列举了更丰富的使用方法。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/retryWhen';
import 'rxjs/add/operator/catch';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
const source$ = Observable.range(1, 5);
const error$ = source$.map(throwOnUnluckyNumber);
const retryWhen$ = error$.retryWhen(err$ => Observable.interval(1000));
retryWhen$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
3.3.1 延时重试
可以直接使用 $err.delay(1000)制定在1秒后重试。
3.3.2 用retryWhen实现retry
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/retryWhen';
import 'rxjs/add/operator/catch';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
Observable.prototype.retry = function (maxCount) {
return this.retryWhen(err$ => {
return err$.scan((errorCount, err) => {
if (errorCount >= maxCount) {
throw err;
}
return errorCount + 1;
}, 0)
});
};
const source$ = Observable.range(1, 10);
const error$ = source$.map(throwOnUnluckyNumber);
const catch$ = error$.retry(2)
.catch(err => Observable.of(8));
catch$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
3.3.3 延时间并有上限的重试
输出结果为:1 2 3 等待1s 1 2 3 err
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/retryWhen';
import 'rxjs/add/operator/catch';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
Observable.prototype.retryWithDelay = function(maxCount, delayMilliseconds) {
return this.retryWhen(err$ => {
return err$.scan((errorCount, err) => {
if (errorCount >= maxCount) {
throw err;
}
return errorCount + 1;
}, 0).delay(delayMilliseconds);
});
};
const source$ = Observable.range(1, 10);
const error$ = source$.map(throwOnUnluckyNumber);
const retry$ = error$.retryWithDelay(2, 1000);
retry$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
3.3.4 延伸递增重试
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/delayWhen';
import 'rxjs/add/operator/zip';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/retryWhen';
import 'rxjs/add/operator/catch';
Observable.prototype.retryWithExpotentialDelay = function (maxRetry, initialDelay) {
return this.retryWhen(
err$ => {
return err$.scan((errorCount, err) => {
if (errorCount >= maxRetry) {
throw err;
}
return errorCount + 1;
}, 0)
.delayWhen(errorCount => {
const delayTime = Math.pow(2, errorCount - 1) * initialDelay;
console.log('#delayTime: ', delayTime);
return Observable.timer(delayTime);
});
});
};
const source$ = Observable.range(1, 10);
const catch$ = source$.map(
value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
}
).retryWithExpotentialDelay(3, 100)
.catch(err => Observable.of('four'));
catch$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
3.4 finally
finally是不论是否报错都会触发,它和do都不同在于,finally智慧发生一次作用,二do是对上游吐出的每个数据均执行。finally和do配合使用更,可以覆盖上游可能发生的所有事件。
4. 重试的本质
600毫秒吐出一个数字,2400毫秒时吐出4触发错误,触发之后1秒中重试。重试600毫秒输出0,然后1秒时又再次充实。因为重试都本质就是 取消订阅 + 重新订阅。
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/retryWhen';
import 'rxjs/add/operator/catch';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
const source$ = Observable.interval(600);
const error$ = source$.map(throwOnUnluckyNumber);
const retryWhen$ = error$.retryWhen(err$ => Observable.interval(1000));
retryWhen$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);
上面例子是cold的模式,下面模拟了一个hot模式的数据流,仍旧是为了说明,retry本质是订阅+取消订阅。
on subscribe value: 0 value: 1 value: 2 value: 3 on unsubscribe on subscribe value: 5 value: 6 value: 6 on unsubscribe
import {Observable} from 'rxjs/Observable';
import EventEmitter from 'events';
import 'rxjs/add/observable/range';
import 'rxjs/add/observable/interval';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/throw';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/retry';
import 'rxjs/add/operator/take';
const throwOnUnluckyNumber = value => {
if (value === 4) {
throw new Error('unlucky number 4');
}
return value;
};
const emitter = new EventEmitter();
Observable.interval(600).subscribe((value) => {
emitter.emit('tick', value);
});
const hotSource$ = Observable.create(observer => {
console.log('on subscribe');
const listener = (value) => observer.next(value);
emitter.on('tick', listener);
return () => {
console.log('on unsubscribe');
emitter.removeListener('tick', listener);
}
});
const error$ = hotSource$.map(throwOnUnluckyNumber);
const retry$ = error$.retry(1).take(7);
retry$.subscribe(
value => console.log('value: ', value),
err => console.log('error: ', err),
() => console.log('complete')
);