简介

这个项目是我学校的一个学长带着我去做的,是老师从企业接的,一个,算是小型的商业项目。具体内容就不在这里讲的很详细了。同时,由于这个项目甲方还没验收,所以,这个项目也只是一个小 Demo,即未使用到EMQ的高级特性(像钩子,认证链,规则,数据桥接等等),但是等甲方验收通过后,这些高级特性一定会用到的。所以这个项目也会连载(但是间隔时间会长点,毕竟要和甲方和硬件经常”交流”😭)。所以本文就是这个系列的第一集,非常适合用以学习。在后续的开发中也是会本文也是主要分析关于物联网项目中MQTT的使用,以及用部分设计模式改造出一个适合多数情况下使用的小项目。

需要的技术储备

  • Spring Boot 这个项目他本质上还是个 Web项目,只是里面用到了一些 MQTT的知识。
  • 设计模式 这个技术储备是一定要有的,这个小Demo 的核心亮点就是运用了设计模式
  • MQTT MQTT的基础知识是需要提交了解一下的
  • 简单的了解 EMQ 在这个项目调试的过程中,我们会用到 EMQ 来模拟硬件的发/接 消息,后面也会用到EMQ的高级特性,所以还是了解一下为好

以上需要的技术储备,如果有哪个还是不熟悉的话,建议大家可以先去了解一下。

  • 设计模式 我推荐大家去看《大话设计模式》这本书,很适合萌新学
  • MQTT 和 EMQ 我自己以前有写过关于这方面的笔记,大家感兴趣也可以看一下,链接中都有

需要引入的包

  1. <!-- 用于实现 mqtt客户端-->
  2. <dependency>
  3. <groupId>org.eclipse.paho</groupId>
  4. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  5. <version>1.2.5</version>
  6. </dependency>
  7. <!-- 工具包,简化开发-->
  8. <dependency>
  9. <groupId>cn.hutool</groupId>
  10. <artifactId>hutool-all</artifactId>
  11. <version>5.7.17</version>
  12. </dependency>

整体架构设计

小型物联网项目Demo-Lora - 图1
整个系统可分为:

  1. MQTTReq生成中心:用于生成向硬件发送的MQTTReq对象,但是最后要转换为Json格式的发送给硬件
  2. MQTTProcess:这个是实现MQTT的整体流程,也是最重要的。
  3. 代理工厂:将硬件的消息解析后,生成对应的代理对象,去执行数据库的操作

下面我简单的介绍一下我这项目硬件那边简单的结构,但目的是为了便于理解代码的讲解,其实每个系统的硬件结构都是不同的,如果你硬件的那边结构已经确定,可以跳过介绍,直接看代码讲解就好

硬件结构简单介绍

硬件结构图

在这个硬件结构中,最主要的一条线就是 网关-节点-module,module 就是一个具体负责处理数据的结构
小型物联网项目Demo-Lora - 图2

命令

由于这个硬件系统是要根据具体的命令来执行对应的操作,所以我们给硬件传消息的适合,要把消息带上。
小型物联网项目Demo-Lora - 图3

MQTTReq生成中心

MQTTReq的组成

这个其实每个系统都不一样的,只要你和硬件那边沟通好就可以,那下面就用我的举个例子。
如果你跟硬件已经沟通好了,可以跳过这个环节
小型物联网项目Demo-Lora - 图4

  • adr:是地址,是要告诉硬件命令的传递过程,例如下面这个地址,id1,id2,id3就是命令要经过的节点id,最后一个节点就是要执行命令的节点,对于地址的构成有很多种形式,有id,ids,domain等

小型物联网项目Demo-Lora - 图5
小型物联网项目Demo-Lora - 图6

  • msgid:每个发送消息的id,保证每个发送的消息的id都是唯一的,然后硬件回应这个发送的消息的msgid 与发送的消息的msgid 是一样的,这样就可以互相匹配上
  • cmd:就是具体命令的对应id
  • type:type是用来告诉硬件这次操作是只读取硬件的消息,还是要更新硬件的消息
  • time:time就是发送消息的时间,这里用数组代替是因为格式化的时间格式传输会消耗更多的字节,这个硬件那边传输的适合消耗的字节越少越好
  • data:data就是要具体传输给硬件的数据了,有数据的话,就用键值对的格式传输,如果没有要传输的数据,用 value:-1代替就好

小型物联网项目Demo-Lora - 图7

  1. package com.xiancai.lora.MQTT.bean;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import java.util.List;
  5. import java.util.Map;
  6. @Data
  7. @Builder
  8. public class MQTTReq {
  9. /**
  10. 地址
  11. */
  12. private String adr;
  13. private String msgid;
  14. /**
  15. * 具体的命令
  16. */
  17. private Integer command;
  18. /**
  19. * 命令的类型,get,set
  20. */
  21. private String type;
  22. /**
  23. * 发送时的时间 格式[23,1,9,15,56,23]
  24. */
  25. private List<Integer> time;
  26. /**
  27. 数据,因为要用键值对的形式,选择map就好
  28. */
  29. private Map<String,Object> data;
  30. }

