简单模式

一个生产者发送消息,一个消费者接收消息,关系是一对一。
image.png
pom引入

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.6.0</version>
  6. </dependency>
  7. </dependencies>

producer (生产者)

  1. package com.hikktn.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * @ClassName Producer
  9. * @Description 生产者
  10. * @Author lisonglin
  11. * @Date 2021/4/5 21:08
  12. * @Version 1.0
  13. */
  14. public class Producer {
  15. private static final String QUEUE_NAME = "simple_queue";
  16. public static void main(String[] args) {
  17. // 创建连接工厂
  18. ConnectionFactory factory = new ConnectionFactory();
  19. // 主机地址;默认为 localhost
  20. factory.setHost("192.168.135.143");
  21. // 虚拟主机名称;默认为 /
  22. factory.setVirtualHost("/itcast");
  23. // 连接用户名;默认为guest
  24. factory.setUsername("hikktn");
  25. // 连接密码;默认为guest
  26. factory.setPassword("hikktn");
  27. Connection connection = null;
  28. Channel channel = null;
  29. try {
  30. // 创建连接
  31. connection = factory.newConnection();
  32. // 创建通道
  33. channel = connection.createChannel();
  34. // 声明简单队列
  35. /**
  36. * 参数1:队列名称
  37. * 参数2:是否定义持久化队列
  38. * 参数3:是否独占本次连接
  39. * 参数4:是否在不使用的时候自动删除队列
  40. * 参数5:队列其它参数
  41. */
  42. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  43. String message = "你好,rabbitmq!";
  44. /**
  45. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
  46. * 参数2:路由key,简单模式可以传递队列名称
  47. * 参数3:消息其它属性
  48. * 参数4:消息内容
  49. */
  50. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  51. System.out.println("提供者发送消息:" + message);
  52. } catch (IOException e) {
  53. e.printStackTrace();
  54. } catch (TimeoutException e) {
  55. e.printStackTrace();
  56. } finally {
  57. // 关闭资源
  58. try {
  59. channel.close();
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. } catch (TimeoutException e) {
  63. e.printStackTrace();
  64. }
  65. try {
  66. connection.close();
  67. } catch (IOException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }
  72. }

consumer(消费者)

  1. package com.hikktn.consumer;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @ClassName Consumer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:42
  10. * @Version 1.0
  11. */
  12. public class Consumer {
  13. private static final String QUEUE_NAME = "simple_queue";
  14. public static void main(String[] args) {
  15. // 创建连接工厂
  16. ConnectionFactory factory = new ConnectionFactory();
  17. // 主机地址;默认为 localhost
  18. factory.setHost("192.168.135.143");
  19. // 虚拟主机名称;默认为 /
  20. factory.setVirtualHost("/itcast");
  21. // 连接用户名;默认为guest
  22. factory.setUsername("hikktn");
  23. // 连接密码;默认为guest
  24. factory.setPassword("hikktn");
  25. Connection connection = null;
  26. Channel channel = null;
  27. try {
  28. // 创建连接
  29. connection = factory.newConnection();
  30. // 创建通道
  31. channel = connection.createChannel();
  32. //创建消费者;并设置消息处理
  33. DefaultConsumer consumer = new DefaultConsumer(channel){
  34. @Override
  35. /**
  36. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  37. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  38. * properties 属性信息
  39. * body 消息
  40. */
  41. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. //路由key
  43. System.out.println("路由key为:" + envelope.getRoutingKey());
  44. //交换机
  45. System.out.println("交换机为:" + envelope.getExchange());
  46. //消息id
  47. System.out.println("消息id为:" + envelope.getDeliveryTag());
  48. //收到的消息
  49. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  50. }
  51. };
  52. //监听消息
  53. /**
  54. * 参数1:队列名称
  55. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  56. * 参数3:消息接收到后回调
  57. */
  58. channel.basicConsume(QUEUE_NAME, true, consumer);
  59. }catch (IOException e){
  60. e.printStackTrace();
  61. } catch (TimeoutException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. }

测试
image.pngimage.pngRabbitMQ在线网站查看消息
image.pngimage.png
Java接收到消息
image.png
RabbitMQ消费完消息
image.png

工作模式

一个生产者发送消息,多个消费者平均接收消息,相当于轮询处理消息。
image.png
生产者

  1. package com.hikktn.simple;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * @ClassName Producer
  9. * @Description TODO
  10. * @Author lisonglin
  11. * @Date 2021/4/5 21:08
  12. * @Version 1.0
  13. */
  14. public class ProducerWork {
  15. private static final String QUEUE_NAME = "work_queue";
  16. public static void main(String[] args) {
  17. // 创建连接工厂
  18. ConnectionFactory factory = new ConnectionFactory();
  19. // 主机地址;默认为 localhost
  20. factory.setHost("192.168.135.143");
  21. // 虚拟主机名称;默认为 /
  22. factory.setVirtualHost("/itcast");
  23. // 连接用户名;默认为guest
  24. factory.setUsername("hikktn");
  25. // 连接密码;默认为guest
  26. factory.setPassword("hikktn");
  27. Connection connection = null;
  28. Channel channel = null;
  29. try {
  30. // 创建连接
  31. connection = factory.newConnection();
  32. // 创建通道
  33. channel = connection.createChannel();
  34. // 声明简单队列
  35. /**
  36. * 参数1:队列名称
  37. * 参数2:是否定义持久化队列
  38. * 参数3:是否独占本次连接
  39. * 参数4:是否在不使用的时候自动删除队列
  40. * 参数5:队列其它参数
  41. */
  42. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  43. for (int i = 0; i <= 10; i++) {
  44. String message = "你好,rabbitmq!"+i;
  45. /**
  46. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
  47. * 参数2:路由key,简单模式可以传递队列名称
  48. * 参数3:消息其它属性
  49. * 参数4:消息内容
  50. */
  51. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  52. System.out.println("提供者发送消息:" + message + i);
  53. }
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. } catch (TimeoutException e) {
  57. e.printStackTrace();
  58. } finally {
  59. // 关闭资源
  60. try {
  61. channel.close();
  62. } catch (IOException e) {
  63. e.printStackTrace();
  64. } catch (TimeoutException e) {
  65. e.printStackTrace();
  66. }
  67. try {
  68. connection.close();
  69. } catch (IOException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }
  74. }

将连接配置共同到一个工具类中:

  1. package com.hikktn.util;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. /**
  5. * @ClassName ConnectionUtil
  6. * @Description TODO
  7. * @Author lisonglin
  8. * @Date 2021/4/7 1:07
  9. * @Version 1.0
  10. */
  11. public class ConnectionUtil {
  12. public static Connection getConnection() throws Exception {
  13. // 创建连接工厂
  14. ConnectionFactory factory = new ConnectionFactory();
  15. // 主机地址;默认为 localhost
  16. factory.setHost("192.168.135.143");
  17. // 虚拟主机名称;默认为 /
  18. factory.setVirtualHost("/itcast");
  19. // 连接用户名;默认为guest
  20. factory.setUsername("hikktn");
  21. // 连接密码;默认为guest
  22. factory.setPassword("hikktn");
  23. // 通过工厂获取连接
  24. Connection connection = factory.newConnection();
  25. return connection;
  26. }
  27. }

consumer(消费者1)

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * @ClassName Consumer
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/5 21:42
  11. * @Version 1.0
  12. */
  13. public class ConsumerWork1 {
  14. private static final String QUEUE_NAME = "work_queue";
  15. public static void main(String[] args) throws Exception {
  16. // 创建连接工厂
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 创建通道
  19. final Channel channel = connection.createChannel();
  20. //一次只能接收并处理一个消息
  21. channel.basicQos(1);
  22. //创建消费者;并设置消息处理
  23. DefaultConsumer consumer = new DefaultConsumer(channel) {
  24. @Override
  25. /**
  26. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  27. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  28. * properties 属性信息
  29. * body 消息
  30. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  31. // 测试ack
  32. // int i = 1 / 0;
  33. // //路由key
  34. // System.out.println("路由key为:" + envelope.getRoutingKey());
  35. // //交换机
  36. // System.out.println("交换机为:" + envelope.getExchange());
  37. // //消息id
  38. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  39. //收到的消息
  40. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  41. channel.basicAck(envelope.getDeliveryTag(), false);
  42. }
  43. };
  44. //监听消息
  45. /**
  46. * 参数1:队列名称
  47. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  48. * 参数3:消息接收到后回调
  49. */
  50. channel.basicConsume(QUEUE_NAME, false, consumer);
  51. }
  52. }

consumer(消费者2)

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.oracle.jrockit.jfr.Producer;
  4. import com.rabbitmq.client.*;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * @ClassName Consumer
  9. * @Description TODO
  10. * @Author lisonglin
  11. * @Date 2021/4/5 21:42
  12. * @Version 1.0
  13. */
  14. public class ConsumerWork2 {
  15. private static final String QUEUE_NAME = "work_queue";
  16. public static void main(String[] args) throws Exception {
  17. // 创建连接工厂
  18. Connection connection = ConnectionUtil.getConnection();
  19. // 创建通道
  20. Channel channel = connection.createChannel();
  21. // 创建队列
  22. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  23. //一次只能接收并处理一个消息
  24. channel.basicQos(1);
  25. //创建消费者;并设置消息处理
  26. DefaultConsumer consumer = new DefaultConsumer(channel) {
  27. @Override
  28. /**
  29. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  30. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  31. * properties 属性信息
  32. * body 消息
  33. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  34. // //路由key
  35. // System.out.println("路由key为:" + envelope.getRoutingKey());
  36. // //交换机
  37. // System.out.println("交换机为:" + envelope.getExchange());
  38. // //消息id
  39. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  40. //收到的消息
  41. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  42. // 自动ack(接收消息)
  43. channel.basicAck(envelope.getDeliveryTag(), false);
  44. }
  45. };
  46. //监听消息
  47. /**
  48. * 参数1:队列名称
  49. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  50. * 参数3:消息接收到后回调
  51. */
  52. channel.basicConsume(QUEUE_NAME, false, consumer);
  53. }
  54. }

测试
消费者1接收到消息:
image.pngimage.png
消费者2接收到消息:
image.png
生产者发送消息:
image.png

订阅模式

一个生产者发送消息,所有消费者接收消息。
image.png
生产者

  1. package com.hikktn.simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @ClassName Producer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:08
  10. * @Version 1.0
  11. */
  12. public class ProducerPublish {
  13. //交换机名称
  14. private static final String FANOUT_EXCHAGE = "fanout_exchange";
  15. private static final String QUEUE_NAME_1 = "publish_fanout_queue_1";
  16. private static final String QUEUE_NAME_2 = "publish_fanout_queue_2";
  17. public static void main(String[] args) {
  18. // 创建连接工厂
  19. ConnectionFactory factory = new ConnectionFactory();
  20. // 主机地址;默认为 localhost
  21. factory.setHost("192.168.135.143");
  22. // 虚拟主机名称;默认为 /
  23. factory.setVirtualHost("/itcast");
  24. // 连接用户名;默认为guest
  25. factory.setUsername("hikktn");
  26. // 连接密码;默认为guest
  27. factory.setPassword("hikktn");
  28. Connection connection = null;
  29. Channel channel = null;
  30. try {
  31. // 创建连接
  32. connection = factory.newConnection();
  33. // 创建通道
  34. channel = connection.createChannel();
  35. /**
  36. * 声明交换机
  37. * 参数1:交换机名称
  38. * 参数2:交换机类型,fanout、topic、direct、headers
  39. */
  40. channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  41. // 声明(创建)队列
  42. /**
  43. * 参数1:队列名称
  44. * 参数2:是否定义持久化队列
  45. * 参数3:是否独占本次连接
  46. * 参数4:是否在不使用的时候自动删除队列
  47. * 参数5:队列其它参数
  48. */
  49. // 提供方可以不需要创建队列和交换机,由消费者处理
  50. // channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
  51. // channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
  52. //队列绑定交换机
  53. // channel.queueBind(QUEUE_NAME_1, FANOUT_EXCHAGE, "");
  54. // channel.queueBind(QUEUE_NAME_2, FANOUT_EXCHAGE, "");
  55. for (int i = 2; i <= 5; i++) {
  56. // 发送信息
  57. String message = "567你好;小兔子!发布订阅模式--" + i;
  58. // 声明事务
  59. channel.txSelect();
  60. /**
  61. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
  62. * 参数2:路由key,简单模式可以传递队列名称
  63. * 参数3:消息其它属性
  64. * 参数4:消息内容
  65. */
  66. channel.basicPublish(FANOUT_EXCHAGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  67. // 提交事务
  68. channel.txCommit();
  69. System.out.println("已发送消息:" + message);
  70. }
  71. } catch (Exception e1) {
  72. try {
  73. // 回滚
  74. channel.txRollback();
  75. } catch (IOException e) {
  76. e.printStackTrace();
  77. }
  78. } finally {
  79. // 关闭资源
  80. try {
  81. channel.close();
  82. } catch (IOException e) {
  83. e.printStackTrace();
  84. } catch (TimeoutException e) {
  85. e.printStackTrace();
  86. }
  87. try {
  88. connection.close();
  89. } catch (IOException e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. }
  94. }

消费者1

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.oracle.jrockit.jfr.Producer;
  4. import com.rabbitmq.client.*;
  5. import java.io.IOException;
  6. /**
  7. * @ClassName Consumer
  8. * @Description TODO
  9. * @Author lisonglin
  10. * @Date 2021/4/5 21:42
  11. * @Version 1.0
  12. */
  13. public class ConsumerPublish1 {
  14. private static final String FANOUT_EXCHAGE = "fanout_exchange";
  15. private static final String QUEUE_NAME_1 = "publish_fanout_queue_1";
  16. public static void main(String[] args) throws Exception {
  17. // 创建连接工厂
  18. Connection connection = ConnectionUtil.getConnection();
  19. // 创建通道
  20. final Channel channel = connection.createChannel();
  21. //声明交换机
  22. channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  23. // 声明(创建)队列
  24. /**
  25. * 参数1:队列名称
  26. * 参数2:是否定义持久化队列
  27. * 参数3:是否独占本次连接
  28. * 参数4:是否在不使用的时候自动删除队列
  29. * 参数5:队列其它参数
  30. */
  31. channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
  32. //队列绑定交换机
  33. channel.queueBind(QUEUE_NAME_1, FANOUT_EXCHAGE, "");
  34. //创建消费者;并设置消息处理
  35. DefaultConsumer consumer = new DefaultConsumer(channel) {
  36. @Override
  37. /**
  38. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  39. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  40. * properties 属性信息
  41. * body 消息
  42. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  43. // //路由key
  44. // System.out.println("路由key为:" + envelope.getRoutingKey());
  45. // //交换机
  46. // System.out.println("交换机为:" + envelope.getExchange());
  47. // //消息id
  48. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  49. //收到的消息
  50. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  51. channel.basicAck(envelope.getDeliveryTag(), false);
  52. }
  53. };
  54. //监听消息
  55. /**
  56. * 参数1:队列名称
  57. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  58. * 参数3:消息接收到后回调
  59. */
  60. channel.basicConsume(QUEUE_NAME_1, false, consumer);
  61. }
  62. }

消费者2

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. /**
  6. * @ClassName Consumer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:42
  10. * @Version 1.0
  11. */
  12. public class ConsumerPublish2 {
  13. private static final String FANOUT_EXCHAGE = "fanout_exchange";
  14. private static final String QUEUE_NAME_2 = "publish_fanout_queue_2";
  15. public static void main(String[] args) throws Exception {
  16. // 创建连接工厂
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 创建通道
  19. final Channel channel = connection.createChannel();
  20. // 声明交换机
  21. channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
  22. // 声明(创建)队列
  23. /**
  24. * 参数1:队列名称
  25. * 参数2:是否定义持久化队列
  26. * 参数3:是否独占本次连接
  27. * 参数4:是否在不使用的时候自动删除队列
  28. * 参数5:队列其它参数
  29. */
  30. channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
  31. //队列绑定交换机
  32. channel.queueBind(QUEUE_NAME_2, FANOUT_EXCHAGE, "");
  33. //创建消费者;并设置消息处理
  34. DefaultConsumer consumer = new DefaultConsumer(channel) {
  35. @Override
  36. /**
  37. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  38. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  39. * properties 属性信息
  40. * body 消息
  41. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. // //路由key
  43. // System.out.println("路由key为:" + envelope.getRoutingKey());
  44. // //交换机
  45. // System.out.println("交换机为:" + envelope.getExchange());
  46. // //消息id
  47. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  48. //收到的消息
  49. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  50. channel.basicAck(envelope.getDeliveryTag(), true);
  51. }
  52. };
  53. //监听消息
  54. /**
  55. * queue :队列名称
  56. * autoAck :是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  57. * 设置true后,回调方法不会调用,需要重新启动后,回调方法才会调用
  58. * callback:消息接收到后回调
  59. */
  60. channel.basicConsume(QUEUE_NAME_2, false, consumer);
  61. }
  62. }

测试
生产者发送消息
image.png
消费者1接收消息image.png
image.png
消费者2接收消息
image.png

路由模式

image.png一个生产者发送消息,同时绑定了消息的key,而多个消费者想要接收,必须拥有这个消息key,否则无法接收到消息
image.png
生产者

  1. package com.hikktn.simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @ClassName Producer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:08
  10. * @Version 1.0
  11. */
  12. public class ProducerRouting {
  13. //交换机名称
  14. private static final String DIRECT_EXCHAGE = "direct_exchange";
  15. private static final String QUEUE_NAME_1 = "routing_direct_queue_1";
  16. private static final String QUEUE_NAME_2 = "routing_direct_queue_2";
  17. public static void main(String[] args) {
  18. // 创建连接工厂
  19. ConnectionFactory factory = new ConnectionFactory();
  20. // 主机地址;默认为 localhost
  21. factory.setHost("192.168.135.143");
  22. // 虚拟主机名称;默认为 /
  23. factory.setVirtualHost("/itcast");
  24. // 连接用户名;默认为guest
  25. factory.setUsername("hikktn");
  26. // 连接密码;默认为guest
  27. factory.setPassword("hikktn");
  28. Connection connection = null;
  29. Channel channel = null;
  30. try {
  31. // 创建连接
  32. connection = factory.newConnection();
  33. // 创建通道
  34. channel = connection.createChannel();
  35. /**
  36. * 声明交换机
  37. * 参数1:交换机名称
  38. * 参数2:交换机类型,fanout、topic、direct、headers
  39. */
  40. channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  41. // 声明(创建)队列
  42. /**
  43. * 参数1:队列名称
  44. * 参数2:是否定义持久化队列
  45. * 参数3:是否独占本次连接
  46. * 参数4:是否在不使用的时候自动删除队列
  47. * 参数5:队列其它参数
  48. */
  49. // 提供方可以不需要创建队列和交换机,由消费者处理
  50. // channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
  51. // channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
  52. //队列绑定交换机
  53. // channel.queueBind(QUEUE_NAME_1, DIRECT_EXCHAGE, "insert");
  54. // channel.queueBind(QUEUE_NAME_2, DIRECT_EXCHAGE, "update");
  55. for (int i = 1; i <= 5; i++) {
  56. // 发送信息
  57. String messageUpdate = "你好,小兔子!路由模式;routing key 为 update " + i;
  58. String messageInsert = "你好,小兔子!路由模式;routing key 为 insert " + i;
  59. // 声明事务
  60. channel.txSelect();
  61. /**
  62. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
  63. * 参数2:路由key,简单模式可以传递队列名称
  64. * 参数3:消息其它属性
  65. * 参数4:消息内容
  66. */
  67. channel.basicPublish(DIRECT_EXCHAGE, "update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageUpdate.getBytes());
  68. channel.basicPublish(DIRECT_EXCHAGE, "insert", MessageProperties.PERSISTENT_TEXT_PLAIN, messageInsert.getBytes());
  69. System.out.println("提供者发送的消息:" + messageUpdate);
  70. System.out.println("提供者发送的消息:" + messageInsert);
  71. // 提交事务
  72. channel.txCommit();
  73. }
  74. } catch (Exception e1) {
  75. try {
  76. // 回滚
  77. channel.txRollback();
  78. } catch (IOException e) {
  79. e.printStackTrace();
  80. }
  81. } finally {
  82. // 关闭资源
  83. try {
  84. channel.close();
  85. } catch (IOException e) {
  86. e.printStackTrace();
  87. } catch (TimeoutException e) {
  88. e.printStackTrace();
  89. }
  90. try {
  91. connection.close();
  92. } catch (IOException e) {
  93. e.printStackTrace();
  94. }
  95. }
  96. }
  97. }

消费者1

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. /**
  6. * @ClassName Consumer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:42
  10. * @Version 1.0
  11. */
  12. public class ConsumerRouting1 {
  13. private static final String DIRECT_EXCHAGE = "direct_exchange";
  14. private static final String QUEUE_NAME_1 = "routing_direct_queue_1";
  15. public static void main(String[] args) throws Exception {
  16. // 创建连接工厂
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 创建通道
  19. final Channel channel = connection.createChannel();
  20. //声明交换机
  21. channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  22. // 声明(创建)队列
  23. /**
  24. * 参数1:队列名称
  25. * 参数2:是否定义持久化队列
  26. * 参数3:是否独占本次连接
  27. * 参数4:是否在不使用的时候自动删除队列
  28. * 参数5:队列其它参数
  29. */
  30. channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
  31. //队列绑定交换机
  32. channel.queueBind(QUEUE_NAME_1, DIRECT_EXCHAGE, "insert");
  33. //创建消费者;并设置消息处理
  34. DefaultConsumer consumer = new DefaultConsumer(channel) {
  35. @Override
  36. /**
  37. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  38. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  39. * properties 属性信息
  40. * body 消息
  41. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. // //路由key
  43. // System.out.println("路由key为:" + envelope.getRoutingKey());
  44. // //交换机
  45. // System.out.println("交换机为:" + envelope.getExchange());
  46. // //消息id
  47. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  48. //收到的消息
  49. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  50. channel.basicAck(envelope.getDeliveryTag(), false);
  51. }
  52. };
  53. //监听消息
  54. /**
  55. * 参数1:队列名称
  56. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  57. * 参数3:消息接收到后回调
  58. */
  59. channel.basicConsume(QUEUE_NAME_1, false, consumer);
  60. }
  61. }

消费者2

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. /**
  6. * @ClassName Consumer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:42
  10. * @Version 1.0
  11. */
  12. public class ConsumerRouting2 {
  13. private static final String DIRECT_EXCHAGE = "direct_exchange";
  14. private static final String QUEUE_NAME_2 = "routing_direct_queue_2";
  15. public static void main(String[] args) throws Exception {
  16. // 创建连接工厂
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 创建通道
  19. final Channel channel = connection.createChannel();
  20. //声明交换机
  21. channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
  22. // 声明(创建)队列
  23. /**
  24. * 参数1:队列名称
  25. * 参数2:是否定义持久化队列
  26. * 参数3:是否独占本次连接
  27. * 参数4:是否在不使用的时候自动删除队列
  28. * 参数5:队列其它参数
  29. */
  30. channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
  31. //队列绑定交换机
  32. channel.queueBind(QUEUE_NAME_2, DIRECT_EXCHAGE, "update");
  33. //创建消费者;并设置消息处理
  34. DefaultConsumer consumer = new DefaultConsumer(channel) {
  35. @Override
  36. /**
  37. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  38. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  39. * properties 属性信息
  40. * body 消息
  41. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. // //路由key
  43. // System.out.println("路由key为:" + envelope.getRoutingKey());
  44. // //交换机
  45. // System.out.println("交换机为:" + envelope.getExchange());
  46. // //消息id
  47. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  48. //收到的消息
  49. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  50. channel.basicAck(envelope.getDeliveryTag(), false);
  51. }
  52. };
  53. //监听消息
  54. /**
  55. * 参数1:队列名称
  56. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  57. * 参数3:消息接收到后回调
  58. */
  59. channel.basicConsume(QUEUE_NAME_2, false, consumer);
  60. }
  61. }

image.png生产者发送消息
image.png
消费者1接收消息image.png
image.png
消费者2接收消息
image.png

通配符模式

image.png两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配一个单词
image.png
生产者

  1. package com.hikktn.simple;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @ClassName Producer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:08
  10. * @Version 1.0
  11. */
  12. public class ProducerTopics {
  13. //交换机名称
  14. private static final String TOPIC_EXCHAGE = "topic_exchange";
  15. private static final String QUEUE_NAME_1 = "topic_queue_1";
  16. private static final String QUEUE_NAME_2 = "topic_queue_2";
  17. public static void main(String[] args) {
  18. // 创建连接工厂
  19. ConnectionFactory factory = new ConnectionFactory();
  20. // 主机地址;默认为 localhost
  21. factory.setHost("192.168.135.143");
  22. // 虚拟主机名称;默认为 /
  23. factory.setVirtualHost("/itcast");
  24. // 连接用户名;默认为guest
  25. factory.setUsername("hikktn");
  26. // 连接密码;默认为guest
  27. factory.setPassword("hikktn");
  28. Connection connection = null;
  29. Channel channel = null;
  30. try {
  31. // 创建连接
  32. connection = factory.newConnection();
  33. // 创建通道
  34. channel = connection.createChannel();
  35. /**
  36. * 声明交换机
  37. * 参数1:交换机名称
  38. * 参数2:交换机类型,fanout、topic、direct、headers
  39. */
  40. channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
  41. // 声明(创建)队列
  42. /**
  43. * 参数1:队列名称
  44. * 参数2:是否定义持久化队列
  45. * 参数3:是否独占本次连接
  46. * 参数4:是否在不使用的时候自动删除队列
  47. * 参数5:队列其它参数
  48. */
  49. // 提供方可以不需要创建队列和交换机,由消费者处理
  50. // channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
  51. // channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
  52. //队列绑定交换机
  53. // channel.queueBind(QUEUE_NAME_1, DIRECT_EXCHAGE, "insert");
  54. // channel.queueBind(QUEUE_NAME_2, DIRECT_EXCHAGE, "update");
  55. // 发送信息
  56. String messageUpdate = "你好,小兔子! key 为 hikktn.update ";
  57. String messageInsert = "你好,小兔子! key 为 hikktn.insert ";
  58. // 声明事务
  59. channel.txSelect();
  60. /**
  61. * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
  62. * 参数2:路由key,简单模式可以传递队列名称
  63. * 参数3:消息其它属性
  64. * 参数4:消息内容
  65. */
  66. channel.basicPublish(TOPIC_EXCHAGE, "hikktn.update", MessageProperties.PERSISTENT_TEXT_PLAIN, messageUpdate.getBytes());
  67. channel.basicPublish(TOPIC_EXCHAGE, "hikktn.insert.123", MessageProperties.PERSISTENT_TEXT_PLAIN, messageInsert.getBytes());
  68. System.out.println("提供者发送的消息:" + messageUpdate);
  69. System.out.println("提供者发送的消息:" + messageInsert);
  70. // 提交事务
  71. channel.txCommit();
  72. } catch (Exception e1) {
  73. try {
  74. // 回滚
  75. channel.txRollback();
  76. } catch (IOException e) {
  77. e.printStackTrace();
  78. }
  79. } finally {
  80. // 关闭资源
  81. try {
  82. channel.close();
  83. } catch (IOException e) {
  84. e.printStackTrace();
  85. } catch (TimeoutException e) {
  86. e.printStackTrace();
  87. }
  88. try {
  89. connection.close();
  90. } catch (IOException e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. }
  95. }

消费者1

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. /**
  6. * @ClassName Consumer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:42
  10. * @Version 1.0
  11. */
  12. public class ConsumerTopics1 {
  13. private static final String TOPIC_EXCHAGE = "topic_exchange";
  14. private static final String QUEUE_NAME_1 = "topic_queue_1";
  15. public static void main(String[] args) throws Exception {
  16. // 创建连接工厂
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 创建通道
  19. final Channel channel = connection.createChannel();
  20. // 声明交换机
  21. channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
  22. // 声明(创建)队列
  23. /**
  24. * 参数1:队列名称
  25. * 参数2:是否定义持久化队列
  26. * 参数3:是否独占本次连接
  27. * 参数4:是否在不使用的时候自动删除队列
  28. * 参数5:队列其它参数
  29. */
  30. channel.queueDeclare(QUEUE_NAME_1, true, false, false, null);
  31. //队列绑定交换机
  32. channel.queueBind(QUEUE_NAME_1, TOPIC_EXCHAGE, "*.update");
  33. //创建消费者;并设置消息处理
  34. DefaultConsumer consumer = new DefaultConsumer(channel) {
  35. @Override
  36. /**
  37. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  38. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  39. * properties 属性信息
  40. * body 消息
  41. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. // //路由key
  43. // System.out.println("路由key为:" + envelope.getRoutingKey());
  44. // //交换机
  45. // System.out.println("交换机为:" + envelope.getExchange());
  46. // //消息id
  47. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  48. //收到的消息
  49. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  50. channel.basicAck(envelope.getDeliveryTag(), false);
  51. }
  52. };
  53. //监听消息
  54. /**
  55. * 参数1:队列名称
  56. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  57. * 参数3:消息接收到后回调
  58. */
  59. channel.basicConsume(QUEUE_NAME_1, false, consumer);
  60. }
  61. }

消费者2

  1. package com.hikktn.consumer;
  2. import com.hikktn.util.ConnectionUtil;
  3. import com.rabbitmq.client.*;
  4. import java.io.IOException;
  5. /**
  6. * @ClassName Consumer
  7. * @Description TODO
  8. * @Author lisonglin
  9. * @Date 2021/4/5 21:42
  10. * @Version 1.0
  11. */
  12. public class ConsumerTopics2 {
  13. private static final String TOPIC_EXCHAGE = "topic_exchange";
  14. private static final String QUEUE_NAME_2 = "topic_queue_2";
  15. public static void main(String[] args) throws Exception {
  16. // 创建连接工厂
  17. Connection connection = ConnectionUtil.getConnection();
  18. // 创建通道
  19. final Channel channel = connection.createChannel();
  20. //声明交换机
  21. channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
  22. // 声明(创建)队列
  23. /**
  24. * 参数1:队列名称
  25. * 参数2:是否定义持久化队列
  26. * 参数3:是否独占本次连接
  27. * 参数4:是否在不使用的时候自动删除队列
  28. * 参数5:队列其它参数
  29. */
  30. channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
  31. //队列绑定交换机
  32. channel.queueBind(QUEUE_NAME_2, TOPIC_EXCHAGE, "hikktn.#");
  33. //创建消费者;并设置消息处理
  34. DefaultConsumer consumer = new DefaultConsumer(channel) {
  35. @Override
  36. /**
  37. * consumerTag 消息者标签,在channel.basicConsume时候可以指定
  38. * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
  39. * properties 属性信息
  40. * body 消息
  41. */ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  42. // //路由key
  43. // System.out.println("路由key为:" + envelope.getRoutingKey());
  44. // //交换机
  45. // System.out.println("交换机为:" + envelope.getExchange());
  46. // //消息id
  47. // System.out.println("消息id为:" + envelope.getDeliveryTag());
  48. //收到的消息
  49. System.out.println("接收到的消息为:" + new String(body, "utf-8"));
  50. channel.basicAck(envelope.getDeliveryTag(), false);
  51. }
  52. };
  53. //监听消息
  54. /**
  55. * 参数1:队列名称
  56. * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认
  57. * 参数3:消息接收到后回调
  58. */
  59. channel.basicConsume(QUEUE_NAME_2, false, consumer);
  60. }
  61. }

image.png
image.png
image.png
image.png
image.png
image.png