前言
有群友提出疑问:active不起作用,明明断开了,却仍然显示true?
在webui->设备列表->设备详细信息->属性->服务端属性
可以看到每个设备的服务端属性都有下列五个:active
inactivityAlarmTime
lastActivityTime
lastConnectTime
lastDisconnectTime
按照字面意思进行理解。active
可以作为设备是否在线的依据。
但细心的朋友发现,我设备断开连接了,active字段依然为True,active不起作用呀?
如果能再细心一点就会发现,设备断开,lastDisconnectTime
字段就会立刻更新为断开时间戳。设备断开连接,tb平台是可以感知到的。
但是tb的active属性的设计并不是代表这层含义。
下面开始根据配置和源码分析active的判断逻辑。不想看分析过程也可以直接到文章最后看结论。
配置
thingsboard.yml
分别可以配置:
- 默认不活动超时时间/秒 600秒
- 默认状态检查间隔/秒 60秒
是否持久化到遥测
state:
# 应该比transport.sessions.report_timeout大
defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:32}"
defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:10}"
persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"
源码
DefaultDeviceStateService
引入了上面的配置,并且开启了一个线程循环执行updateState方法@Service
@TbCoreComponent
@Slf4j
public class DefaultDeviceStateService implements DeviceStateService {
@Value("${state.defaultInactivityTimeoutInSec}")
@Getter
private long defaultInactivityTimeoutInSec;
@Value("${state.defaultStateCheckIntervalInSec}")
@Getter
private int defaultStateCheckIntervalInSec;
@Value("${state.persistToTelemetry:false}")
@Getter
private boolean persistToTelemetry;
private ListeningScheduledExecutorService queueExecutor;
@PostConstruct
public void init() {
//由于没有锁,应该始终是单线程的。
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));
/*
* 调用guava队列执行器,按照固定速率执行updateState方法.
* 初始化延迟:小于等于defaultStateCheckIntervalInSec的随机数
* 固定速率执行:defaultStateCheckIntervalInSec
* 单位:秒
*/
queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
}
....
}
updateState方法 主要看注释部分 ```java private void updateState() { //当前时间 long ts = System.currentTimeMillis(); Set
deviceIds = new HashSet<>(deviceStates.keySet()); for (DeviceId deviceId : deviceIds) { //getOrFetchDeviceStateData方法中也有对active字段的赋值操作
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
//active字段赋值
//判断方法:当前时间 < (最后活动时间 + 不活动超时时间)
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
save(deviceId, ACTIVITY_STATE, state.isActive());
}
} else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
deviceStates.remove(deviceId);
deviceLastReportedActivity.remove(deviceId);
deviceLastSavedActivity.remove(deviceId);
}
} }
//设备状态定义
@Data
@Builder
public class DeviceState {
private boolean active;
private long lastConnectTime;
private long lastActivityTime;
private long lastDisconnectTime;
private long lastInactivityAlarmTime;
private long inactivityTimeout;
}
//获取或提取设备状态数据
private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
//获取设备状态
DeviceStateData deviceStateData = deviceStates.get(deviceId);
//获取不到则提取
if (deviceStateData == null) {
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device != null) {
try {
//提取设备状态
deviceStateData = fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData);
} catch (InterruptedException | ExecutionException e) {
log.debug(“[{}] Failed to fetch device state!”, deviceId, e);
}
}
}
return deviceStateData;
}
//提取设备状态
private ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
} else {
ListenableFuture
> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
}
}
//提取设备状态
private , DeviceStateData> extractDeviceStateData(Device device) {
return new Function
, DeviceStateData>() {
@Nullable
@Override
public DeviceStateData apply(@Nullable List
在`DeviceStateService`的每个事件方法中都会对active进行判断赋值
```java
public interface DeviceStateService extends ApplicationListener<PartitionChangeEvent> {
void onDeviceAdded(Device device);
void onDeviceUpdated(Device device);
void onDeviceDeleted(Device device);
void onDeviceConnect(DeviceId deviceId);
void onDeviceActivity(DeviceId deviceId, long lastReportedActivityTime);
void onDeviceDisconnect(DeviceId deviceId);
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes);
}
//代码篇幅长,有省略
@Service
@TbCoreComponent
@Slf4j
public class DefaultDeviceStateService implements DeviceStateService {
@Override
public void onDeviceConnect(DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
...
}
@Override
public void onDeviceActivity(DeviceId deviceId, long lastReportedActivity) {
...
if (!state.isActive()) {
state.setActive(true);
...
}
...
}
@Override
public void onDeviceDisconnect(DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
...
}
@Override
public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
...
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
...
}
@Override
public void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback callback) {
try {
...
if (proto.getDeleted()) {
...
} else {
...
if (device != null) {
if (proto.getAdded()) {
Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
...
}, MoreExecutors.directExecutor());
} else if (proto.getUpdated()) {
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
...
}
} else {
...
}
}
}
}
}
结论
active = 当前时间 < (最后活动时间 + 不活动超时时间)
默认active = 当前时间 < (最后活动时间 + 5分钟)
其他
下面内容感谢柴宁大佬。语雀主页https://www.yuque.com/chaining
/api/plugins/telemetry/{entityType}/{entityId}/values/attributes{?keys}
curl -X GET --header 'Accept: application/json' --header 'X-Authorization: bearer {替换为jwtToken}' 'http://localhost:8080/api/plugins/telemetry/DEVICE/f498d600-8c63-11eb-bcb2-a918c92ac538/values/attributes?keys=active'