1. RabbitMQ

MQ全称 Message Queue(消息队列) 是在消息的传输过程中保存消息的容器 多用于分布式系统之间进行通信

14. RabbitMQ - 图1

  • 优势

14. RabbitMQ - 图2

14. RabbitMQ - 图3

14. RabbitMQ - 图4

14. RabbitMQ - 图5

  • 劣势

14. RabbitMQ - 图6

2. 常用的MQ产品

14. RabbitMQ - 图7

3. RabbitMQ简介

AMQP 即 Advanced Message Queuing Protocol (高级消息队列协议) 是一个网络协议 是应用层协议的一个开放标准 为面向消息的中间件设计

14. RabbitMQ - 图8

14. RabbitMQ - 图9

3.1. JMS

JMS 即 Java 消息服务 应用程序接口 一个Java平台中关于面向中间件的API

4. 安装

  1. 安装erlang
  1. yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel
  2. wget http://erlang.org/download/otp_src_22.0.tar.gz
  3. tar -zxvf otp_src_22.0.tar.gz
  4. mv otp_src_22.0 /usr/local/
  5. cd /usr/local/otp_src_22.0/
  6. mkdir ../erlang
  7. ./configure --prefix=/usr/local/erlang
  8. make install
  9. ll /usr/local/erlang/bin
  10. echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile
  11. source /etc/profile
  12. erl
  13. halt().
  1. 安装RabbitMQ
  1. cd /root
  2. wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz
  3. yum install -y xz
  4. /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz
  5. tar -xvf rabbitmq-server-generic-unix-3.7.15.tar
  6. mv rabbitmq_server-3.7.15/ /usr/local/
  7. mv /usr/local/rabbitmq_server-3.7.15 rabbitmq
  8. echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile
  9. source /etc/profile
  1. 启动
  1. rabbitmq-server -detached
  2. rabbitmq-plugins enable rabbitmq_management #开启web插件
  3. rabbitmqctl stop #停止
  4. rabbitmqctl status #状态

默认账号密码:guest guest(这个账号只允许本机访问)

  1. firewall-cmd --zone=public --add-port=15672/tcp --permanent
  2. vim /usr/local/rabbitmq/ebin/rabbit.app #配置

访问http://192.168.130.124:15672/

5. 入门案例

5.1. 生产者

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.6.0</version>
  6. </dependency>
  7. </dependencies>
  8. <build>
  9. <plugins>
  10. <plugin>
  11. <groupId>org.apache.maven.plugins</groupId>
  12. <artifactId>maven-compiler-plugin</artifactId>
  13. <version>3.8.0</version>
  14. <configuration>
  15. <source>1.8</source>
  16. <target>1.8</target>
  17. </configuration>
  18. </plugin>
  19. </plugins>
  20. </build>
  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //5.创建队列queue
  14. /**
  15. * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  16. * queue 队列名称
  17. * durable 是否持久化 当mq重启之后还在
  18. * exclusive 是否独占,只能有一个消费者监听这个队列 当Connection关闭时是否删除队列
  19. * autoDelete 是否自动删除 当没有Consumer时 自动删除
  20. * arguments 参数
  21. *
  22. */
  23. //如果没有一个叫hello_world的队列 则自动创建
  24. channel.queueDeclare("hello_world",true,false,false,null);
  25. //6.发送消息
  26. /**
  27. * String var1, String var2, BasicProperties var3, byte[] var4
  28. * var1 交换机名称 简单模式下会使用默认的""
  29. * var2 路由名称
  30. * var3 配置信息
  31. * var4 发送消息数据
  32. */
  33. String body = "hello world";
  34. channel.basicPublish("","hello_world",null,body.getBytes(StandardCharsets.UTF_8));
  35. //7.释放资源
  36. channel.close();
  37. connection.close();

5.2. 消费者

与生产者坐标一致

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //5.创建队列queue
  14. /**
  15. * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
  16. * queue 队列名称
  17. * durable 是否持久化 当mq重启之后还在
  18. * exclusive 是否独占,只能有一个消费者监听这个队列 当Connection关闭时是否删除队列
  19. * autoDelete 是否自动删除 当没有Consumer时 自动删除
  20. * arguments 参数
  21. *
  22. */
  23. //如果没有一个叫hello_world的队列 则自动创建
  24. channel.queueDeclare("hello_world",true,false,false,null);
  25. //6.接受消息
  26. Consumer consumer = new DefaultConsumer(channel){
  27. //回调方法 当收到消息后 会执行该方法
  28. /**
  29. *
  30. * @param consumerTag 标识
  31. * @param envelope 获取一些信息 交换机 路由key
  32. * @param properties 配置信息
  33. * @param body 数据
  34. * @throws IOException
  35. */
  36. @Override
  37. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  38. System.out.println(consumerTag);
  39. System.out.println(envelope.getExchange());
  40. System.out.println(envelope.getRoutingKey());
  41. System.out.println(properties);
  42. System.out.println(new String(body));
  43. }
  44. };
  45. /**
  46. * String var1, DeliverCallback var2, CancelCallback var3
  47. * queue 队列名称
  48. * autoAck 是否自动确认
  49. * callback 回调对象
  50. */
  51. channel.basicConsume("hello_world",true,consumer);

