直接上生产者代码

    1. package com.demo.RabbitMQ;
    2. import com.rabbitmq.client.AMQP;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.util.HashMap;
    7. import java.util.Map;
    8. //消息队列hello world
    9. //生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
    10. public class ProducerMQ {
    11. //定义队列名字
    12. private final static String QUEUE_NAME = "hello";
    13. @SuppressWarnings("ProducerMQ")
    14. public static void main(String[] args) throws Exception {
    15. //创建一个连接工厂
    16. ConnectionFactory connectionFactory = new ConnectionFactory();
    17. //设置链接地址
    18. connectionFactory.setHost( "127.0.0.1" );
    19. //创建一个连接
    20. Connection connection = connectionFactory.newConnection();
    21. //通过连接创建一个通信管道
    22. Channel channel = connection.createChannel();
    23. Map <String, Object> map = new HashMap();
    24. map.put( "AWK", "98K" );
    25. // 参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数
    26. channel.queueDeclare( QUEUE_NAME, false, false, false, null );
    27. //发送的消息
    28. String message = "Hello World";
    29. //转换成byte传出去
    30. map.put( "AWK", "AK47" );
    31. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    32. .deliveryMode( 2 ) // 传送方式
    33. .contentEncoding( "UTF-8" ) // 编码方式
    34. .expiration( "10000" ) // 过期时间
    35. .headers( map ) //自定义属性
    36. .build();
    37. //basicPublish 参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-路由的headers信息;参数四:消息主体
    38. channel.basicPublish( "", QUEUE_NAME, properties, message.getBytes() );
    39. //关闭通道
    40. channel.close();
    41. //关闭连接
    42. connection.close();
    43. }
    44. }

    消费者(消息接受者)

    1. package com.demo.RabbitMQ;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. //接收者
    5. //生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
    6. public class ConsumerMQ {
    7. //接受队列得名字
    8. private final static String QUEUE_NAME = "hello";
    9. public static void main(String[] args) throws Exception {
    10. ConnectionFactory connectionFactory = new ConnectionFactory();
    11. connectionFactory.setHost( "127.0.0.1" );
    12. //创建通信连接
    13. Connection connection = connectionFactory.newConnection();
    14. //建立通道
    15. Channel chanel = connection.createChannel();
    16. // 声明队列【参数说明:参数一:队列名称,参数二:
    17. // 是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
    18. chanel.queueDeclare( QUEUE_NAME, true, false, false, null );
    19. // 第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.get
    20. Consumer consumer = new DefaultConsumer( chanel ) {
    21. @Override
    22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    23. String message = new String( body );
    24. //接收到的路由的headers等
    25. System.out.println( "头部" + properties.getHeaders() );
    26. System.out.println( "接收到的消息" + message );
    27. }
    28. };
    29. // queue 所订阅的队列 autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false
    30. // callback接收到消息之后执行的回调方法
    31. chanel.basicConsume( QUEUE_NAME, true, consumer );
    32. // 第二种消息获取 单个消息获取采用GetResponse
    33. // @Param队列名称 Boolean autoAck 是否自动确认
    34. // while (true) {
    35. // GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );
    36. // String message = new String( getResponse.getBody(), "UTF-8" );
    37. // System.out.println( message );
    38. // UInt64 deliveryTag, 结果是否为多条数据
    39. // chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );
    40. // }
    41. //
    42. }
    43. }

    在RabbitMQ管理台手动添加队列的时候选择持久化选择 Durable持久那么消费者端也必须设置持久否则会报如下错误
    (2)RabbitMQ - 图1
    接受的时候会报错。
    (2)RabbitMQ - 图2
    需设置消费者是否持久化 为true

    (2)RabbitMQ - 图3

    (2)RabbitMQ - 图4