RedisIdWorker

这个是生成唯一的消息id,具体怎么生成的就在这里不详细说了,这个黑马程序员的那个Redis课中有讲到,想要了解的同学可以看一下,当然,我以前也是记过对应的笔记的,链接放下面 了

我们再来回顾一下 地址的构成,可以由id组成,由ids组成,由domain组成,还可以混合组成。所以针对不同的地址,我们要进行不同的处理,但是他们的抽象处理步骤大体相同
传地址:

  1. 根据前端传来的节点id,往回找需要找的节点的对应属性(可能是节点id或ids或domain),然后把他们用对于的符号组合成字符串
  2. 然后用base64编码,不够16位的往前用 A 补够。

收地址:

  1. 先用base64解码
  2. 找到地址中最后一个节点的属性

大体上没什么区别,最主要的区别就在找节点的哪个属性,是id,ids还是domain。那既然除了选取属性外,其他的操作都相同,我们就可以用一个模板方法模式。

这个 AbstractAddressHandler就是一个模板,他里面封装了所有有关地址的方法,其中有的方法每个地址都一样,有的方法每个地址都不同,那么可以用abstarct,让他延迟到子类中实现

  1. package com.xiancai.lora.MQTT.util.address;
  2. import java.nio.charset.StandardCharsets;
  3. import java.util.Base64;
  4. import java.util.LinkedList;
  5. import java.util.Map;
  6. /**
  7. * 构造地址的模板方法
  8. */
  9. public abstract class AbstractAddressHandler {
  10. /**
  11. * 给硬件的地址
  12. * @param nodeId
  13. * @return
  14. */
  15. public String produceAddress(Integer nodeId){
  16. //先是把要所有的节点id找到
  17. LinkedList<String> address = findAddress(nodeId);
  18. //然后拼成串
  19. String normalAddress = combineAddress(address);
  20. //然后转Base64
  21. return toBase64(normalAddress);
  22. }
  23. /**
  24. * 解析硬件传来的地址,先把Base64的编码转换为正常格式,再进行拆分
  25. * @param address
  26. * @return
  27. */
  28. public abstract Map<String,String> parseHardWareAddress(String address);
  29. /**
  30. * 要给硬件的地址,要先找到前几个设备的id/ids/admin,
  31. * 因为最终执行命令的都是节点,所以我们只要找节点就可以了
  32. * 参数给的是最终的那个节点id,我们要往上找
  33. */
  34. public abstract LinkedList<String> findAddress(Integer nodeId);
  35. public abstract String combineAddress(LinkedList<String> addresses);
  36. /**
  37. * 转换为 Base64编码的格式
  38. * @param address
  39. * @return
  40. */
  41. public String toBase64(String address){
  42. String base64Address = Base64.getEncoder().encodeToString(address.getBytes(StandardCharsets.UTF_8));
  43. while (base64Address.length()<16){
  44. base64Address='A'+base64Address;
  45. }
  46. return base64Address;
  47. }
  48. /**
  49. * 将硬件传来的base64的地址转换为正常的
  50. * @param base64
  51. * @return
  52. */
  53. public String parseBase64(String base64){
  54. String removeAString = removeA(base64);
  55. return new String(Base64.getDecoder().decode(removeAString));
  56. }
  57. // AAAAABSADSDSDAAA
  58. //AAADSDSDASBAAAAAA
  59. //以不确定的长度的A作为前缀
  60. private String removeA(String normalAddress){
  61. if(normalAddress.charAt(0)!='A'){
  62. return normalAddress;
  63. }
  64. char[] chars = normalAddress.toCharArray();
  65. int i=0;
  66. for ( ; i < chars.length; i++) {
  67. if(chars[i]!='A') break;
  68. }
  69. String substring = normalAddress.substring(i);
  70. return substring;
  71. }
  72. }