6. Work queues 工作队列模式

14. RabbitMQ - 图10

多个消费者共同消费一个队列中的消息

对于任务过重或者任务较多情况使用工作队列可以提高任务处理的速度

生产者生成多条消息 而消费者轮流切换接受

Work queues 代码与生产者 消费者没有太大区别 只是生产者在频道中发送多条 多个消费者轮流接受消息

7. Pub/Sub 订阅模式

14. RabbitMQ - 图11

X为交换机 生产者发送消息给交换机 而交换机转发消息 有三种模式

  • Fanout 广播模式 将消息交给所有绑定到交换机的队列
  • Direct 定向 把消息交给符合指定 routing key 的队列
  • Topic 通配符 把消息交给符合 routing pattern (路由模式)

生产者

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //5.创建交换机
  14. /**
  15. * String var1, BuiltinExchangeType var2, boolean var3, boolean var4, boolean var5, Map<String, Object> var6
  16. * exchange 交换机名称
  17. * type 交换机类型 枚举 DIRECT("direct")定向 FANOUT("fanout")扇形(广播) TOPIC("topic")通配符 HEADERS("headers") 参数匹配
  18. * durable 是否持久化
  19. * autoDelete 自动删除
  20. * internal 内部使用 一般为false
  21. * arguments 参数
  22. */
  23. String exchangeName = "test_fanout";
  24. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
  25. //6.创建队列
  26. String queue1Name = "test_fanout_queue1";
  27. String queue2Name = "test_fanout_queue2";
  28. channel.queueDeclare(queue1Name, true, false, false, null);
  29. channel.queueDeclare(queue2Name, true, false, false, null);
  30. //7.绑定队列和交换机
  31. /** String queue, String exchange, String routingKey
  32. * queue 队列名称
  33. * exchange 交换机名称
  34. * routingKey 路由键绑定规则 如果交换机类型为FANOUT 则routingKey为""
  35. */
  36. channel.queueBind(queue1Name, exchangeName, "");
  37. channel.queueBind(queue2Name, exchangeName, "");
  38. //8.发送消息
  39. String body = "日志信息:";
  40. channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));
  41. //9.释放资源
  42. channel.close();
  43. connection.close();

多个消费者绑定不同的队列

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //6.接受消息
  14. String queue1Name = "test_fanout_queue1";
  15. String queue2Name = "test_fanout_queue2";
  16. Consumer consumer = new DefaultConsumer(channel){
  17. //回调方法 当收到消息后 会执行该方法
  18. /**
  19. *
  20. * @param consumerTag 标识
  21. * @param envelope 获取一些信息 交换机 路由key
  22. * @param properties 配置信息
  23. * @param body 数据
  24. * @throws IOException
  25. */
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. // System.out.println(consumerTag);
  29. // System.out.println(envelope.getExchange());
  30. // System.out.println(envelope.getRoutingKey());
  31. // System.out.println(properties);
  32. System.out.println(new String(body));
  33. System.out.println("第一个消费者");
  34. }
  35. };
  36. /**
  37. * String var1, DeliverCallback var2, CancelCallback var3
  38. * queue 队列名称
  39. * autoAck 是否自动确认
  40. * callback 回调对象
  41. */
  42. channel.basicConsume(queue1Name,true,consumer);

8. Routing 路由模式

14. RabbitMQ - 图12

