可以支持模糊匹配的路由key

特殊字符:
# 0个,一个或者多个
* 最少要一个

image.png


发送消息

image.png


实现

主题模式(Topics)

增加模糊路由,模糊匹配
*:一个
#:多个

image.png

生产者

  1. public class Producer {
  2. private static Logger log = LoggerFactory.getLogger(Producer.class);
  3. public static void main(String[] args) {
  4. // 步骤
  5. // 1. 创建连接工厂
  6. log.info("1. 创建连接工厂");
  7. ConnectionFactory connectionFactory = new ConnectionFactory();
  8. //配置相关配置项
  9. connectionFactory.setHost("112.74.175.76");
  10. connectionFactory.setPort(5672);
  11. connectionFactory.setUsername("admin");
  12. connectionFactory.setPassword("admin");
  13. connectionFactory.setVirtualHost("/");
  14. //声明连接和通道
  15. Connection connection = null;
  16. Channel channel = null;
  17. try {
  18. // 2. 创建连接 Connection
  19. log.info("2. 创建连接 Connection");
  20. connection = connectionFactory.newConnection("生产者");
  21. // 3. 通过连接获取通道Chanel
  22. log.info("3. 通过连接获取通道Chanel");
  23. channel = connection.createChannel();
  24. // 5. 准备消息
  25. log.info("5. 准备消息");
  26. String msg = "Hello topic-exchange "+System.currentTimeMillis();
  27. // 6. 准备交换机
  28. log.info("6. 准备交换机");
  29. String exchangeName = "topic-exchange";
  30. // 7. 定义路由key
  31. log.info("7. 定义路由key");
  32. String routingKey = "com.order.user.test";
  33. // 8. 定义交换机类型
  34. log.info("8. 定义交换机类型");
  35. String exchangeType = "topic";
  36. /**
  37. * @param 交换机
  38. * @param 队列名称/路由key
  39. * @param 属性配置
  40. * @param 发送消息内容
  41. */
  42. channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
  43. log.info("消息发送成功:{}",msg);
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. log.info("关闭连接");
  48. // 7. 关闭通道
  49. if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
  50. try {
  51. channel.close();
  52. } catch (IOException | TimeoutException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. // 8. 关闭连接
  57. if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
  58. try {
  59. connection.close();
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  65. }
  66. }

image.png


消费者

  1. public class Consumer {
  2. private static Logger log = LoggerFactory.getLogger(Consumer.class);
  3. private static Runnable runnable = new Runnable() {
  4. @Override
  5. public void run() {
  6. // 步骤
  7. // 1. 创建连接工厂
  8. log.info("1. 创建连接工厂");
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. //配置相关配置项
  11. connectionFactory.setHost("112.74.175.76");
  12. connectionFactory.setPort(5672);
  13. connectionFactory.setUsername("admin");
  14. connectionFactory.setPassword("admin");
  15. connectionFactory.setVirtualHost("/");
  16. //获取队列的名称
  17. final String queueName = Thread.currentThread().getName();
  18. //声明连接和通道
  19. Connection connection = null;
  20. Channel channel = null;
  21. try {
  22. // 2. 创建连接 Connection
  23. log.info("2. 创建连接 Connection");
  24. connection = connectionFactory.newConnection("消费者-1");
  25. // 3. 通过连接获取通道Chanel
  26. log.info("3. 通过连接获取通道Chanel");
  27. channel = connection.createChannel();
  28. // 4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
  29. //声明队列
  30. log.info("4. 通过通道接收消息");
  31. log.info("参数传递队列名称:{}", queueName);
  32. //5. 声明队列存储消息,如果不存在则被创建,rabbitmq不允许存在两个相同的队列名称
  33. log.info("5. 声明队列存储消息");
  34. //6. 定义接收消息的回调
  35. log.info("6. 定义接收消息的回调");
  36. Channel callbackChannel = channel;
  37. /**
  38. * @param queue 队列名称
  39. * @param durable 是否持久化
  40. * @param exclusive 是否排他,是否是私有的,如果为true,则会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
  41. * @param autoDelete 是否自动删除 当最后一个消费者断开连接之后是否自动删除消息
  42. * @param arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的生命周期
  43. */
  44. callbackChannel.basicConsume(queueName, true, new DeliverCallback() {
  45. @Override
  46. public void handle(String s, Delivery delivery) throws IOException {
  47. log.info(String.valueOf(delivery.getEnvelope().getDeliveryTag()));
  48. log.info("{}:收到的消息是:{}",queueName, new String(delivery.getBody(), StandardCharsets.UTF_8));
  49. }
  50. }, new CancelCallback() {
  51. @Override
  52. public void handle(String s) throws IOException {
  53. log.error("接受消息失败:{}", s);
  54. }
  55. });
  56. System.out.println("开始接收消息");
  57. //阻断执行
  58. // System.in.read();
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. } finally {
  62. log.info("关闭连接");
  63. // 7. 关闭通道
  64. if (ObjectUtil.isNotEmpty(channel) && channel.isOpen()) {
  65. try {
  66. channel.close();
  67. } catch (IOException | TimeoutException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. // 8. 关闭连接
  72. if (ObjectUtil.isNotEmpty(connection) && connection.isOpen()) {
  73. try {
  74. connection.close();
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }
  80. }
  81. };
  82. public static void main(String[] args) {
  83. //采用多线程的方式进行创建,使用线程名作为队列名
  84. new Thread(runnable,"q1").start();
  85. new Thread(runnable,"q2").start();
  86. new Thread(runnable,"q3").start();
  87. new Thread(runnable,"q4").start();
  88. }
  89. }

image.png