这是一个用 id 作为地址的一个地址处理器,还可以有用ids作为地址的处理器,用domain的,用混合的等等,只要继承了上面的那个模板后就可以生成很多的地址处理器。

  1. package com.xiancai.lora.MQTT.util.address;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.xiancai.lora.model.entity.Node;
  4. import com.xiancai.lora.service.NodeService;
  5. import com.xiancai.lora.service.impl.NodeServiceImpl;
  6. import org.springframework.context.ApplicationContext;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import javax.annotation.Resource;
  10. import java.util.*;
  11. @Component
  12. public class NormalAddressHandler extends AbstractAddressHandler {
  13. //因为这里的NormalAddress是反射new出来的,是用newInstance默认构造器造出来的,所以这个类为被
  14. @Resource
  15. private NodeService nodeService;
  16. @Override
  17. public LinkedList<String> findAddress(Integer nodeId) {
  18. LinkedList<String> addressQueue = new LinkedList<>();
  19. addressQueue.add(nodeId+"");
  20. Integer loraId = nodeService.getById(nodeId).getLoraId();
  21. List<String> list=new ArrayList<>();
  22. process(loraId,list);
  23. addressQueue.addAll(list);
  24. return addressQueue;
  25. }
  26. /**
  27. * 递归得到地址
  28. * @param loraId
  29. * @param list
  30. */
  31. private void process(Integer loraId,List<String> list){
  32. if(list.size()==2||loraId==-1){
  33. return;
  34. }
  35. //拿出来后面的id,找到这个id的节点的lora_id,看他的上一级节点是谁
  36. Node node = nodeService.getOne(new QueryWrapper<Node>().eq("is_lora", loraId));
  37. Integer beforeAddress = node.getId();
  38. list.add(beforeAddress + "");
  39. process(node.getLoraId(),list);
  40. }
  41. @Override
  42. public String combineAddress(LinkedList<String> addresses) {
  43. StringBuilder s= new StringBuilder();
  44. while (!addresses.isEmpty()){
  45. s.append("#").append(addresses.pollLast());
  46. }
  47. return s.toString();
  48. }
  49. @Override
  50. public Map<String, String> parseHardWareAddress(String Base634Address) {
  51. //先是把Base64转换为普通字符串
  52. String address = parseBase64(Base634Address);
  53. String[] split = address.split("#");
  54. Map<String,String> map=new HashMap<>();
  55. map.put("id",split[split.length-1]);
  56. return map;
  57. }
  58. }

那现在各个地址处理器有了,怎么可以根据不同的情况选取不同的地址处理器呢?那这个就要用到策略模式了。
策略模式简单来说就是定义很多的策略,然后用一个类来针对不同的情况来实现不同的策略
那刚才我们创建的很多的地址处理器,都是策略,现在我们只用再实现一个类来实现分配策略就好了
分配策略有简单来说有两种方式,一种是静态的,用个switch或者map来分配,一种是动态的,用反射
对于静态来说,会有点耦合,我们要不断的修改用来分配策略的类,当策略很多的策略的时候,swtich和map也会显得有点臃肿,所以,下面我们就用反射来实现。

  1. package com.xiancai.lora.MQTT.util.address.context;
  2. import com.xiancai.lora.MQTT.util.BeanFactory;
  3. import com.xiancai.lora.MQTT.util.address.AbstractAddressHandler;
  4. import com.xiancai.lora.enums.MQTT.MQTTAddress;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.Resource;
  8. import java.util.Map;
  9. import static com.xiancai.lora.constant.UsuStatus.ADDRESS_REFLECT_PREFIX;
  10. import static com.xiancai.lora.constant.UsuStatus.ADDRESS_REFLECT_SUFFIX;
  11. @Component
  12. public class AddressHandlerContext {
  13. private AbstractAddressHandler addressHandler;
  14. @Autowired
  15. private MQTTAddress mqttAddress;
  16. @Resource
  17. private BeanFactory beanFactory;
  18. public void produceAddressHandler(String symbol){
  19. try {
  20. String classPath=ADDRESS_REFLECT_PREFIX + mqttAddress.getClassPath(symbol) + ADDRESS_REFLECT_SUFFIX;
  21. Class<?> aClass = Class.forName(classPath);
  22. addressHandler =(AbstractAddressHandler) beanFactory.getApplicationContext().getBean(aClass);
  23. } catch (Exception e) {
  24. throw new RuntimeException("地址处理器创建异常"+e.getMessage());
  25. }
  26. }
  27. /**
  28. * 给硬件的地址
  29. * @param nodeId
  30. * @return
  31. */
  32. public String produceAddress(Integer nodeId){
  33. return addressHandler.produceAddress(nodeId);
  34. }
  35. /**
  36. * 解析硬件传来的地址,先把Base64的编码转换为正常格式,再进行拆分
  37. * @param address
  38. * @return
  39. */
  40. public Map<String, String> parseHardWareAddress(String address){
  41. return addressHandler.parseHardWareAddress(address);
  42. }
  43. }
  1. package com.xiancai.lora.MQTT.util;
  2. import org.springframework.beans.BeansException;
  3. import org.springframework.context.ApplicationContext;
  4. import org.springframework.context.ApplicationContextAware;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class BeanFactory implements ApplicationContextAware {
  8. private ApplicationContext applicationContext;
  9. @Override
  10. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  11. this.applicationContext=applicationContext;
  12. }
  13. public ApplicationContext getApplicationContext() {
  14. return applicationContext;
  15. }
  16. }
  1. package com.xiancai.lora.MQTT.util.res;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import com.xiancai.lora.exception.BusinessException;
  4. import org.springframework.stereotype.Component;
  5. import java.nio.charset.StandardCharsets;
  6. import java.time.LocalDateTime;
  7. import java.time.format.DateTimeFormatter;
  8. import java.util.ArrayList;
  9. import java.util.Base64;
  10. import java.util.List;
  11. import java.util.Map;
  12. @Component
  13. public class MQTTReqProperty {
  14. /**
  15. * 解析时间
  16. * @return
  17. */
  18. public List<Integer> parseTime(){
  19. LocalDateTime now = LocalDateTime.now();
  20. String date = now.format(DateTimeFormatter.ofPattern("yy,MM,dd,HH,mm,ss"));
  21. ArrayList<Integer> list = new ArrayList<>();
  22. for (String s : date.split(",")) {
  23. list.add(Integer.parseInt(s));
  24. }
  25. return list;
  26. }
  27. /**
  28. * 解析地址
  29. */
  30. /**
  31. * 解析 data,无论是什么 DTO 对象,我们都只要把他转成字符串就行了
  32. */
  33. public Map<String,Object> parseData(Object data){
  34. Map<String, Object> stringObjectMap = BeanUtil.beanToMap(data);
  35. if(stringObjectMap.isEmpty()){
  36. throw new BusinessException("ss",123,"ss");
  37. }
  38. return stringObjectMap;
  39. }
  40. }


