参见 https://www.cnblogs.com/austinspark-jessylu/p/8043726.html
场景
任务中心:定时调度执行N多任务。
需要有个单机分发机器来生成任务列表并分发任务,怎么保证单机执行?
采用缓存
基本思路:利用缓存中存储一个特定的key来是否抢到了锁。利用缓存的版本机制,同一个版本只能存储一次数据,否则返回失败。
自己的场景实践—存在问题(见代码中描述)
1、每台机器先获取锁
- 先查询是否tair value是否为空
- 为空,直接put进去,抢到了锁
- 不为空,则判断是否锁超时(因为是定时调度,保证本次调度内单机执行,下次调度重新抢锁)
- 如果超时,则重新put新值进去,相当于抢锁
- 否则,未抢到锁,相当于其他机器抢到了锁去执行
2、抢到锁 执行业务逻辑
3、记得释放锁
@Overridepublic void createInstanceByScheduler() throws Exception {/** 0 获取tair锁 */boolean tairFlag = middleWareService.updateCurrentHost();if (!tairFlag) {// 锁获取失败直接退出throw new Exception("tairLock has been uesd");}try {/** 1 拉取需要执行的任务 */List<CommonTaskMainInfo> taskList = queryTaskToExecute();if (CollectionUtils.isEmpty(taskList)) { return; }/** 2 放入PriorityQueue中 */PriorityBlockingQueue taskQueue = new PriorityBlockingQueue(taskList.size());taskList.forEach(taskMainInfo -> taskQueue.put(taskMainInfo));/** 3 异步线程消费PriorityQueue,生成instance,放入msgBroker */consumeQueue(taskQueue, taskList);/** 4 更新task调度信息 */updateExecuteInfoAndSave(taskList);} catch (Exception e) {throw e;} finally {/** 5 归还tair锁 */middleWareService.clearCurrentHost();}}@Overridepublic ScheduleHost queryCurrentHost() {Object result = tairCacheClient.getObject(TAIR_KEY);if (result == null) {return null;} else {return (ScheduleHost) result;}}@Overridepublic boolean updateCurrentHost() {ScheduleHost currentHost = queryCurrentHost();// 以下的写法存在问题,多机器并发执行,需要双重判空if (currentHost != null && !currentHost.isEmpty()) {long scheduleTime = currentHost.getScheduleTime().getTime();long runTime = new Date().getTime() - scheduleTime;// 如果tair锁被占用超过3分钟没有归还,将被清空if (runTime < EXPIRED_TIME) {LogUtil.info(LOGGER, "[helios] updateCurrentHost error, currentHost={0}", currentHost);return false;}}ScheduleHost host = new ScheduleHost(new Date(), EnvUtil.getHostName());tairCacheClient.putObject(TAIR_KEY, host);return true;}@Overridepublic void clearCurrentHost() {tairCacheClient.putObject(TAIR_KEY, new ScheduleHost());}
业界标准实现

2和0分别为版本号和过期时间,0代表不过期
采用DB
原理:利用数据库的特性 —— 唯一键,同一个键 只有一条记录能插入成功,来抢锁
// 锁类型/场景private int type;// 唯一键标识private String lockInfo;DataBaseLockDO lock = new DataBaseLockDO();lock.setType(2);lock.setLockInfo("feedbackReport#"+ reportDO.getPlatform()+ "#"+ reportDO.getBizIndex()+ "#"+ reportDO.getSubTypeIndex()+ "#"+ DateUtil.formatDate(new Date(currentTime),DateUtil.DEFAULT_HOUR_STYLE_NOSPACE));// 能插入成功才继续if (!this.dataBaseLockDAO.insert(lock)) {return;}
