前言

协议介绍

HTTP是一个客户端(用户)和服务端(网站)之间请求和应答的标准,通常使用TCP协议。通过使用网页浏览器、网络爬虫或者其它的工具,客户端发起一个HTTP请求到服务器上指定端口(默认端口为80)。我们称这个客户端为用户代理程序(user agent)。应答的服务器上存储着一些资源,比如HTML文件和图像。我们称这个应答服务器为源服务器(origin server)。在用户代理和源服务器中间可能存在多个“中间层”,比如代理服务器、网关或者隧道。
尽管TCP/IP协议是互联网上最流行的应用,但是在HTTP协议中并没有规定它必须使用或它支持的层。事实上HTTP可以在任何互联网协议或其他网络上实现。HTTP假定其下层协议提供可靠的传输。因此,任何能够提供这种保证的协议都可以被其使用,所以其在TCP/IP协议族使用TCP作为其传输层。
通常,由HTTP客户端发起一个请求,创建一个到服务器指定端口(默认是80端口)的TCP连接。HTTP服务器则在那个端口监听客户端的请求。一旦收到请求,服务器会向客户端返回一个状态,比如”HTTP/1.1 200 OK”,以及返回的内容,如请求的文件、错误消息、或者其它信息
HTTP是可用于IoT应用程序的通用网络协议。您可以在此处找到有关HTTP的更多信息。HTTP协议是基于TCP的,并使用请求 - 响应模型。当然它的缺点也极为明显,HTTP对于嵌入式设备来说太重了,也不灵活。

协议特点

Thingsboard源码分析-HTTP连接处理 - 图1

  1. 支持客户/服务器模式。
  2. 简单快速: 客户向服务器请求服务时,只需传送请求方法和路径。请求方法常用的有GET、PUT、POST。每种方法规定了客户与服务器联系的类型不同。由于HTTP协议简单,使得HTTP服务器的程序规模小,因此通信速度很快。
  3. 灵活: HTTP允许传输任意类型的数据对象。正在传输的类型由Content-Type加以标记。
  4. 无连接:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。
  5. 无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。

    关联模块一览

    和HTTP设备传输协议关联的模块有Thingsboard HTTP Transport ServiceThingsboard HTTP Transport CommonThingsboard Server Queue components。前面这些名称大家可以看IDEA maven模块名称。
    HTTP Transport Service
    Thingsboard源码分析-HTTP连接处理 - 图2
    HTTP Transport Common
    Thingsboard源码分析-HTTP连接处理 - 图3
    Server Queue Components
    Thingsboard源码分析-HTTP连接处理 - 图4

    MQTT Transport Service

    org.thingsboard.server.http.ThingsboardHttpTransportApplication, HTTP服务启动类, 使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。
    1. @SpringBootApplication
    2. @EnableAsync
    3. @ComponentScan({"org.thingsboard.server.http", "org.thingsboard.server.common", "org.thingsboard.server.transport.http", "org.thingsboard.server.queue"})
    4. public class ThingsboardHttpTransportApplication {
    5. private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
    6. private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-http-transport";
    7. public static void main(String[] args) {
    8. SpringApplication.run(ThingsboardHttpTransportApplication.class, updateArguments(args));
    9. }
    10. private static String[] updateArguments(String[] args) {
    11. if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
    12. String[] modifiedArgs = new String[args.length + 1];
    13. System.arraycopy(args, 0, modifiedArgs, 0, args.length);
    14. modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
    15. return modifiedArgs;
    16. }
    17. return args;
    18. }
    19. }Copy to clipboardErrorCopied
    第2-3行代码中,@EnableAsync注解使用来开启异步线程,@EnableScheduling注解使用来开启定时任务。
    第4行代码@ComponentScan({"org.thingsboard.server.http", "org.thingsboard.server.common", "org.thingsboard.server.transport.http", "org.thingsboard.server.queue"}): 扫描这些包下的所有使用@Component 的类,不管自动导入还是导出。
    第7-8行代码和updateArguments的作用是:启动时,使用 —spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。

    HTTP Transport Common

    Spring Boot框架

    Thingsboard的HTTP设备传输协议是基于Spring Boot
    Spring Boot 是 Spring 的子项目,正如其名字,提供 Spring 的引导( Boot )的功能。
    通过 Spring Boot ,我们开发者可以快速配置 Spring 项目,引入各种 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而无需不断重复编写繁重的 Spring 配置,降低了 Spring 的使用成本。

    犹记当年,Spring XML 为主的时代,大晚上各种搜索 Spring 的配置,苦不堪言。现在有了 Spring Boot 之后,生活真美好。