生产者发送不同key的消息给交换机 而交换机根据队列的key转发消息给有标识的队列

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //5.创建交换机
  14. /**
  15. * String var1, BuiltinExchangeType var2, boolean var3, boolean var4, boolean var5, Map<String, Object> var6
  16. * exchange 交换机名称
  17. * type 交换机类型 枚举 DIRECT("direct")定向 FANOUT("fanout")扇形(广播) TOPIC("topic")通配符 HEADERS("headers") 参数匹配
  18. * durable 是否持久化
  19. * autoDelete 自动删除
  20. * internal 内部使用 一般为false
  21. * arguments 参数
  22. */
  23. String exchangeName = "test_direct";
  24. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
  25. //6.创建队列
  26. String queue1Name = "test_direct_queue1";
  27. String queue2Name = "test_direct_queue2";
  28. channel.queueDeclare(queue1Name, true, false, false, null);
  29. channel.queueDeclare(queue2Name, true, false, false, null);
  30. //7.绑定队列和交换机
  31. /** String queue, String exchange, String routingKey
  32. * queue 队列名称
  33. * exchange 交换机名称
  34. * routingKey 路由键绑定规则 如果交换机类型为FANOUT 则routingKey为""
  35. */
  36. //队列1的绑定
  37. channel.queueBind(queue1Name, exchangeName, "error");
  38. //队列2的绑定
  39. channel.queueBind(queue2Name, exchangeName, "info");
  40. channel.queueBind(queue2Name, exchangeName, "error");
  41. channel.queueBind(queue2Name, exchangeName, "warning");
  42. //8.发送消息
  43. String body = "日志信息:";
  44. //队列1只接受error消息 而队列2所有类型都接受
  45. channel.basicPublish(exchangeName, "info", null, body.getBytes(StandardCharsets.UTF_8));
  46. //9.释放资源
  47. channel.close();
  48. connection.close();

消费者

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //6.接受消息
  14. String queue1Name = "test_direct_queue1";
  15. String queue2Name = "test_direct_queue2";
  16. Consumer consumer = new DefaultConsumer(channel){
  17. //回调方法 当收到消息后 会执行该方法
  18. /**
  19. *
  20. * @param consumerTag 标识
  21. * @param envelope 获取一些信息 交换机 路由key
  22. * @param properties 配置信息
  23. * @param body 数据
  24. * @throws IOException
  25. */
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. // System.out.println(consumerTag);
  29. // System.out.println(envelope.getExchange());
  30. // System.out.println(envelope.getRoutingKey());
  31. // System.out.println(properties);
  32. System.out.println(new String(body));
  33. System.out.println("队列1 存储到数据库");
  34. }
  35. };
  36. /**
  37. * String var1, DeliverCallback var2, CancelCallback var3
  38. * queue 队列名称
  39. * autoAck 是否自动确认
  40. * callback 回调对象
  41. */
  42. channel.basicConsume(queue1Name,true,consumer);

9. Topics 通配符模式

14. RabbitMQ - 图13

使用通配符和路由器转发 让队列更加灵活的接受对应的消息

*星号代表0个或多个单词

井号代表1个单词

生产者

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //5.创建交换机
  14. /**
  15. * String var1, BuiltinExchangeType var2, boolean var3, boolean var4, boolean var5, Map<String, Object> var6
  16. * exchange 交换机名称
  17. * type 交换机类型 枚举 DIRECT("direct")定向 FANOUT("fanout")扇形(广播) TOPIC("topic")通配符 HEADERS("headers") 参数匹配
  18. * durable 是否持久化
  19. * autoDelete 自动删除
  20. * internal 内部使用 一般为false
  21. * arguments 参数
  22. */
  23. String exchangeName = "test_topics";
  24. channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
  25. //6.创建队列
  26. String queue1Name = "test_topics_queue1";
  27. String queue2Name = "test_topics_queue2";
  28. channel.queueDeclare(queue1Name, true, false, false, null);
  29. channel.queueDeclare(queue2Name, true, false, false, null);
  30. //7.绑定队列和交换机
  31. /** String queue, String exchange, String routingKey
  32. * queue 队列名称
  33. * exchange 交换机名称
  34. * routingKey 路由键绑定规则 如果交换机类型为FANOUT 则routingKey为""
  35. */
  36. //routing key 系统的名称.日志的级别
  37. channel.queueBind(queue1Name, exchangeName, "#.error"); //以.error结尾
  38. channel.queueBind(queue1Name, exchangeName, "order.*"); //以order.开头
  39. channel.queueBind(queue2Name, exchangeName, "*.*"); //队列2所有消息都可以接受到
  40. //8.发送消息
  41. String body = "日志信息:";
  42. channel.basicPublish(exchangeName, "goods.info", null, body.getBytes(StandardCharsets.UTF_8));
  43. //9.释放资源
  44. channel.close();
  45. connection.close();

