1.MQTT框架,长时间运行容易掉线,而且还重连不上 (Springboot+MQTT)?

原因:使用相同的clientID
方案:全局使用的clientID保证唯一性,可以采用UUID等方式
原因: 当前用户没有Topic权限
方案:如果配置有acl权限,则查询当前登录mqtt用户是否具有订阅对应topic的权限,无权时也会造成一直频繁断线重连
原因:在回调函数内进行业务处理遇到异常并没有捕获
方案:在可能出现异常的语句块,进行try-catch捕获

  1. /**
  2. * subscribe订阅后得到的消息会执行到这里
  3. */
  4. @Override
  5. public void messageArrived(String topic, MqttMessage message) {
  6. String msg = new String(message.getPayload());
  7. try {
  8. //此处可能因为收到的消息不合法,会造成JSON转化异常,若异常未捕获,会导致MQTT客户端掉线
  9. JSONObject jsonObject = JSON.parseObject(msg);
  10. String gwId = String.valueOf(jsonObject.get("gwId"));
  11. } catch (JSONException e) {
  12. log.error("JSON Format Parsing Exception : {}", msg);
  13. }
  14. }

jmqtt-master.zip
iot-mqtt-server-master.zip

2.es里的数据导入另一个es?
答:reindexm,es有tool的。

https://gitee.com/ichiva/mybatis-sync-es

核心代码

同步处理器接口

  1. package com.gzwl.interceptor;
  2. /**
  3. * es 同步处理器
  4. */
  5. public interface SynEsHandler {
  6. void handler(Object parameter);
  7. }

拦截器

  1. package com.gzwl.interceptor;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.ibatis.executor.Executor;
  4. import org.apache.ibatis.mapping.MappedStatement;
  5. import org.apache.ibatis.plugin.*;
  6. import org.springframework.stereotype.Component;
  7. import java.util.*;
  8. /**
  9. * es 同步拦截器
  10. *
  11. * 配置拦截器 @Signature
  12. * 前置拦截器 type = Executor.class
  13. * 只有写入才需要拦截 method = "update"
  14. * 拦截方法的参数 args = {MappedStatement.class, Object.class}
  15. */
  16. @Slf4j
  17. @Component
  18. @Intercepts({
  19. @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
  20. public class SyncEsInterceptor implements Interceptor {
  21. private Map<String,SynEsHandler> handlerMap = new HashMap<>();
  22. /**
  23. * 拦截器在sql执行成功后同步到es,
  24. * 如果同步失败抛出异常,保证数据一致性
  25. *
  26. * @param invocation
  27. * @return
  28. * @throws Throwable
  29. */
  30. @Override
  31. public Object intercept(Invocation invocation) throws Throwable {
  32. Object res = invocation.proceed();
  33. Object[] args = invocation.getArgs();
  34. if (args.length >= 2) {
  35. MappedStatement mappedStatement = (MappedStatement) args[0];
  36. //拦截到的方法,也就是注册的key
  37. String key = mappedStatement.getId();
  38. log.debug("拦截到的方法 key=",key);
  39. SynEsHandler synEsHandler = handlerMap.get(key);
  40. if(null != synEsHandler){
  41. try {
  42. synEsHandler.handler(args[1]);
  43. }catch (Exception e){
  44. //包装异常
  45. throw new SyncEsException(e);
  46. }
  47. }else {
  48. log.debug("没有处理的key={}",key);
  49. }
  50. }
  51. return res;
  52. }
  53. @Override
  54. public Object plugin(Object o) {
  55. if(o instanceof Executor) return Plugin.wrap(o, this);
  56. else return o;
  57. }
  58. @Override
  59. public void setProperties(Properties properties) {
  60. }
  61. /**
  62. * 注册同步处理器
  63. * @param key
  64. * @param parameterHandler
  65. */
  66. public void regHandler(String key,SynEsHandler parameterHandler){
  67. handlerMap.put(key,parameterHandler);
  68. }
  69. }

实现处理接口

  1. package com.gzwl.interceptor.impl;
  2. import com.gzwl.entity.Employee;
  3. import com.gzwl.interceptor.SynEsHandler;
  4. import com.gzwl.interceptor.SyncEsInterceptor;
  5. import org.apache.ibatis.binding.MapperMethod;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. class EmployeeMapperHandler implements SynEsHandler {
  10. @Autowired
  11. public EmployeeMapperHandler(SyncEsInterceptor syncEsInterceptor){
  12. //将自己注册到拦截器中,每个方法可用是独立的handler,也可用注册多个方法的处理器
  13. syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.insert",this);
  14. syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.update",this);
  15. }
  16. @Override
  17. public void handler(Object parameter) {
  18. Employee entity;
  19. if(parameter instanceof Employee){
  20. //insert 可用直接拦截到实体类
  21. entity = (Employee) parameter;
  22. }else {
  23. //update 方法需要特殊处理
  24. entity = (Employee) ((MapperMethod.ParamMap) parameter).get("param1");
  25. }
  26. //在这里编写es的同步逻辑
  27. }
  28. }

完整代码:https://gitee.com/ichiva/mybatis-sync-es
注意:这个方案也是有致命缺陷的,就是在es执行完成后,程序还有可能让数据库回滚,从而造成数据不一致。