Spring Boot 提供了各种 Starter 启动器,提供标准化的默认配置。例如:

并且,Spring Boot 基本已经一统 Java 项目的开发,大量的开源项目都实现了其的 Starter 启动器。例如:

  • incubator-dubbo-spring-boot-project 启动器,可以快速配置 Dubbo 。
  • rocketmq-spring-boot-starter 启动器,可以快速配置 RocketMQ 。

    项目解读

    项目结构

    1. └── java
    2. └── org
    3. └── thingsboard
    4. └── server
    5. └── transport
    6. └── http
    7. ├── DeviceApiController.java //设备API接口类
    8. └── HttpTransportContext.java //设备HTTP传输协议上下文类Copy to clipboardErrorCopied

    引入依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-web</artifactId>
    4. </dependency>Copy to clipboardErrorCopied

    参数配置

    1. transport:
    2. http:
    3. enabled: "${HTTP_ENABLED:true}" //协议开关
    4. request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}" //请求超时设置
    5. Copy to clipboardErrorCopied

    DeviceApiController

    服务端请求属性值API

    GET /api/v1/{deviceToken}/attributes{?clientKeys,sharedKeys}
    入参定义
字段 类型 描述
deviceToken String 设备token信息
clientKeys String 设备属性键
sharedKeys String 共享属性键
  1. @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
  2. public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
  3. @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
  4. @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys,
  5. HttpServletRequest httpRequest) {
  6. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
  7. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  8. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  9. GetAttributeRequestMsg.Builder request = GetAttributeRequestMsg.newBuilder().setRequestId(0);
  10. List<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? Arrays.asList(clientKeys.split(",")) : null;
  11. List<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? Arrays.asList(sharedKeys.split(",")) : null;
  12. if (clientKeySet != null) {
  13. request.addAllClientAttributeNames(clientKeySet);
  14. }
  15. if (sharedKeySet != null) {
  16. request.addAllSharedAttributeNames(sharedKeySet);
  17. }
  18. TransportService transportService = transportContext.getTransportService();
  19. transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
  20. transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
  21. }));
  22. return responseWriter;
  23. }Copy to clipboardErrorCopied

第6行代码,新建DeferredResult对象,该对象用于API接口需要在指定时间内将异步操作的结果同步返回给前端时;指定时间为上面设置的6s。
第9行代码中,新建GetAttributeRequestMsg并设置请求编号为0。GetAttributeRequestMsg protobuf定义如下:

  1. message GetAttributeRequestMsg {
  2. int32 requestId = 1;
  3. repeated string clientAttributeNames = 2;
  4. repeated string sharedAttributeNames = 3;
  5. }Copy to clipboardErrorCopied

第10-11行代码中,判断clientKeys和sharedKeys是否为空,如果是,则设置为null, 反之以 , 分割,设置成List集合。
第12-17行代码中,判断clientKeySetsharedKeySet是否为null,如果不是则设置GetAttributeRequestMsg对象的相关属性名。
第19行代码中,同步设备信息到服务端中。
第20行代码中,处理设备属性请求相关逻辑。
第7-21行代码,该处理类为: org.thingsboard.server.common.transport@TransportService

  1. void process(ValidateDeviceTokenRequestMsg msg,
  2. TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);Copy to clipboardErrorCopied

ValidateDeviceTokenRequestMsg对象为protobuf生成的,文件定义如下:

  1. message ValidateDeviceTokenRequestMsg {
  2. string token = 1;
  3. }Copy to clipboardErrorCopied

