这里 rabbitmq安装就省略了。
注意下载和配置 erlang 就行

简介

RabbitMQ 是基于 AMQP 协议的一种消息队列,有的时候我们称之为消息中间件。
AMQP,即(Advanced Message Queuing Protocol),高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

几种常见的消息模型

下边我们来看看 rabbitmq 的几种消息模型
创建maven项目,导入相关依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.7.3</version>
  5. </dependency>

直连模式

image.png

这种模式下 是一个点对点的模式。

  • P:生产者,也就是要发送消息的一方
  • C:消费者,接受消息的一方
  • queue:图中红色方块区域,存放消息的队列,先进先出。生产者发送消息到队列,消费者从队列中拿消息。

这个模式下只能有一个 生产者和消费者。

无论在何种模式下
无论角色是生产者还是消费者,在发送和获取消息的时候,都要先获取一个 Connection 连接。
然后通过连接,创建一个 channel 对象,通过channel 对象来绑定一个 queue,然后进行消息的发送和获取。

所以实际上应该是这样的。
image.png

关于 channel 这个概念

我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢?
试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是多个 TCP 连接。
然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。
RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源

感谢链接 https://www.cnblogs.com/eleven24/p/10326718.html

代码实现

生产者端:

public class Provider {

    //生产消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //首先这里肯定要先去 rabbitmq去创建一个链接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置 rabbitmq 的地址和端口号
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        //连接哪个虚拟主机
        connectionFactory.setVirtualHost("/ems");

        //设置访问虚拟机的用户名和密码
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("toor");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //因为在 rabbitmq 中我们都是通过 通道(channel)来获取和发送消息的
        Channel channel = connection.createChannel();

        //通道去绑定对应的消息队列。
        //这个队列会在不存在的情况下,自动创建一个队列。
        channel.queueDeclare("hello", //参数名
                true,   //队列是否持久化,持久化意味着重启后,队列仍然保存,但是队列中的数据会丢失,
                                // 如果没有持久化,重启后队列会跟着消失, 。
                                // 我们这里没必要,所以可以设置为 false,在项目中应该设为 true
                false,  //是否独占队列,true 意味着只能自己这个连接可用这个队列,其它连接不能
                false,  //在消息完成之后是否自动删除队列(注意!这里是在消费者完全断开连接之后才进行删除的)
                null);//附加参数

        channel.basicPublish("", //要发给交换机的名称,因为我们这里是简单模式
                "hello",        //要发给队列的名称
                MessageProperties.PERSISTENT_TEXT_PLAIN,//传递消息额外设置,我们可以在这里设置消息的持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
                "hello rabbitmq".getBytes() );//消息的具体内容,必须是byte数组

        //关闭channel
        channel.close();
        //关闭连接
        connection.close();

    }

}

因为以后也要重复获取和关闭连接,我们这里索性抽取一个 util 工具类

public class RabbitMqUtil {

    private static ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();

        //首先这里肯定要先去 rabbitmq去创建一个链接
        //设置 rabbitmq 的地址和端口号
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        //连接哪个虚拟主机
        connectionFactory.setVirtualHost("/ems");

        //设置访问虚拟机的用户名和密码
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("toor");
    }


    /**
     * 获取连接
     * @return
     */
    public static Connection getConnection() {
        //获取连接对象
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
            //因为在 rabbitmq 中我们都是通过 通道(channel)来获取和发送消息的
            return connection;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return null;
    }

    /**
     * 关闭连接
     * @param channel
     * @param connection
     * @return
     */
    public static boolean close(Channel channel, Connection connection) {
        try {
            //关闭channel
            if (channel != null) channel.close();
            //关闭连接
            if (connection != null) connection.close();
            return true;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return false;
    }

}

消费者端:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = RabbitMqUtil.getConnection();

        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);

        channel.basicConsume(
                "hello", //消费哪个队列的消息
                true,//开启消息的确认机制
                new DefaultConsumer(channel){//消息消费时的回调接口

                    //处理消息的回调方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //前三个参数,咱们先不管,看看最后一个参数
                        //见名知意
                        String s = new String(body);
                        System.out.println("消费消息:::::" + s);
                    }
                });

        System.out.println("channel 准备关闭");

    }
}