RabbitMQ 的实现方式可参考:https://www.yuque.com/nashihuakai/han7d5/odk884
创建延迟消息结构
@Getter@Setter@NoArgsConstructorpublic class DelayMessage implements Delayed {private long id;private long executeTime; // 执行时间private Class<?> clazz;public DelayMessage(int id, long executeTime, Class<?> clazz) {this.id = id;this.executeTime = executeTime;this.clazz = clazz;}// 获取延迟时间@Overridepublic long getDelay(@NotNull TimeUnit unit) {return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);}// 任务比较@Overridepublic int compareTo(@NotNull Delayed delayed) {DelayMessage msg = (DelayMessage) delayed;return Long.compare(this.id, msg.id);}}
开启线程执行队列
@Component@Transactionalpublic class AssociationTaskRunner implements ApplicationRunner {private static final Logger logger = LoggerFactory.getLogger(AssociationTaskRunner.class);@Autowiredprivate GuessService guessService;// 延迟队列DelayQueue<DelayMessage> delayQueue = new DelayQueue<>();@Overridepublic void run(ApplicationArguments args) throws Exception {ExecutorService pool = Executors.newSingleThreadExecutor();pool.execute(() -> {for (; ; ) {try {if (Thread.currentThread().isInterrupted()) {logger.warn("[Execute Daily Guess] end of running by thread interrupted");break;}DelayMessage message = delayQueue.take();String currentDate = DateUtils.getLocalDateTimeFormat(new Date(), "yyyy-MM-dd HH:mm:ss");// 每日竞猜开奖if (Guess.class.equals(message.getClazz())) {executeDailyGuess();} else {continue;}logger.info("[Execute Daily Guess] running succeed by [id = {}, clazz = {}, executeDate = {}]", message.getId(), message.getClazz().getName(), currentDate);} catch (InterruptedException ex) {ex.printStackTrace();Thread.currentThread().interrupt();logger.warn("[Execute Daily Guess] running failed by [thread = {}, ex = {}]", Thread.currentThread().getName(), ex.getMessage());}}});}/*** 执行每日竞猜开奖*/private void executeDailyGuess() {// TODO}}
