前言

Constrained Application Protocol(CoAP)是用于受约束设备的专用Internet应用程序协议,如RFC 7275所定义。它使那些称为“节点”的受约束设备能够使用类似协议与更广泛的Internet通信。CoAP被设计用于在相同约束网络(例如,低功率,有损网络)上的设备之间,设备与Internet上的一般节点之间以及在都通过Internet连接的不同约束网络上的设备之间使用。CoAP也正在通过其他机制使用,例如移动通信网络上的SMS。
Thingsboard源码分析-CoAP连接处理 - 图1
下图是近几年的CoAP关注区域热力图:
Thingsboard源码分析-CoAP连接处理 - 图2
具体表格如下:

自2015年5月15号到2020年5月15日统计值

关注区域 关注值
中国 100
韩国 37
印度 35

关联模块一览

和CoAP设备传输协议关联的模块有Thingsboard CoAP Transport ServiceThingsboard CoAP Transport CommonThingsboard Server Queue components。前面这些名称大家可以看IDEA maven模块名称。
CoAP Transport Service
Thingsboard源码分析-CoAP连接处理 - 图3
CoAP Transport Common
Thingsboard源码分析-CoAP连接处理 - 图4
Server Queue Components
Thingsboard源码分析-CoAP连接处理 - 图5

CoAP Transport Service

org.thingsboard.server.coap.ThingsboardCoapTransportApplication, CoAP服务启动类,使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。

  1. @SpringBootConfiguration
  2. @EnableAsync
  3. @EnableScheduling
  4. @ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"})
  5. public class ThingsboardCoapTransportApplication {
  6. private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
  7. private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-coap-transport";
  8. public static void main(String[] args) {
  9. SpringApplication.run(ThingsboardCoapTransportApplication.class, updateArguments(args));
  10. }
  11. private static String[] updateArguments(String[] args) {
  12. if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
  13. String[] modifiedArgs = new String[args.length + 1];
  14. System.arraycopy(args, 0, modifiedArgs, 0, args.length);
  15. modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
  16. return modifiedArgs;
  17. }
  18. return args;
  19. }
  20. }Copy to clipboardErrorCopied

第2-3行代码中,@EnableAsync注解使用来开启异步线程,@EnableScheduling注解使用来开启定时任务。
第4行代码@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"}): 扫描这些包下的所有使用@Component 的类,不管自动导入还是导出。
第7-8行代码和updateArguments的作用是:启动时,使用 –spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。

CoAP Transport Common

Californium框架

GitHub项目地址: https://github.com/eclipse/californium
官网介绍: https://www.eclipse.org/californium/

  • 基于Java的CoAP框架

Californium是一个强大的CoAP框架,目标是后端服务与小型物联网设备进行通信,当然大型物联网设备也适宜。它为RESTful Web服务提供了一个更方便的API,支持CoAP的所有特性。

  • 标准兼容

Californium已经运行了IETF的代码,并通过了所有ETSI插件测试规范。

  • 云原生

Californium具有可伸缩性的体系结构,并且性能优于高性能的HTTP服务器。CoAP的低开销允许使用一个服务实例处理数百万个IoT设备。灵活的并发模型允许实现最适合您的应用程序的任何东西。

引入依赖

CoAP Transport common通过引入californium 1.0.2版本的jar包对CoAP进行协议逻辑实现。

  1. <dependency>
  2. <groupId>org.eclipse.californium</groupId>
  3. <artifactId>californium-core</artifactId>
  4. <version>1.0.2</version>
  5. </dependency>Copy to clipboardErrorCopied

参数配置

  1. transport:
  2. # 本地CoAP传输协议参数
  3. coap:
  4. # 开启/关闭CoAP传输协议.
  5. enabled: "${COAP_ENABLED:true}"
  6. # 绑定地址
  7. bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}"
  8. # 绑定端口号
  9. bind_port: "${COAP_BIND_PORT:5683}"
  10. # 超时时间
  11. timeout: "${COAP_TIMEOUT:10000}"Copy to clipboardErrorCopied

模块目录结构

首先我们看该模块下的目录结构:

  1. .
  2. └── java
  3. └── org
  4. └── thingsboard
  5. └── server
  6. └── transport
  7. └── coap
  8. ├── CoapTransportContext.java CoAP传输协议上下文
  9. ├── CoapTransportResource.java CoAP传输协议资源类
  10. ├── CoapTransportService.java CoAP传输协议启动类
  11. ├── adaptors
  12. ├── CoapTransportAdaptor.java CoAP协议传输适配器
  13. └── JsonCoapAdaptor.java CoAP传输内容Json适配器
  14. └── client
  15. └── DeviceEmulator.java 设备仿真器
  16. Copy to clipboardErrorCopied

CoAP传输协议逻辑实现

