参考地址:

    地址:https://www.codenong.com/jsfe8d22ac6810/

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-integration</artifactId>
    4. </dependency>
    5. <dependency>
    6. <groupId>org.springframework.integration</groupId>
    7. <artifactId>spring-integration-stream</artifactId>
    8. </dependency>
    9. <dependency>
    10. <groupId>org.springframework.integration</groupId>
    11. <artifactId>spring-integration-mqtt</artifactId>
    12. </dependency>
    1. import cn.hutool.core.util.RandomUtil;
    2. import cn.hutool.json.JSONUtil;
    3. import org.springframework.boot.autoconfigure.SpringBootApplication;
    4. import org.springframework.boot.builder.SpringApplicationBuilder;
    5. import org.springframework.context.ConfigurableApplicationContext;
    6. import java.util.HashMap;
    7. import java.util.Map;
    8. import java.util.concurrent.TimeUnit;
    9. @SpringBootApplication
    10. public class DemoMqttApplication {
    11. public static void main(String[] args) {
    12. ConfigurableApplicationContext context = new SpringApplicationBuilder(DemoMqttApplication.class)
    13. .run(args);
    14. new Thread(() -> {
    15. while (true) {
    16. Map<String, Object> dataMap = new HashMap<>() {{
    17. put("temperature", RandomUtil.randomInt(-50, 50));
    18. }};
    19. String payload = JSONUtil.toJsonStr(dataMap);
    20. MqttConsumer.publish("v1/devices/me/telemetry", payload);
    21. try {
    22. TimeUnit.SECONDS.sleep(5);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. }).start();
    28. }
    29. }
    1. /**
    2. * 获取配置信息
    3. **/
    4. public class PropertiesUtil {
    5. public static String MQTT_HOST = "tcp://127.0.0.1:1883";
    6. public static String MQTT_CLIENT_ID = "mqtt_client_id_001";
    7. public static String MQTT_USER_NAME = "27Gu6FlV5CzoZRO5bae2";
    8. public static String MQTT_PASSWORD="";
    9. public static String MQTT_TOPIC = "v1/devices/me/rpc/request/+";
    10. public static Integer MQTT_TIMEOUT = 60;
    11. public static Integer MQTT_KEEP_ALIVE = 60;
    12. }
    1. import lombok.extern.slf4j.Slf4j;
    2. import org.eclipse.paho.client.mqttv3.*;
    3. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    4. import org.springframework.boot.ApplicationArguments;
    5. import org.springframework.boot.ApplicationRunner;
    6. import org.springframework.stereotype.Component;
    7. @Slf4j
    8. @Component
    9. public class MqttConsumer implements ApplicationRunner {
    10. private static MqttClient client;
    11. @Override
    12. public void run(ApplicationArguments args) {
    13. log.info("初始化并启动mqtt......");
    14. this.connect();
    15. }
    16. /**
    17. * 连接mqtt服务器
    18. */
    19. private void connect() {
    20. try {
    21. // 1 创建客户端
    22. getClient();
    23. // 2 设置配置
    24. MqttConnectOptions options = getOptions();
    25. String[] topic = {PropertiesUtil.MQTT_TOPIC};
    26. // 3 消息发布质量
    27. int[] qos = getQos(topic.length);
    28. // 4 最后设置
    29. create(options, topic, qos);
    30. } catch (Exception e) {
    31. log.error("mqtt连接异常:" + e);
    32. }
    33. }
    34. /**
    35. * 创建客户端 --- 1 ---
    36. */
    37. public void getClient() {
    38. try {
    39. if (null == client) {
    40. client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENT_ID, new MemoryPersistence());
    41. }
    42. log.info("--创建mqtt客户端");
    43. } catch (Exception e) {
    44. log.error("创建mqtt客户端异常:" + e);
    45. }
    46. }
    47. /**
    48. * 生成配置对象,用户名,密码等 --- 2 ---
    49. */
    50. public MqttConnectOptions getOptions() {
    51. MqttConnectOptions options = new MqttConnectOptions();
    52. options.setUserName(PropertiesUtil.MQTT_USER_NAME);
    53. options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray());
    54. // 设置超时时间
    55. options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT);
    56. // 设置会话心跳时间
    57. options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE);
    58. // 是否清除session
    59. options.setCleanSession(false);
    60. log.info("--生成mqtt配置对象");
    61. return options;
    62. }
    63. /**
    64. * qos --- 3 ---
    65. */
    66. public int[] getQos(int length) {
    67. int[] qos = new int[length];
    68. for (int i = 0; i < length; i++) {
    69. /**
    70. * MQTT协议中有三种消息发布服务质量:
    71. *
    72. * QOS0: “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    73. * QOS1: “至少一次”,确保消息到达,但消息重复可能会发生。
    74. * QOS2: “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果,资源开销大
    75. */
    76. qos[i] = 1;
    77. }
    78. log.info("--设置消息发布质量");
    79. return qos;
    80. }
    81. /**
    82. * 装在各种实例和订阅主题 --- 4 ---
    83. */
    84. public void create(MqttConnectOptions options, String[] topic, int[] qos) {
    85. try {
    86. client.setCallback(new MqttConsumerCallback(client, options, topic, qos));
    87. log.info("--添加回调处理类");
    88. client.connect(options);
    89. } catch (Exception e) {
    90. log.info("装载实例或订阅主题异常:" + e);
    91. }
    92. }
    93. /**
    94. * 订阅某个主题
    95. *
    96. * @param topic
    97. * @param qos
    98. */
    99. public void subscribe(String topic, int qos) {
    100. try {
    101. log.info("topic:" + topic);
    102. client.subscribe(topic, qos);
    103. } catch (MqttException e) {
    104. e.printStackTrace();
    105. }
    106. }
    107. /**
    108. * 发布,非持久化
    109. * <p>
    110. * qos根据文档设置为1
    111. *
    112. * @param topic
    113. * @param msg
    114. */
    115. public static void publish(String topic, String msg) {
    116. publish(1, false, topic, msg);
    117. }
    118. /**
    119. * 发布
    120. */
    121. public static void publish(int qos, boolean retained, String topic, String pushMessage) {
    122. MqttMessage message = new MqttMessage();
    123. message.setQos(qos);
    124. message.setRetained(retained);
    125. message.setPayload(pushMessage.getBytes());
    126. MqttTopic mTopic = client.getTopic(topic);
    127. if (null == mTopic) {
    128. log.error("topic:" + topic + " 不存在");
    129. }
    130. MqttDeliveryToken token;
    131. try {
    132. token = mTopic.publish(message);
    133. token.waitForCompletion();
    134. if (!token.isComplete()) {
    135. log.info("消息发送成功");
    136. }
    137. } catch (MqttPersistenceException e) {
    138. e.printStackTrace();
    139. } catch (MqttException e) {
    140. e.printStackTrace();
    141. }
    142. }
    143. }
    1. import lombok.extern.slf4j.Slf4j;
    2. import org.eclipse.paho.client.mqttv3.*;
    3. import java.util.Arrays;
    4. /**
    5. * mqtt回调处理类
    6. */
    7. @Slf4j
    8. public class MqttConsumerCallback implements MqttCallbackExtended {
    9. private MqttClient client;
    10. private MqttConnectOptions options;
    11. private String[] topic;
    12. private int[] qos;
    13. public MqttConsumerCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
    14. this.client = client;
    15. this.options = options;
    16. this.topic = topic;
    17. this.qos = qos;
    18. }
    19. /**
    20. * 断开重连
    21. */
    22. @Override
    23. public void connectionLost(Throwable cause) {
    24. log.info("MQTT连接断开,发起重连......");
    25. try {
    26. if (null != client && !client.isConnected()) {
    27. client.reconnect();
    28. log.error("尝试重新连接");
    29. } else {
    30. client.connect(options);
    31. log.error("尝试建立新连接");
    32. }
    33. } catch (Exception e) {
    34. e.printStackTrace();
    35. }
    36. }
    37. /**
    38. * 接收到消息调用令牌中调用
    39. */
    40. @Override
    41. public void deliveryComplete(IMqttDeliveryToken token) {
    42. //log.info("deliveryComplete---------" + Arrays.toString(topic));
    43. }
    44. /**
    45. * 消息处理
    46. */
    47. @Override
    48. public void messageArrived(String topic, MqttMessage message) {
    49. try {
    50. String msg = new String(message.getPayload());
    51. log.info("收到topic:" + topic + " 消息:" + msg);
    52. } catch (Exception e) {
    53. log.info("处理mqtt消息异常:" + e);
    54. }
    55. }
    56. /**
    57. * mqtt连接后订阅主题
    58. */
    59. @Override
    60. public void connectComplete(boolean b, String s) {
    61. try {
    62. if (null != topic && null != qos) {
    63. if (client.isConnected()) {
    64. client.subscribe(topic, qos);
    65. log.info("mqtt连接成功,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
    66. log.info("--订阅主题::" + Arrays.toString(topic));
    67. } else {
    68. log.info("mqtt连接失败,客户端ID:" + PropertiesUtil.MQTT_CLIENT_ID);
    69. }
    70. }
    71. } catch (Exception e) {
    72. log.info("mqtt订阅主题异常:" + e);
    73. }
    74. }
    75. }
    1. 2021-07-19 17:32:39.939 INFO 14688 --- [ main] com.example.MqttConsumer : 初始化并启动mqtt......
    2. 2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --创建mqtt客户端
    3. 2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --生成mqtt配置对象
    4. 2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --设置消息发布质量
    5. 2021-07-19 17:32:39.956 INFO 14688 --- [ main] com.example.MqttConsumer : --添加回调处理类
    6. 2021-07-19 17:32:40.415 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : mqtt连接成功,客户端IDmqtt_client_id_001
    7. 2021-07-19 17:32:40.415 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : --订阅主题::[v1/devices/me/rpc/request/+]
    8. 2021-07-19 17:32:46.825 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : 收到topic:v1/devices/me/rpc/request/0 消息:{"method":"setValue","params":67.04}
    9. 2021-07-19 17:32:47.208 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : 收到topic:v1/devices/me/rpc/request/1 消息:{"method":"setValue","params":89.46}
    10. 2021-07-19 17:32:47.845 INFO 14688 --- [t_client_id_001] com.example.MqttConsumerCallback : 收到topic:v1/devices/me/rpc/request/2 消息:{"method":"setValue","params":75.91}