首先需要声明消息队列以及交换机绑定 还有死信交换机,如果不需要定时任务 则不需要死信交换机
创建配置类, 加上注解@Configuration
@Configuration
public class RabbitMQConfig {
public static final String EX_A = "exchangeA";
public static final String EX_B = "exchangeB";
public static final String Q_A = "queueA";
public static final String Q_B = "queueB";
public static final String ROUTING_KEY_A = "exa-qa";
public static final String ROUTING_KEY_B = "exb-qb";
@Bean(EX_A)
public Exchange EX_A(){
return ExchangeBuilder.fanoutExchange(EX_A).durable(true).build();
}
@Bean(EX_B)
public Exchange EX_B(){
return ExchangeBuilder.fanoutExchange(EX_B).durable(true).build();
}
@Bean(Q_A)
public Queue Q_A(){
Queue queue = QueueBuilder.durable(Q_A).build();
queue.addArgument("x-dead-letter-exchange",EX_B);
return queue;
}
@Bean(Q_B)
public Queue Q_B(){
Queue queue = QueueBuilder.durable(Q_B).build();
return queue;
}
@Bean
public Binding binding_A(){
return BindingBuilder.bind(Q_A()).to(EX_A()).with(ROUTING_KEY_A).noargs();
}
@Bean
public Binding binding_B(){
return BindingBuilder.bind(Q_B()).to(EX_B()).with(ROUTING_KEY_B).noargs();
}
}
发送消息使用RabbitTemplate 延迟消息需要设置死信交换机.
import com.heima.taskdemo.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.text.SimpleDateFormat;
import java.util.Date;
@RestController
public class WmNewsPublishProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/publish")
public String publish(){
MessageProperties messageProperties = new MessageProperties();
// 设置过期时间,单位:毫秒
messageProperties.setExpiration("5000");
//将需要发送的转为byts 放入Message 如果是对象可以 byte[] bytes = JSONObject.toJSONBytes(wmNews);
Message message = new Message("hello world".getBytes(),messageProperties);
//发送时选择交换机以及队列 还有message
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_A,RabbitMQConfig.ROUTING_KEY_A,message);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间【"+simpleDateFormat.format(new Date())+"】");
return "OK";
}
}
使用 @RabbitListener(queues = RabbitMQConfig.Q_B) 接收消息,需要声明消息队列
@Component
public class PublishListener {
@RabbitListener(queues = RabbitMQConfig.Q_B)
public void publish(Message message){
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收到消息【"+simpleDateFormat.format(new Date())+"】"+new String(message.getBody()));
}
}