消费者

  1. //1.创建连接工厂
  2. ConnectionFactory factory = new ConnectionFactory();
  3. //2.设置参数
  4. factory.setHost("192.168.130.124"); //ip
  5. factory.setPort(5672); //端口
  6. factory.setVirtualHost("/itcast"); //虚拟机 默认值
  7. factory.setUsername("iekr"); //用户名
  8. factory.setPassword("iekr"); //密码 默认值为guest
  9. //3.创建连接 connection
  10. Connection connection = factory.newConnection();
  11. //4.创建channel
  12. Channel channel = connection.createChannel();
  13. //6.接受消息
  14. String queue1Name = "test_topics_queue1";
  15. String queue2Name = "test_topics_queue2";
  16. Consumer consumer = new DefaultConsumer(channel){
  17. //回调方法 当收到消息后 会执行该方法
  18. /**
  19. *
  20. * @param consumerTag 标识
  21. * @param envelope 获取一些信息 交换机 路由key
  22. * @param properties 配置信息
  23. * @param body 数据
  24. * @throws IOException
  25. */
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. // System.out.println(consumerTag);
  29. // System.out.println(envelope.getExchange());
  30. // System.out.println(envelope.getRoutingKey());
  31. // System.out.println(properties);
  32. System.out.println(new String(body));
  33. System.out.println("队列1 存储到数据库");
  34. }
  35. };
  36. /**
  37. * String var1, DeliverCallback var2, CancelCallback var3
  38. * queue 队列名称
  39. * autoAck 是否自动确认
  40. * callback 回调对象
  41. */
  42. channel.basicConsume(queue1Name,true,consumer);

10. Spring 整合 RabbitMQ

坐标

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework</groupId>
  4. <artifactId>spring-context</artifactId>
  5. <version>5.3.10</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.amqp</groupId>
  9. <artifactId>spring-rabbit</artifactId>
  10. <version>2.3.9</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>junit</groupId>
  14. <artifactId>junit</artifactId>
  15. <version>4.13</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.springframework</groupId>
  19. <artifactId>spring-test</artifactId>
  20. <version>5.3.10</version>
  21. </dependency>
  22. </dependencies>
  23. <build>
  24. <plugins>
  25. <plugin>
  26. <groupId>org.apache.maven.plugins</groupId>
  27. <artifactId>maven-compiler-plugin</artifactId>
  28. <version>3.8.1</version>
  29. <configuration>
  30. <source>1.8</source>
  31. <target>1.8</target>
  32. </configuration>
  33. </plugin>
  34. </plugins>
  35. </build>

rabbitmq.properties

  1. rabbitmq.host=192.168.130.124
  2. rabbitmq.port=5672
  3. rabbitmq.username=iekr
  4. rabbitmq.password=iekr
  5. rabbitmq.virtual-host=/itcast

5.1. 生产者

spring-rabbitmq-producer.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:p="http://www.springframework.org/schema/p"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  7. xsi:schemaLocation="
  8. http://www.springframework.org/schema/beans
  9. https://www.springframework.org/schema/beans/spring-beans.xsd
  10. http://www.springframework.org/schema/context
  11. https://www.springframework.org/schema/context/spring-context.xsd
  12. http://www.springframework.org/schema/rabbit
  13. https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  14. ">
  15. <!-- 加载配置文件-->
  16. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  17. <!-- 定义rabbitmq connectionFactory-->
  18. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  19. port="${rabbitmq.port}"
  20. username="${rabbitmq.username}"
  21. password="${rabbitmq.password}"
  22. virtual-host="${rabbitmq.virtual-host}"/>
  23. <!-- 定义管理交换机 队列-->
  24. <rabbit:admin connection-factory="connectionFactory"/>
  25. <!-- 定义持久化队列 不存在则自动创建 不绑定到交换机则绑定到默认交换机 默认交换机为direct 名字为"" 路由键位队列名称-->
  26. <rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
  27. <!-- 定义广播交换机中的持久化队列 不存在则自动创建-->
  28. <rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
  29. <rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
  30. <!-- 定义广播类型交换机 绑定上面两个队列-->
  31. <rabbit:fanout-exchange name="spring_fanout_exchange"
  32. id="spring_fanout_exchange"
  33. auto-declare="true">
  34. <rabbit:bindings>
  35. <rabbit:binding queue="spring_fanout_queue_1"/>
  36. <rabbit:binding queue="spring_fanout_queue_2"/>
  37. </rabbit:bindings>
  38. </rabbit:fanout-exchange>
  39. <!-- 通配符队列-->
  40. <rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
  41. <rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
  42. <rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
  43. <!-- 通配符定义-->
  44. <rabbit:topic-exchange name="spring_topic_exchange" id="spring_topic_exchange" auto-declare="true">
  45. <rabbit:bindings>
  46. <rabbit:binding pattern="heima.*" queue="spring_topic_queue_star"/>
  47. <rabbit:binding pattern="heima.#" queue="spring_topic_queue_well"/>
  48. <rabbit:binding pattern="itcast.#" queue="spring_topic_queue_well2"/>
  49. </rabbit:bindings>
  50. </rabbit:topic-exchange>
  51. <!-- 定义rabbitTemplate对象操作可以在代码中方便发送消息-->
  52. <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
  53. </beans>

