ykkj-spring-boot-starter-activemq技术组件,基于 activemq 实现分布式消息队列:

  • 使用 JMS集成。

    1. 发布/消费

    1.2 实现源码

    消息的发布基于JmsMessagingTemplate 实现:

  • 注入framework.activemq.config.activemq.ProducerUnitl.java类,注入之后实现send方法即可实现发送消息。

  • 注入 org.springframework.jms.core.JmsMessagingTemplate实现 #convertAndSend()方法,发送消息。
  • 在方法上使用@JmsListener(destination=”${spring.activemq.topic-name}”, containerFactory=”topicListener”)注解,消费消息。

使用 framework/activemq/config/ActiveMQBeanConfiguration.java配置类,注入springboot环境 TopicConsumerListener监听器,初始化对应的消费者。如下图所示:
image.png

1.3 实战案例

1.3.1 引入依赖

在 ykkj-module-system-biz 【根据需求选择引用的模块】模块中,引入 ykkj-spring-boot-starter-activemq 技术组件。如下所示:

  1. <dependency>
  2. <groupId>com.ykkj.boot</groupId>
  3. <artifactId>ykkj-spring-boot-starter-activemq</artifactId>
  4. </dependency>

1.3.2 application-dev.yaml

在 application-xxx.yaml 的 spring下,加入 activemq配置项,代码如下图:
image.png

1.3.3 activemq.ProducerController

① 已经实现framework.activemq.controller.admin.activemq.ProducerController 的 接口类:
image.png
② 需要使用 /topic/sendTopic发送Topic消息。如下图所示:
image.png

1.3.4 前端页面默认会显示mqtt接口

以上配置完成之后打开前端页面的接口测试页面会显示mqtt的api测试接口。代码如下图:
image.png
点击发送之后可以看到成功字样
image.png
由于启动的时候程序默认订阅了mqtt服务则可以看到控制台回打印消息【@JmsListener实现订阅】
image.png

2. 使用介绍

使用mqtt需要在yaml配置文件的spring参数下进行如下配置

#配置activemq 服务器
  activemq:
    or-start: true     #是否启用mqtt服务
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    queue-name: active.queue   #队列名称
    topic-name: UNITY   #订阅的主题
    send-topic: UPDATA
    queue-topic: UPDATA
    pool:
      enabled: true
      max-connections: 10   #连接池最大连接数
      idle-timeout: 30000   #空闲的连接过期时间,默认为30秒
      max-session-per-connection: 10

在需要使用的类里注入ProducerUnitl类使用sendXXX等方法即可实现消息发送,当前类的所有方法如下

package com.ykkj.cdp.framework.activemq.config.activemq;

import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.util.Map;

@Component
@ConditionalOnExpression("${spring.activemq.or-start}")
public class ProducerUnitl {
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Autowired
    private Session session;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;
    @Value("${spring.activemq.send-topic}")
    private  String cachTopic;
    @Value("${spring.activemq.queue-topic}")
    private  String quneTopic;


    //lora告警下发流水号
    private static short liushuihao = 0;

    /**
     * 发送默认队列消息
     * @param str 消息内容
     * @return 发送结果
     */
    public String sendQueue(String str) {
        this.sendMessage(this.queue, str);
        return "success";
    }
    /**
     * 发送默认主题消息
     * @param str 消息内容
     * @return 发送结果
     */
    public String sendTopic(String str) {
        this.sendMessage(this.topic, str);
        return "success";
    }

    /**
     * 发送自定义的主题消息
     * @param topic 主题
     * @param str 消息内容
     * @return 发送结果
     */
    public String sendOtherTopic(String topic,String str) {
        this.sendMessage(new ActiveMQTopic(topic), str);
        return "success";
    }
    //解算缓冲
    public String sendCachTopic(String str) {

        this.sendMessage(new ActiveMQTopic(cachTopic), str);
        return "success";
    }



    //删除全部队列
    public String removeAllMsg() throws JMSException {
        Destination dest = new ActiveMQTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
        Message message=new ActiveMQMapMessage();
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
        jmsMessagingTemplate.convertAndSend(dest,message);
        return "success";
    }
    //删除单一队列
    public  void removeScheduleMessage(Map<String, Object> param){
        try {
            Destination dest = new ActiveMQTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
            Message message=new ActiveMQMapMessage();
            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, param.get("messageId").toString());
            jmsMessagingTemplate.convertAndSend(dest,message);

        } catch (Exception e) {
            // TODO: handle exception
        }
    }



    // 发送消息,destination是发送到的队列,message是待发送的消息
    private void sendMessage(Destination destination, final String message){

        jmsMessagingTemplate.convertAndSend(destination, message);
    }

}

在需要订阅消息的类方法上加入@JmsListener(destination=”${spring.activemq.topic-name}”, containerFactory=”topicListener”)进行消息订阅

@Component
@ConditionalOnExpression("${spring.activemq.or-start}")
public class TopicConsumerListener {
    //topic模式的消费者
    @JmsListener(destination="${spring.activemq.topic-name}", containerFactory="topicListener")
    public void readActiveQueue(String message) {
        System.out.println("topic接受到:" + message);
    }
}

注解中的 destination为订阅的主题/队列,containerFactory为订阅类型主题订阅(topicListener)/队列订阅(queueListener)
*在实现mqtt的类上最好加上@ConditionalOnExpression(“${spring.activemq.or-start}”)注解防止由于mqtt堵塞程序执行