1.MQTT框架,长时间运行容易掉线,而且还重连不上 (Springboot+MQTT)?
原因:使用相同的clientID
方案:全局使用的clientID保证唯一性,可以采用UUID等方式
原因: 当前用户没有Topic权限
方案:如果配置有acl权限,则查询当前登录mqtt用户是否具有订阅对应topic的权限,无权时也会造成一直频繁断线重连
原因:在回调函数内进行业务处理遇到异常并没有捕获
方案:在可能出现异常的语句块,进行try-catch捕获
/**
* subscribe订阅后得到的消息会执行到这里
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
String msg = new String(message.getPayload());
try {
//此处可能因为收到的消息不合法,会造成JSON转化异常,若异常未捕获,会导致MQTT客户端掉线
JSONObject jsonObject = JSON.parseObject(msg);
String gwId = String.valueOf(jsonObject.get("gwId"));
} catch (JSONException e) {
log.error("JSON Format Parsing Exception : {}", msg);
}
}
jmqtt-master.zip
iot-mqtt-server-master.zip
2.es里的数据导入另一个es?
答:reindexm,es有tool的。
https://gitee.com/ichiva/mybatis-sync-es
核心代码
同步处理器接口
package com.gzwl.interceptor;
/**
* es 同步处理器
*/
public interface SynEsHandler {
void handler(Object parameter);
}
拦截器
package com.gzwl.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* es 同步拦截器
*
* 配置拦截器 @Signature
* 前置拦截器 type = Executor.class
* 只有写入才需要拦截 method = "update"
* 拦截方法的参数 args = {MappedStatement.class, Object.class}
*/
@Slf4j
@Component
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class SyncEsInterceptor implements Interceptor {
private Map<String,SynEsHandler> handlerMap = new HashMap<>();
/**
* 拦截器在sql执行成功后同步到es,
* 如果同步失败抛出异常,保证数据一致性
*
* @param invocation
* @return
* @throws Throwable
*/
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object res = invocation.proceed();
Object[] args = invocation.getArgs();
if (args.length >= 2) {
MappedStatement mappedStatement = (MappedStatement) args[0];
//拦截到的方法,也就是注册的key
String key = mappedStatement.getId();
log.debug("拦截到的方法 key=",key);
SynEsHandler synEsHandler = handlerMap.get(key);
if(null != synEsHandler){
try {
synEsHandler.handler(args[1]);
}catch (Exception e){
//包装异常
throw new SyncEsException(e);
}
}else {
log.debug("没有处理的key={}",key);
}
}
return res;
}
@Override
public Object plugin(Object o) {
if(o instanceof Executor) return Plugin.wrap(o, this);
else return o;
}
@Override
public void setProperties(Properties properties) {
}
/**
* 注册同步处理器
* @param key
* @param parameterHandler
*/
public void regHandler(String key,SynEsHandler parameterHandler){
handlerMap.put(key,parameterHandler);
}
}
实现处理接口
package com.gzwl.interceptor.impl;
import com.gzwl.entity.Employee;
import com.gzwl.interceptor.SynEsHandler;
import com.gzwl.interceptor.SyncEsInterceptor;
import org.apache.ibatis.binding.MapperMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
class EmployeeMapperHandler implements SynEsHandler {
@Autowired
public EmployeeMapperHandler(SyncEsInterceptor syncEsInterceptor){
//将自己注册到拦截器中,每个方法可用是独立的handler,也可用注册多个方法的处理器
syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.insert",this);
syncEsInterceptor.regHandler("com.gzwl.dao.EmployeeMapper.update",this);
}
@Override
public void handler(Object parameter) {
Employee entity;
if(parameter instanceof Employee){
//insert 可用直接拦截到实体类
entity = (Employee) parameter;
}else {
//update 方法需要特殊处理
entity = (Employee) ((MapperMethod.ParamMap) parameter).get("param1");
}
//在这里编写es的同步逻辑
}
}
完整代码:https://gitee.com/ichiva/mybatis-sync-es
注意:这个方案也是有致命缺陷的,就是在es执行完成后,程序还有可能让数据库回滚,从而造成数据不一致。