pheanstalk 简单案例

前言

上一小节,我们介绍了 pheanstalk 的使用。
这一小节,我们使用 pheanstalk 来完成一个小功能。

需求设计

pheanstalk 简单案例 - 图1
主要是一个注册功能,当用户注册成功之后,会接收到两条信息,分别是邮件和短信。
这里的邮件和短信,我们以消息队列来实现,这样可以加快服务端对用户端的响应,提高体验。

实现规划

1、以 php + mysql + beanstalkd 为基础架构
2、使用 pheanstalk 操作 beanstalkd、使用 medoo 操作 mysql
3、使用 composer 作为包管理工具
4、有三个文件:

  • register.php 负责接收注册请求、用户信息入库、生成短信任务和邮件任务
  • sms.php 短信任务消费者
  • email.php 邮件任务消费者

    代码实现

    包安装

    1. composer require pda/pheanstalk
    2. composer require catfan/medoo

    demo.sql

    1. CREATE TABLE `user` (
    2. `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
    3. `username` varchar(50) COLLATE utf8_bin NOT NULL,
    4. `email` varchar(100) COLLATE utf8_bin NOT NULL,
    5. `phone` char(11) COLLATE utf8_bin NOT NULL,
    6. `password` varchar(255) COLLATE utf8_bin NOT NULL,
    7. PRIMARY KEY (`id`),
    8. KEY `user_username` (`username`)
    9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin

    register.php

    1. <?php
    2. // +----------------------------------------------------------------------
    3. // | register.php
    4. // +----------------------------------------------------------------------
    5. // | Description: 注册
    6. // +----------------------------------------------------------------------
    7. // | Time: 2021/1/1 下午1:47
    8. // +----------------------------------------------------------------------
    9. // | Author: Object,一季樱花落雨殇<1647762341@qq.com>
    10. // +----------------------------------------------------------------------
    11. require_once "../vendor/autoload.php";
    12. use Medoo\Medoo;
    13. use Pheanstalk\Pheanstalk;
    14. // demo 只为演示如何使用 pheanstalk ,所以不做精细化处理,如注入、加密、字段设计等,只能用于简单测试
    15. $param = getopt('u:p:e:m:');
    16. $username = $param['u'];
    17. $password = $param['p'];
    18. $email = $param['e'];
    19. $phone = $param['m'];
    20. $db = new Medoo([
    21. 'database_type' => 'mysql',
    22. 'database_name' => 'test',
    23. 'server' => 'mysql',
    24. 'username' => 'root',
    25. 'password' => 'root',
    26. // [optional]
    27. 'charset' => 'utf8',
    28. 'port' => 3306,
    29. ]);
    30. $alreadyIn = $db->has('user', [
    31. 'OR' => [
    32. 'username' => $username,
    33. 'email' => $email,
    34. 'phone' => $phone,
    35. ]
    36. ]);
    37. if ($alreadyIn) {
    38. echo '用户已存在';
    39. die();
    40. }
    41. // 插入用户
    42. $user = [
    43. 'username' => $username,
    44. 'email' => $email,
    45. 'phone' => $phone,
    46. 'password' => $password
    47. ];
    48. $db->insert('user', $user);
    49. $user['id'] = $db->id();
    50. $jobData = json_encode($user);
    51. $conn = Pheanstalk::create('beanstalkd', 11300, 10);
    52. $conn->useTube('register_sms');
    53. $smsJob = $conn->put($jobData);
    54. $conn->useTube('register_email');
    55. $emailJob = $conn->put($jobData);
    56. echo '注册成功:<br>';
    57. print_r($user);
    58. print_r($smsJob);
    59. print_r($emailJob);

    sms.php

    1. <?php
    2. // +----------------------------------------------------------------------
    3. // | sms.php
    4. // +----------------------------------------------------------------------
    5. // | Description:
    6. // +----------------------------------------------------------------------
    7. // | Time: 2021/1/1 下午1:47
    8. // +----------------------------------------------------------------------
    9. // | Author: Object,一季樱花落雨殇<1647762341@qq.com>
    10. // +----------------------------------------------------------------------
    11. include_once "../vendor/autoload.php";
    12. use Pheanstalk\Pheanstalk;
    13. $conn = Pheanstalk::create('beanstalkd', 11300, 10);
    14. $conn->watchOnly('register_sms');
    15. function sendSms($user)
    16. {
    17. return random_int(0, 1); // 取随机数,模拟发送成功与失败
    18. }
    19. while (1) {
    20. try {
    21. $job = $conn->reserveWithTimeout(1);
    22. if ($job === null) {
    23. throw new Exception('没有任务');
    24. }
    25. // 发送邮件
    26. if (sendSms($job->getData())) {
    27. // 处理成功
    28. $conn->delete($job);
    29. } else {
    30. // 处理失败
    31. $conn->release();
    32. }
    33. } catch (Exception $e) {
    34. print_r($e->getMessage());
    35. die();
    36. }
    37. echo "欢迎短信发送成功<br>";
    38. usleep(500000);
    39. }

    email.php

    1. <?php
    2. // +----------------------------------------------------------------------
    3. // | email.php
    4. // +----------------------------------------------------------------------
    5. // | Description:
    6. // +----------------------------------------------------------------------
    7. // | Time: 2021/1/1 下午1:47
    8. // +----------------------------------------------------------------------
    9. // | Author: Object,一季樱花落雨殇<1647762341@qq.com>
    10. // +----------------------------------------------------------------------
    11. include_once "../vendor/autoload.php";
    12. use Pheanstalk\Pheanstalk;
    13. $conn = Pheanstalk::create('beanstalkd', 11300, 10);
    14. $conn->watchOnly('register_email');
    15. function sendEmail($user)
    16. {
    17. return random_int(0, 1); // 取随机数,模拟发送成功与失败
    18. }
    19. while (1) {
    20. try {
    21. $job = $conn->reserve();
    22. if ($job === null) {
    23. throw new Exception('没有任务');
    24. }
    25. // 发送邮件
    26. if (sendEmail($job->getData())) {
    27. // 处理成功
    28. $conn->delete($job);
    29. } else {
    30. // 处理失败
    31. $conn->release();
    32. }
    33. } catch (Exception $e) {
    34. print_r($e->getMessage());
    35. die();
    36. }
    37. echo "欢迎邮件发送成功<br>";
    38. usleep(500000); // 每 500ms 接收 job
    39. }

    测试

    先确保,你的 php 脚本能正确地连接到 mysql 和 beanstalkd
    然后,打开三个 shell 窗口,每个窗口按顺序运行一条命令:
    1. php sms.php
    1. php email.php
    1. php register.php -u username -p password -e email@qq.com -m 15866668888
    其中,sms.php 和 email.php 两个窗口,会阻塞运行。
    每当有运行过 register.php 之后,三个窗口就分别会有输出。
    大致如下:
    pheanstalk 简单案例 - 图2
    pheanstalk 简单案例 - 图3
    pheanstalk 简单案例 - 图4