MQTTReq生成中心

  • MQTTReq生成中心,主要用到的设计模式是门面模式也叫外观模式(即 将生产MQTTReq各个熟悉的类都聚合到这个 MQTTReqFacade 外观类中,使得MQTTReq的生成可以更加方便的生成)
  • 这里可能有的朋友会想建造者模式。我这里不用是因为 我的MQTTTReq不会因为硬件的结构而发生太大的变化,只是 地址会变化,如果很多属性都会根据硬件的结构发生变化,那用建造者模式还蛮不错的。这里如果变化不是很大的话,我觉得没有太大的必要去用建造者模式,用了我反而还觉得可能有点浪费空间~~~。
  • 在这个外观类中,聚合了 redisIdWorker ,mqttReqProperty,addressHandlerContext。来分别装配MQTTReq。其实这里的优势我感觉已经体现了一点点,当MQTTReq的属性更多的时候,那门面模式的优势就很明显了。
  1. package com.xiancai.lora.MQTT.util.res;
  2. import com.xiancai.lora.MQTT.bean.MQTTReq;
  3. import com.xiancai.lora.MQTT.util.address.context.AddressHandlerContext;
  4. import com.xiancai.lora.MQTT.util.res.MQTTReqProperty;
  5. import com.xiancai.lora.utils.RedisIdWorker;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.Resource;
  8. import java.util.List;
  9. import java.util.Map;
  10. /**
  11. * 门面模式的外观类,用来装配一个MQTTReq
  12. */
  13. @Component
  14. public class MQTTReqFacade {
  15. /**
  16. * id生成器
  17. */
  18. @Resource
  19. private RedisIdWorker redisIdWorker;
  20. @Resource
  21. private MQTTReqProperty mqttReqProperty;
  22. /**
  23. * 生成对应的地址
  24. */
  25. @Resource
  26. private AddressHandlerContext addressHandlerContext;
  27. /**
  28. * 在这里构建的时候
  29. * @param data
  30. * @param symbol
  31. * @param commandId
  32. * @param type
  33. * @return
  34. */
  35. public MQTTReq combineMQTTReq(Object data,String symbol,Integer commandId,String type){
  36. //准备对应的addressHandler
  37. addressHandlerContext.produceAddressHandler(symbol);
  38. //解析对应的data
  39. Map<String, Object> dataMap = mqttReqProperty.parseData(data);
  40. if (dataMap.size()==0){
  41. dataMap.put("value",-1);
  42. }
  43. //生成对应的地址
  44. String address = addressHandlerContext.produceAddress((Integer) dataMap.remove("nodeId"));
  45. //生成消息id
  46. String messageId = redisIdWorker.nextId("command")+"";
  47. List<Integer> time = mqttReqProperty.parseTime();
  48. //生成对应的MQTTReq
  49. MQTTReq req = MQTTReq.builder().msgid(messageId)
  50. .command(commandId)
  51. .type(type).adr(address).data(dataMap).time(time).build();
  52. return req;
  53. }
  54. public Map<String,String> parseHardWareAddress(String address){
  55. return addressHandlerContext.parseHardWareAddress(address);
  56. }
  57. }

MQTTProcess

这个类是最重要的类,MQTT所有的操作都封装到这个类当中

客户端收到消息后,存到Redis中

