前言
有群友提出疑问:active不起作用,明明断开了,却仍然显示true?
在webui->设备列表->设备详细信息->属性->服务端属性
可以看到每个设备的服务端属性都有下列五个:activeinactivityAlarmTimelastActivityTimelastConnectTimelastDisconnectTime
按照字面意思进行理解。active 可以作为设备是否在线的依据。
但细心的朋友发现,我设备断开连接了,active字段依然为True,active不起作用呀?
如果能再细心一点就会发现,设备断开,lastDisconnectTime字段就会立刻更新为断开时间戳。设备断开连接,tb平台是可以感知到的。
但是tb的active属性的设计并不是代表这层含义。
下面开始根据配置和源码分析active的判断逻辑。不想看分析过程也可以直接到文章最后看结论。
配置
thingsboard.yml 分别可以配置:
- 默认不活动超时时间/秒 600秒
- 默认状态检查间隔/秒 60秒
是否持久化到遥测
state:# 应该比transport.sessions.report_timeout大defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:32}"defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:10}"persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"
源码
DefaultDeviceStateService引入了上面的配置,并且开启了一个线程循环执行updateState方法@Service@TbCoreComponent@Slf4jpublic class DefaultDeviceStateService implements DeviceStateService {@Value("${state.defaultInactivityTimeoutInSec}")@Getterprivate long defaultInactivityTimeoutInSec;@Value("${state.defaultStateCheckIntervalInSec}")@Getterprivate int defaultStateCheckIntervalInSec;@Value("${state.persistToTelemetry:false}")@Getterprivate boolean persistToTelemetry;private ListeningScheduledExecutorService queueExecutor;@PostConstructpublic void init() {//由于没有锁,应该始终是单线程的。queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));/** 调用guava队列执行器,按照固定速率执行updateState方法.* 初始化延迟:小于等于defaultStateCheckIntervalInSec的随机数* 固定速率执行:defaultStateCheckIntervalInSec* 单位:秒*/queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);}....}
updateState方法 主要看注释部分 ```java private void updateState() { //当前时间 long ts = System.currentTimeMillis(); Set
deviceIds = new HashSet<>(deviceStates.keySet()); for (DeviceId deviceId : deviceIds) { //getOrFetchDeviceStateData方法中也有对active字段的赋值操作DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);if (stateData != null) {DeviceState state = stateData.getState();//active字段赋值//判断方法:当前时间 < (最后活动时间 + 不活动超时时间)state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {state.setLastInactivityAlarmTime(ts);pushRuleEngineMessage(stateData, INACTIVITY_EVENT);save(deviceId, INACTIVITY_ALARM_TIME, ts);save(deviceId, ACTIVITY_STATE, state.isActive());}} else {log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);deviceStates.remove(deviceId);deviceLastReportedActivity.remove(deviceId);deviceLastSavedActivity.remove(deviceId);}
} }
//设备状态定义
@Data
@Builder
public class DeviceState {
private boolean active;
private long lastConnectTime;
private long lastActivityTime;
private long lastDisconnectTime;
private long lastInactivityAlarmTime;
private long inactivityTimeout;
}
//获取或提取设备状态数据
private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
//获取设备状态
DeviceStateData deviceStateData = deviceStates.get(deviceId);
//获取不到则提取
if (deviceStateData == null) {
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device != null) {
try {
//提取设备状态
deviceStateData = fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData);
} catch (InterruptedException | ExecutionException e) {
log.debug(“[{}] Failed to fetch device state!”, deviceId, e);
}
}
}
return deviceStateData;
}
//提取设备状态
private ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
} else {
ListenableFuture
> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
}
}
//提取设备状态
private , DeviceStateData> extractDeviceStateData(Device device) {
return new Function
, DeviceStateData>() {
@Nullable
@Override
public DeviceStateData apply(@Nullable List
在`DeviceStateService`的每个事件方法中都会对active进行判断赋值```javapublic interface DeviceStateService extends ApplicationListener<PartitionChangeEvent> {void onDeviceAdded(Device device);void onDeviceUpdated(Device device);void onDeviceDeleted(Device device);void onDeviceConnect(DeviceId deviceId);void onDeviceActivity(DeviceId deviceId, long lastReportedActivityTime);void onDeviceDisconnect(DeviceId deviceId);void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes);}//代码篇幅长,有省略@Service@TbCoreComponent@Slf4jpublic class DefaultDeviceStateService implements DeviceStateService {@Overridepublic void onDeviceConnect(DeviceId deviceId) {DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);...}@Overridepublic void onDeviceActivity(DeviceId deviceId, long lastReportedActivity) {...if (!state.isActive()) {state.setActive(true);...}...}@Overridepublic void onDeviceDisconnect(DeviceId deviceId) {DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);...}@Overridepublic void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {...state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());...}@Overridepublic void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) {try {...if (proto.getDeleted()) {...} else {...if (device != null) {if (proto.getAdded()) {Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {...}, MoreExecutors.directExecutor());} else if (proto.getUpdated()) {DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());...}} else {...}}}}}
结论
active = 当前时间 < (最后活动时间 + 不活动超时时间)
默认active = 当前时间 < (最后活动时间 + 5分钟)
其他
下面内容感谢柴宁大佬。语雀主页https://www.yuque.com/chaining
/api/plugins/telemetry/{entityType}/{entityId}/values/attributes{?keys}
curl -X GET --header 'Accept: application/json' --header 'X-Authorization: bearer {替换为jwtToken}' 'http://localhost:8080/api/plugins/telemetry/DEVICE/f498d600-8c63-11eb-bcb2-a918c92ac538/values/attributes?keys=active'



