简单模式
一个生产者发送消息,一个消费者接收消息,关系是一对一。
pom引入
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency></dependencies>
producer (生产者)
package com.hikktn.simple;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Producer* @Description 生产者* @Author lisonglin* @Date 2021/4/5 21:08* @Version 1.0*/public class Producer {private static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");Connection connection = null;Channel channel = null;try {// 创建连接connection = factory.newConnection();// 创建通道channel = connection.createChannel();// 声明简单队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "你好,rabbitmq!";/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("提供者发送消息:" + message);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 关闭资源try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
consumer(消费者)
package com.hikktn.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class Consumer {private static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");Connection connection = null;Channel channel = null;try {// 创建连接connection = factory.newConnection();// 创建通道channel = connection.createChannel();//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key为:" + envelope.getRoutingKey());//交换机System.out.println("交换机为:" + envelope.getExchange());//消息idSystem.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME, true, consumer);}catch (IOException e){e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}}
测试 
RabbitMQ在线网站查看消息

Java接收到消息
RabbitMQ消费完消息
工作模式
一个生产者发送消息,多个消费者平均接收消息,相当于轮询处理消息。
生产者
package com.hikktn.simple;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Producer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:08* @Version 1.0*/public class ProducerWork {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");Connection connection = null;Channel channel = null;try {// 创建连接connection = factory.newConnection();// 创建通道channel = connection.createChannel();// 声明简单队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 0; i <= 10; i++) {String message = "你好,rabbitmq!"+i;/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("提供者发送消息:" + message + i);}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 关闭资源try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
将连接配置共同到一个工具类中:
package com.hikktn.util;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/*** @ClassName ConnectionUtil* @Description TODO* @Author lisonglin* @Date 2021/4/7 1:07* @Version 1.0*/public class ConnectionUtil {public static Connection getConnection() throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}}
consumer(消费者1)
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerWork1 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();//一次只能接收并处理一个消息channel.basicQos(1);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 测试ack// int i = 1 / 0;// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME, false, consumer);}}
consumer(消费者2)
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.oracle.jrockit.jfr.Producer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerWork2 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);//一次只能接收并处理一个消息channel.basicQos(1);//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));// 自动ack(接收消息)channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME, false, consumer);}}
测试
消费者1接收到消息:

