前言

有群友提出疑问:active不起作用,明明断开了,却仍然显示true?

在webui->设备列表->设备详细信息->属性->服务端属性
可以看到每个设备的服务端属性都有下列五个:
active
inactivityAlarmTime
lastActivityTime
lastConnectTime
lastDisconnectTime
按照字面意思进行理解。active 可以作为设备是否在线的依据。
但细心的朋友发现,我设备断开连接了,active字段依然为True,active不起作用呀?
如果能再细心一点就会发现,设备断开,lastDisconnectTime字段就会立刻更新为断开时间戳。设备断开连接,tb平台是可以感知到的。
但是tb的active属性的设计并不是代表这层含义。
下面开始根据配置和源码分析active的判断逻辑。不想看分析过程也可以直接到文章最后看结论。

配置

thingsboard.yml 分别可以配置:

  • 默认不活动超时时间/秒 600秒
  • 默认状态检查间隔/秒 60秒
  • 是否持久化到遥测

    1. state:
    2. # 应该比transport.sessions.report_timeout大
    3. defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:32}"
    4. defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:10}"
    5. persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"

    下面开始查看上面的配置是否和active属性有关

    源码

    DefaultDeviceStateService 引入了上面的配置,并且开启了一个线程循环执行updateState方法

    1. @Service
    2. @TbCoreComponent
    3. @Slf4j
    4. public class DefaultDeviceStateService implements DeviceStateService {
    5. @Value("${state.defaultInactivityTimeoutInSec}")
    6. @Getter
    7. private long defaultInactivityTimeoutInSec;
    8. @Value("${state.defaultStateCheckIntervalInSec}")
    9. @Getter
    10. private int defaultStateCheckIntervalInSec;
    11. @Value("${state.persistToTelemetry:false}")
    12. @Getter
    13. private boolean persistToTelemetry;
    14. private ListeningScheduledExecutorService queueExecutor;
    15. @PostConstruct
    16. public void init() {
    17. //由于没有锁,应该始终是单线程的。
    18. queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));
    19. /*
    20. * 调用guava队列执行器,按照固定速率执行updateState方法.
    21. * 初始化延迟:小于等于defaultStateCheckIntervalInSec的随机数
    22. * 固定速率执行:defaultStateCheckIntervalInSec
    23. * 单位:秒
    24. */
    25. queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
    26. }
    27. ....
    28. }

    updateState方法 主要看注释部分 ```java private void updateState() { //当前时间 long ts = System.currentTimeMillis(); Set deviceIds = new HashSet<>(deviceStates.keySet()); for (DeviceId deviceId : deviceIds) {

    1. //getOrFetchDeviceStateData方法中也有对active字段的赋值操作
    2. DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
    3. if (stateData != null) {
    4. DeviceState state = stateData.getState();
    5. //active字段赋值
    6. //判断方法:当前时间 < (最后活动时间 + 不活动超时时间)
    7. state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
    8. if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
    9. state.setLastInactivityAlarmTime(ts);
    10. pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
    11. save(deviceId, INACTIVITY_ALARM_TIME, ts);
    12. save(deviceId, ACTIVITY_STATE, state.isActive());
    13. }
    14. } else {
    15. log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
    16. deviceStates.remove(deviceId);
    17. deviceLastReportedActivity.remove(deviceId);
    18. deviceLastSavedActivity.remove(deviceId);
    19. }

    } }

//设备状态定义 @Data @Builder public class DeviceState { private boolean active; private long lastConnectTime; private long lastActivityTime; private long lastDisconnectTime; private long lastInactivityAlarmTime; private long inactivityTimeout; } //获取或提取设备状态数据 private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) { //获取设备状态 DeviceStateData deviceStateData = deviceStates.get(deviceId); //获取不到则提取 if (deviceStateData == null) { Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId); if (device != null) { try { //提取设备状态 deviceStateData = fetchDeviceState(device).get(); deviceStates.putIfAbsent(deviceId, deviceStateData); } catch (InterruptedException | ExecutionException e) { log.debug(“[{}] Failed to fetch device state!”, deviceId, e); } } } return deviceStateData; } //提取设备状态 private ListenableFuture fetchDeviceState(Device device) { //无论是否持久化到遥测数据,都会调用extractDeviceStateData提取设备状态数据的方法 if (persistToTelemetry) { ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor()); } else { ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor()); } }

//提取设备状态 private Function, DeviceStateData> extractDeviceStateData(Device device) { return new Function, DeviceStateData>() { @Nullable @Override public DeviceStateData apply(@Nullable List data) { try { long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L); long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L); long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)); //判断方法:当前时间 < (最后活动时间 + 不活动超时时间) boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout; DeviceState deviceState = DeviceState.builder() //active字段赋值 .active(active) .lastConnectTime(getEntryValue(data, LAST_CONNECT_TIME, 0L)) .lastDisconnectTime(getEntryValue(data, LAST_DISCONNECT_TIME, 0L)) .lastActivityTime(lastActivityTime) .lastInactivityAlarmTime(inactivityAlarmTime) .inactivityTimeout(inactivityTimeout) .build(); TbMsgMetaData md = new TbMsgMetaData(); md.putValue(“deviceName”, device.getName()); md.putValue(“deviceType”, device.getType()); return DeviceStateData.builder() .tenantId(device.getTenantId()) .deviceId(device.getId()) .deviceCreationTime(device.getCreatedTime()) .metaData(md) .state(deviceState).build(); } catch (Exception e) { log.warn(“[{}] Failed to fetch device state data”, device.getId(), e); throw new RuntimeException(e); } } }; }

  1. `DeviceStateService`的每个事件方法中都会对active进行判断赋值
  2. ```java
  3. public interface DeviceStateService extends ApplicationListener<PartitionChangeEvent> {
  4. void onDeviceAdded(Device device);
  5. void onDeviceUpdated(Device device);
  6. void onDeviceDeleted(Device device);
  7. void onDeviceConnect(DeviceId deviceId);
  8. void onDeviceActivity(DeviceId deviceId, long lastReportedActivityTime);
  9. void onDeviceDisconnect(DeviceId deviceId);
  10. void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
  11. void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes);
  12. }
  13. //代码篇幅长,有省略
  14. @Service
  15. @TbCoreComponent
  16. @Slf4j
  17. public class DefaultDeviceStateService implements DeviceStateService {
  18. @Override
  19. public void onDeviceConnect(DeviceId deviceId) {
  20. DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
  21. ...
  22. }
  23. @Override
  24. public void onDeviceActivity(DeviceId deviceId, long lastReportedActivity) {
  25. ...
  26. if (!state.isActive()) {
  27. state.setActive(true);
  28. ...
  29. }
  30. ...
  31. }
  32. @Override
  33. public void onDeviceDisconnect(DeviceId deviceId) {
  34. DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
  35. ...
  36. }
  37. @Override
  38. public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
  39. ...
  40. state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
  41. ...
  42. }
  43. @Override
  44. public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) {
  45. try {
  46. ...
  47. if (proto.getDeleted()) {
  48. ...
  49. } else {
  50. ...
  51. if (device != null) {
  52. if (proto.getAdded()) {
  53. Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
  54. ...
  55. }, MoreExecutors.directExecutor());
  56. } else if (proto.getUpdated()) {
  57. DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
  58. ...
  59. }
  60. } else {
  61. ...
  62. }
  63. }
  64. }
  65. }
  66. }

结论

active = 当前时间 < (最后活动时间 + 不活动超时时间)
默认active = 当前时间 < (最后活动时间 + 5分钟)

其他

下面内容感谢柴宁大佬。语雀主页https://www.yuque.com/chaining

Rest API获取单个设备的活动属性active

/api/plugins/telemetry/{entityType}/{entityId}/values/attributes{?keys}

  1. curl -X GET --header 'Accept: application/json' --header 'X-Authorization: bearer {替换为jwtToken}' 'http://localhost:8080/api/plugins/telemetry/DEVICE/f498d600-8c63-11eb-bcb2-a918c92ac538/values/attributes?keys=active'

image.png
image.png
复仇者联盟点赞.gif