在架构图中,我们也看到了客户端收到消息后,要存到Redis中,这个操作是在写MQTT客户端的回调函数中的
messageArrived函数中实现的。

  1. /**
  2. * 应用收到消息后触发的回调
  3. * @param topic
  4. * @param mqttMessage
  5. * @throws Exception
  6. */
  7. @Override
  8. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  9. String message = new String(mqttMessage.getPayload());
  10. String messageId = (String) JSONUtil.parseObj(message).get("msgid");
  11. log.info("订阅者订阅到了消息,topic={},messageid={},qos={},payload={}",
  12. topic,
  13. mqttMessage.getId(),
  14. mqttMessage.getQos(),
  15. message
  16. );
  17. stringRedisTemplate.opsForValue().set(MQTT_MESSAGE+messageId,message);
  18. }

具体的流程

这个也用到了门面模式(外观模式),每一个流程就不说了,就简单的梳理一下整体的流程

  1. 发消息:就是调用 MQTTReqFacade生成消息。然后发布,返回消息id
  2. 收消息:根据拿到的消息id,去redis中找对应的消息。

这里解释一下为什么要多加一层 Redis。一开始我也没加Redis,又开了一个独立线程单独去处理,但是最后还要返回给前端信息,如果发布完消息后就去返回给前端信息,如果硬件那边出现问题,对于用户来讲效果不是很好。所以这里就不是很适合单独开一个线程去处理。

  1. 解析拿到的硬件消息,这里硬件返回的rcmd 与发生的cmd一样,adr也是一样,这里为什么是根据硬件的消息去操作,是因为,硬件那里有一些命令是自动的(比如说自动上报数据之类的)。这个是不需要我们发送消息的。所以就统一用硬件的消息来处理
  2. 解析完消息后,操作数据库

最后我们把这几个流程封装一个方法中,对外界开放就好了。

  1. package com.xiancai.lora.MQTT.util.process;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.util.StrUtil;
  4. import cn.hutool.json.JSONUtil;
  5. import com.xiancai.lora.MQTT.bean.MQTTReq;
  6. import com.xiancai.lora.MQTT.bean.MQTTRes;
  7. import com.xiancai.lora.MQTT.client.EmqClient;
  8. import com.xiancai.lora.MQTT.publish.properties.MqttProperties;
  9. import com.xiancai.lora.MQTT.service.context.MQTTServiceContext;
  10. import com.xiancai.lora.MQTT.util.address.context.AddressHandlerContext;
  11. import com.xiancai.lora.MQTT.util.res.MQTTReqFacade;
  12. import com.xiancai.lora.enums.QosEnum;
  13. import com.xiancai.lora.exception.BusinessException;
  14. import com.xiancai.lora.service.CommandService;
  15. import com.xiancai.lora.utils.Result;
  16. import com.xiancai.lora.utils.wrong.check.CheckHardWareWrong;
  17. import org.springframework.data.redis.core.StringRedisTemplate;
  18. import org.springframework.stereotype.Component;
  19. import javax.annotation.Resource;
  20. import java.util.Map;
  21. import static com.xiancai.lora.constant.RedisConstants.MQTT_MESSAGE;
  22. import static com.xiancai.lora.enums.MQTT.MQTTReqType.MQTT_REQ_GET;
  23. /**
  24. * MQTT整体流程
  25. */
  26. @Component
  27. public class MQTTProcess {
  28. @Resource
  29. private MQTTReqFacade mqttReqFacade;
  30. @Resource
  31. private StringRedisTemplate stringRedisTemplate;
  32. @Resource
  33. private CommandService commandService;
  34. @Resource
  35. private CheckHardWareWrong checkHardWareWrong;
  36. @Resource
  37. private EmqClient emqClient;
  38. @Resource
  39. private MqttProperties mqttProperties;
  40. @Resource
  41. private MQTTServiceContext mqttServiceContext;
  42. /**
  43. * 准备MQTTReq
  44. *
  45. * @return
  46. */
  47. protected MQTTReq prepareMQTTReq(Object data, String symbol, Integer commandId, String type) {
  48. return mqttReqFacade.combineMQTTReq(data, symbol, commandId, type);
  49. }
  50. /**
  51. * 发布消息
  52. */
  53. public String publishMessage(Object data, String symbol, Integer commandId, String type) {
  54. //封装请求对象
  55. MQTTReq mqttReq = prepareMQTTReq(data, symbol, commandId, type);
  56. //发布消息
  57. emqClient.publish(mqttProperties.getWebTopic(), JSONUtil.toJsonPrettyStr(mqttReq), QosEnum.QOS0, false);
  58. return mqttReq.getMsgid();
  59. }
  60. /**
  61. * 接收消息
  62. */
  63. public String receiveMessage(String messageId) {
  64. int count = 1;
  65. while (count <= 5) {
  66. String jsonMessage = stringRedisTemplate.opsForValue().get(MQTT_MESSAGE + messageId);
  67. if (StrUtil.isNotBlank(jsonMessage)) return jsonMessage;
  68. try {
  69. //睡0.2秒再去查,不要查太频繁
  70. Thread.sleep(200);
  71. } catch (InterruptedException e) {
  72. throw new RuntimeException(e);
  73. }
  74. count++;
  75. }
  76. //5次还没得到消息,就直接返回null
  77. return null;
  78. }
  79. /**
  80. * 解析消息,判断错误,还是根据硬件的地址去找,因为后面会有数据自动上报,我们是不发消息的
  81. */
  82. public Map<String, Object> parseMessage(String jsonMessage) {
  83. //先检查传过来的信息是不是null
  84. checkHardWareWrong.checkJsonMessage(jsonMessage);
  85. //然后用json包将硬件发送的json信息转化为MQTTRes对象
  86. MQTTRes mqttRes = JSONUtil.toBean(jsonMessage, MQTTRes.class);
  87. //先判断res的状态
  88. String res = mqttRes.getRes();
  89. checkHardWareWrong.checkRes(mqttRes.getRes());
  90. //解析硬件传过来的地址,截取到的可能是id,ids,domain,所以我们用一个map标识一下
  91. Map<String, String> address = mqttReqFacade.parseHardWareAddress(mqttRes.getAdr());
  92. //获取硬件传来的信息
  93. Map<String, Object> data = mqttRes.getData();
  94. //获取硬件的命令,这里也是看硬件的,原因还是那个自动上报的那个东西
  95. Integer rcmd = mqttRes.getRcmd();
  96. String commandContent = commandService.getById(rcmd).getContent();
  97. checkHardWareWrong.checkString(commandContent, "未找到硬件传来的id为" + rcmd + "的命令");
  98. //将命令和地址都放进data中
  99. data.put("commandContent", commandContent);
  100. data.putAll(address);
  101. return data;
  102. }
  103. /**
  104. * 操作数据库
  105. */
  106. public Result executeDataBase(Map<String, Object> data) {
  107. //这里还是用一个remove方法,因为我们操作数据库是用不到这个键值对的
  108. return mqttServiceContext.executeService((String) data.remove("commandContent"), data);
  109. }
  110. /**
  111. * 总的MQTT的操作流程
  112. *
  113. * @param data
  114. * @param symbol
  115. * @param commandId
  116. * @return
  117. */
  118. public Result MQTTProcess(Object data, String symbol, Integer commandId, String type) {
  119. String messageId = publishMessage(data, symbol, commandId, type);
  120. String jsonMessage = receiveMessage(messageId);
  121. Map<String, Object> hardWareData = parseMessage(jsonMessage);
  122. return executeDataBase(hardWareData);
  123. }
  124. /**
  125. * 解析 data,无论是什么 DTO 对象,我们都只要把他转成Map就行了
  126. */
  127. public Map<String, Object> parseData(Object data) {
  128. Map<String, Object> stringObjectMap = BeanUtil.beanToMap(data);
  129. if (stringObjectMap.isEmpty()) {
  130. throw new BusinessException("ss", 123, "ss");
  131. }
  132. return stringObjectMap;
  133. }
  134. }

