1. import lombok.experimental.UtilityClass;
    2. import lombok.extern.slf4j.Slf4j;
    3. import java.util.Map;
    4. import java.util.concurrent.ConcurrentHashMap;
    5. import java.util.concurrent.ScheduledThreadPoolExecutor;
    6. import java.util.concurrent.TimeUnit;
    7. import java.util.function.Supplier;
    8. /**
    9. * 心跳管理器
    10. * <p>
    11. * 该类通常与类{@link SwitchBarrier}一起使用,心跳管理器管理业务的运行状态,而类{@link SwitchBarrier}管理对远端业务的访问
    12. * </p>
    13. * @author shizi
    14. * @since 2020-11-24 15:16:35
    15. */
    16. @Slf4j
    17. @UtilityClass
    18. public class HeartBeanManager {
    19. /**
    20. * 心跳守护线程池
    21. */
    22. private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), r -> {
    23. Thread thread = new Thread(r, "heart-daemon");
    24. thread.setDaemon(true);
    25. return thread;
    26. });
    27. /**
    28. * 心跳异常时候的阈值打印
    29. */
    30. private final Integer HEARD_BEAN_PRINT_THRESHOLD = 6;
    31. /**
    32. * 打印日志频率阈值
    33. */
    34. private Integer printLogThresholdNum = 0;
    35. /**
    36. * 业务的心跳任务
    37. */
    38. private final Map<String, Pair<Supplier<Boolean>, Boolean>> heartTaskMap = new ConcurrentHashMap<>();
    39. static {
    40. scheduler.scheduleWithFixedDelay(HeartBeanManager::heartBeat, 5, 5, TimeUnit.SECONDS);
    41. }
    42. private static void heartBeat() {
    43. heartTaskMap.forEach((k, v) -> {
    44. try {
    45. v.getKey().get();
    46. serverRestore(k, v.getValue());
    47. v.setValue(true);
    48. } catch (Throwable e) {
    49. serverUnAvailable(k);
    50. v.setValue(false);
    51. }
    52. });
    53. }
    54. /**
    55. * 添加业务的心跳判断
    56. *
    57. * @param bizName 业务名
    58. * @param healthCheck 业务的心跳检测url
    59. */
    60. public void addHeartWatch(String bizName, Supplier<Boolean> healthCheck) {
    61. heartTaskMap.putIfAbsent(bizName, new Pair<>(healthCheck, true));
    62. }
    63. public void addHeartWatchAndRun(String bizName, Supplier<Boolean> healthCheck) {
    64. heartTaskMap.putIfAbsent(bizName, new Pair<>(healthCheck, true));
    65. handleRun(bizName);
    66. }
    67. /**
    68. * 手动运行
    69. * @param bizName 业务名
    70. */
    71. public void handleRun(String bizName) {
    72. if (!heartTaskMap.containsKey(bizName)) {
    73. return;
    74. }
    75. Pair<Supplier<Boolean>, Boolean> healthPair = heartTaskMap.get(bizName);
    76. try {
    77. healthPair.getKey().get();
    78. serverRestore(bizName, healthPair.getValue());
    79. healthPair.setValue(true);
    80. } catch (Throwable e) {
    81. serverUnAvailable(bizName);
    82. healthPair.setValue(false);
    83. }
    84. }
    85. /**
    86. * 业务是否可用
    87. *
    88. * @param bizName 业务名
    89. * @return true:业务可用,false:业务不可用
    90. */
    91. public boolean isHealth(String bizName) {
    92. return SwitchBarrier.canCross(bizName);
    93. }
    94. private void serverRestore(String bizName, Boolean serverAvailable) {
    95. if (!serverAvailable) {
    96. log.info("服务【{}】心跳恢复", bizName);
    97. }
    98. printLogThresholdNum = 0;
    99. SwitchBarrier.allowCross(bizName);
    100. }
    101. private void serverUnAvailable(String bizName) {
    102. if (printLogThresholdNum <= 0) {
    103. log.error("服务【{}】心跳异常,url:{}", bizName, heartTaskMap.get(bizName).getKey());
    104. printLogThresholdNum = HEARD_BEAN_PRINT_THRESHOLD;
    105. } else {
    106. printLogThresholdNum--;
    107. }
    108. SwitchBarrier.forbidCross(bizName);
    109. }
    110. }

    用法

    1. // 启动健康检查任务
    2. HeartBeanManager.addHeartWatchAndRun(CLOUD_UPLOAD_JOB, () -> {
    3. String remoteUrl = cloudRemoteAddress();
    4. if (StringUtils.isEmpty(remoteUrl)) {
    5. log.info("异常:地址为空");
    6. return false;
    7. }
    8. NeoMap resultMap = HttpHelper.get(NeoMap.class, remoteUrl + CLOUD_HEALTH_CHECK);
    9. if (resultMap.containsKey("ok")) {
    10. return resultMap.getBoolean("ok");
    11. }
    12. return false;
    13. });