CoapTransportContext

  1. @Slf4j
  2. @ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.coap.enabled}'=='true')")
  3. @Component
  4. public class CoapTransportContext extends TransportContext {
  5. @Getter
  6. @Value("${transport.coap.bind_address}")
  7. private String host;
  8. @Getter
  9. @Value("${transport.coap.bind_port}")
  10. private Integer port;
  11. @Getter
  12. @Value("${transport.coap.timeout}")
  13. private Long timeout;
  14. @Getter
  15. @Autowired
  16. private CoapTransportAdaptor adaptor;
  17. }Copy to clipboardErrorCopied

通过@Value注解将配置文件中的transport.coap.bind_addresstransport.coap.bind_porttransport.coap.timeout取出来。

CoapTransportResource

  1. @Service("CoapTransportService")
  2. @ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.coap.enabled}'=='true')")
  3. @Slf4j
  4. public class CoapTransportService {
  5. private static final String V1 = "v1";
  6. private static final String API = "api";
  7. @Autowired
  8. private CoapTransportContext coapTransportContext;
  9. private CoapServer server;
  10. @PostConstruct
  11. public void init() throws UnknownHostException {
  12. log.info("Starting CoAP transport...");
  13. log.info("Starting CoAP transport server");
  14. //初始化CoAP Server服务
  15. this.server = new CoapServer();
  16. //创建CoAP资源服务
  17. createResources();
  18. //绑定地址和端口号
  19. InetAddress addr = InetAddress.getByName(coapTransportContext.getHost());
  20. InetSocketAddress sockAddr = new InetSocketAddress(addr, coapTransportContext.getPort());
  21. //为CoAP添加节点
  22. server.addEndpoint(new CoapEndpoint(sockAddr));
  23. //启动服务
  24. server.start();
  25. log.info("CoAP transport started!");
  26. }
  27. private void createResources() {
  28. //创建CoAP资源
  29. CoapResource api = new CoapResource(API);
  30. api.add(new CoapTransportResource(coapTransportContext, V1));
  31. server.add(api);
  32. }
  33. @PreDestroy
  34. public void shutdown() {
  35. log.info("Stopping CoAP transport!");
  36. //优雅关闭CoAP服务
  37. this.server.destroy();
  38. log.info("CoAP transport stopped!");
  39. }
  40. }Copy to clipboardErrorCopied

第9-10行代码,通过@Autowired注解将刚才写的CoAPTransportContext注入进来。
第12行代码,通过Californium创建CoAP Server服务。
第14-25行代码中逻辑依次为:

  1. 初始化CoAP Server服务
  2. 创建CoAP资源服务
  3. 绑定地址和端口号
  4. 为CoAP添加节点
  5. 启动服务

第39-45行代码中通过@PreDestroy优雅的关闭CoAP服务。

CoapTransportResource

处理GET资源

  1. @Override
  2. public void handleGET(CoapExchange exchange) {
  3. Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
  4. if (!featureType.isPresent()) {
  5. log.trace("Missing feature type parameter");
  6. exchange.respond(ResponseCode.BAD_REQUEST);
  7. } else if (featureType.get() == FeatureType.TELEMETRY) {
  8. log.trace("Can't fetch/subscribe to timeseries updates");
  9. exchange.respond(ResponseCode.BAD_REQUEST);
  10. } else if (exchange.getRequestOptions().hasObserve()) {
  11. processExchangeGetRequest(exchange, featureType.get());
  12. } else if (featureType.get() == FeatureType.ATTRIBUTES) {
  13. processRequest(exchange, SessionMsgType.GET_ATTRIBUTES_REQUEST);
  14. } else {
  15. log.trace("Invalid feature type parameter");
  16. exchange.respond(ResponseCode.BAD_REQUEST);
  17. }
  18. }Copy to clipboardErrorCopied

第3行代码中,通过 getFeatureType(Request request)获取Uri路径段字符串的第4个值,例如api/v1/123/telemetry这个Uri路径段,取的就是telemetry这个值的大写。

  1. private static final int FEATURE_TYPE_POSITION = 4;
  2. private Optional<FeatureType> getFeatureType(Request request) {
  3. //返回Uri路径段字符串的列表
  4. List<String> uriPath = request.getOptions().getUriPath();
  5. try {
  6. //判断Uri路径段长度大小
  7. if (uriPath.size() >= FEATURE_TYPE_POSITION) {
  8. return Optional.of(FeatureType.valueOf(uriPath.get(FEATURE_TYPE_POSITION - 1).toUpperCase()));
  9. }
  10. } catch (RuntimeException e) {
  11. log.warn("Failed to decode feature type: {}", uriPath);
  12. }
  13. return Optional.empty();
  14. }Copy to clipboardErrorCopied

