import lombok.experimental.UtilityClass;import lombok.extern.slf4j.Slf4j;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.function.Supplier;/*** 心跳管理器* <p>* 该类通常与类{@link SwitchBarrier}一起使用,心跳管理器管理业务的运行状态,而类{@link SwitchBarrier}管理对远端业务的访问* </p>* @author shizi* @since 2020-11-24 15:16:35*/@Slf4j@UtilityClasspublic class HeartBeanManager {/*** 心跳守护线程池*/private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), r -> {Thread thread = new Thread(r, "heart-daemon");thread.setDaemon(true);return thread;});/*** 心跳异常时候的阈值打印*/private final Integer HEARD_BEAN_PRINT_THRESHOLD = 6;/*** 打印日志频率阈值*/private Integer printLogThresholdNum = 0;/*** 业务的心跳任务*/private final Map<String, Pair<Supplier<Boolean>, Boolean>> heartTaskMap = new ConcurrentHashMap<>();static {scheduler.scheduleWithFixedDelay(HeartBeanManager::heartBeat, 5, 5, TimeUnit.SECONDS);}private static void heartBeat() {heartTaskMap.forEach((k, v) -> {try {v.getKey().get();serverRestore(k, v.getValue());v.setValue(true);} catch (Throwable e) {serverUnAvailable(k);v.setValue(false);}});}/*** 添加业务的心跳判断** @param bizName 业务名* @param healthCheck 业务的心跳检测url*/public void addHeartWatch(String bizName, Supplier<Boolean> healthCheck) {heartTaskMap.putIfAbsent(bizName, new Pair<>(healthCheck, true));}public void addHeartWatchAndRun(String bizName, Supplier<Boolean> healthCheck) {heartTaskMap.putIfAbsent(bizName, new Pair<>(healthCheck, true));handleRun(bizName);}/*** 手动运行* @param bizName 业务名*/public void handleRun(String bizName) {if (!heartTaskMap.containsKey(bizName)) {return;}Pair<Supplier<Boolean>, Boolean> healthPair = heartTaskMap.get(bizName);try {healthPair.getKey().get();serverRestore(bizName, healthPair.getValue());healthPair.setValue(true);} catch (Throwable e) {serverUnAvailable(bizName);healthPair.setValue(false);}}/*** 业务是否可用** @param bizName 业务名* @return true:业务可用,false:业务不可用*/public boolean isHealth(String bizName) {return SwitchBarrier.canCross(bizName);}private void serverRestore(String bizName, Boolean serverAvailable) {if (!serverAvailable) {log.info("服务【{}】心跳恢复", bizName);}printLogThresholdNum = 0;SwitchBarrier.allowCross(bizName);}private void serverUnAvailable(String bizName) {if (printLogThresholdNum <= 0) {log.error("服务【{}】心跳异常,url:{}", bizName, heartTaskMap.get(bizName).getKey());printLogThresholdNum = HEARD_BEAN_PRINT_THRESHOLD;} else {printLogThresholdNum--;}SwitchBarrier.forbidCross(bizName);}}
用法
// 启动健康检查任务HeartBeanManager.addHeartWatchAndRun(CLOUD_UPLOAD_JOB, () -> {String remoteUrl = cloudRemoteAddress();if (StringUtils.isEmpty(remoteUrl)) {log.info("异常:地址为空");return false;}NeoMap resultMap = HttpHelper.get(NeoMap.class, remoteUrl + CLOUD_HEALTH_CHECK);if (resultMap.containsKey("ok")) {return resultMap.getBoolean("ok");}return false;});
