pheanstalk 使用手册


前面一节,介绍了 pheanstalk 的接口文件,目的是为了更接近 pheanstalkd 的使用。

这同样是一个从 0 到 1 了解第三方包如何使用的过程,这是一个学习方法,在其它的包时,也有借鉴意义。

在过了一遍 contracts 之后,我们可以发现,pheanstalk 最接近用户层(开发者)的接口,就是 PheanstalkInterface 。
那么,使用手册,我们也通过解读 PheanstalkInterface 的具体实现 Pheanstalk 类来撰写。

建立连接

  1. include_once "vendor/autoload.php";
  2. $conn = \Pheanstalk\Pheanstalk::create('127.0.0.1',11300,10);

上面的 create 方法,第一个参数必填,需要传入目标 beanstalkd 服务器的 ip ,前提是,对于 beanstalkd server 而言,你的 ip (可能是内网,也可能是外网,也可能是本机),必须在它的监听列表中。
第二个参数是端口,可不填,默认为 11300 ,当然你可以修改。
第三个参数是连接超时时间,可不填,默认为 10 ,表示 10 秒之后还未连接成功就超时了。
要注意的是:此 create 方法只是返回一个配置好的 pheanstalk 实例,但并非是一个连接好的实例。
如何理解呢?
从实现上, pheanstalk 的流程是等到有指令了,也就是有 dispatch 指令后,才建立一个 socket 连接,而它与 beanstalkd 通信的底层,用的也是 socket 。
所以,如果你只写了 create 方法,并不能看出是否能够连接成功。
你还需要发送一个指令才能知道是否能够连接成功。
一旦发送过指令之后,Pheanstalk->connection->socket 就会指向一个 socket 资源。

关闭、重连

虽然,我们能看到 Pheanstalkd 类中,有 reconnect 方法,还有 $this->connection 的属性。
然而它们的访问权限是 private 。
这告诉我们,关闭和重连,不需要我们手动操作。
我们可以看到,这两个方法,只有在 pheanstalk 向 beanstalkd 发送指令时,会用到。
因为 PHP 使用 socket 连接时,socket 是有生命周期的,超出一定时间 socket 就会失效,此时,Pheanstalk 的 dispatch 方法就会自动重连。
我们知道,pheanstalk 的底层通信基于 php 的 socket ,那么,必然具备 socket 的特性,再深入的内容,可以参考 php 手册中的 socket 部分。

生产者操作

选择 tube

  1. $conn->useTube('myTube'); // 会返回 this ,因此支持链式操作

put job

  1. $job = $conn->put('this is my first job !');

这里传入的字符串,就是放到 beanstalkd 中 job 的 body
当然,关于 priority 、 delay 、 ttr 这些参数你可以在 body 后面传入,具体可参考方法接口。
put 会返回一个 Job 实例,如下:

  1. Pheanstalk\Job Object
  2. (
  3. [id:Pheanstalk\Job:private] => 1
  4. [data:Pheanstalk\Job:private] => this is my first job !
  5. )

链式操作

  1. $job = $conn->useTube('myTube')->put('this is my 2nd job !');

默认 Tube

我们知道,默认的 Tube 是 default ,如果我们不使用 useTube 的话,put 的 job 就会在 default tube 中。

消费者操作

接收 job

reserve

  1. $job = $conn->reserve(); // 默认情况会接收 default tube 的 job
  2. print_r($job); // 如果 reserve 没能得到 job ,就会一直阻塞在上面

上面返回的一定是 Job 实例。

reserveWithTimeOut

  1. try {
  2. $job2 = $conn->reserveWithTimeout(10); // 阻塞接收,10秒之后超时,就不再接收了
  3. print_r($job2);
  4. if ($job2 === null) {
  5. throw new Exception('超时了');
  6. }
  7. // 处理 job ...
  8. sleep(60);
  9. } catch (\Pheanstalk\Exception\DeadlineSoonException $e) {
  10. print_r('deadline soon' . $e->getMessage());
  11. } catch (Exception $e) {
  12. print_r($e->getMessage());
  13. }

注意:这个方法,超时的话会返回 null ,成功的话,会返回一个 Job 实例。
还有,上面代码片段中有一行 sleep ,这一行的作用,是做一个小实验。
当,job 被成功返回过来,但没有 sleep 这一行,你会发现,你连续运行两次该代码,返回的 job 是同一个。
理论上,我们应该接收到下一个 job ,因为这个 job 被接收了之后,状态就变成 reserved 而不再是 ready 。
那如何来测试 job 是否真的变成 reserved 了呢?
这就是 sleep 的用处,步骤如下:

  • 先往你的 beanstalkd 加入两个 job
  • 再在两个 cli 窗口运行两次含 sleep 的消费者代码

