参考地址:
- https://blog.csdn.net/foxbamboo/article/details/104672862/
- https://blog.csdn.net/weixin_42906256/article/details/108910180?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-10.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-10.control
地址:https://www.codenong.com/jsfe8d22ac6810/
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
import cn.hutool.core.util.RandomUtil;import cn.hutool.json.JSONUtil;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.builder.SpringApplicationBuilder;import org.springframework.context.ConfigurableApplicationContext;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;@SpringBootApplicationpublic class DemoMqttApplication {public static void main(String[] args) {ConfigurableApplicationContext context = new SpringApplicationBuilder(DemoMqttApplication.class).run(args);new Thread(() -> {while (true) {Map<String, Object> dataMap = new HashMap<>() {{put("temperature", RandomUtil.randomInt(-50, 50));}};String payload = JSONUtil.toJsonStr(dataMap);MqttConsumer.publish("v1/devices/me/telemetry", payload);try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}
/*** 获取配置信息**/public class PropertiesUtil {public static String MQTT_HOST = "tcp://127.0.0.1:1883";public static String MQTT_CLIENT_ID = "mqtt_client_id_001";public static String MQTT_USER_NAME = "27Gu6FlV5CzoZRO5bae2";public static String MQTT_PASSWORD="";public static String MQTT_TOPIC = "v1/devices/me/rpc/request/+";public static Integer MQTT_TIMEOUT = 60;public static Integer MQTT_KEEP_ALIVE = 60;}
import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class MqttConsumer implements ApplicationRunner {private static MqttClient client;@Overridepublic void run(ApplicationArguments args) {log.info("初始化并启动mqtt......");this.connect();}/*** 连接mqtt服务器*/private void connect() {try {// 1 创建客户端getClient();// 2 设置配置MqttConnectOptions options = getOptions();String[] topic = {PropertiesUtil.MQTT_TOPIC};// 3 消息发布质量int[] qos = getQos(topic.length);// 4 最后设置create(options, topic, qos);} catch (Exception e) {log.error("mqtt连接异常:" + e);}}/*** 创建客户端 --- 1 ---*/public void getClient() {try {if (null == client) {client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());}log.info("--创建mqtt客户端");} catch (Exception e) {log.error("创建mqtt客户端异常:" + e);}}/*** 生成配置对象,用户名,密码等 --- 2 ---*/public MqttConnectOptions getOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(PropertiesUtil.MQTT_USER_NAME);options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());// 设置超时时间options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);// 设置会话心跳时间options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);// 是否清除sessionoptions.setCleanSession(false);log.info("--生成mqtt配置对象");return options;}/*** qos --- 3 ---*/public int[] getQos(int length) {int[] qos = new int[length];for (int i = 0; i < length; i++) {/*** MQTT协议中有三种消息发布服务质量:** QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。* QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。* QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大*/qos[i] = 1;}log.info("--设置消息发布质量");return qos;}/*** 装在各种实例和订阅主题 --- 4 ---*/public void create(MqttConnectOptions options, String[] topic, int[] qos) {try {client.setCallback(new MqttConsumerCallback(client, options, topic, qos));log.info("--添加回调处理类");client.connect(options);} catch (Exception e) {log.info("装载实例或订阅主题异常:" + e);}}/*** 订阅某个主题** @param topic* @param qos*/public void subscribe(String topic, int qos) {try {log.info("topic:" + topic);client.subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}/*** 发布,非持久化* <p>* qos根据文档设置为1** @param topic* @param msg*/public static void publish(String topic, String msg) {publish(1, false, topic, msg);}/*** 发布*/public static void publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = client.getTopic(topic);if (null == mTopic) {log.error("topic:" + topic + " 不存在");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();if (!token.isComplete()) {log.info("消息发送成功");}} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}}
import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.*;import java.util.Arrays;/*** mqtt回调处理类*/@Slf4jpublic class MqttConsumerCallback implements MqttCallbackExtended {private MqttClient client;private MqttConnectOptions options;private String[] topic;private int[] qos;public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {this.client = client;this.options = options;this.topic = topic;this.qos = qos;}/*** 断开重连*/@Overridepublic void connectionLost(Throwable cause) {log.info("MQTT连接断开,发起重连......");try {if (null != client && !client.isConnected()) {client.reconnect();log.error("尝试重新连接");} else {client.connect(options);log.error("尝试建立新连接");}} catch (Exception e) {e.printStackTrace();}}/*** 接收到消息调用令牌中调用*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {//log.info("deliveryComplete---------" + Arrays.toString(topic));}/*** 消息处理*/@Overridepublic void messageArrived(String topic, MqttMessage message) {try {String msg = new String(message.getPayload());log.info("收到topic:" + topic + " 消息:" + msg);} catch (Exception e) {log.info("处理mqtt消息异常:" + e);}}/*** mqtt连接后订阅主题*/@Overridepublic void connectComplete(boolean b, String s) {try {if (null != topic && null != qos) {if (client.isConnected()) {client.subscribe(topic, qos);log.info("mqtt连接成功,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);log.info("--订阅主题::" + Arrays.toString(topic));} else {log.info("mqtt连接失败,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);}}} catch (Exception e) {log.info("mqtt订阅主题异常:" + e);}}}
2021-07-19 17:32:39.939 INFO 14688 --- [ main] com.example.MqttConsumer : 初始化并启动mqtt......2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --创建mqtt客户端2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --生成mqtt配置对象2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --设置消息发布质量2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --添加回调处理类2021-07-19 17:32:40.415 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : mqtt连接成功,客户端ID:mqtt_client_id_0012021-07-19 17:32:40.415 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : --订阅主题::[v1/devices/me/rpc/request/+]2021-07-19 17:32:46.825 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : 收到topic:v1/devices/me/rpc/request/0 消息:{"method":"setValue","params":67.04}2021-07-19 17:32:47.208 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : 收到topic:v1/devices/me/rpc/request/1 消息:{"method":"setValue","params":89.46}2021-07-19 17:32:47.845 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : 收到topic:v1/devices/me/rpc/request/2 消息:{"method":"setValue","params":75.91}