test

  1. package com.itheima;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.test.context.ContextConfiguration;
  7. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  8. @RunWith(SpringJUnit4ClassRunner.class)
  9. @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
  10. public class ProducerTest {
  11. //注入
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. //1对1
  15. @Test
  16. public void testHelloWorld() {
  17. //发送消息
  18. rabbitTemplate.convertAndSend("spring_queue", "hello world spring ...");
  19. }
  20. //广播
  21. @Test
  22. public void testFanout() {
  23. //发送消息
  24. rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "spring fanout...");
  25. }
  26. //topic
  27. @Test
  28. public void testTopic() {
  29. //发送消息
  30. rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.123.456", "spring topic...");
  31. }
  32. }

5.2. 消费者

spring-rabbitmq-consumer.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:p="http://www.springframework.org/schema/p"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  7. xsi:schemaLocation="
  8. http://www.springframework.org/schema/beans
  9. https://www.springframework.org/schema/beans/spring-beans.xsd
  10. http://www.springframework.org/schema/context
  11. https://www.springframework.org/schema/context/spring-context.xsd
  12. http://www.springframework.org/schema/rabbit
  13. https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  14. ">
  15. <!-- 加载配置文件-->
  16. <context:property-placeholder location="classpath:rabbitmq.properties"/>
  17. <!-- 定义rabbitmq connectionFactory-->
  18. <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
  19. port="${rabbitmq.port}"
  20. username="${rabbitmq.username}"
  21. password="${rabbitmq.password}"
  22. virtual-host="${rabbitmq.virtual-host}"/>
  23. <bean id="springQueueListener" class="com.itheima.rabbitmq.SpringQueueListener"/>
  24. <!-- <bean id="fanoutListener1" class="com.itheima.rabbitmq.FanoutListener1"/>-->
  25. <!-- <bean id="fanoutListener2" class="com.itheima.rabbitmq.FanoutListener2"/>-->
  26. <!-- <bean id="topicListenerStar" class="com.itheima.rabbitmq.TopicListenerStar"/>-->
  27. <!-- <bean id="topicListenerWell" class="com.itheima.rabbitmq.TopicListenerWell"/>-->
  28. <!-- <bean id="topicListenerWell2" class="com.itheima.rabbitmq.TopicListenerWell2"/>-->
  29. <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
  30. <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
  31. <!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue1"/>-->
  32. <!-- <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue2"/>-->
  33. <!-- <rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>-->
  34. <!-- <rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>-->
  35. <!-- <rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>-->
  36. </rabbit:listener-container>
  37. </beans>

消费者类 根据bean id 编写对应的类名 并 实现 MessageListener 重写 onMessage 方法

  1. package com.itheima.rabbitmq;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageListener;
  4. public class SpringQueueListener implements MessageListener {
  5. @Override
  6. public void onMessage(Message message) {
  7. System.out.println(new String(message.getBody()));
  8. }
  9. }

11. Spring Boot 整合 RabbitMQ

14. RabbitMQ - 图14

5.1. 生产者

绑定交换机和队列

  1. package com.example.springrabbitmqproducer.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. public static final String EXCHANGE_NAME = "boot_topic_exchange";
  9. public static final String QUEUE_NAME = "boot_queue";
  10. //1.交换机
  11. @Bean("bootExchange")
  12. public Exchange bootExchange() {
  13. //获取4种类型的交换机
  14. return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
  15. }
  16. //2.队列
  17. @Bean("bootQueue")
  18. public Queue bootQueue() {
  19. return QueueBuilder.durable(QUEUE_NAME).build();
  20. }
  21. //3.队列和交换机的绑定
  22. @Bean
  23. public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
  24. return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
  25. }
  26. }

test

  1. package com.example.springrabbitmqproducer;
  2. import com.example.springrabbitmqproducer.rabbitmq.config.RabbitMQConfig;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. @SpringBootTest
  8. class SpringRabbitmqProducerApplicationTests {
  9. //注入RabbitTemplate
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. @Test
  13. public void testSend(){
  14. rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello");
  15. }
  16. }

5.2. 消费者

  1. @Component
  2. public class RabbitMQListener {
  3. //监听指定队列
  4. @RabbitListener(queues = "boot_queue")
  5. public void ListenerQueue(Message message) {
  6. System.out.println(new String(message.getBody()));
  7. }
  8. }