4.1 创建消息生产者工程


创建模块02-rabbitmq-springboot-send

image.png
image.png
image.png
配置模块02-rabbitmq-springboot-send的application.properties文件添加对RabbitMQ的集成

  1. #配置RabbitMQ链接信息
  2. #配置RabbitMQ服务器的IP地址
  3. spring.rabbitmq.host=192.168.222.128
  4. #配置RabbitMQ服务器的端口
  5. spring.rabbitmq.port=5672
  6. #配置RabbitMQ服务器的访问账号
  7. spring.rabbitmq.username=root
  8. #配置RabbitMQ服务器的访问密码
  9. spring.rabbitmq.password=root

4.2 创建消息接收者工程

创建模块02-rabbitmq-springboot-receive

image.png
image.png
image.png
配置模块02-rabbitmq-springboot-receive的application.properties文件添加对RabbitMQ的集成

  1. #配置RabbitMQ链接信息
  2. #配置RabbitMQ服务器的IP地址
  3. spring.rabbitmq.host=192.168.222.128
  4. #配置RabbitMQ服务器的端口
  5. spring.rabbitmq.port=5672
  6. #配置RabbitMQ服务器的访问账号
  7. spring.rabbitmq.username=root
  8. #配置RabbitMQ服务器的访问密码
  9. spring.rabbitmq.password=root

4.3 Direct模式消息发送和接收

4.3.1 编写Direct模式的消息发送

在02-rabbitmq-springboot-send模块中创建类,com.bjpowernode.direct.Send

  1. @Service
  2. public class Send {
  3. //自动注入Amqp的模板对象
  4. @Resource
  5. private AmqpTemplate template;
  6. public void send(){
  7. //发送消息到队列
  8. //参数 1 为消息存放的交换机名称 (需要事前创建)
  9. //参数 2 为RoutingKey,接收者需要根据这个key精准接收消息
  10. //参数 3 为具体存入队列中的消息数据
  11. template.convertAndSend("BootDirectExchange","BootRouting","SpringBootDirect");
  12. }
  13. }

创建Amqp配置类com.bjpowernode.rabbitmq.config.AmqpConfig

  1. @Configuration
  2. public class AmqpConfig {
  3. //@Bean 用于模拟Spring配置文件中的<bean>标签,用于创建名字为
  4. // BootDirectExchange的交换机
  5. @Bean
  6. public DirectExchange myChange(){
  7. return new DirectExchange("BootDirectExchange");
  8. }
  9. }

运行测试Direct消息发送,编写Application.java类

  1. @SpringBootApplication
  2. public class Application {
  3. public static void main(String[] args) {
  4. ApplicationContext ac= SpringApplication.run(Application.class, args);
  5. Send send= (Send) ac.getBean("send");
  6. send.send();
  7. }
  8. }

4.3.2 编写Direct模式的消息接收

在02-rabbitmq-springboot-receive模块中创建类,com.bjpowernode.direct.Receive

  1. @Service
  2. public class Receive {
  3. //@RabbitListener注解用于标记当前方法为消息监听方法,可以监听某个队列,当队列中有新消息则自动完成接收,
  4. @RabbitListener(queues ="myQueueDirect")
  5. public void receive(String message){
  6. System.out.println("Boot的Direct消息----"+message);
  7. }
  8. }

创建Amqp配置类com.bjpowernode.rabbitmq.config.AmqpConfig

  1. @Configuration
  2. public class AmqpConfig {
  3. //创建一个名字为myQueueDirect的队列
  4. @Bean
  5. public Queue queue(){
  6. return new Queue("myQueueDirect");
  7. }
  8. //创建一个名字为BootDirectExchange的交换机
  9. @Bean
  10. public Exchange myChange(){
  11. return new DirectExchange("BootDirectExchange");
  12. }
  13. //将队列绑定到交换机
  14. @Bean("binding")
  15. //参数1 为自定义队列对象,参数名queue为自定义队列Bean 的id
  16. //参数 2 为自定义的交换机,参数名myChange 为自定义交换机Bean 的id
  17. public Binding binding(Queue queue,Exchange myChange){
  18. // 将队列绑定到交换机,参数BootRouting为RoutingKey
  19. return BindingBuilder.bind(queue).to(myChange).with("BootRouting ").noargs();
  20. }
  21. }

运行测试Receive消息接收,编写Application.java类

  1. @SpringBootApplication
  2. public class Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Application.class, args);
  5. }
  6. }

4.4 Fanout模式消息发送和接收

4.4.1 编写Fanout模式的消息发送

在02-rabbitmq-springboot-send模块中创建类,com.bjpowernode.fanout.Send

  1. @Service
  2. public class Send {
  3. //自动注入Amqp的模板对象
  4. @Resource
  5. private AmqpTemplate template;
  6. public void fanoutSend(){
  7. //发送消息
  8. //参数 1 为交换机名称
  9. //参数 2 为Routingkey ,由于Fanout不需要绑定RoutingKey因此可以为空
  10. //参数 3 为具体的消息内容
  11. template.convertAndSend("BootFanoutExchange","","SpringBootFanout");
  12. }
  13. }

