使用 guava-retrying

  • 使用 guava-retryingokhttp 来演示下重试机制
    • 别和我说 okhttp 自带重试机制,我不听

~~

如何实现?

  • 提供一个 retryer 对象和一个 callable 接口即可

    • retryer 设置发生重试的场景和重试的次数以及利用重试监听器进行补偿处理
    • callable 中是具体的重试业务

      场景

  • 提供一个接口,前两次请求会返回 404,后面将返回 200

  • 提供一个服务,会调用该接口
    • 如果发现调用出现 404,会启动重试机制
  • 就这么简单

simple coding

依赖

  1. <dependency>
  2. <groupId>org.projectlombok</groupId>
  3. <artifactId>lombok</artifactId>
  4. <optional>true</optional>
  5. </dependency>
  6. <!-- guava-retrying -->
  7. <dependency>
  8. <groupId>com.github.rholder</groupId>
  9. <artifactId>guava-retrying</artifactId>
  10. <version>2.0.0</version>
  11. </dependency>
  12. <!-- okhttp -->
  13. <dependency>
  14. <groupId>com.squareup.okhttp3</groupId>
  15. <artifactId>okhttp</artifactId>
  16. <version>4.3.1</version>
  17. </dependency>

OkHttpUtil

  • 简单封装下 OkHttpClient
  • 单独一个 OkHttpClient 实例内部会维护线程池的,所以单例化就好

    1. /**
    2. * OkHttp 工具类
    3. */
    4. @Slf4j
    5. public class OkHttpUtil {
    6. private OkHttpUtil() {
    7. }
    8. //////////////////////////////////////////////////////////////////////////
    9. // OkHttpClient Singleton
    10. //////////////////////////////////////////////////////////////////////////
    11. public static OkHttpClient getOKHttpClientInstance() {
    12. return LazyLoad.client;
    13. }
    14. private static class LazyLoad {
    15. private static final OkHttpClient client = new OkHttpClient();
    16. }
    17. //////////////////////////////////////////////////////////////////////////
    18. // util method
    19. //////////////////////////////////////////////////////////////////////////
    20. /**
    21. * <h2>执行post请求,注意要求目标接口返回响应头具有application/json </h2>
    22. *
    23. * @param url 请求地址
    24. * @param param 携带的参数实体类
    25. * @param headersParam 请求头参数
    26. * @param <T> 实体类类型
    27. * @return {@link Response} 响应
    28. * @throws IOException 执行请求时可能出现的异常
    29. */
    30. public static <T> Response doPost(String url, T param, Map<String, String> headersParam) throws IOException {
    31. MediaType contentType = MediaType.get(org.springframework.http.MediaType.APPLICATION_JSON_VALUE);
    32. RequestBody requestBody = RequestBody.create(JSON.toJSONString(param), contentType);
    33. // 构建 headers
    34. Headers headers = _buildHeaders(headersParam);
    35. // 构建 request
    36. Request request = new Request.Builder()
    37. .headers(headers)
    38. .url(url)
    39. .post(requestBody)
    40. .build();
    41. // 执行请求, 直接返回即可
    42. return getOKHttpClientInstance().newCall(request).execute();
    43. // sb了, 居然直接关掉流了。导致出现了 java.lang.IllegalStateException: closed
    44. // try (Response response = getOKHttpClientInstance().newCall(request).execute()) {
    45. // return response;
    46. // } catch (IOException e) {
    47. // log.error("请求url: {}时候发生了错误: {}", url, e.getLocalizedMessage(), e);
    48. // throw e;
    49. // }
    50. }
    51. /**
    52. * 填充 header 为 headers
    53. *
    54. * @param headersParam {@link Map} headers 头
    55. * @return {@link Headers} okHttp 的 headers 头
    56. */
    57. private static Headers _buildHeaders(Map<String, String> headersParam) {
    58. Headers.Builder headerBuilder = new Headers.Builder();
    59. for (Map.Entry<String, String> entry : headersParam.entrySet()) {
    60. headerBuilder.add(entry.getKey(), entry.getValue());
    61. }
    62. return headerBuilder.build();
    63. }
    64. }

vo

  • 参数实体
    1. /**
    2. * 为了简单就不做校验了
    3. */
    4. @AllArgsConstructor
    5. @NoArgsConstructor
    6. @Data
    7. public class UserVO {
    8. private String name;
    9. private Integer age;
    10. }

Democontroller

  • 总之就是前两次请求会返回 404,后续都是 200

    1. @Slf4j
    2. @RestController
    3. @RequestMapping("/test")
    4. public class DemoController {
    5. private static AtomicInteger count = new AtomicInteger(0);
    6. @RequestMapping(value = "/getData", method = RequestMethod.POST,
    7. consumes = MediaType.APPLICATION_JSON_VALUE)
    8. public void getData(@RequestBody UserVO userVO, @RequestHeader String token, HttpServletResponse response) throws IOException {
    9. // 超过 2次 返回成功
    10. count.getAndIncrement();
    11. log.info("当前count: {}", count.get());
    12. if (count.get() > 2) {
    13. response.setContentType("application/json;charset=utf-8");
    14. response.getOutputStream().write("呵呵哒".getBytes(StandardCharsets.UTF_8));
    15. // 该方式会直接设置 Content-Type 为 application/json;charset=ISO-8859-1
    16. // response.getWriter().write("呵呵哒");
    17. response.setStatus(HttpStatus.OK.value());
    18. return;
    19. }
    20. System.out.println(userVO);
    21. System.out.println(token);
    22. response.setStatus(HttpStatus.NOT_FOUND.value());
    23. return;
    24. }
    25. }

