ykkj-spring-boot-starter-activemq技术组件,基于 activemq 实现分布式消息队列:
-
1. 发布/消费
1.2 实现源码
消息的发布基于JmsMessagingTemplate 实现:
注入framework.activemq.config.activemq.ProducerUnitl.java类,注入之后实现send方法即可实现发送消息。
- 注入 org.springframework.jms.core.JmsMessagingTemplate实现 #convertAndSend()方法,发送消息。
- 在方法上使用@JmsListener(destination=”${spring.activemq.topic-name}”, containerFactory=”topicListener”)注解,消费消息。
使用 framework/activemq/config/ActiveMQBeanConfiguration.java配置类,注入springboot环境 TopicConsumerListener监听器,初始化对应的消费者。如下图所示:
1.3 实战案例
1.3.1 引入依赖
在 ykkj-module-system-biz 【根据需求选择引用的模块】模块中,引入 ykkj-spring-boot-starter-activemq 技术组件。如下所示:
<dependency><groupId>com.ykkj.boot</groupId><artifactId>ykkj-spring-boot-starter-activemq</artifactId></dependency>
1.3.2 application-dev.yaml
在 application-xxx.yaml 的 spring下,加入 activemq配置项,代码如下图:
1.3.3 activemq.ProducerController
① 已经实现framework.activemq.controller.admin.activemq.ProducerController 的 接口类:
② 需要使用 /topic/sendTopic发送Topic消息。如下图所示:
1.3.4 前端页面默认会显示mqtt接口
以上配置完成之后打开前端页面的接口测试页面会显示mqtt的api测试接口。代码如下图:
点击发送之后可以看到成功字样
由于启动的时候程序默认订阅了mqtt服务则可以看到控制台回打印消息【@JmsListener实现订阅】
2. 使用介绍
使用mqtt需要在yaml配置文件的spring参数下进行如下配置
#配置activemq 服务器
activemq:
or-start: true #是否启用mqtt服务
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue-name: active.queue #队列名称
topic-name: UNITY #订阅的主题
send-topic: UPDATA
queue-topic: UPDATA
pool:
enabled: true
max-connections: 10 #连接池最大连接数
idle-timeout: 30000 #空闲的连接过期时间,默认为30秒
max-session-per-connection: 10
在需要使用的类里注入ProducerUnitl类使用sendXXX等方法即可实现消息发送,当前类的所有方法如下
package com.ykkj.cdp.framework.activemq.config.activemq;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.util.Map;
@Component
@ConditionalOnExpression("${spring.activemq.or-start}")
public class ProducerUnitl {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Session session;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@Value("${spring.activemq.send-topic}")
private String cachTopic;
@Value("${spring.activemq.queue-topic}")
private String quneTopic;
//lora告警下发流水号
private static short liushuihao = 0;
/**
* 发送默认队列消息
* @param str 消息内容
* @return 发送结果
*/
public String sendQueue(String str) {
this.sendMessage(this.queue, str);
return "success";
}
/**
* 发送默认主题消息
* @param str 消息内容
* @return 发送结果
*/
public String sendTopic(String str) {
this.sendMessage(this.topic, str);
return "success";
}
/**
* 发送自定义的主题消息
* @param topic 主题
* @param str 消息内容
* @return 发送结果
*/
public String sendOtherTopic(String topic,String str) {
this.sendMessage(new ActiveMQTopic(topic), str);
return "success";
}
//解算缓冲
public String sendCachTopic(String str) {
this.sendMessage(new ActiveMQTopic(cachTopic), str);
return "success";
}
//删除全部队列
public String removeAllMsg() throws JMSException {
Destination dest = new ActiveMQTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
Message message=new ActiveMQMapMessage();
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
jmsMessagingTemplate.convertAndSend(dest,message);
return "success";
}
//删除单一队列
public void removeScheduleMessage(Map<String, Object> param){
try {
Destination dest = new ActiveMQTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
Message message=new ActiveMQMapMessage();
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, param.get("messageId").toString());
jmsMessagingTemplate.convertAndSend(dest,message);
} catch (Exception e) {
// TODO: handle exception
}
}
// 发送消息,destination是发送到的队列,message是待发送的消息
private void sendMessage(Destination destination, final String message){
jmsMessagingTemplate.convertAndSend(destination, message);
}
}
在需要订阅消息的类方法上加入@JmsListener(destination=”${spring.activemq.topic-name}”, containerFactory=”topicListener”)进行消息订阅
@Component
@ConditionalOnExpression("${spring.activemq.or-start}")
public class TopicConsumerListener {
//topic模式的消费者
@JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
public void readActiveQueue(String message) {
System.out.println("topic接受到:" + message);
}
}
注解中的 destination为订阅的主题/队列,containerFactory为订阅类型主题订阅(topicListener)/队列订阅(queueListener)
*在实现mqtt的类上最好加上@ConditionalOnExpression(“${spring.activemq.or-start}”)注解防止由于mqtt堵塞程序执行