代理工厂

到这里就差去数据库中操作数据了。但是我们这里并没有直接用 service 层来处理,是因为我们后面的阶段(现在没有实现)获得数据后不可能会直接去处理数据的,还有对数据进行检查处理,或者还有进行一些统计之类的事情。而且未来的命令会会很多很多,如果直接把 sevice对象来处理可能会对命令的处理的效率有一定的影响或者说对空间的一些浪费等。现在因为没有在代理中做更多的事情,现在优势不是很明显,等后期甲方通过后,再加一些功能,应该就可以看出来一些优势了。

Cglib代理

这里的代理方式我们选择Cglib代理的方式。一般可能用那个 jdk的方式来实现,但是那个需要被代理对象再去实现一个接口,如果被代理对象太多,那实现的接口也要很多。所以这里我们选择用Cglib代理来实现
Cglib代理是生成被代理对象的子类来实现代理的。被代理的类也不用再额外实现接口,比较方便。

  1. package com.xiancai.lora.MQTT.service.proxy;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import com.xiancai.lora.service.NodeService;
  4. import com.xiancai.lora.utils.Result;
  5. import org.springframework.cglib.proxy.MethodInterceptor;
  6. import org.aopalliance.intercept.MethodInvocation;
  7. import org.springframework.cglib.proxy.*;
  8. import org.springframework.stereotype.Component;
  9. import java.lang.reflect.Method;
  10. /**
  11. * Cglib代理,通过生成一个被代理对象的子类实现代理效果
  12. */
  13. @Component
  14. public class ProxyFactory implements MethodInterceptor {
  15. //维护一个目标对象
  16. private Object target;
  17. //传入一个被代理的对象
  18. public void produceProxy(Object target){
  19. this.target=target;
  20. }
  21. //返回一个代理对象,是target对象的代理对象
  22. public Object getProxyInstance(){
  23. //1.创建一个工具类
  24. Enhancer enhancer = new Enhancer();
  25. //2.设置父类
  26. enhancer.setSuperclass(target.getClass());
  27. //3.设置回调函数
  28. enhancer.setCallback(this);
  29. //4.创建子类对象,即代理对象
  30. return enhancer.create();
  31. }
  32. public void produceProxyByName(String classPath){
  33. }
  34. @Override
  35. public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
  36. return method.invoke(target,objects);
  37. }
  38. }

