需求

  • 异步执行任务
  • 支持定时执行
  • 支持取消任务
  • 保障快速执行

技术背景

  • 基于redis实现
  • php

实现

基于redis的 sorted set + hash,实现定时执行任务的Demo

sorted set 介绍

  • redis有序集合,且不允许重复的成员,不同的是每个元素都会关联一个double类型的分数
  • redis正是通过分数来为集合中的成员进行从小到大的排序,有序集合的成员是唯一的,但分数(score)却可以重复

思路

  • 使用sortset类型,将member[成员] =【任务标识】,score[分数] =【定时时间戳】
  • 使用hash类型,将【任务标识】对应的任务数据JSON存到hash中 key =【任务标识】,value =【任务数据JSON】
  • 解决及时消耗,可以运行多个进程进行并行执行

Redis实战之实现定时执行任务 - 图1

php实现代码DEMO

  1. <?php
  2. class DoTest
  3. {
  4. private const YIELD_KEY = 'yield:list';
  5. private const YIELD_DATA_KEY = 'yield:data';
  6. public function run()
  7. {
  8. $bbj = RedisClient::instance()->bbj();
  9. //获取排序(低到高)中第一个task_id
  10. $data = $bbj->zRange(self::YIELD_KEY, 0, 0, true);
  11. if (empty($data)) {
  12. echo "无数据" . PHP_EOL;
  13. return null;
  14. }
  15. $mem = array_keys($data)[0];
  16. $ts = array_values($data)[0];
  17. $now = time();
  18. //校验是否到时
  19. if ($ts > $now) {
  20. echo "还未到时间,无需操作" . PHP_EOL;;
  21. return null;
  22. }
  23. //移除集合,多进程并执行到时任务(只能被成功移除一次)
  24. $row = $bbj->zRem(self::YIELD_KEY, $mem);
  25. if (empty($row)) {
  26. echo "已经被剔除" . PHP_EOL;;
  27. return false;
  28. }
  29. //获取当前要执行的任务数据JSON
  30. $dataJson = $bbj->hGet(self::YIELD_DATA_KEY, $mem);
  31. //todo 执行定时任务业务逻辑
  32. var_export($data);
  33. var_export($dataJson);
  34. //使用完后删除任务数据JSON
  35. $bbj->hdel(self::YIELD_DATA_KEY, $mem);
  36. return true;
  37. }
  38. public function add(int $time, $i, string $content)
  39. {
  40. $data = [
  41. 'msg' => $content . $time
  42. ];
  43. $bbj = RedisClient::instance()->bbj();
  44. $dataJson = json_encode($data);
  45. $taskId = $time . '_' . $i;
  46. $isSc = $bbj->zAdd(self::YIELD_KEY, $time, $taskId);
  47. if ($isSc) $isSc = $bbj->hSet(self::YIELD_DATA_KEY, $taskId, $dataJson);
  48. var_export($isSc);
  49. }
  50. public function addFeature($i)
  51. {
  52. $time = Carbon::now()->addMinute()->timestamp;
  53. $this->add($time, $i, '未来执行内容');
  54. }
  55. public function addCurrent($i)
  56. {
  57. $time = time();
  58. $this->add($time, $i, '马上执行内容');
  59. }
  60. /**
  61. * 取消定时任务,根据任务ID
  62. * @param $taskId
  63. */
  64. public function removeYield($taskId)
  65. {
  66. $bbj = RedisClient::instance()->bbj();
  67. $bbj->zRem(self::YIELD_KEY, $taskId);
  68. $bbj->hDel(self::YIELD_DATA_KEY, $taskId);
  69. }
  70. }

监控定时任务队列

<?php
$dt = new DoTest();
while (true) {
    $rt = $dt->run();
    if (is_null($rt)) {
        sleep(1);
    }
}

添加当前执行和未来执行任务

<?php
$dt = new DoTest();
while (true) {
     for ($i = 0; $i < 100000; $i++) {
        // 添加立即执行当前任务
        $dt->addCurrent($i);
        // 添加待执行未来任务
        $dt->addFeature($i);
    }
}

取消定时任务

<?php
$dt = new DoTest();
$dt->removeYield('taskId');

解决场景

  • 定时短信发送/email发送
  • 定时执行??任务