TransportServiceCallback是一个接口类,而上述的DeviceAuthCallback是其的实现类。具体代码如下:

  1. public interface TransportServiceCallback<T> {
  2. TransportServiceCallback<Void> EMPTY = new TransportServiceCallback<Void>() {
  3. @Override
  4. public void onSuccess(Void msg) {
  5. }
  6. @Override
  7. public void onError(Throwable e) {
  8. }
  9. };
  10. // 成功接口类
  11. void onSuccess(T msg);
  12. // 失败通用类
  13. void onError(Throwable e);
  14. }Copy to clipboardErrorCopied
  1. private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
  2. private final TransportContext transportContext;
  3. private final DeferredResult<ResponseEntity> responseWriter;
  4. private final Consumer<SessionInfoProto> onSuccess;
  5. DeviceAuthCallback(TransportContext transportContext, DeferredResult<ResponseEntity> responseWriter, Consumer<SessionInfoProto> onSuccess) {
  6. this.transportContext = transportContext;
  7. this.responseWriter = responseWriter;
  8. this.onSuccess = onSuccess;
  9. }
  10. @Override
  11. public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
  12. if (msg.hasDeviceInfo()) {
  13. UUID sessionId = UUID.randomUUID();
  14. DeviceInfoProto deviceInfoProto = msg.getDeviceInfo();
  15. SessionInfoProto sessionInfo = SessionInfoProto.newBuilder()
  16. .setNodeId(transportContext.getNodeId())
  17. .setTenantIdMSB(deviceInfoProto.getTenantIdMSB())
  18. .setTenantIdLSB(deviceInfoProto.getTenantIdLSB())
  19. .setDeviceIdMSB(deviceInfoProto.getDeviceIdMSB())
  20. .setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
  21. .setSessionIdMSB(sessionId.getMostSignificantBits())
  22. .setSessionIdLSB(sessionId.getLeastSignificantBits())
  23. .setDeviceName(msg.getDeviceInfo().getDeviceName())
  24. .setDeviceType(msg.getDeviceInfo().getDeviceType())
  25. .build();
  26. onSuccess.accept(sessionInfo);
  27. } else {
  28. responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
  29. }
  30. }
  31. @Override
  32. public void onError(Throwable e) {
  33. log.warn("Failed to process request", e);
  34. responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
  35. }
  36. }Copy to clipboardErrorCopied

第6-10行代码为虚构函数, 入参分别为传输上下文: TransportContext transportContext、HTTP回复信息:DeferredResult<ResponseEntity> responseWriter和设备session信息。
设备Session信息protobuf定义如下:

  1. message SessionInfoProto {
  2. string nodeId = 1;
  3. int64 sessionIdMSB = 2;
  4. int64 sessionIdLSB = 3;
  5. int64 tenantIdMSB = 4;
  6. int64 tenantIdLSB = 5;
  7. int64 deviceIdMSB = 6;
  8. int64 deviceIdLSB = 7;
  9. string deviceName = 8;
  10. string deviceType = 9;
  11. int64 gwSessionIdMSB = 10;
  12. int64 gwSessionIdLSB = 11;
  13. }Copy to clipboardErrorCopied

ValidateDeviceCredentialsResponseMsg(认证设备资质回复信息) protobuf定义如下:

  1. message DeviceInfoProto {
  2. int64 tenantIdMSB = 1;
  3. int64 tenantIdLSB = 2;
  4. int64 deviceIdMSB = 3;
  5. int64 deviceIdLSB = 4;
  6. string deviceName = 5;
  7. string deviceType = 6;
  8. string additionalInfo = 7;
  9. }
  10. message ValidateDeviceCredentialsResponseMsg {
  11. DeviceInfoProto deviceInfo = 1;
  12. string credentialsBody = 2;
  13. }Copy to clipboardErrorCopied

当回调函数成功后,第13-28行代码判断认证设备资质回复信息是否有设备信息,则新建设备Session信息并拼接相关信息。反之回复401(不被认证的)。
第34-38行代码中,当处理失败的话,打印异常失败原因并回复500(服务内部错误)。
上面这几个回调函数的执行顺序如下:
第一部分

  1. org.thingsboard.server.common.transport.TransportService
  2. void process(ValidateDeviceTokenRequestMsg msg,
  3. TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);Copy to clipboardErrorCopied

第二部分
第三部分


从服务器订阅属性更新API

GET /api/v1/{deviceToken}/attributes/updates{?timeout}
入参定义

