介绍

:::tips image.png

基础的消息队列模型包括的三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息 :::

    使用

    发布消息

    1. @SpringBootTest
    2. public class MyTest {
    3. @Test
    4. public void test() throws IOException, TimeoutException {
    5. //构建连接工厂对象
    6. ConnectionFactory factory = new ConnectionFactory();
    7. //设置连接参数:主机名、端口号、vhost、用户名、密码
    8. factory.setHost("RabbitMQ服务的IP");
    9. factory.setPort(RabbitMQ服务的端口号);
    10. factory.setVirtualHost("/");
    11. factory.setUsername("用户名");
    12. factory.setPassword("密码");
    13. //创建连接对象
    14. Connection connection = factory.newConnection();
    15. //创建连接通道
    16. Channel channel = connection.createChannel();
    17. //创建队列
    18. String queueName = "simple.queue";
    19. channel.queueDeclare(queueName, false, false, false, null);
    20. //发送消息
    21. String message = "这是一条测试消息";
    22. channel.basicPublish("", queueName, null, message.getBytes());
    23. System.out.println("消息发送成功:" + message);
    24. //关闭连接
    25. channel.close();
    26. connection.close();
    27. }
    28. }

    订阅消息

    1. @SpringBootTest
    2. public class MyTest {
    3. public void test() throws IOException, TimeoutException {
    4. //构建连接工厂对象
    5. ConnectionFactory factory = new ConnectionFactory();
    6. //设置连接参数:主机名、端口号、vhost、用户名、密码
    7. factory.setHost("RabbitMQ服务的IP");
    8. factory.setPort(RabbitMQ服务的端口号);
    9. factory.setVirtualHost("/");
    10. factory.setUsername("用户名");
    11. factory.setPassword("密码");
    12. //创建连接对象
    13. Connection connection = factory.newConnection();
    14. //创建连接通道
    15. Channel channel = connection.createChannel();
    16. //创建队列
    17. String queueName = "simple.queue";
    18. channel.queueDeclare(queueName, false, false, false, null);
    19. //订阅消息
    20. channel.basicConsume(queueName, true, new DefaultConsumer(channel){
    21. @Override
    22. public void handleDelivery(String consumerTag, Envelope envelope,
    23. AMQP.BasicProperties properties, byte[] body) throws IOException {
    24. //处理消息
    25. String message = new String(body);
    26. System.out.println("接收到消息:" + message);
    27. }
    28. });
    29. System.out.println("等待接收消息");
    30. }
    31. }