工作队列相比简单队列,就是消费者变成了多个,生产者没有变化,
    增加,消息确认机制ACK,confirm机制,消费者在没有确认消息之前,可以处理消息的数量
    工作队列 - 图1
    生产者 send.php

    1. <?php
    2. //引入类库
    3. require_once __DIR__ . '/vendor/autoload.php';
    4. use PhpAmqpLib\Connection\AMQPStreamConnection;
    5. use PhpAmqpLib\Message\AMQPMessage;
    6. try {
    7. //创建一个连接
    8. $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/');
    9. //建立通道
    10. $channel = $connection->channel();
    11. //confirm推送成功方法
    12. $channel->set_ack_handler(
    13. function (AMQPMessage $message) {
    14. echo "发送成功确认: " . $message->body . PHP_EOL;
    15. }
    16. );
    17. //confirm推送失败方法
    18. $channel->set_nack_handler(
    19. function (AMQPMessage $message) {
    20. echo "发送失败确认: " . $message->body . PHP_EOL;
    21. }
    22. );
    23. //发布确认模式通道变为confirm模式
    24. $channel->confirm_select();
    25. //声明需要存储消息的队列,已经存在,可以不用声明
    26. $channel->queue_declare('work_list', false, false, false, false);
    27. //指定要发送的消息内容
    28. for ($i = 0; $i < 10; $i++) {
    29. $arrar['id'] = $i;
    30. $arrar['name'] = 'hdiewbewuwewehe';
    31. $json = json_encode($arrar);
    32. //消息持久 $msg = new AMQPMessage($json,array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
    33. $msg = new AMQPMessage($json);
    34. $channel->basic_publish($msg, '', 'work_list');
    35. // sleep(1);
    36. }
    37. /**
    38. *您不必在每条消息发送后等待挂起的acks。事实上,这样会更有效率
    39. *等待尽可能多的邮件被屏蔽。
    40. * 可以直接执行,也可以放在最后面
    41. */
    42. $channel->wait_for_pending_acks();
    43. // 监听成功或失败返回结束 成功/失败 => set_ack_handler/set_nack_handler
    44. // 关闭通道
    45. $channel->close();
    46. // 关闭连接
    47. $connection->close();
    48. echo '发送成功';
    49. } catch (Exception $e) {
    50. die($e->getMessage());
    51. }

    消费者 receive.php

    <?php
    //引入类库
    require_once __DIR__ . '/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    //创建一个连接
    $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest', '/'); // 建立连接到RabbitMQ服务器
    //建立通道
    $channel = $connection->channel();
    //声明需要存储消息的队列,已经存在,可以不用声明
    //$channel->queue_declare('list_set_1', false, false, false, false);
    
    //消费者执行函数
    $callback = function ($msg) {
        echo ' 队列信息: ', $msg->body, "\n";
        sleep(1);
        //开启了自动确认ack,手动发送确认
        $msg->ack();
    };
    //当前消费者可以同时处理的消息数量
    $channel->basic_qos(null, 1, null);
    //消费消息->第四个参数 no_ack 是否自动确认ack
    $channel->basic_consume('work_list', '', false, false, false, false, $callback);
    //当 $channel 有回调时,我们的 $callback 函数都会传递给我们生产者发送的消息,(当队列有消息时,执行回调函数)
    while ($channel->is_open()) {
        $channel->wait();
    }
    
    // 关闭通道和连接
    //$channel->close();
    //$connection->close();