RabbitMQ的基本结构

image-20210717162752376.png
RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

    RabbitMQ消息模型

    image-20210717163332646.png

    入门案例

    简单队列模式的模型图:

    image-20210717163434647.png
    官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue

  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

    案例结构

    image.png

    publisher实现

    ```java package cn.itcast.mq.helloworld;

public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost(在15672端口RabbitMQ控制面板Admin管理里面可以设置,相当于给每个用户分配一个空间每个空间是独立的)、用户名、密码 factory.setHost(“192.168.150.101”); factory.setPort(5672); factory.setVirtualHost(“/“); factory.setUsername(“itcast”); factory.setPassword(“123321”); // 1.2.建立连接 Connection connection = factory.newConnection();

  1. // 2.创建通道Channel
  2. Channel channel = connection.createChannel();
  3. // 3.创建队列
  4. String queueName = "simple.queue";
  5. channel.queueDeclare(queueName, false, false, false, null);
  6. // 4.发送消息
  7. String message = "hello, rabbitmq!";
  8. channel.basicPublish("", queueName, null, message.getBytes());
  9. System.out.println("发送消息成功:【" + message + "】");
  10. // 5.关闭通道和连接
  11. channel.close();
  12. connection.close();
  13. }

}

  1. <a name="blcpm"></a>
  2. ## consumer实现
  3. ```java
  4. package cn.itcast.mq.helloworld;
  5. public class ConsumerTest {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. // 1.建立连接
  8. ConnectionFactory factory = new ConnectionFactory();
  9. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  10. factory.setHost("192.168.150.101");
  11. factory.setPort(5672);
  12. factory.setVirtualHost("/");
  13. factory.setUsername("itcast");
  14. factory.setPassword("123321");
  15. // 1.2.建立连接
  16. Connection connection = factory.newConnection();
  17. // 2.创建通道Channel
  18. Channel channel = connection.createChannel();
  19. // 3.创建队列
  20. String queueName = "simple.queue";
  21. channel.queueDeclare(queueName, false, false, false, null);
  22. // 4.订阅消息
  23. channel.basicConsume(queueName, true, new DefaultConsumer(channel){
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope,
  26. AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. // 5.处理消息
  28. String message = new String(body);
  29. System.out.println("接收到消息:【" + message + "】");
  30. }
  31. });
  32. System.out.println("等待接收消息。。。。");
  33. }
  34. }