Java SpringBoot MQTT apollo 订阅发布
对于项目,首先需要引入maven包:

pom.xml

  1. <!-- MQTT -->
  2. <dependency>
  3. <groupId>org.springframework.integration</groupId>
  4. <artifactId>spring-integration-stream</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.integration</groupId>
  8. <artifactId>spring-integration-mqtt</artifactId>
  9. </dependency>

其目标就是将MQTT用在项目组中
接着就是项目yml文件的配置,使用properties,照葫芦画瓢就行了:

applicaiton.yml

  1. mqtt:
  2. username: admin
  3. password: password
  4. host-url: tcp://127.0.0.1:8161 # 你自己服务器的地址和端口,这个需要改
  5. clientID: test1 # 这个改不改随意,但不同的客户端肯定不能一样
  6. default-topic: home/garden/fountain # 默认主题
  7. timeout: 100
  8. keepalive: 100
  9. # Tomcat
  10. server:
  11. tomcat:
  12. uri-encoding: UTF-8
  13. max-threads: 1000
  14. min-spare-threads: 30
  15. port: 8088

注意host-url,这就是apollo的地址
来到第三步,此时就是项目内的文件:

MqttConfig文件

  1. @Component
  2. @ConfigurationProperties("mqtt")
  3. @Setter
  4. @Getter
  5. public class MqttConfig {
  6. @Autowired
  7. private MqttPushClient mqttPushClient;
  8. /**
  9. * 用户名
  10. */
  11. // @Value("username")
  12. private String username;
  13. /**
  14. * 密码
  15. */
  16. private String password;
  17. /**
  18. * 连接地址
  19. */
  20. private String hostUrl;
  21. /**
  22. * 客户Id
  23. */
  24. private String clientID;
  25. /**
  26. * 默认连接话题
  27. */
  28. private String defaultTopic;
  29. /**
  30. * 超时时间
  31. */
  32. private int timeout;
  33. /**
  34. * 保持连接数
  35. */
  36. private int keepalive;
  37. @Bean
  38. public MqttPushClient getMqttPushClient() {
  39. System.out.println("hostUrl: "+ hostUrl);
  40. System.out.println("clientID: "+ clientID);
  41. System.out.println("username: "+ username);
  42. System.out.println("password: "+ password);
  43. System.out.println("timeout: "+timeout);
  44. System.out.println("keepalive: "+ keepalive);
  45. mqttPushClient.connect(hostUrl, clientID, username, password, timeout, keepalive);
  46. // 以/#结尾表示订阅所有以test开头的主题
  47. mqttPushClient.subscribe(defaultTopic, 0);
  48. return mqttPushClient;
  49. }
  50. }

目的就是配置所对应的消息
第四步就是发布以及订阅等功能:

MqttPushClient

  1. @Component
  2. public class MqttPushClient {
  3. private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
  4. @Autowired
  5. private PushCallback pushCallback;
  6. private static MqttClient client;
  7. private static MqttClient getClient() {
  8. return client;
  9. }
  10. private static void setClient(MqttClient client) {
  11. MqttPushClient.client = client;
  12. }
  13. /**
  14. * 客户端连接
  15. *
  16. * @param host ip+端口
  17. * @param clientID 客户端Id
  18. * @param username 用户名
  19. * @param password 密码
  20. * @param timeout 超时时间
  21. * @param keepalive 保留数
  22. */
  23. public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
  24. MqttClient client;
  25. try {
  26. client = new MqttClient(host, clientID, new MemoryPersistence());
  27. MqttConnectOptions options = new MqttConnectOptions();
  28. options.setCleanSession(true);
  29. options.setUserName(username);
  30. options.setPassword(password.toCharArray());
  31. options.setConnectionTimeout(timeout);
  32. options.setKeepAliveInterval(keepalive);
  33. MqttPushClient.setClient(client);
  34. try {
  35. client.setCallback(pushCallback);
  36. client.connect(options);
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. /**
  45. * 发布
  46. *
  47. * @param qos 连接方式
  48. * @param retained 是否保留
  49. * @param topic 主题
  50. * @param pushMessage 消息体
  51. */
  52. public void publish(int qos, boolean retained, String topic, String pushMessage) {
  53. MqttMessage message = new MqttMessage();
  54. message.setQos(qos);
  55. message.setRetained(retained);
  56. message.setPayload(pushMessage.getBytes());
  57. MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
  58. if (null == mTopic) {
  59. logger.error("topic not exist");
  60. }
  61. MqttDeliveryToken token;
  62. try {
  63. token = mTopic.publish(message);
  64. token.waitForCompletion();
  65. } catch (MqttPersistenceException e) {
  66. e.printStackTrace();
  67. } catch (MqttException e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. /**
  72. * 订阅某个主题
  73. *
  74. * @param topic 主题
  75. * @param qos 连接方式
  76. */
  77. public void subscribe(String topic, int qos) {
  78. logger.info("开始订阅主题" + topic);
  79. try {
  80. MqttPushClient.getClient().subscribe(topic, qos);
  81. } catch (MqttException e) {
  82. e.printStackTrace();
  83. }
  84. }
  85. }

订阅主题以及发布的方式等内容更多编写
最后在测试看看结果是否正确:

TestController

  1. @RestController
  2. @RequestMapping("/")
  3. public class TestController {
  4. @Autowired
  5. private MqttPushClient mqttPushClient;
  6. @GetMapping(value = "/publishTopic")
  7. public String publishTopic() {
  8. String topicString = "home/garden/fountain";
  9. mqttPushClient.publish(0, false, topicString, "测试一下发布消息");
  10. return "ok";
  11. }
  12. // 发送自定义消息内容(使用默认主题)
  13. @RequestMapping("/publishTopic/{data}")
  14. public String test1(@PathVariable("data") String data) {
  15. String topicString = "home/garden/fountain";
  16. mqttPushClient.publish(0,false,topicString, data);
  17. return "ok";
  18. }
  19. // 发送自定义消息内容,且指定主题
  20. @RequestMapping("/publishTopic/{topic}/{data}")
  21. public String test2(@PathVariable("topic") String topic, @PathVariable("data") String data) {
  22. mqttPushClient.publish(0,false,topic, data);
  23. return "ok";
  24. }
  25. }


可以使用MQTT.fx进行测试。用Postman发出,就能够查看最终的结果。