参见 https://www.cnblogs.com/austinspark-jessylu/p/8043726.html
场景
任务中心:定时调度执行N多任务。
需要有个单机分发机器来生成任务列表并分发任务,怎么保证单机执行?
采用缓存
基本思路:利用缓存中存储一个特定的key来是否抢到了锁。利用缓存的版本机制,同一个版本只能存储一次数据,否则返回失败。
自己的场景实践—存在问题(见代码中描述)
1、每台机器先获取锁
- 先查询是否tair value是否为空
- 为空,直接put进去,抢到了锁
- 不为空,则判断是否锁超时(因为是定时调度,保证本次调度内单机执行,下次调度重新抢锁)
- 如果超时,则重新put新值进去,相当于抢锁
- 否则,未抢到锁,相当于其他机器抢到了锁去执行
2、抢到锁 执行业务逻辑
3、记得释放锁
@Override
public void createInstanceByScheduler() throws Exception {
/** 0 获取tair锁 */
boolean tairFlag = middleWareService.updateCurrentHost();
if (!tairFlag) {
// 锁获取失败直接退出
throw new Exception("tairLock has been uesd");
}
try {
/** 1 拉取需要执行的任务 */
List<CommonTaskMainInfo> taskList = queryTaskToExecute();
if (CollectionUtils.isEmpty(taskList)) { return; }
/** 2 放入PriorityQueue中 */
PriorityBlockingQueue taskQueue = new PriorityBlockingQueue(taskList.size());
taskList.forEach(taskMainInfo -> taskQueue.put(taskMainInfo));
/** 3 异步线程消费PriorityQueue,生成instance,放入msgBroker */
consumeQueue(taskQueue, taskList);
/** 4 更新task调度信息 */
updateExecuteInfoAndSave(taskList);
} catch (Exception e) {
throw e;
} finally {
/** 5 归还tair锁 */
middleWareService.clearCurrentHost();
}
}
@Override
public ScheduleHost queryCurrentHost() {
Object result = tairCacheClient.getObject(TAIR_KEY);
if (result == null) {
return null;
} else {
return (ScheduleHost) result;
}
}
@Override
public boolean updateCurrentHost() {
ScheduleHost currentHost = queryCurrentHost();
// 以下的写法存在问题,多机器并发执行,需要双重判空
if (currentHost != null && !currentHost.isEmpty()) {
long scheduleTime = currentHost.getScheduleTime().getTime();
long runTime = new Date().getTime() - scheduleTime;
// 如果tair锁被占用超过3分钟没有归还,将被清空
if (runTime < EXPIRED_TIME) {
LogUtil.info(LOGGER, "[helios] updateCurrentHost error, currentHost={0}", currentHost);
return false;
}
}
ScheduleHost host = new ScheduleHost(new Date(), EnvUtil.getHostName());
tairCacheClient.putObject(TAIR_KEY, host);
return true;
}
@Override
public void clearCurrentHost() {
tairCacheClient.putObject(TAIR_KEY, new ScheduleHost());
}
业界标准实现
2和0分别为版本号和过期时间,0代表不过期
采用DB
原理:利用数据库的特性 —— 唯一键,同一个键 只有一条记录能插入成功,来抢锁
// 锁类型/场景
private int type;
// 唯一键标识
private String lockInfo;
DataBaseLockDO lock = new DataBaseLockDO();
lock.setType(2);
lock.setLockInfo("feedbackReport#"
+ reportDO.getPlatform()
+ "#"
+ reportDO.getBizIndex()
+ "#"
+ reportDO.getSubTypeIndex()
+ "#"
+ DateUtil.formatDate(new Date(currentTime),
DateUtil.DEFAULT_HOUR_STYLE_NOSPACE));
// 能插入成功才继续
if (!this.dataBaseLockDAO.insert(lock)) {
return;
}