假如有这样一个场景,有上万条数据,需要分100条一批的分批次并行调用接口,假如接口返回失败,提示该批次失败并于2秒后重试, 假如重试又失败,提示4秒后重试,如果还是失败则8秒后重试,当重试3次后依然失败那么停止重试。 使用RxJS 简洁实现方式如下:
import { _http } from '@/http.service';
import {
debounceTime,
mergeMap,
map,
catchError,
retryWhen,
delayWhen,
} from 'rxjs/operators';
const limit = 100
const len = Math.ceil(this.lineIds.length / limit)
const list = []
let count = 0
for (let i = 0; i < len; i++) {
list.push(idList.slice(i * limit, (i + 1) * limit))
}
from(list) // 分批
.pipe(
mergeMap((data: any) => { // 拿到当前批次数据合并到当前流
let countLimit = 0
return _http // 发送请求,详见 随后贴出http.service
.post('/api/issued-task', {
data
})
.pipe(
map(({ response }: any) => {
const { code, msg } = response
if (code === 1 && countLimit < 3) {
countLimit++
this.alert( // 封装的提示信息方法,
`发送失败,${2 ** countLimit} 秒后自动重发,请稍等...`
)
throw new Error(msg)
}
return response
}),
retryWhen((errors: any) => // 重试
errors.pipe(delayWhen(() => timer(2 ** countLimit * 1000)))
//重试时间设置
),
catchError(({ message: msg }: any) => of({ code: 1, msg }))
// 捕获 整个请求过程中的错误信息
)
}),
catchError(({ message: msg }: any) => of({ code: 1, msg }))
// 捕获整个分批过程中的错误信息
)
.subscribe(({ code, msg }: any) => { // 每一批次的请求回调处理
count++
if (code !== 0) {
this.alert(msg)
return
}
if (count === len) { // 所有批次下发完成,成功和失败,具体逻辑自己控制
this.alert('任务下发成功!')
}
})
http.service 内容如下:
/*
* Created by yyccmmkk on 2019/9/11 15:12
* 36995800@163.com
*/
import { ajax } from 'rxjs/ajax'
import { of, race } from 'rxjs'
import { delay } from 'rxjs/operators'
const baseURL: string = process.env.VUE_APP_BASE_URL // 上传接口 baseUrl
const regExp = /\/index\/(.+)\/?/
const urlRegExp = /\/(tms|ofc)(?=\/)/
const basePath = process.env.VUE_APP_BASE_PATH // 路由
const mapBasePath = process.env.VUE_APP_MAP_BASE_PATH
const limitBasePath = process.env.VUE_APP_LIMIT_BASE_PATH
const match = window.location.href.match(regExp)
const axios = require('axios')
const token = sessionStorage.getItem('token') || (match && match[1]) || null
const instance = axios.create({
baseURL,
timeout: 50000,
headers: {
'Content-Type': 'application/json;charset=UTF-8',
token
}
})
// eslint-disable-next-line
class _http {
static cache: any = { source: { url: '', data: null, baseURL } }
static interceptors: any = {
// eslint-disable-next-line
beforeRequest(source: { url: string; data: any; baseURL: string }) {}
}
static post(url: string, data: any) {
const cache = _http.cache
cache.source = { url, data, baseURL }
_http.interceptors.beforeRequest(_http.cache.source)
return race(
of({ code: 408, msg: '请求超时!' }).pipe(delay(50000)),
ajax({
url: `${baseURL}${url}`,
method: 'POST',
async: true,
body: data,
headers: {
'Content-Type': 'application/json;charset=UTF-8',
token
}
})
)
}
}
// 本地联调时,不需要tms ofc
if (process.env.VAUE_APP_API_ENV === 'local') {
instance.interceptors.request.use(
(config: any) => {
const { url, baseURL } = config
// Do something before request is sent
config.url = url.replace(urlRegExp, '')
const match = url.match(urlRegExp)
if (match) {
config.baseURL =
match[1] === 'ofc' ? `${baseURL}:30210` : `${baseURL}:30250`
}
return config
},
(error: any) => {
// Do something with request error
return Promise.reject(error)
}
)
_http.interceptors.beforeRequest = (source: {
url: string
data: any
baseURL: string
}) => {
const { url } = source
const match = url.match(urlRegExp)
if (match) {
const temp = match[1] === 'tms' ? ':3030' : ':8080'
source.url = url.replace(urlRegExp, temp)
}
}
}
export default instance
export { baseURL, basePath, mapBasePath, limitBasePath }
export { instance as http, _http }