windows环境搭建:

    1. 1.下载并安装erlang,下载地址:http://www.erlang.org/download
    2. 2.配置erlang环境变量信息
    3. 新增环境变量ERLANG_HOME=erlang的安装地址
    4. 将%ERLANG_HOME%\bin加入到path
    5. 3.下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html
    6. 注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang
    7. RabitMQ管理平台中心
    8. RabbitMQ 管理平台地址 http://127.0.0.1:15672
    9. 默认账号:guest/guest 用户可以自己创建新的账号
    10. Virtual Hosts:
    11. mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?
    12. RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每
    13. VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互
    14. 隔离的。exchangequeuemessage不能互通。
    15. 默认的端口15672rabbitmq管理平台端口号
    16. 默认的端口5672 rabbitmq消息中间内部通讯的端口
    17. 默认的端口号25672 rabbitmq集群的端口号

    linux环境搭建(Centos7):

    yum install epel-release
    yum install rabbitmq-server
    systemctl start rabbitmq-server
    rabbitmq-plugins enable rabbitmq_management
    
    
    添加用户
    rabbitmqctl add_user wn wn123  用户名 密码
    把用户设置为管理员
    rabbitmqctl set_user_tags wn administrator
    给用户赋予所有权限
    rabbitmqctl set_permissions -p / wn ".*" ".*" ".*"
    

    1、对比:
    ActiveMQ遵循JMS规范,但是迭代速度很慢,不怎么维护了,并且高并发性能不怎么好,有可能会丢失消息;
    RabbitMQ是Erlang语言编写的,继承了Erlang语言天生的为并发而生的性能,稳定性与安全性也有保障;但是不支持动态扩展;
    Kafaka依赖Zookeeper,可动态扩展节点,高性能高吞吐量、无限扩容、消息可以指定追溯;但是有着严格的顺序机制,不支持消息优先级;

    2、RabbitMQ支持多virtual host,不同团队可共用一个RabbitMQ,只要建立不同的virtual host就好了,可以进行相互隔离;互不影响;

    3、普通队列与公平队列(点对点)
    普通:

        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.6.5</version>
            </dependency>
        </dependencies>
    
    public class RabitMQConnection {
    
        public static Connection getConnection() throws IOException, TimeoutException {
            // 1.创建我们的连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2.设置我们的连接地址
            connectionFactory.setHost("127.0.0.1");
            // 3.设置我们的端口号
            connectionFactory.setPort(5672);
            // 4.设置账号和密码
            connectionFactory.setUsername("meite");
            connectionFactory.setPassword("meite");
            // 5.设置VirtualHost
            connectionFactory.setVirtualHost("/meite_rabbitmq");
            return connectionFactory.newConnection();
        }
    }
    
    public class Producer {
        private static final String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = newConnection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "test_yushengjun110";
            System.out.println("生产者发送消息:" + msg);
            // 4.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            channel.close();
            newConnection.close();
        }
    
    }
    


    public class Customer {
        private static final String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("002");
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            // 2.获取通道
            Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                }
            };
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
        }
    
    }
    

    公平:work queue

    public class Producer {
        private static final String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = newConnection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            for (int i = 1; i <= 50; i++) {
                String msg = "test_yushengjun" + i;
                System.out.println("生产者发送消息:" + msg);
                // 4.发送消息
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            newConnection.close();
        }
    
    }
    


    public class Customer1 {
        private static final String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("001");
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            // 2.获取通道
            final Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
    
                    } finally {
                        // 手动回执消息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            // 3.监听队列
            //设置应答模式,如果为true情况下,表示自动应答模式,false表示手动应答
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
        }
    
    }
    

    保证消息不丢失:
    通过confirm机制确认发送消息成功(向mqserver发送消息,如果接收到采用ack机制通知mqclient说明发送成功)

    public class Producer {
        private static final String QUEUE_NAME = "mayikt";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            System.out.println("生产者启动成功..");
            // 1.创建我们的连接
            Connection connection = RabitMQConnection.getConnection();
            // 2.创建我们通道
            Channel channel = connection.createChannel();
    //        for (int i = 0; i < 10; i++) {
            // 开启生产确认消息投递机制
            channel.confirmSelect();
            int i = 1;
            String msg = "每特教育第六期突破3万月薪" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("生产者发送消息成功:" + msg);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    消费者采用ack机制通知给mqserver才会给消费者发送下一条消息(消费者启动的时候会自动去mqserver拉去消息,如果后边有新消息则mqserver会会将消息自动推送给消费者)

    public class Consumer {
        private static final String QUEUE_NAME = "mayikt";
        // 业务操作等待时间
        private static int time = 1000;
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println(">>>消费者02启动:" + time);
            // 1.创建我们的连接
            Connection connection = RabitMQConnection.getConnection();
            // 2.创建我们通道
            final Channel channel = connection.createChannel();
            // MQ每次只会给消费者发送一条消息,必须返回ack之后才会继续发送消息给消费者
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费消息msg:" + msg);
                    try {
                        Thread.sleep(time);
                    } catch (Exception e) {
    
                    }
                    // 手动发送消息告诉给mq服务器端  从队列删除该消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 3.创建我们的监听的消息 auto Ack 默认自动签收  必须手动ack
            channel.basicConsume(QUEUE_NAME, false
                    , defaultConsumer);
    
        }
    }
    

    至于mq服务端怎样做保证消息的安全:
    在我们创建队列和交换机的时候都有一个属性Durability,值可以为Durable和Tmep;
    如果当我们的mqserver刚要做消息磁盘持久化的时候,服务器挂了,那我们可以在发送消息前搞个日志表记录下,然后消费者消费也搞个日志表,做个定时同步补发就好;