直接上生产者代码
package com.demo.RabbitMQ;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.util.HashMap;import java.util.Map;//消息队列hello world//生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。public class ProducerMQ {//定义队列名字private final static String QUEUE_NAME = "hello";@SuppressWarnings("ProducerMQ")public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();//设置链接地址connectionFactory.setHost( "127.0.0.1" );//创建一个连接Connection connection = connectionFactory.newConnection();//通过连接创建一个通信管道Channel channel = connection.createChannel();Map <String, Object> map = new HashMap();map.put( "AWK", "98K" );// 参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数channel.queueDeclare( QUEUE_NAME, false, false, false, null );//发送的消息String message = "Hello World";//转换成byte传出去map.put( "AWK", "AK47" );AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode( 2 ) // 传送方式.contentEncoding( "UTF-8" ) // 编码方式.expiration( "10000" ) // 过期时间.headers( map ) //自定义属性.build();//basicPublish 参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-路由的headers信息;参数四:消息主体channel.basicPublish( "", QUEUE_NAME, properties, message.getBytes() );//关闭通道channel.close();//关闭连接connection.close();}}
消费者(消息接受者)
package com.demo.RabbitMQ;import com.rabbitmq.client.*;import java.io.IOException;//接收者//生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。public class ConsumerMQ {//接受队列得名字private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost( "127.0.0.1" );//创建通信连接Connection connection = connectionFactory.newConnection();//建立通道Channel chanel = connection.createChannel();// 声明队列【参数说明:参数一:队列名称,参数二:// 是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】chanel.queueDeclare( QUEUE_NAME, true, false, false, null );// 第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.getConsumer consumer = new DefaultConsumer( chanel ) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String( body );//接收到的路由的headers等System.out.println( "头部" + properties.getHeaders() );System.out.println( "接收到的消息" + message );}};// queue 所订阅的队列 autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false// callback接收到消息之后执行的回调方法chanel.basicConsume( QUEUE_NAME, true, consumer );// 第二种消息获取 单个消息获取采用GetResponse// @Param队列名称 Boolean autoAck 是否自动确认// while (true) {// GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );// String message = new String( getResponse.getBody(), "UTF-8" );// System.out.println( message );// UInt64 deliveryTag, 结果是否为多条数据// chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );// }//}}
在RabbitMQ管理台手动添加队列的时候选择持久化选择 Durable持久那么消费者端也必须设置持久否则会报如下错误

接受的时候会报错。

需设置消费者是否持久化 为true


