消息生产者
依赖引入
<dependency><groupId>com.cjkj</groupId><artifactId>cjkj-mq-spring-boot-starter</artifactId><version>1.5.0</version></dependency>
配置文件
配置文件中配置需要用到的所有MQ信息[队列,交换机,路由key],支持配置多个
配置的时候注意{}对象中的key必须写正确,交换机对应exchange, 路由键对应routingKey, 队列对应queueName
#业务代码, 交换机, 路由键, 消息队列queues:messageQueues: [{bizCode: 101, exchange: modules_message_service, routingKey: message.push, queueName: modules.message.push}]
定义MQ消息信息
定义交换机和对应路由key封装的常量比如:
PUSH_MESSAGE("modules_message_service", "message.push");定义业务对应的队列,这里以激光推送队列为例:在内部类MessageQueueConstants中添加
public static final String QUEUE_MESSAGE_PUSH = "modules.message.push";@Slf4jpublic enum EnumMessageTopics {/*** 交换机和对应路由key封装的常量*/PUSH_MESSAGE("modules_message_service", "message.push");/*** 交换机*/private final String systemCode;/*** 主题*/private final String topic;EnumMessageTopics(final String systemCode, final String topic) {this.systemCode = systemCode;this.topic = topic;}public String getFullTopicName() {return systemCode + ":" + topic;}public String getSystemCode() {return systemCode;}public String getTopic() {return topic;}/*** 平台 Queues*/public static class MessageQueueConstants {/*** 激光推送信息队列*/public static final String QUEUE_MESSAGE_PUSH = "modules.message.push";}}
发送消息
```java @Autowired private MessageSender messageSender;
@Override
public void pushMessage(PushMessageReq pushMessageReq) {
final List
<a name="qXSEt"></a>## 消费者<a name="noNmh"></a>## 定义回调方法- 回调方法上添加@RabbitHandler和@RabbitListener注解- 在@RabbitListener注解中指定队列```java@RabbitHandler@RabbitListener(queues = EnumMessageTopics.MessageQueueConstants.QUEUE_MESSAGE_PUSH)private void pushMessageFunc(Message message, Channel channel) {TEASPOON.execute(() -> {final DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");final String topic = message.getMessageProperties().getReceivedRoutingKey();String jsonMessage = new String(message.getBody(), StandardCharsets.UTF_8);log.info(dtf2.format(LocalDateTime.now()) + "收到 Topic: [{}],消息 [{}]", topic, jsonMessage);final PushMessageEntity pushMessageEntity;try {pushMessageEntity = JsonUtils.fromJson(jsonMessage, PushMessageEntity.class);// 手动ack应答// 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了// 否则消息服务器以为这条消息没处理掉 后续还会在发,true确认所有消费者获得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("消息消费成功:id:{}", message.getMessageProperties().getDeliveryTag());if (pushMessageEntity != null) {this.sendMessage(pushMessageEntity);}} catch (Exception e) {log.error("消息处理失败:id:{}", message.getMessageProperties().getDeliveryTag());}});}