字段 类型 描述
deviceToken String 设备token信息
timeout long 超时时间
  1. @RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json")
  2. public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
  3. @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
  4. HttpServletRequest httpRequest) {
  5. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
  6. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  7. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  8. TransportService transportService = transportContext.getTransportService();
  9. transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
  10. timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
  11. transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
  12. new SessionCloseOnErrorCallback(transportService, sessionInfo));
  13. }));
  14. return responseWriter;
  15. }Copy to clipboardErrorCopied

第9行代码, 更新设备活动状态(既然上报数据代码肯定存活),发送成功后,调回回调,向设备返回回复消息。
第10行代码,处理订阅服务端属性的请求并回调。


属性更新发布到服务器API

POST /api/v1/{deviceToken}/attributes
入参定义

字段 类型 描述
deviceToken String 设备token信息
json String 设备属性json
  1. @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
  2. public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
  3. @RequestBody String json, HttpServletRequest request) {
  4. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
  5. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  6. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  7. TransportService transportService = transportContext.getTransportService();
  8. //解析并转换设备属性json,进行逻辑操作
  9. transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
  10. new HttpOkCallback(responseWriter));
  11. reportActivity(sessionInfo);
  12. }));
  13. return responseWriter;
  14. }Copy to clipboardErrorCopied

第5-9行代码更新设备属性的并把相关内容发送到规则引擎中。让我们跟下去这个代码:

  1. @Override
  2. public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  3. //匹配限流策略
  4. if (checkLimits(sessionInfo, msg, callback)) {
  5. //立即上报设备活动状态
  6. reportActivityInternal(sessionInfo);
  7. //获取租户编号
  8. TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
  9. //获取设备编号
  10. DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
  11. //解析并转换设备更新属性消息里的属性键值对
  12. JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
  13. //组装消息元数据
  14. TbMsgMetaData metaData = new TbMsgMetaData();
  15. metaData.putValue("deviceName", sessionInfo.getDeviceName());
  16. metaData.putValue("deviceType", sessionInfo.getDeviceType());
  17. TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json));
  18. //发送消息到规则引擎相关的逻辑
  19. sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
  20. }
  21. }Copy to clipboardErrorCopied

第10行代码,上报设备活动信息并更新。


遥测上传API

POST /api/v1/{deviceToken}/telemetry
入参定义

字段 类型 描述
deviceToken String 设备token
json String 设备遥测json
  1. @RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST)
  2. public DeferredResult<ResponseEntity> postTelemetry(@PathVariable("deviceToken") String deviceToken,
  3. @RequestBody String json, HttpServletRequest request) {
  4. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
  5. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  6. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  7. TransportService transportService = transportContext.getTransportService();
  8. transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
  9. new HttpOkCallback(responseWriter));
  10. reportActivity(sessionInfo);
  11. }));
  12. return responseWriter;
  13. }Copy to clipboardErrorCopied

第8行,处理遥测设备的消息并上报设备活动状态。我们跟下去:

  1. @Override
  2. public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  3. //匹配限流策略
  4. if (checkLimits(sessionInfo, msg, callback)) {
  5. //立即上报设备活动状态
  6. reportActivityInternal(sessionInfo);
  7. //获取租户编号
  8. TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
  9. //获取设备编号
  10. DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
  11. MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback);
  12. //便利键值列表
  13. for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
  14. //组装消息元数据
  15. TbMsgMetaData metaData = new TbMsgMetaData();
  16. //设置设备名称
  17. metaData.putValue("deviceName", sessionInfo.getDeviceName());
  18. //设置设备类型
  19. metaData.putValue("deviceType", sessionInfo.getDeviceType());
  20. metaData.putValue("ts", tsKv.getTs() + "");
  21. //解析并转换设备更新属性消息里的属性键值对
  22. JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
  23. TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, gson.toJson(json));
  24. //发送消息到规则引擎相关的逻辑
  25. sendToRuleEngine(tenantId, tbMsg, packCallback);
  26. }
  27. }
  28. }Copy to clipboardErrorCopied

声明设备API

POST /api/v1/{deviceToken}/claim
入参定义