修改Amqp配置类com.bjpowernode.rabbitmq.config.AmqpConfig,增加以下内容

  1. //创建交换机
  2. @Bean
  3. public FanoutExchange fanoutExchange(){
  4. //创建一个基于Fanout的交换机 名字为BootFanoutExchange
  5. return new FanoutExchange("BootFanoutExchange");
  6. }

运行测试Direct消息发送,编写Application.java类

  1. @SpringBootApplication
  2. public class Application {
  3. public static void main(String[] args) {
  4. ApplicationContext ac= SpringApplication.run(Application.class, args);
  5. Send send= (Send) ac.getBean("send");
  6. send.fanoutSend();
  7. }
  8. }

4.4.2 编写Fanout模式的消息接收

在02-rabbitmq-springboot-receive模块中创建类,com.bjpowernode.fanout.Receove

  1. @Service
  2. public class Receive {
  3. @RabbitListener(queues ="fanoutQueue")
  4. public void fanoutReceive(String message){
  5. System.out.println("Boot的Fanout消息----"+message);
  6. }
  7. }

修改Amqp配置类com.bjpowernode.rabbitmq.config.AmqpConfig,增加以下内容
方式一:采取配置类的方式来实现

  1. //创建一个名字为 fanoutQueue的队列
  2. @Bean
  3. public Queue fanoutQueue(){
  4. return new Queue("fanoutQueue");
  5. }
  6. //创建一个名字为 BootFanoutExchange的交换机
  7. @Bean
  8. public FanoutExchange fanoutExchange(){
  9. return new FanoutExchange("BootFanoutExchange");
  10. }
  11. @Bean
  12. public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){
  13. //将队列绑定到指定的交换机上
  14. //参数1 为指定的队列对象
  15. //参数2 为指定的交换机对象
  16. return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
  17. }

方式二:采取注解的方式来实现

  1. @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))})
  2. public void directRecive01(String message){
  3. System.out.println("directRecive01:" + message);
  4. }
  5. @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))})
  6. public void directRecive02(String message){
  7. System.out.println("directRecive02:" + message);
  8. }
  9. @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = "fanoutExchange",type = "fanout"))})
  10. public void directRecive03(String message){
  11. System.out.println("directRecive03" + message);
  12. }

运行测试Receive消息接收,编写Application.java类

  1. @SpringBootApplication
  2. public class Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(Application.class, args);
  5. }
  6. }

4.5 Topic模式消息发送和接收

4.5.1 编写Topic模式消息发送

在02-rabbitmq-springboot-send模块中创建类,com.bjpowernode.topic.Send

  1. @Service
  2. public class Send {
  3. //自动注入Amqp的模板对象
  4. @Resource
  5. private AmqpTemplate template;
  6. public void topicSend(){
  7. //发送消息
  8. //参数 1 为交换机名称
  9. //参数 2 为Routingkey
  10. //参数 3 为具体的消息内容
  11. template.convertAndSend("BootTopicExchange","Boot.text","SpringBootTopic");
  12. }
  13. }

修改Amqp配置类com.bjpowernode.rabbitmq.config.AmqpConfig,增加以下内容

  1. //创建交换机
  2. @Bean
  3. public TopicExchange topicExchange(){
  4. return new TopicExchange("BootTopicExchange");
  5. }

运行测试Direct消息发送,编写Application.java类

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
      ApplicationContext ac= SpringApplication.run(Application.class, args);
      Send send= (Send) ac.getBean("send");
      send. topicSend ();
    }
}

4.5.2 编写Topic模式消息接收

在02-rabbitmq-springboot-receive模块中创建类,com.bjpowernode.topic.Receove

@Service
public class Receive {
    @RabbitListener(queues ="topicQueue")
    public void topicReceive(String message){
        System.out.println("Boot的Fanout消息111----"+message);
    }
    @RabbitListener(queues ="topicQueue2")
    public void fanoutReceive 02(String message){
        System.out.println("Boot的Fanout消息222----"+message);
    }
}

修改Amqp配置类com.bjpowernode.rabbitmq.config.AmqpConfig,增加以下内容

//创建交换机,
@Bean
public TopicExchange TopicExchange(){
//创建一个名为BootTopicExchange的Topic的交换机
    return new TopicExchange("BootTopicExchange");
}
//创建队列
@Bean
public Queue topicQueue(){
    return new Queue("topicQueue");
}
//创建队列
@Bean
public Queue topicQueue2(){
    return new Queue("topicQueue2");
}
//绑定队列到交换机
@Bean
public Binding  topicBinding(Queue topicQueue,TopicExchange topicExchange){
    //将队列绑定到指定交换机
//参数1 为指定队列对象
//参数2 为指定的交换机对象
//参数3 为RoutingKey的匹配规则,Boot.#表示 可以接收以Boot开头的任意子孙路径下的队列
Return BindingBuilder.bind(topicQueue).to(topicExchange).with("Boot.#");
}
@Bean
public Binding  topicBinding2(Queue topicQueue2,TopicExchange topicExchange){
    //将队列绑定到指定交换机
//参数1 为指定队列对象
//参数2 为指定的交换机对象
//参数3 为RoutingKey的匹配规则,#.test表示 可以接收以任意路径靠头的但是必须以test结尾的队列
    return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.text");
}

运行测试Receive消息接收,编写Application.java类

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}