你会发现这个情况,如下图:
pheanstalk 使用手册 - 图1
pheanstalk 使用手册 - 图2
它们分别接收了两个 job 。

这里不要用 web 测试,用 cli 才能更直观地看到效果。

这个现象,也很好理解:

  • reserve 成功之后,你的 php 进程还在于 beanstalkd 保持着 socket 连接,此连接不销毁,beanstalkd 都会为你冻结此 job 。
  • 一旦 php 进程销毁(执行结束),beanstalkd 没有接收到其他对 job 的操作,自然就回到 ready 队列中了

    删除 job

    1. $conn->delete($job2);
    这个方法没有返回值,但如果出错的话,会抛出异常。
    一般在 job 被消费者处理完毕之后才调用 delete 方法。

    释放 reserved job

    1. $conn->release($job,$priority,$delay);
    此方法没有返回值,如果出错会抛出异常。
    此方法可以将 reserved job 重新放回 ready 队列中,一般在消费者逻辑处理失败后,才使用这个方法。

    预留

    1. $conn->bury($job,$priority);
    此方法没有返回值,如果出错会抛出异常。
    此方法可以将一个 job 操作为 buried 状态,比如当你的消费者接收到这个 job 时,对 job 进行了一系列检查,经检查,发现这个 job 还不能被消费,此时可以将 job 操作为 buried ,直到有客户端对这个 job 发送了 kick 指令,才会被再次 reserve 。

    延迟 ttr

    1. $conn->touch($job);
    此方法没有返回值,如果出错会抛出异常。

    添加监听的管道

    1. $conn->watch('test')->watch('sms');
    此方法返回 $conn 本身,可链式操作,每次运行,都会为 watch list 添加一个 tube 。

    删除监听的管道

    1. $conn->ignore('email')->ignore('sms');
    此方法返回 $conn 本身,可链式操作,每次运行,都会从 watch list 中移除一个 tube 。

    默认,watch list 中会有一个 default ,所以,当你执行 watch 时,default 仍然在 watch list 中。

仅监听一个管道

  1. $conn->watchOnly('sms');

此方法返回 $conn 本身,可链式操作。
通过此快捷方法,可以一次性移除 watch list 中所有的 tube ,除了此方法传入的 tube 以外。

其他命令

单纯获取 job

  1. $conn->peek(new \Pheanstalk\Job(1,'')); // 根据 id 获取 job ,id 在 beanstalkd 是唯一的,不论处于什么 tube
  2. $conn->useTube('order')->peekReady(); // 从 order tube 中获取排在最前面的 job(这个顺序,同 reserve 的顺序)
  3. $conn->peekDelayed(); // 从 default tube 中获取(最)即将变成 ready 的 delayed job
  4. $conn->peekBuried(); // 返回下一个 buried job

这里的 peekReady 、 peekDelayed 、 peekBuried 虽然在译文解释中,提到「下一个」,但它并非是一个指针,当你调用一起 peekReady ,下一次再调用 peekReady 就自动将指针移动到「下一个」。如果你不手动将当前得到的 Job 操作为其他状态,或者延长延迟时间等,你多次调用 peek 命令,他将返回同一个 Job。

批量操作 job 为 ready

  1. $conn->useTube('order');
  2. $kicked = $conn->kick(10);
  3. print_r($kicked);

上面的代码片段,是针对 order tube 操作 10 条 buried 或 delayed 为 ready 。
具体可以看 beanstalkd 译文中的 kick 命令。

操作指定 job 为 ready

  1. $conn->useTube('sms')->kickJob($job);

此方法没有返回值。

获取 job 的统计信息

  1. $job = $conn->reserve();
  2. $res = $conn->statsJob($job);
  3. print_r($res);

此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。
$res 如下:

  1. Pheanstalk\Response\ArrayResponse Object
  2. (
  3. [name:Pheanstalk\Response\ArrayResponse:private] => OK
  4. [storage:ArrayObject:private] => Array
  5. (
  6. [id] => 2
  7. [tube] => default
  8. [state] => reserved
  9. [pri] => 1
  10. [age] => 92865
  11. [delay] => 0
  12. [ttr] => 60
  13. [time-left] => 59
  14. [file] => 0
  15. [reserves] => 8
  16. [timeouts] => 0
  17. [releases] => 0
  18. [buries] => 0
  19. [kicks] => 0
  20. )
  21. )

