分发机制
参考:https://rabbitmq.com/getstarted.html

一定要掌握前五种

简单模式(Simple)

生产者 - 队列 - 消费者
image.png

导入依赖

  1. <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.12.0</version>
  6. </dependency>

定义生产者


/**
 * 生产者
 *
 * @author zukxu
 * CreateTime: 2021/5/18 0018 15:32
 */
public class Producer {
    private static Logger log = LoggerFactory.getLogger(Producer.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        log.info("1. 创建连接工厂");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            log.info("2. 创建连接 Connection");
            connection = connectionFactory.newConnection("生产者-1");

            //    3. 通过连接获取通道Chanel
            log.info("3. 通过连接获取通道Chanel");
            channel = connection.createChannel();

            //    4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
            //声明队列
            log.info("通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息");
            String queueName = "队列-1";
            /**
             * @param 队列名称
             * @param 是否持久化
             * @param 是否独占队列(排他性,一般不设置为排他)
             * @param 是否自动删除(最后一个消费者消费结束是否自动删除,一般不会自动删除)
             * @param 附加参数map
             *
             */
            channel.queueDeclare(queueName, false, false, false, null);

            //    5. 准备消息
            log.info("准备消息");
            String msg = "Hello World";

            //    6. 发送消息到queue
            log.info("6. 发送消息到queue");
            /**
             * @param 交换机
             * @param 队列名称
             * @param 消息持久化
             * @param 发送内容
             */
            channel.basicPublish("", queueName, null, msg.getBytes());

            log.info("消息发送成功:{}",msg);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            log.info("关闭连接");
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

image.png

  • Rabbitmq

image.png

定义消费者

public class Consumer {
    private static Logger log = LoggerFactory.getLogger(Consumer.class);

    public static void main(String[] args) {
        //    所有中间件都是基于tcp/ip协议,rabbitmq基于AMQP协议
        //    需要ip:port

        //    步骤
        //    1. 创建连接工厂
        log.info("1. 创建连接工厂");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //配置相关配置项
        connectionFactory.setHost("112.74.175.76");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        //声明连接和通道
        Connection connection = null;
        Channel channel = null;
        try {
            //     2. 创建连接 Connection
            log.info("2. 创建连接 Connection");
            connection = connectionFactory.newConnection("消费者-1");

            //    3. 通过连接获取通道Chanel
            log.info("3. 通过连接获取通道Chanel");
            channel = connection.createChannel();

            //    4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
            //声明队列
            log.info("通过通道接收消息");

            String queueName = "队列-1";
            log.info("参数传递队列名称:{}", queueName);

            channel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    log.info("消息接收成功:{}", new String(delivery.getBody(), StandardCharsets.UTF_8));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    log.error("接受消息失败:{}", s);
                }
            });

            System.out.println("开始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            log.info("关闭连接");
            //    7. 关闭通道

            if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {

                try {
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
            }

            //    8. 关闭连接
            if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

查看消费过程

持久化和非持久化队列的区别就是 服务器重启和消息消费之后队列是否还存在
而且非持久化也会进行存盘,但是重启服务器之后会消失,持久化不会消失

  • 生成连接

image.png


  • 生成通道

image.png


  • 生成队列

image.png