消费者2接收到消息:
生产者发送消息:
订阅模式
一个生产者发送消息,所有消费者接收消息。
生产者
package com.hikktn.simple;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Producer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:08* @Version 1.0*/public class ProducerPublish {//交换机名称private static final String FANOUT_EXCHAGE = "fanout_exchange";private static final String QUEUE_NAME_1 = "publish_fanout_queue_1";private static final String QUEUE_NAME_2 = "publish_fanout_queue_2";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");Connection connection = null;Channel channel = null;try {// 创建连接connection = factory.newConnection();// 创建通道channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/// 提供方可以不需要创建队列和交换机,由消费者处理// channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);// channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);//队列绑定交换机// channel.queueBind(QUEUE_NAME_1, FANOUT_EXCHAGE, "");// channel.queueBind(QUEUE_NAME_2, FANOUT_EXCHAGE, "");for (int i = 2; i <= 5; i++) {// 发送信息String message = "567你好;小兔子!发布订阅模式--" + i;// 声明事务channel.txSelect();/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(FANOUT_EXCHAGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());// 提交事务channel.txCommit();System.out.println("已发送消息:" + message);}} catch (Exception e1) {try {// 回滚channel.txRollback();} catch (IOException e) {e.printStackTrace();}} finally {// 关闭资源try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
消费者1
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.oracle.jrockit.jfr.Producer;import com.rabbitmq.client.*;import java.io.IOException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerPublish1 {private static final String FANOUT_EXCHAGE = "fanout_exchange";private static final String QUEUE_NAME_1 = "publish_fanout_queue_1";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);//队列绑定交换机channel.queueBind(QUEUE_NAME_1, FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME_1, false, consumer);}}
消费者2
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerPublish2 {private static final String FANOUT_EXCHAGE = "fanout_exchange";private static final String QUEUE_NAME_2 = "publish_fanout_queue_2";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);//队列绑定交换机channel.queueBind(QUEUE_NAME_2, FANOUT_EXCHAGE, "");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), true);}};//监听消息/*** queue :队列名称* autoAck :是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 设置true后,回调方法不会调用,需要重新启动后,回调方法才会调用* callback:消息接收到后回调*/channel.basicConsume(QUEUE_NAME_2, false, consumer);}}
路由模式
一个生产者发送消息,同时绑定了消息的key,而多个消费者想要接收,必须拥有这个消息key,否则无法接收到消息
生产者
package com.hikktn.simple;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Producer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:08* @Version 1.0*/public class ProducerRouting {//交换机名称private static final String DIRECT_EXCHAGE = "direct_exchange";private static final String QUEUE_NAME_1 = "routing_direct_queue_1";private static final String QUEUE_NAME_2 = "routing_direct_queue_2";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");Connection connection = null;Channel channel = null;try {// 创建连接connection = factory.newConnection();// 创建通道channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/// 提供方可以不需要创建队列和交换机,由消费者处理// channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);// channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);//队列绑定交换机// channel.queueBind(QUEUE_NAME_1, DIRECT_EXCHAGE, "insert");// channel.queueBind(QUEUE_NAME_2, DIRECT_EXCHAGE, "update");for (int i = 1; i <= 5; i++) {// 发送信息String messageUpdate = "你好,小兔子!路由模式;routing key 为 update " + i;String messageInsert = "你好,小兔子!路由模式;routing key 为 insert " + i;// 声明事务channel.txSelect();/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(DIRECT_EXCHAGE, "update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageUpdate.getBytes());channel.basicPublish(DIRECT_EXCHAGE, "insert", MessageProperties.PERSISTENT_TEXT_PLAIN, messageInsert.getBytes());System.out.println("提供者发送的消息:" + messageUpdate);System.out.println("提供者发送的消息:" + messageInsert);// 提交事务channel.txCommit();}} catch (Exception e1) {try {// 回滚channel.txRollback();} catch (IOException e) {e.printStackTrace();}} finally {// 关闭资源try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
消费者1
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerRouting1 {private static final String DIRECT_EXCHAGE = "direct_exchange";private static final String QUEUE_NAME_1 = "routing_direct_queue_1";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);//队列绑定交换机channel.queueBind(QUEUE_NAME_1, DIRECT_EXCHAGE, "insert");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME_1, false, consumer);}}
消费者2
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerRouting2 {private static final String DIRECT_EXCHAGE = "direct_exchange";private static final String QUEUE_NAME_2 = "routing_direct_queue_2";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);//队列绑定交换机channel.queueBind(QUEUE_NAME_2, DIRECT_EXCHAGE, "update");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME_2, false, consumer);}}
通配符模式
两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配一个单词
生产者
package com.hikktn.simple;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @ClassName Producer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:08* @Version 1.0*/public class ProducerTopics {//交换机名称private static final String TOPIC_EXCHAGE = "topic_exchange";private static final String QUEUE_NAME_1 = "topic_queue_1";private static final String QUEUE_NAME_2 = "topic_queue_2";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 主机地址;默认为 localhostfactory.setHost("192.168.135.143");// 虚拟主机名称;默认为 /factory.setVirtualHost("/itcast");// 连接用户名;默认为guestfactory.setUsername("hikktn");// 连接密码;默认为guestfactory.setPassword("hikktn");Connection connection = null;Channel channel = null;try {// 创建连接connection = factory.newConnection();// 创建通道channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/// 提供方可以不需要创建队列和交换机,由消费者处理// channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);// channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);//队列绑定交换机// channel.queueBind(QUEUE_NAME_1, DIRECT_EXCHAGE, "insert");// channel.queueBind(QUEUE_NAME_2, DIRECT_EXCHAGE, "update");// 发送信息String messageUpdate = "你好,小兔子! key 为 hikktn.update ";String messageInsert = "你好,小兔子! key 为 hikktn.insert ";// 声明事务channel.txSelect();/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(TOPIC_EXCHAGE, "hikktn.update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageUpdate.getBytes());channel.basicPublish(TOPIC_EXCHAGE, "hikktn.insert.123", MessageProperties.PERSISTENT_TEXT_PLAIN, messageInsert.getBytes());System.out.println("提供者发送的消息:" + messageUpdate);System.out.println("提供者发送的消息:" + messageInsert);// 提交事务channel.txCommit();} catch (Exception e1) {try {// 回滚channel.txRollback();} catch (IOException e) {e.printStackTrace();}} finally {// 关闭资源try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
消费者1
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerTopics1 {private static final String TOPIC_EXCHAGE = "topic_exchange";private static final String QUEUE_NAME_1 = "topic_queue_1";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);//队列绑定交换机channel.queueBind(QUEUE_NAME_1, TOPIC_EXCHAGE, "*.update");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME_1, false, consumer);}}
消费者2
package com.hikktn.consumer;import com.hikktn.util.ConnectionUtil;import com.rabbitmq.client.*;import java.io.IOException;/*** @ClassName Consumer* @Description TODO* @Author lisonglin* @Date 2021/4/5 21:42* @Version 1.0*/public class ConsumerTopics2 {private static final String TOPIC_EXCHAGE = "topic_exchange";private static final String QUEUE_NAME_2 = "topic_queue_2";public static void main(String[] args) throws Exception {// 创建连接工厂Connection connection = ConnectionUtil.getConnection();// 创建通道final Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 声明(创建)队列/*** 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);//队列绑定交换机channel.queueBind(QUEUE_NAME_2, TOPIC_EXCHAGE, "hikktn.#");//创建消费者;并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// //路由key// System.out.println("路由key为:" + envelope.getRoutingKey());// //交换机// System.out.println("交换机为:" + envelope.getExchange());// //消息id// System.out.println("消息id为:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};//监听消息/*** 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_NAME_2, false, consumer);}}









生产者发送消息