实现流程:我们解析完硬件发过来的消息后,会解析到一个 Map 对象,而map对象中有一个 commandContent的键。对应的值就是具体的命令。我们再来回顾一下命令的组成

image.png

前面的node,是操作的对象,而后面的是具体的操作。所以这里我又用了一个策略模式,因为现在命令具体的操作对象要么是node,要么是module。所以我这里用了两个类来各自封装有关node的数据库操作,已经有关module的数据库操作。
这里还有一个原因不直接把操作nodeService或moduleService,是因为多重代理的问题,mybatis-plus本身也是用代理来实现的,具体的原理,最后的参考文章有。

  1. package com.xiancai.lora.MQTT.service.proxy;
  2. import com.xiancai.lora.service.ModuleService;
  3. import com.xiancai.lora.utils.Result;
  4. import org.springframework.stereotype.Component;
  5. import javax.annotation.Resource;
  6. import java.util.Map;
  7. @Component
  8. public class ProxyModuleService {
  9. /**
  10. * 拿到的命令中
  11. */
  12. @Resource
  13. private ModuleService moduleService;
  14. public Result poweron(Map<String,Object> data){
  15. return moduleService.poweron(data);
  16. }
  17. public Result poweroff(Map<String,Object> data){
  18. return moduleService.poweroff(data);
  19. }
  20. }
  1. package com.xiancai.lora.MQTT.service.proxy;
  2. import com.xiancai.lora.model.entity.Node;
  3. import com.xiancai.lora.service.NodeService;
  4. import com.xiancai.lora.utils.Result;
  5. import org.springframework.stereotype.Service;
  6. import javax.annotation.Resource;
  7. import java.io.Serializable;
  8. import java.util.Map;
  9. @Service
  10. public class ProxyNodeService {
  11. @Resource
  12. private NodeService nodeService;
  13. public Result init(Map<String,Object> data){
  14. return nodeService.init(data);
  15. }
  16. public Node getById(Serializable id){
  17. return nodeService.getById(id);
  18. }
  19. public Result setDataUploadInterval(Map<String,Object> data){
  20. return nodeService.setDataUploadInterval(data);
  21. }
  22. public Result restart(Map<String,Object> data){
  23. return nodeService.restart(data);
  24. }
  25. public Result getLocation(Map<String,Object> data){
  26. return nodeService.getLocation(data);
  27. }
  28. }

这两个类也算是策略了。那我们现在还需要一个类来根据不同的情况分配策略,但是这个类有点特殊,刚刚我们是只建了两个策略,没有为每一个命令单独建一个策略(如果为每一个命令单独建一个策略,那就方便多了,但是命令太多了,只是Demo阶段就又将近30个命令,如果后续还要添加命令,那就要创建非常多的类。)。那么如果用反射的话,我们这里不仅要得到对应的对象,我们还要得到对应的方法,恰好我们可以用命令的后半段来作为方法名。代码如下

  1. package com.xiancai.lora.MQTT.service.context;
  2. import com.xiancai.lora.MQTT.service.proxy.ProxyFactory;
  3. import com.xiancai.lora.MQTT.service.proxy.ProxyModuleService;
  4. import com.xiancai.lora.MQTT.service.proxy.ProxyNodeService;
  5. import com.xiancai.lora.utils.Result;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.Resource;
  8. import java.util.Map;
  9. @Component
  10. public class MQTTServiceContext {
  11. @Resource
  12. private ProxyFactory proxyFactory;
  13. @Resource
  14. private ProxyNodeService proxyNodeService;
  15. @Resource
  16. private ProxyModuleService proxyModuleService;
  17. //利用反射去操作数据库
  18. //先是通过反射去调用proxyFactory的对应方法
  19. public Result executeService(String classPath, Map<String,Object> data){
  20. //先进来判断是不是不用去处理库,如果不用就直接返回。
  21. boolean isNoRes = isNoRes(data);
  22. if (isNoRes) return Result.success(true);
  23. Class[] paramTypes={Map.class};
  24. Object[] params={data};
  25. String[] split = classPath.split("_");
  26. if(split[0].equals("node")){
  27. proxyFactory.produceProxy(proxyNodeService);
  28. ProxyNodeService proxyInstance = (ProxyNodeService) proxyFactory.getProxyInstance();
  29. return (Result) CallMethod.call(classPath, paramTypes, params, proxyInstance);
  30. }else {
  31. proxyFactory.produceProxy(proxyModuleService);
  32. ProxyModuleService proxyInstance = (ProxyModuleService) proxyFactory.getProxyInstance();
  33. return (Result) CallMethod.call(classPath, paramTypes, params, proxyInstance);
  34. }
  35. }
  36. private boolean isNoRes(Map<String, Object> data){
  37. if(data.size()==2&&data.containsKey("value")&&data.get("value").equals(-1)){
  38. return true;
  39. }
  40. return false;
  41. }
  42. }