字段 类型 描述
deviceToken String 设备token
json String 声明json(非必填)
  1. @RequestMapping(value = "/{deviceToken}/claim", method = RequestMethod.POST)
  2. public DeferredResult<ResponseEntity> claimDevice(@PathVariable("deviceToken") String deviceToken,
  3. @RequestBody(required = false) String json, HttpServletRequest request) {
  4. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
  5. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  6. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  7. TransportService transportService = transportContext.getTransportService();
  8. DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
  9. transportService.process(sessionInfo, JsonConverter.convertToClaimDeviceProto(deviceId, json),
  10. new HttpOkCallback(responseWriter));
  11. }));
  12. return responseWriter;
  13. }Copy to clipboardErrorCopied

第9行代码,处理声明设备逻辑。相关代码如下:

  1. @Override
  2. public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
  3. //匹配限流策略
  4. if (checkLimits(sessionInfo, msg, callback)) {
  5. //立即上报设备活动状态
  6. reportActivityInternal(sessionInfo);
  7. //消息发送设备Actor
  8. sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  9. .setClaimDevice(msg).build(), callback);
  10. }
  11. }Copy to clipboardErrorCopied

服务端订阅RPC命令API

GET /api/v1/{deviceToken}/rpc{?timeout}
入参定义

字段 类型 描述
deviceToken String 设备token信息
timeout long 超时
  1. @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
  2. public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
  3. @RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
  4. HttpServletRequest httpRequest) {
  5. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
  6. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  7. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  8. TransportService transportService = transportContext.getTransportService();
  9. transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
  10. timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
  11. transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
  12. new SessionCloseOnErrorCallback(transportService, sessionInfo));
  13. }));
  14. return responseWriter;
  15. }Copy to clipboardErrorCopied

第9行代码, 更新设备活动状态(既然上报数据代码肯定存活),发送成功后,调回回调,向设备返回回复消息。
第12-14行代码,处理服务端订阅rpc命令:

  1. @Override
  2. public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
  3. //匹配限流策略
  4. if (checkLimits(sessionInfo, msg, callback)) {
  5. //组装消息元数据
  6. SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
  7. sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
  8. //消息发送设备Actor
  9. sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  10. .setSubscribeToRPC(msg).build(), callback);
  11. }
  12. }Copy to clipboardErrorCopied

服务端回复RPC命令API

POST /api/v1/{deviceToken}/rpc/{requestId}
入参定义

字段 类型 描述
deviceToken String
requestId long
  1. @RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
  2. public DeferredResult<ResponseEntity> replyToCommand(@PathVariable("deviceToken") String deviceToken,
  3. @PathVariable("requestId") Integer requestId,
  4. @RequestBody String json, HttpServletRequest request) {
  5. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
  6. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  7. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  8. TransportService transportService = transportContext.getTransportService();
  9. transportService.process(sessionInfo, ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(json).build(), new HttpOkCallback(responseWriter));
  10. }));
  11. return responseWriter;
  12. }Copy to clipboardErrorCopied

第9行代码,处理服务端回复RPC命令逻辑,具体逻辑如下:

  1. @Override
  2. public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
  3. //匹配限流策略
  4. if (checkLimits(sessionInfo, msg, callback)) {
  5. //立即上报设备活动状态
  6. reportActivityInternal(sessionInfo);
  7. //消息发送设备Actor
  8. sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  9. .setToDeviceRPCCallResponse(msg).build(), callback);
  10. }
  11. }Copy to clipboardErrorCopied

客户端RPC命令API

POST /api/v1/{deviceToken}/rpc
入参定义

字段 类型 描述
deviceToken String
json String
  1. @RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST)
  2. public DeferredResult<ResponseEntity> postRpcRequest(@PathVariable("deviceToken") String deviceToken,
  3. @RequestBody String json, HttpServletRequest httpRequest) {
  4. DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
  5. transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
  6. new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
  7. JsonObject request = new JsonParser().parse(json).getAsJsonObject();
  8. TransportService transportService = transportContext.getTransportService();
  9. transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
  10. transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
  11. .setMethodName(request.get("method").getAsString())
  12. .setParams(request.get("params").toString()).build(),
  13. new SessionCloseOnErrorCallback(transportService, sessionInfo));
  14. }));
  15. return responseWriter;
  16. }Copy to clipboardErrorCopied

第9行代码, 更新设备活动状态(既然上报数据代码肯定存活),发送成功后,调回回调,向设备返回回复消息。
第10-14行代码,处理客户端rpc命令相关逻辑。