参见 https://www.cnblogs.com/austinspark-jessylu/p/8043726.html

场景

任务中心:定时调度执行N多任务。

需要有个单机分发机器来生成任务列表并分发任务,怎么保证单机执行?

采用缓存

基本思路:利用缓存中存储一个特定的key来是否抢到了锁。利用缓存的版本机制,同一个版本只能存储一次数据,否则返回失败。

自己的场景实践—存在问题(见代码中描述)

1、每台机器先获取锁

  • 先查询是否tair value是否为空
  • 为空,直接put进去,抢到了锁
  • 不为空,则判断是否锁超时(因为是定时调度,保证本次调度内单机执行,下次调度重新抢锁)
    • 如果超时,则重新put新值进去,相当于抢锁
    • 否则,未抢到锁,相当于其他机器抢到了锁去执行

2、抢到锁 执行业务逻辑

3、记得释放锁

  1. @Override
  2. public void createInstanceByScheduler() throws Exception {
  3. /** 0 获取tair锁 */
  4. boolean tairFlag = middleWareService.updateCurrentHost();
  5. if (!tairFlag) {
  6. // 锁获取失败直接退出
  7. throw new Exception("tairLock has been uesd");
  8. }
  9. try {
  10. /** 1 拉取需要执行的任务 */
  11. List<CommonTaskMainInfo> taskList = queryTaskToExecute();
  12. if (CollectionUtils.isEmpty(taskList)) { return; }
  13. /** 2 放入PriorityQueue中 */
  14. PriorityBlockingQueue taskQueue = new PriorityBlockingQueue(taskList.size());
  15. taskList.forEach(taskMainInfo -> taskQueue.put(taskMainInfo));
  16. /** 3 异步线程消费PriorityQueue,生成instance,放入msgBroker */
  17. consumeQueue(taskQueue, taskList);
  18. /** 4 更新task调度信息 */
  19. updateExecuteInfoAndSave(taskList);
  20. } catch (Exception e) {
  21. throw e;
  22. } finally {
  23. /** 5 归还tair锁 */
  24. middleWareService.clearCurrentHost();
  25. }
  26. }
  27. @Override
  28. public ScheduleHost queryCurrentHost() {
  29. Object result = tairCacheClient.getObject(TAIR_KEY);
  30. if (result == null) {
  31. return null;
  32. } else {
  33. return (ScheduleHost) result;
  34. }
  35. }
  36. @Override
  37. public boolean updateCurrentHost() {
  38. ScheduleHost currentHost = queryCurrentHost();
  39. // 以下的写法存在问题,多机器并发执行,需要双重判空
  40. if (currentHost != null && !currentHost.isEmpty()) {
  41. long scheduleTime = currentHost.getScheduleTime().getTime();
  42. long runTime = new Date().getTime() - scheduleTime;
  43. // 如果tair锁被占用超过3分钟没有归还,将被清空
  44. if (runTime < EXPIRED_TIME) {
  45. LogUtil.info(LOGGER, "[helios] updateCurrentHost error, currentHost={0}", currentHost);
  46. return false;
  47. }
  48. }
  49. ScheduleHost host = new ScheduleHost(new Date(), EnvUtil.getHostName());
  50. tairCacheClient.putObject(TAIR_KEY, host);
  51. return true;
  52. }
  53. @Override
  54. public void clearCurrentHost() {
  55. tairCacheClient.putObject(TAIR_KEY, new ScheduleHost());
  56. }

业界标准实现

image.png

2和0分别为版本号和过期时间,0代表不过期

采用DB

原理:利用数据库的特性 —— 唯一键,同一个键 只有一条记录能插入成功,来抢锁

  1. // 锁类型/场景
  2. private int type;
  3. // 唯一键标识
  4. private String lockInfo;
  5. DataBaseLockDO lock = new DataBaseLockDO();
  6. lock.setType(2);
  7. lock.setLockInfo("feedbackReport#"
  8. + reportDO.getPlatform()
  9. + "#"
  10. + reportDO.getBizIndex()
  11. + "#"
  12. + reportDO.getSubTypeIndex()
  13. + "#"
  14. + DateUtil.formatDate(new Date(currentTime),
  15. DateUtil.DEFAULT_HOUR_STYLE_NOSPACE));
  16. // 能插入成功才继续
  17. if (!this.dataBaseLockDAO.insert(lock)) {
  18. return;
  19. }