RabbitMQ 的实现方式可参考:https://www.yuque.com/nashihuakai/han7d5/odk884

创建延迟消息结构

  1. @Getter
  2. @Setter
  3. @NoArgsConstructor
  4. public class DelayMessage implements Delayed {
  5. private long id;
  6. private long executeTime; // 执行时间
  7. private Class<?> clazz;
  8. public DelayMessage(int id, long executeTime, Class<?> clazz) {
  9. this.id = id;
  10. this.executeTime = executeTime;
  11. this.clazz = clazz;
  12. }
  13. // 获取延迟时间
  14. @Override
  15. public long getDelay(@NotNull TimeUnit unit) {
  16. return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
  17. }
  18. // 任务比较
  19. @Override
  20. public int compareTo(@NotNull Delayed delayed) {
  21. DelayMessage msg = (DelayMessage) delayed;
  22. return Long.compare(this.id, msg.id);
  23. }
  24. }

开启线程执行队列

  1. @Component
  2. @Transactional
  3. public class AssociationTaskRunner implements ApplicationRunner {
  4. private static final Logger logger = LoggerFactory.getLogger(AssociationTaskRunner.class);
  5. @Autowired
  6. private GuessService guessService;
  7. // 延迟队列
  8. DelayQueue<DelayMessage> delayQueue = new DelayQueue<>();
  9. @Override
  10. public void run(ApplicationArguments args) throws Exception {
  11. ExecutorService pool = Executors.newSingleThreadExecutor();
  12. pool.execute(() -> {
  13. for (; ; ) {
  14. try {
  15. if (Thread.currentThread().isInterrupted()) {
  16. logger.warn("[Execute Daily Guess] end of running by thread interrupted");
  17. break;
  18. }
  19. DelayMessage message = delayQueue.take();
  20. String currentDate = DateUtils.getLocalDateTimeFormat(new Date(), "yyyy-MM-dd HH:mm:ss");
  21. // 每日竞猜开奖
  22. if (Guess.class.equals(message.getClazz())) {
  23. executeDailyGuess();
  24. } else {
  25. continue;
  26. }
  27. logger.info("[Execute Daily Guess] running succeed by [id = {}, clazz = {}, executeDate = {}]", message.getId(), message.getClazz().getName(), currentDate);
  28. } catch (InterruptedException ex) {
  29. ex.printStackTrace();
  30. Thread.currentThread().interrupt();
  31. logger.warn("[Execute Daily Guess] running failed by [thread = {}, ex = {}]", Thread.currentThread().getName(), ex.getMessage());
  32. }
  33. }
  34. });
  35. }
  36. /**
  37. * 执行每日竞猜开奖
  38. */
  39. private void executeDailyGuess() {
  40. // TODO
  41. }
  42. }