可以支持模糊匹配的路由key
特殊字符:
# 0个,一个或者多个
* 最少要一个

发送消息

实现
主题模式(Topics)
增加模糊路由,模糊匹配
*:一个
#:多个

生产者
public class Producer {private static Logger log = LoggerFactory.getLogger(Producer.class);public static void main(String[] args) {// 步骤// 1. 创建连接工厂log.info("1. 创建连接工厂");ConnectionFactory connectionFactory = new ConnectionFactory();//配置相关配置项connectionFactory.setHost("112.74.175.76");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");//声明连接和通道Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionlog.info("2. 创建连接 Connection");connection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道Chanellog.info("3. 通过连接获取通道Chanel");channel = connection.createChannel();// 5. 准备消息log.info("5. 准备消息");String msg = "Hello topic-exchange "+System.currentTimeMillis();// 6. 准备交换机log.info("6. 准备交换机");String exchangeName = "topic-exchange";// 7. 定义路由keylog.info("7. 定义路由key");String routingKey = "com.order.user.test";// 8. 定义交换机类型log.info("8. 定义交换机类型");String exchangeType = "topic";/*** @param 交换机* @param 队列名称/路由key* @param 属性配置* @param 发送消息内容*/channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());log.info("消息发送成功:{}",msg);} catch (Exception e) {e.printStackTrace();} finally {log.info("关闭连接");// 7. 关闭通道if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}// 8. 关闭连接if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}

消费者
public class Consumer {private static Logger log = LoggerFactory.getLogger(Consumer.class);private static Runnable runnable = new Runnable() {@Overridepublic void run() {// 步骤// 1. 创建连接工厂log.info("1. 创建连接工厂");ConnectionFactory connectionFactory = new ConnectionFactory();//配置相关配置项connectionFactory.setHost("112.74.175.76");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");//获取队列的名称final String queueName = Thread.currentThread().getName();//声明连接和通道Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionlog.info("2. 创建连接 Connection");connection = connectionFactory.newConnection("消费者-1");// 3. 通过连接获取通道Chanellog.info("3. 通过连接获取通道Chanel");channel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息//声明队列log.info("4. 通过通道接收消息");log.info("参数传递队列名称:{}", queueName);//5. 声明队列存储消息,如果不存在则被创建,rabbitmq不允许存在两个相同的队列名称log.info("5. 声明队列存储消息");//6. 定义接收消息的回调log.info("6. 定义接收消息的回调");Channel callbackChannel = channel;/*** @param queue 队列名称* @param durable 是否持久化* @param exclusive 是否排他,是否是私有的,如果为true,则会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @param autoDelete 是否自动删除 当最后一个消费者断开连接之后是否自动删除消息* @param arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的生命周期*/callbackChannel.basicConsume(queueName, true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {log.info(String.valueOf(delivery.getEnvelope().getDeliveryTag()));log.info("{}:收到的消息是:{}",queueName, new String(delivery.getBody(), StandardCharsets.UTF_8));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {log.error("接受消息失败:{}", s);}});System.out.println("开始接收消息");//阻断执行// System.in.read();} catch (Exception e) {e.printStackTrace();} finally {log.info("关闭连接");// 7. 关闭通道if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}// 8. 关闭连接if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}};public static void main(String[] args) {//采用多线程的方式进行创建,使用线程名作为队列名new Thread(runnable,"q1").start();new Thread(runnable,"q2").start();new Thread(runnable,"q3").start();new Thread(runnable,"q4").start();}}