CallMethod是根据命令,去执行对象的方法
这里我们是让传进来的对象调用方法,而不是用反射创建出的对象调用,还是因为反射创建出的对象没让
spring 管理。

  1. package com.xiancai.lora.MQTT.service.context;
  2. import cn.hutool.core.util.StrUtil;
  3. import com.xiancai.lora.utils.StringUtils;
  4. import java.lang.reflect.Method;
  5. import static com.xiancai.lora.constant.UsuStatus.REFLECT_PREFIX;
  6. import static com.xiancai.lora.constant.UsuStatus.REFLECT_SUFFIX;
  7. /**
  8. * 利用反射去调用方法
  9. */
  10. public class CallMethod {
  11. /**
  12. * 通过字符串串调用方法
  13. * @param classAndMethod 类名-方法名,通过此字符串调用类中的方法
  14. * @param paramTypes 方法类型列表(因为方法可能重载)
  15. * @param params 需要调用的方法的参数列表
  16. * @return
  17. */
  18. public static Object call(String classAndMethod,Class[] paramTypes,Object[] params,Object o){
  19. String[] args=classAndMethod.split("_",2);
  20. //要调用的类名
  21. String className=args[0];
  22. className=REFLECT_PREFIX+StringUtils.getMethodName(className)+REFLECT_SUFFIX;
  23. //要调用的方法名
  24. String method="";
  25. method=StrUtil.toCamelCase(args[1]);
  26. try {
  27. //加载类,参数是完整类名 //第一个参数是方法名,后面的参数指示方法的参数类型和个数
  28. Method newMethod=Class.forName(className).getMethod(method,paramTypes);
  29. //accessiable设为true表示忽略java语言访问检查(可访问private方法)
  30. //method.setAccessible(true);
  31. //第一个参数类实例(必须有对象才能调用非静态方法,如果是静态方法此参数可为null)
  32. //第二个是要传给方法的参数
  33. Object result=newMethod.invoke(o,params);
  34. return result;
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. return null;
  39. }
  40. }

那到这里整个流程就已经结束了,接下来的就是要对于具体的命令,数据库采取不同的措施了。

流程梳理

为了更好的看到效果,我们把刚才写好的代码具体的用一下

  1. 这是一个Controller层的一个方法,我们在这个方法中只需要调用MQTTProcess对象的MQTTProcess方法就好了,很简洁吧,而且也很好理解,把需要的东西给mqttProcess这个对象,他就可以帮你完成整个流程,而且还会直接返回前端要的通用返回对象

小型物联网项目Demo-Lora - 图9

  1. 在MQTTProcess这个方法中,流程也很简单,就是发消息,收消息,解析消息,操作数据库,逻辑非常的清晰

小型物联网项目Demo-Lora - 图10

  1. 发布消息就是调用 MQTTReqFacade 这个门面类来封装MQTTReq,然后发布消息小型物联网项目Demo-Lora - 图11
  2. 接收消息和解析消息由于都是同样的操作,就直接在MQTTProcess中写了小型物联网项目Demo-Lora - 图12小型物联网项目Demo-Lora - 图13
  3. 然后将处理好的数据传给 MQTTServiceContext 这个类让他自己根据数据去找对应的数据库操作处理。这样在这个MQTTProcess这个类中逻辑非常清晰,那个类干那个类的事,那个方法处理那些操作。都互不影响
  4. MQTTServiceContext 他就负责根据传过来的对象,选择代理对象,已经方法,最后让代理对象去处理小型物联网项目Demo-Lora - 图14

总结

这个小 Demo 呢,理论上来讲应该是可以应对大多数的比较简单的物联网项目了,只要把这个 Demo 中的硬件结构换成自己的,再根据硬件结构调整一下类的关系就好了。对于我个人而言,这也算是第一次用设计模式改良代码。当时就是感觉当成自己写的MQTT的逻辑很复杂,而且非常耦合,基本上只要有一小小的改动,就要调好多代码。毕竟,这也是企业的项目,自己也想做的好一些,让自己的代码更有价值,于是便开始有了这个 Demo

参考的文章

下面是我在遇到问题时参考的文章
通过反射调用方法: https://blog.csdn.net/csdn_ljh/article/details/51502567
解决反射生成对象,spring注入无效:https://blog.csdn.net/qq_30023773/article/details/81035617
多重代理的问题:https://blog.csdn.net/weixin_45839894/article/details/110921243
代理模式的对比:https://blog.csdn.net/weixin_43829047/article/details/113885861