获取 tube 统计信息

  1. $res = $conn->statsTube('default');
  2. print_r($res);

此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。
$res 如下:

  1. Pheanstalk\Response\ArrayResponse Object
  2. (
  3. [name:Pheanstalk\Response\ArrayResponse:private] => OK
  4. [storage:ArrayObject:private] => Array
  5. (
  6. [name] => default
  7. [current-jobs-urgent] => 1
  8. [current-jobs-ready] => 2
  9. [current-jobs-reserved] => 0
  10. [current-jobs-delayed] => 0
  11. [current-jobs-buried] => 0
  12. [total-jobs] => 2
  13. [current-using] => 1
  14. [current-watching] => 1
  15. [current-waiting] => 0
  16. [cmd-delete] => 0
  17. [cmd-pause-tube] => 0
  18. [pause] => 0
  19. [pause-time-left] => 0
  20. )
  21. )

获取 beanstalkd 服务器统计信息

  1. $res = $conn->stats();
  2. print_r($res);

此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。
$res 如下:

  1. Pheanstalk\Response\ArrayResponse Object
  2. (
  3. [name:Pheanstalk\Response\ArrayResponse:private] => OK
  4. [storage:ArrayObject:private] => Array
  5. (
  6. [current-jobs-urgent] => 1
  7. [current-jobs-ready] => 2
  8. [current-jobs-reserved] => 0
  9. [current-jobs-delayed] => 0
  10. [current-jobs-buried] => 0
  11. [cmd-put] => 2
  12. [cmd-peek] => 10
  13. [cmd-peek-ready] => 11
  14. [cmd-peek-delayed] => 10
  15. [cmd-peek-buried] => 10
  16. [cmd-reserve] => 2
  17. [cmd-reserve-with-timeout] => 16
  18. [cmd-delete] => 0
  19. [cmd-release] => 0
  20. [cmd-use] => 7
  21. [cmd-watch] => 4
  22. [cmd-ignore] => 4
  23. [cmd-bury] => 0
  24. [cmd-kick] => 4
  25. [cmd-touch] => 0
  26. [cmd-stats] => 1
  27. [cmd-stats-job] => 1
  28. [cmd-stats-tube] => 1
  29. [cmd-list-tubes] => 0
  30. [cmd-list-tube-used] => 0
  31. [cmd-list-tubes-watched] => 0
  32. [cmd-pause-tube] => 0
  33. [job-timeouts] => 0
  34. [total-jobs] => 2
  35. [max-job-size] => 65535
  36. [current-tubes] => 1
  37. [current-connections] => 1
  38. [current-producers] => 0
  39. [current-workers] => 0
  40. [current-waiting] => 0
  41. [total-connections] => 37
  42. [pid] => 1
  43. [version] => 1.10
  44. [rusage-utime] => 0.020000
  45. [rusage-stime] => 0.030000
  46. [uptime] => 96714
  47. [binlog-oldest-index] => 0
  48. [binlog-current-index] => 0
  49. [binlog-records-migrated] => 0
  50. [binlog-records-written] => 0
  51. [binlog-max-size] => 10485760
  52. [id] => a1e58a6bbd4c3b8b
  53. [hostname] => 91db88742cda
  54. )
  55. )

查看当前 tube 列表

  1. $res = $conn->listTubes();
  2. print_r($res);

此方法将返回一个数组,如下:

  1. Array
  2. (
  3. [0] => default
  4. [1] => myTube
  5. )

查看当前 use 的 tube 列表

  1. $res = $conn->listTubeUsed();
  2. print_r($res);

此方法返回一个字符串,为当前 used 的 tube 名。
注意,同一时间,只有一个 tube 会被 used 。

查看当前 watch list

  1. $res = $conn->listTubesWatched();
  2. print_r($res);

此方法将返回一个数组,如下:

  1. Array
  2. (
  3. [0] => default
  4. )

冻结 tube

  1. $conn->pauseTube('default',90);

此方法没有返回值,会将 tube 冻结 90 秒,冻结期间,消费者无法 reserve job ,如果 tube 冻结后,有客户端发送了 reserve 指令,则会阻塞,直到冻结结束,或 reserve time out 。