第4-9行代码中,如果featureType这个枚举为空、TELEMETRY和其他类型的错误的话,则返回128错误码。
第10-11行代码中,检查是否存在Observe选。如果是的话,进入processExchangeGetRequest(exchange, featureType.get())方法。
第12-13行代码中,如果featrueType是ATTRIBUTES,进入processRequest(CoapExchange exchange, SessionMsgType type)方法。
处理POST资源

  1. @Override
  2. public void handlePOST(CoapExchange exchange) {
  3. //获取POST请求Uri路径的第四个请求参数。
  4. Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest());
  5. //如果请求参数为空时,记录日志并回复错误码。
  6. if (!featureType.isPresent()) {
  7. log.trace("Missing feature type parameter");
  8. exchange.respond(ResponseCode.BAD_REQUEST);
  9. } else {
  10. switch (featureType.get()) {
  11. case ATTRIBUTES:
  12. processRequest(exchange, SessionMsgType.POST_ATTRIBUTES_REQUEST);
  13. break;
  14. case TELEMETRY:
  15. processRequest(exchange, SessionMsgType.POST_TELEMETRY_REQUEST);
  16. break;
  17. case RPC:
  18. Optional<Integer> requestId = getRequestId(exchange.advanced().getRequest());
  19. if (requestId.isPresent()) {
  20. processRequest(exchange, SessionMsgType.TO_DEVICE_RPC_RESPONSE);
  21. } else {
  22. processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST);
  23. }
  24. break;
  25. case CLAIM:
  26. processRequest(exchange, SessionMsgType.CLAIM_REQUEST);
  27. break;
  28. }
  29. }
  30. }Copy to clipboardErrorCopied

第4-8行代码,获取POST请求Uri路径的第四个请求参数。如果请求参数为空时,记录日志并回复错误码。
第10-27行代码中,判断请求参数的类型

  1. ATTRIBUTES(属性) : processRequest(exchange, SessionMsgType.POST_ATTRIBUTES_REQUEST)
  2. TELEMETRY(遥测数据) : processRequest(exchange, SessionMsgType.POST_TELEMETRY_REQUEST)
  3. RPC(远程调用)
    • 服务端RPC : 如果Uri路径带有请求标识符,则processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST)
    • 客户端RPC: processRequest(exchange, SessionMsgType.TO_SERVER_RPC_REQUEST)
  4. CLAIM(声明) : processRequest(exchange, SessionMsgType.CLAIM_REQUEST)

processRequest(CoapExchange exchange, SessionMsgType type)

  1. private void processRequest(CoapExchange exchange, SessionMsgType type) {
  2. //记录请求的相关信息
  3. log.trace("Processing {}", exchange.advanced().getRequest());
  4. exchange.accept();
  5. Exchange advanced = exchange.advanced();
  6. Request request = advanced.getRequest();
  7. //获取设备的AccessToken数据
  8. Optional<DeviceTokenCredentials> credentials = decodeCredentials(request);
  9. //如果设备AccessToken为空,则返回错误码
  10. if (!credentials.isPresent()) {
  11. exchange.respond(ResponseCode.BAD_REQUEST);
  12. return;
  13. }
  14. transportService.process(TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(credentials.get().getCredentialsId()).build(),
  15. new DeviceAuthCallback(transportContext, exchange, sessionInfo -> {
  16. UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  17. try {
  18. //判断逻辑,并进入相应的处理类中去
  19. switch (type) {
  20. case POST_ATTRIBUTES_REQUEST:
  21. ...
  22. //上报设备活跃
  23. reportActivity(sessionId, sessionInfo);
  24. break;
  25. case POST_TELEMETRY_REQUEST:
  26. ...
  27. reportActivity(sessionId, sessionInfo);
  28. break;
  29. case CLAIM_REQUEST:
  30. ...
  31. case SUBSCRIBE_ATTRIBUTES_REQUEST:
  32. ...
  33. break;
  34. case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
  35. ...
  36. break;
  37. case SUBSCRIBE_RPC_COMMANDS_REQUEST:
  38. ...
  39. break;
  40. case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
  41. ...
  42. break;
  43. case TO_DEVICE_RPC_RESPONSE:
  44. ...
  45. break;
  46. case TO_SERVER_RPC_REQUEST:
  47. ...
  48. break;
  49. case GET_ATTRIBUTES_REQUEST:
  50. ...
  51. break;
  52. }
  53. } catch (AdaptorException e) {
  54. log.trace("[{}] Failed to decode message: ", sessionId, e);
  55. exchange.respond(ResponseCode.BAD_REQUEST);
  56. } catch (IllegalAccessException e) {
  57. log.trace("[{}] Failed to process message: ", sessionId, e);
  58. exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR);
  59. }
  60. }));
  61. }Copy to clipboardErrorCopied

第4-13行代码中,获取设备的AccessToken数据,如果设备AccessToken为空,则返回错误码。
第20-60行代码中,判断设备请求参数的逻辑,并进入相应的处理类中去。
第24行代码,通过reportActivity上报设备的活跃数据。