RetryService

  • 需要一个 Retryer 重试器,定义如何重试的时机
    • 可以添加一个 RetryListener 重试监听器,用于进行补偿策略或者紧急提示
  • 执行重试,即 retryer.call(callable), 传入一个具有重试业务的 Callable ```java @Slf4j @Service public class RetryService { private static final int RETRY_TIMES = 3; /**

    • 重试监听器逻辑 */ private static RetryListener retryListener = new RetryListener() { @Override public void onRetry(Attempt attempt) {

      1. Long retryTimes = attempt.getAttemptNumber();
      2. log.info("重试时,监听器执行的逻辑,当前是第 {} 次重试", retryTimes);
      3. if (retryTimes == RETRY_TIMES) {
      4. log.error("重试次数已到,建议执行补偿策略");
      5. // TODO 补偿策略
      6. }

      } };

      // 重试器 private static Retryer retryer = RetryerBuilder.newBuilder()

      1. // 当返回结果为 NULL 时,执行重试
      2. .retryIfResult(Predicates.isNull())
      3. // 当抛出 RuntimeException 时,执行重试
      4. .retryIfRuntimeException()
      5. // 自定义当抛出哪种异常时,执行重试
      6. .retryIfExceptionOfType(IOException.class)
      7. // 2s 后执行重试
      8. .withWaitStrategy(WaitStrategies.fixedWait(2L, TimeUnit.SECONDS))
      9. // 重试三次后不再重试 (会抛出异常)
      10. // com.github.rholder.retry.RetryException: Retrying failed to complete successfully after 3 attempts.
      11. .withStopStrategy(StopStrategies.stopAfterAttempt(RETRY_TIMES))
      12. // 添加监听器
      13. .withRetryListener(retryListener)
      14. .build();
  1. /**
  2. * 开始执行重试
  3. *
  4. * @param callable
  5. */
  6. public static void startRetry(Callable callable) {
  7. try {
  8. retryer.call(callable);
  9. } catch (Exception e) {
  10. log.error("执行任务时出现异常: {}", e.getLocalizedMessage(), e);
  11. }
  12. }

}

  1. BusinessService<br />
  2. - 有个请求上述接口的服务,如果出现 `404` 就开始执行重试策略
  3. ```java
  4. public interface BusinessService {
  5. public String postSomething(UserVO userVO, boolean isRetry) throws IOException;
  6. }
  1. @Slf4j
  2. @Service
  3. public class BusinessServiceImpl implements BusinessService {
  4. private static final String url = "http://127.0.0.1:8080/test/getData";
  5. private static final String token = "randomToken";
  6. /**
  7. * post 目标地址
  8. *
  9. * @param userVO 要发送的值
  10. * @param isRetry 当前是否为重试阶段,首次调用应该设置为 false
  11. * @throws IOException HttpClient 发送请求时候报错
  12. */
  13. public String postSomething(UserVO userVO, boolean isRetry) throws IOException {
  14. // TODO: check userVO
  15. Map<String, String> headers = new HashMap<>(1);
  16. headers.put("token", "randomToken");
  17. // 注意用 try resource 对 response 进行关闭
  18. try (Response response = OkHttpUtil.doPost(url, userVO, headers)) {
  19. if (response.code() == 404) {
  20. // 当前非重试阶段,才可调用重试,避免重复调用重试
  21. if (!isRetry) {
  22. doPostRetry(userVO);
  23. }
  24. return null;
  25. }
  26. // 调用 ResponseBody#string() 内部会对 response 进行 close
  27. // https://blog.csdn.net/my_truelove/article/details/80133556
  28. String body = response.body().string();
  29. log.warn("成功时的值为: {}", body);
  30. return body;
  31. } catch (IOException e) {
  32. log.error("请求url: {}时候发生了错误: {}", url, e.getLocalizedMessage(), e);
  33. throw e;
  34. }
  35. }
  36. /**
  37. * 真正执行重试逻辑
  38. *
  39. * @param userVo
  40. */
  41. public void doPostRetry(UserVO userVo) {
  42. Callable<String> callable = new Callable<String>() {
  43. @Override
  44. public String call() throws Exception {
  45. return postSomething(userVo, true);
  46. }
  47. };
  48. RetryService.startRetry(callable);
  49. }
  50. }

调用

  1. @Autowired
  2. private BusinessService businessService;
  3. @Test
  4. public void testVA() throws IOException {
  5. UserVO vo = new UserVO();
  6. vo.setName("akarin");
  7. vo.setAge(20);
  8. businessService.postSomething(vo, false);
  9. }

测试

  • 先启动接口
  • 再启动调用

    打印

  • 接口打印

    1. 2020-01-15 10:47:06.381 INFO 6324 --- [nio-8080-exec-1] c.e.redisdemo.controller.DemoController : 当前count: 1
    2. UserVO(name=akarin, age=20)
    3. randomToken
    4. 2020-01-15 10:47:06.397 INFO 6324 --- [nio-8080-exec-2] c.e.redisdemo.controller.DemoController : 当前count: 2
    5. UserVO(name=akarin, age=20)
    6. randomToken
    7. 2020-01-15 10:47:08.408 INFO 6324 --- [nio-8080-exec-3] c.e.redisdemo.controller.DemoController : 当前count: 3
  • 测试打印

image.png

  • 重试调用两次,在第二次重试调用成功