前言
Constrained Application Protocol(CoAP)是用于受约束设备的专用Internet应用程序协议,如RFC 7275所定义。它使那些称为“节点”的受约束设备能够使用类似协议与更广泛的Internet通信。CoAP被设计用于在相同约束网络(例如,低功率,有损网络)上的设备之间,设备与Internet上的一般节点之间以及在都通过Internet连接的不同约束网络上的设备之间使用。CoAP也正在通过其他机制使用,例如移动通信网络上的SMS。
下图是近几年的CoAP关注区域热力图:
具体表格如下:
自2015年5月15号到2020年5月15日统计值
关注区域 | 关注值 |
---|---|
中国 | 100 |
韩国 | 37 |
印度 | 35 |
关联模块一览
和CoAP设备传输协议关联的模块有Thingsboard CoAP Transport Service
、Thingsboard CoAP Transport Common
和Thingsboard Server Queue components
。前面这些名称大家可以看IDEA maven模块名称。
CoAP Transport Service
CoAP Transport Common
Server Queue Components
CoAP Transport Service
org.thingsboard.server.coap.ThingsboardCoapTransportApplication
, CoAP服务启动类,使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"})
public class ThingsboardCoapTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-coap-transport";
public static void main(String[] args) {
SpringApplication.run(ThingsboardCoapTransportApplication.class, updateArguments(args));
}
private static String[] updateArguments(String[] args) {
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
String[] modifiedArgs = new String[args.length + 1];
System.arraycopy(args, 0, modifiedArgs, 0, args.length);
modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
return modifiedArgs;
}
return args;
}
}Copy to clipboardErrorCopied
第2-3行代码中,@EnableAsync
注解使用来开启异步线程,@EnableScheduling
注解使用来开启定时任务。
第4行代码@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"})
: 扫描这些包下的所有使用@Component
的类,不管自动导入还是导出。
第7-8行代码和updateArguments
的作用是:启动时,使用 –spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。
CoAP Transport Common
Californium框架
GitHub项目地址: https://github.com/eclipse/californium
官网介绍: https://www.eclipse.org/californium/
- 基于Java的CoAP框架
Californium是一个强大的CoAP框架,目标是后端服务与小型物联网设备进行通信,当然大型物联网设备也适宜。它为RESTful Web服务提供了一个更方便的API,支持CoAP的所有特性。
- 标准兼容
Californium已经运行了IETF的代码,并通过了所有ETSI插件测试规范。
- 云原生
Californium具有可伸缩性的体系结构,并且性能优于高性能的HTTP服务器。CoAP的低开销允许使用一个服务实例处理数百万个IoT设备。灵活的并发模型允许实现最适合您的应用程序的任何东西。
引入依赖
CoAP Transport common通过引入californium 1.0.2版本的jar包对CoAP进行协议逻辑实现。
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
<version>1.0.2</version>
</dependency>Copy to clipboardErrorCopied
参数配置
transport:
# 本地CoAP传输协议参数
coap:
# 开启/关闭CoAP传输协议.
enabled: "${COAP_ENABLED:true}"
# 绑定地址
bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
# 绑定端口号
bind_port: "${COAP_BIND_PORT:5683}"
# 超时时间
timeout: "${COAP_TIMEOUT:10000}"Copy to clipboardErrorCopied
模块目录结构
首先我们看该模块下的目录结构:
.
└── java
└── org
└── thingsboard
└── server
└── transport
└── coap
├── CoapTransportContext.java CoAP传输协议上下文
├── CoapTransportResource.java CoAP传输协议资源类
├── CoapTransportService.java CoAP传输协议启动类
├── adaptors
│ ├── CoapTransportAdaptor.java CoAP协议传输适配器
│ └── JsonCoapAdaptor.java CoAP传输内容Json适配器
└── client
└── DeviceEmulator.java 设备仿真器
Copy to clipboardErrorCopied
CoAP传输协议逻辑实现
CoapTransportContext
@Slf4j
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.coap.enabled}'=='true')")
@Component
public class CoapTransportContext extends TransportContext {
@Getter
@Value("${transport.coap.bind_address}")
private String host;
@Getter
@Value("${transport.coap.bind_port}")
private Integer port;
@Getter
@Value("${transport.coap.timeout}")
private Long timeout;
@Getter
@Autowired
private CoapTransportAdaptor adaptor;
}Copy to clipboardErrorCopied
通过@Value
注解将配置文件中的transport.coap.bind_address
、transport.coap.bind_port
、transport.coap.timeout
取出来。
CoapTransportResource
@Service("CoapTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.coap.enabled}'=='true')")
@Slf4j
public class CoapTransportService {
private static final String V1 = "v1";
private static final String API = "api";
@Autowired
private CoapTransportContext coapTransportContext;
private CoapServer server;
@PostConstruct
public void init() throws UnknownHostException {
log.info("Starting CoAP transport...");
log.info("Starting CoAP transport server");
//初始化CoAP Server服务
this.server = new CoapServer();
//创建CoAP资源服务
createResources();
//绑定地址和端口号
InetAddress addr = InetAddress.getByName(coapTransportContext.getHost());
InetSocketAddress sockAddr = new InetSocketAddress(addr, coapTransportContext.getPort());
//为CoAP添加节点
server.addEndpoint(new CoapEndpoint(sockAddr));
//启动服务
server.start();
log.info("CoAP transport started!");
}
private void createResources() {
//创建CoAP资源
CoapResource api = new CoapResource(API);
api.add(new CoapTransportResource(coapTransportContext, V1));
server.add(api);
}
@PreDestroy
public void shutdown() {
log.info("Stopping CoAP transport!");
//优雅关闭CoAP服务
this.server.destroy();
log.info("CoAP transport stopped!");
}
}Copy to clipboardErrorCopied
第9-10行代码,通过@Autowired
注解将刚才写的CoAPTransportContext注入进来。
第12行代码,通过Californium创建CoAP Server服务。
第14-25行代码中逻辑依次为:
- 初始化CoAP Server服务
- 创建CoAP资源服务
- 绑定地址和端口号
- 为CoAP添加节点
- 启动服务
第39-45行代码中通过@PreDestroy
优雅的关闭CoAP服务。
CoapTransportResource
处理GET资源
@Override
public void handleGET(CoapExchange exchange) {
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
if (!featureType.isPresent()) {
log.trace("Missing feature type parameter");
exchange.respond(ResponseCode.BAD_REQUEST);
} else if (featureType.get() == FeatureType.TELEMETRY) {
log.trace("Can't fetch/subscribe to timeseries updates");
exchange.respond(ResponseCode.BAD_REQUEST);
} else if (exchange.getRequestOptions().hasObserve()) {
processExchangeGetRequest(exchange, featureType.get());
} else if (featureType.get() == FeatureType.ATTRIBUTES) {
processRequest(exchange, SessionMsgType.GET_ATTRIBUTES_REQUEST);
} else {
log.trace("Invalid feature type parameter");
exchange.respond(ResponseCode.BAD_REQUEST);
}
}Copy to clipboardErrorCopied
第3行代码中,通过 getFeatureType(Request request)
获取Uri路径段字符串的第4个值,例如api/v1/123/telemetry
这个Uri路径段,取的就是telemetry这个值的大写。
private static final int FEATURE_TYPE_POSITION = 4;
private Optional<FeatureType> getFeatureType(Request request) {
//返回Uri路径段字符串的列表
List<String> uriPath = request.getOptions().getUriPath();
try {
//判断Uri路径段长度大小
if (uriPath.size() >= FEATURE_TYPE_POSITION) {
return Optional.of(FeatureType.valueOf(uriPath.get(FEATURE_TYPE_POSITION - 1).toUpperCase()));
}
} catch (RuntimeException e) {
log.warn("Failed to decode feature type: {}", uriPath);
}
return Optional.empty();
}Copy to clipboardErrorCopied
第4-9行代码中,如果featureType这个枚举为空、TELEMETRY和其他类型的错误的话,则返回128错误码。
第10-11行代码中,检查是否存在Observe选。如果是的话,进入processExchangeGetRequest(exchange, featureType.get())
方法。
第12-13行代码中,如果featrueType是ATTRIBUTES,进入processRequest(CoapExchange exchange, SessionMsgType type)
方法。
处理POST资源
@Override
public void handlePOST(CoapExchange exchange) {
//获取POST请求Uri路径的第四个请求参数。
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
//如果请求参数为空时,记录日志并回复错误码。
if (!featureType.isPresent()) {
log.trace("Missing feature type parameter");
exchange.respond(ResponseCode.BAD_REQUEST);
} else {
switch (featureType.get()) {
case ATTRIBUTES:
processRequest(exchange, SessionMsgType.POST_ATTRIBUTES_REQUEST);
break;
case TELEMETRY:
processRequest(exchange, SessionMsgType.POST_TELEMETRY_REQUEST);
break;
case RPC:
Optional<Integer> requestId = getRequestId(exchange.advanced().getRequest());
if (requestId.isPresent()) {
processRequest(exchange, SessionMsgType.TO_DEVICE_RPC_RESPONSE);
} else {
processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST);
}
break;
case CLAIM:
processRequest(exchange, SessionMsgType.CLAIM_REQUEST);
break;
}
}
}Copy to clipboardErrorCopied
第4-8行代码,获取POST请求Uri路径的第四个请求参数。如果请求参数为空时,记录日志并回复错误码。
第10-27行代码中,判断请求参数的类型
- ATTRIBUTES(属性) :
processRequest(exchange, SessionMsgType.POST_ATTRIBUTES_REQUEST)
- TELEMETRY(遥测数据) :
processRequest(exchange, SessionMsgType.POST_TELEMETRY_REQUEST)
- RPC(远程调用)
- 服务端RPC : 如果Uri路径带有请求标识符,则
processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST)
- 客户端RPC:
processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST)
- 服务端RPC : 如果Uri路径带有请求标识符,则
- CLAIM(声明) :
processRequest(exchange, SessionMsgType.CLAIM_REQUEST)
processRequest(CoapExchange exchange, SessionMsgType type)
private void processRequest(CoapExchange exchange, SessionMsgType type) {
//记录请求的相关信息
log.trace("Processing {}", exchange.advanced().getRequest());
exchange.accept();
Exchange advanced = exchange.advanced();
Request request = advanced.getRequest();
//获取设备的AccessToken数据
Optional<DeviceTokenCredentials> credentials = decodeCredentials(request);
//如果设备AccessToken为空,则返回错误码
if (!credentials.isPresent()) {
exchange.respond(ResponseCode.BAD_REQUEST);
return;
}
transportService.process(TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(credentials.get().getCredentialsId()).build(),
new DeviceAuthCallback(transportContext, exchange, sessionInfo -> {
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
try {
//判断逻辑,并进入相应的处理类中去
switch (type) {
case POST_ATTRIBUTES_REQUEST:
...
//上报设备活跃
reportActivity(sessionId, sessionInfo);
break;
case POST_TELEMETRY_REQUEST:
...
reportActivity(sessionId, sessionInfo);
break;
case CLAIM_REQUEST:
...
case SUBSCRIBE_ATTRIBUTES_REQUEST:
...
break;
case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
...
break;
case SUBSCRIBE_RPC_COMMANDS_REQUEST:
...
break;
case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
...
break;
case TO_DEVICE_RPC_RESPONSE:
...
break;
case TO_SERVER_RPC_REQUEST:
...
break;
case GET_ATTRIBUTES_REQUEST:
...
break;
}
} catch (AdaptorException e) {
log.trace("[{}] Failed to decode message: ", sessionId, e);
exchange.respond(ResponseCode.BAD_REQUEST);
} catch (IllegalAccessException e) {
log.trace("[{}] Failed to process message: ", sessionId, e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);
}
}));
}Copy to clipboardErrorCopied
第4-13行代码中,获取设备的AccessToken数据,如果设备AccessToken为空,则返回错误码。
第20-60行代码中,判断设备请求参数的逻辑,并进入相应的处理类中去。
第24行代码,通过reportActivity
上报设备的活跃数据。