1.7.2 task进程使用消息队列

1.7.2新增特性,可将task进程单独设置为消息队列。带来的好处是:

任务排队容量增加

在维持worker进程异步的前提下,task进程可使用消息队列提升任务排队的容量,unix sock受到缓存区尺寸限制,而消息队列不受限制,可以利用到操作系统所有的内存。 如你的机器有32G内存,如果是unix sock一般缓冲区只有8M。如果你的任务很多,会堆积在socket缓存区中。当超过缓冲区时就会无法再投递新的任务。 而消息队列,只要操作系统有剩余内存,那一直可以投递新的任务到队列中。

支持外部程序投递任务

当前的swoole使用Unix Socket,只允许程序内部进行通信。采用消息队列后,拿到消息队列的key。其他程序就可以向此队列投递数据了。

实例

  1. class SwooleTask
  2. {
  3. protected $queueId;
  4. protected $workerId;
  5. protected $taskId = 1;
  6. const SW_TASK_TMPFILE = 1; //tmp file
  7. const SW_TASK_SERIALIZE = 2; //php serialize
  8. const SW_TASK_NONBLOCK = 4; //task
  9. const SW_EVENT_TASK = 7;
  10. /**
  11. * SwooleTask constructor.
  12. * @param $key
  13. * @param int $workerId
  14. * @throws Exception
  15. */
  16. function __construct($key, $workerId = 0)
  17. {
  18. $this->queueId = msg_get_queue($key);
  19. if ($this->queueId === false)
  20. {
  21. throw new \Exception("msg_get_queue() failed.");
  22. }
  23. $this->workerId = $workerId;
  24. }
  25. protected static function pack($taskId, $data)
  26. {
  27. $flags = self::SW_TASK_NONBLOCK;
  28. $type = self::SW_EVENT_TASK;
  29. if (!is_string($data))
  30. {
  31. $data = serialize($data);
  32. $flags |= self::SW_TASK_SERIALIZE;
  33. }
  34. if (strlen($data) >= 8180)
  35. {
  36. $tmpFile = tempnam('/tmp/', 'swoole.task');
  37. file_put_contents($tmpFile, $data);
  38. $data = pack('l', strlen($data)) . $tmpFile . "\0";
  39. $flags |= self::SW_TASK_TMPFILE;
  40. $len = 128 + 24;
  41. }
  42. else
  43. {
  44. $len = strlen($data);
  45. }
  46. return pack('lSsCCS', $taskId, $len, 0, $type, 0, $flags) . $data;
  47. }
  48. function dispatch($data)
  49. {
  50. $taskId = $this->taskId++;
  51. if (!msg_send($this->queueId, 2, self::pack($taskId, $data), false))
  52. {
  53. return false;
  54. }
  55. else
  56. {
  57. return $taskId;
  58. }
  59. }
  60. }
  61. echo "Sending text to msg queue.\n";
  62. $task = new SwooleTask(0x70001002, 1);
  63. //普通字符串
  64. $task->dispatch("Hello from PHP!");

task进程是可以与swoole_server所有的客户端连接进行通信的,所以外部程序使用消息队列作为IPC,就可以与所有客户端连接进行通信。

使用方法

只需设置swoole_server::set参数即可。新增的参数如下:

  • task_ipc_mode => 1 | 2 | 3,默认为1就是普通的unix socket通信方式,2, 3就是使用消息队列模式。模式2和模式3的不同之处是,模式2支持定向投递,$serv->task($data, $task_worker_id) 这里可以指定投递到哪个task进程。模式3是完全争抢模式,task进程会争抢队列,将无法使用定向投递,即使指定了$task_worker_id,在模式3下也是无效的。

  • message_queue_key => 0x72000100 ,指定一个消息队列key。如果需要运行多个swoole_server的实例,务必指定。否则会发生数据错乱