前言
协议介绍
HTTP是一个客户端(用户)和服务端(网站)之间请求和应答的标准,通常使用TCP协议。通过使用网页浏览器、网络爬虫或者其它的工具,客户端发起一个HTTP请求到服务器上指定端口(默认端口为80)。我们称这个客户端为用户代理程序(user agent)。应答的服务器上存储着一些资源,比如HTML文件和图像。我们称这个应答服务器为源服务器(origin server)。在用户代理和源服务器中间可能存在多个“中间层”,比如代理服务器、网关或者隧道。
尽管TCP/IP协议是互联网上最流行的应用,但是在HTTP协议中并没有规定它必须使用或它支持的层。事实上HTTP可以在任何互联网协议或其他网络上实现。HTTP假定其下层协议提供可靠的传输。因此,任何能够提供这种保证的协议都可以被其使用,所以其在TCP/IP协议族使用TCP作为其传输层。
通常,由HTTP客户端发起一个请求,创建一个到服务器指定端口(默认是80端口)的TCP连接。HTTP服务器则在那个端口监听客户端的请求。一旦收到请求,服务器会向客户端返回一个状态,比如”HTTP/1.1 200 OK”,以及返回的内容,如请求的文件、错误消息、或者其它信息
HTTP是可用于IoT应用程序的通用网络协议。您可以在此处找到有关HTTP的更多信息。HTTP协议是基于TCP的,并使用请求 - 响应模型。当然它的缺点也极为明显,HTTP对于嵌入式设备来说太重了,也不灵活。
协议特点
- 支持客户/服务器模式。
- 简单快速: 客户向服务器请求服务时,只需传送请求方法和路径。请求方法常用的有GET、PUT、POST。每种方法规定了客户与服务器联系的类型不同。由于HTTP协议简单,使得HTTP服务器的程序规模小,因此通信速度很快。
- 灵活: HTTP允许传输任意类型的数据对象。正在传输的类型由Content-Type加以标记。
- 无连接:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。
- 无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。
关联模块一览
和HTTP设备传输协议关联的模块有Thingsboard HTTP Transport Service
、Thingsboard HTTP Transport Common
和Thingsboard Server Queue components
。前面这些名称大家可以看IDEA maven模块名称。
HTTP Transport Service
HTTP Transport Common
Server Queue Components
MQTT Transport Service
org.thingsboard.server.http.ThingsboardHttpTransportApplication
, HTTP服务启动类, 使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。
第2-3行代码中,@SpringBootApplication
@EnableAsync
@ComponentScan({"org.thingsboard.server.http", "org.thingsboard.server.common", "org.thingsboard.server.transport.http", "org.thingsboard.server.queue"})
public class ThingsboardHttpTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-http-transport";
public static void main(String[] args) {
SpringApplication.run(ThingsboardHttpTransportApplication.class, updateArguments(args));
}
private static String[] updateArguments(String[] args) {
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
String[] modifiedArgs = new String[args.length + 1];
System.arraycopy(args, 0, modifiedArgs, 0, args.length);
modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
return modifiedArgs;
}
return args;
}
}Copy to clipboardErrorCopied
@EnableAsync
注解使用来开启异步线程,@EnableScheduling
注解使用来开启定时任务。
第4行代码@ComponentScan({"org.thingsboard.server.http", "org.thingsboard.server.common", "org.thingsboard.server.transport.http", "org.thingsboard.server.queue"})
: 扫描这些包下的所有使用@Component
的类,不管自动导入还是导出。
第7-8行代码和updateArguments
的作用是:启动时,使用 —spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。HTTP Transport Common
Spring Boot框架
Thingsboard的HTTP设备传输协议是基于Spring Boot。
Spring Boot 是 Spring 的子项目,正如其名字,提供 Spring 的引导( Boot )的功能。
通过 Spring Boot ,我们开发者可以快速配置 Spring 项目,引入各种 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而无需不断重复编写繁重的 Spring 配置,降低了 Spring 的使用成本。犹记当年,Spring XML 为主的时代,大晚上各种搜索 Spring 的配置,苦不堪言。现在有了 Spring Boot 之后,生活真美好。
Spring Boot 提供了各种 Starter 启动器,提供标准化的默认配置。例如:
spring-boot-starter-web
启动器,可以快速配置 Spring MVC 。mybatis-spring-boot-starter
启动器,可以快速配置 MyBatis 。
并且,Spring Boot 基本已经一统 Java 项目的开发,大量的开源项目都实现了其的 Starter 启动器。例如:
incubator-dubbo-spring-boot-project
启动器,可以快速配置 Dubbo 。rocketmq-spring-boot-starter
启动器,可以快速配置 RocketMQ 。项目解读
项目结构
└── java
└── org
└── thingsboard
└── server
└── transport
└── http
├── DeviceApiController.java //设备API接口类
└── HttpTransportContext.java //设备HTTP传输协议上下文类Copy to clipboardErrorCopied
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>Copy to clipboardErrorCopied
参数配置
transport:
http:
enabled: "${HTTP_ENABLED:true}" //协议开关
request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}" //请求超时设置
Copy to clipboardErrorCopied
DeviceApiController
服务端请求属性值API
GET /api/v1/{deviceToken}/attributes{?clientKeys,sharedKeys}
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | 设备token信息 |
clientKeys | String | 设备属性键 |
sharedKeys | String | 共享属性键 |
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
@RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
GetAttributeRequestMsg.Builder request = GetAttributeRequestMsg.newBuilder().setRequestId(0);
List<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? Arrays.asList(clientKeys.split(",")) : null;
List<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? Arrays.asList(sharedKeys.split(",")) : null;
if (clientKeySet != null) {
request.addAllClientAttributeNames(clientKeySet);
}
if (sharedKeySet != null) {
request.addAllSharedAttributeNames(sharedKeySet);
}
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
}));
return responseWriter;
}Copy to clipboardErrorCopied
第6行代码,新建DeferredResult对象,该对象用于API接口需要在指定时间内将异步操作的结果同步返回给前端时;指定时间为上面设置的6s。
第9行代码中,新建GetAttributeRequestMsg
并设置请求编号为0。GetAttributeRequestMsg
protobuf定义如下:
message GetAttributeRequestMsg {
int32 requestId = 1;
repeated string clientAttributeNames = 2;
repeated string sharedAttributeNames = 3;
}Copy to clipboardErrorCopied
第10-11行代码中,判断clientKeys和sharedKeys是否为空,如果是,则设置为null, 反之以 , 分割,设置成List集合。
第12-17行代码中,判断clientKeySet
和sharedKeySet
是否为null,如果不是则设置GetAttributeRequestMsg
对象的相关属性名。
第19行代码中,同步设备信息到服务端中。
第20行代码中,处理设备属性请求相关逻辑。
第7-21行代码,该处理类为: org.thingsboard.server.common.transport@TransportService
void process(ValidateDeviceTokenRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);Copy to clipboardErrorCopied
该ValidateDeviceTokenRequestMsg
对象为protobuf生成的,文件定义如下:
message ValidateDeviceTokenRequestMsg {
string token = 1;
}Copy to clipboardErrorCopied
TransportServiceCallback
是一个接口类,而上述的DeviceAuthCallback
是其的实现类。具体代码如下:
public interface TransportServiceCallback<T> {
TransportServiceCallback<Void> EMPTY = new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void msg) {
}
@Override
public void onError(Throwable e) {
}
};
// 成功接口类
void onSuccess(T msg);
// 失败通用类
void onError(Throwable e);
}Copy to clipboardErrorCopied
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
private final TransportContext transportContext;
private final DeferredResult<ResponseEntity> responseWriter;
private final Consumer<SessionInfoProto> onSuccess;
DeviceAuthCallback(TransportContext transportContext, DeferredResult<ResponseEntity> responseWriter, Consumer<SessionInfoProto> onSuccess) {
this.transportContext = transportContext;
this.responseWriter = responseWriter;
this.onSuccess = onSuccess;
}
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
if (msg.hasDeviceInfo()) {
UUID sessionId = UUID.randomUUID();
DeviceInfoProto deviceInfoProto = msg.getDeviceInfo();
SessionInfoProto sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(transportContext.getNodeId())
.setTenantIdMSB(deviceInfoProto.getTenantIdMSB())
.setTenantIdLSB(deviceInfoProto.getTenantIdLSB())
.setDeviceIdMSB(deviceInfoProto.getDeviceIdMSB())
.setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.build();
onSuccess.accept(sessionInfo);
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
}
@Override
public void onError(Throwable e) {
log.warn("Failed to process request", e);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
}Copy to clipboardErrorCopied
第6-10行代码为虚构函数, 入参分别为传输上下文: TransportContext transportContext
、HTTP回复信息:DeferredResult<ResponseEntity> responseWriter
和设备session信息。
设备Session信息protobuf定义如下:
message SessionInfoProto {
string nodeId = 1;
int64 sessionIdMSB = 2;
int64 sessionIdLSB = 3;
int64 tenantIdMSB = 4;
int64 tenantIdLSB = 5;
int64 deviceIdMSB = 6;
int64 deviceIdLSB = 7;
string deviceName = 8;
string deviceType = 9;
int64 gwSessionIdMSB = 10;
int64 gwSessionIdLSB = 11;
}Copy to clipboardErrorCopied
ValidateDeviceCredentialsResponseMsg(认证设备资质回复信息) protobuf定义如下:
message DeviceInfoProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
string deviceName = 5;
string deviceType = 6;
string additionalInfo = 7;
}
message ValidateDeviceCredentialsResponseMsg {
DeviceInfoProto deviceInfo = 1;
string credentialsBody = 2;
}Copy to clipboardErrorCopied
当回调函数成功后,第13-28行代码判断认证设备资质回复信息是否有设备信息,则新建设备Session信息并拼接相关信息。反之回复401(不被认证的)。
第34-38行代码中,当处理失败的话,打印异常失败原因并回复500(服务内部错误)。
上面这几个回调函数的执行顺序如下:
第一部分
org.thingsboard.server.common.transport.TransportService
void process(ValidateDeviceTokenRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);Copy to clipboardErrorCopied
第二部分
第三部分
从服务器订阅属性更新API
GET /api/v1/{deviceToken}/attributes/updates{?timeout}
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | 设备token信息 |
timeout | long | 超时时间 |
@RequestMapping(value = "/{deviceToken}/attributes/updates", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> subscribeToAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
transportService.process(sessionInfo, SubscribeToAttributeUpdatesMsg.getDefaultInstance(),
new SessionCloseOnErrorCallback(transportService, sessionInfo));
}));
return responseWriter;
}Copy to clipboardErrorCopied
第9行代码, 更新设备活动状态(既然上报数据代码肯定存活),发送成功后,调回回调,向设备返回回复消息。
第10行代码,处理订阅服务端属性的请求并回调。
属性更新发布到服务器API
POST /api/v1/{deviceToken}/attributes
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | 设备token信息 |
json | String | 设备属性json |
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
//解析并转换设备属性json,进行逻辑操作
transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
new HttpOkCallback(responseWriter));
reportActivity(sessionInfo);
}));
return responseWriter;
}Copy to clipboardErrorCopied
第5-9行代码更新设备属性的并把相关内容发送到规则引擎中。让我们跟下去这个代码:
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
//匹配限流策略
if (checkLimits(sessionInfo, msg, callback)) {
//立即上报设备活动状态
reportActivityInternal(sessionInfo);
//获取租户编号
TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
//获取设备编号
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
//解析并转换设备更新属性消息里的属性键值对
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
//组装消息元数据
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType());
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, metaData, gson.toJson(json));
//发送消息到规则引擎相关的逻辑
sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
}
}Copy to clipboardErrorCopied
第10行代码,上报设备活动信息并更新。
遥测上传API
POST /api/v1/{deviceToken}/telemetry
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | 设备token |
json | String | 设备遥测json |
@RequestMapping(value = "/{deviceToken}/telemetry", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postTelemetry(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
new HttpOkCallback(responseWriter));
reportActivity(sessionInfo);
}));
return responseWriter;
}Copy to clipboardErrorCopied
第8行,处理遥测设备的消息并上报设备活动状态。我们跟下去:
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
//匹配限流策略
if (checkLimits(sessionInfo, msg, callback)) {
//立即上报设备活动状态
reportActivityInternal(sessionInfo);
//获取租户编号
TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
//获取设备编号
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback);
//便利键值列表
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
//组装消息元数据
TbMsgMetaData metaData = new TbMsgMetaData();
//设置设备名称
metaData.putValue("deviceName", sessionInfo.getDeviceName());
//设置设备类型
metaData.putValue("deviceType", sessionInfo.getDeviceType());
metaData.putValue("ts", tsKv.getTs() + "");
//解析并转换设备更新属性消息里的属性键值对
JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, gson.toJson(json));
//发送消息到规则引擎相关的逻辑
sendToRuleEngine(tenantId, tbMsg, packCallback);
}
}
}Copy to clipboardErrorCopied
声明设备API
POST /api/v1/{deviceToken}/claim
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | 设备token |
json | String | 声明json(非必填) |
@RequestMapping(value = "/{deviceToken}/claim", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> claimDevice(@PathVariable("deviceToken") String deviceToken,
@RequestBody(required = false) String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
transportService.process(sessionInfo, JsonConverter.convertToClaimDeviceProto(deviceId, json),
new HttpOkCallback(responseWriter));
}));
return responseWriter;
}Copy to clipboardErrorCopied
第9行代码,处理声明设备逻辑。相关代码如下:
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
//匹配限流策略
if (checkLimits(sessionInfo, msg, callback)) {
//立即上报设备活动状态
reportActivityInternal(sessionInfo);
//消息发送设备Actor
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setClaimDevice(msg).build(), callback);
}
}Copy to clipboardErrorCopied
服务端订阅RPC命令API
GET /api/v1/{deviceToken}/rpc{?timeout}
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | 设备token信息 |
timeout | long | 超时 |
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> subscribeToCommands(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "timeout", required = false, defaultValue = "0") long timeout,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter),
timeout == 0 ? transportContext.getDefaultTimeout() : timeout);
transportService.process(sessionInfo, SubscribeToRPCMsg.getDefaultInstance(),
new SessionCloseOnErrorCallback(transportService, sessionInfo));
}));
return responseWriter;
}Copy to clipboardErrorCopied
第9行代码, 更新设备活动状态(既然上报数据代码肯定存活),发送成功后,调回回调,向设备返回回复消息。
第12-14行代码,处理服务端订阅rpc命令:
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
//匹配限流策略
if (checkLimits(sessionInfo, msg, callback)) {
//组装消息元数据
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
//消息发送设备Actor
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscribeToRPC(msg).build(), callback);
}
}Copy to clipboardErrorCopied
服务端回复RPC命令API
POST /api/v1/{deviceToken}/rpc/{requestId}
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | |
requestId | long |
@RequestMapping(value = "/{deviceToken}/rpc/{requestId}", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> replyToCommand(@PathVariable("deviceToken") String deviceToken,
@PathVariable("requestId") Integer requestId,
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
TransportService transportService = transportContext.getTransportService();
transportService.process(sessionInfo, ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(json).build(), new HttpOkCallback(responseWriter));
}));
return responseWriter;
}Copy to clipboardErrorCopied
第9行代码,处理服务端回复RPC命令逻辑,具体逻辑如下:
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
//匹配限流策略
if (checkLimits(sessionInfo, msg, callback)) {
//立即上报设备活动状态
reportActivityInternal(sessionInfo);
//消息发送设备Actor
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setToDeviceRPCCallResponse(msg).build(), callback);
}
}Copy to clipboardErrorCopied
客户端RPC命令API
POST /api/v1/{deviceToken}/rpc
入参定义
字段 | 类型 | 描述 |
---|---|---|
deviceToken | String | |
json | String |
@RequestMapping(value = "/{deviceToken}/rpc", method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postRpcRequest(@PathVariable("deviceToken") String deviceToken,
@RequestBody String json, HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
JsonObject request = new JsonParser().parse(json).getAsJsonObject();
TransportService transportService = transportContext.getTransportService();
transportService.registerSyncSession(sessionInfo, new HttpSessionListener(responseWriter), transportContext.getDefaultTimeout());
transportService.process(sessionInfo, ToServerRpcRequestMsg.newBuilder().setRequestId(0)
.setMethodName(request.get("method").getAsString())
.setParams(request.get("params").toString()).build(),
new SessionCloseOnErrorCallback(transportService, sessionInfo));
}));
return responseWriter;
}Copy to clipboardErrorCopied
第9行代码, 更新设备活动状态(既然上报数据代码肯定存活),发送成功后,调回回调,向设备返回回复消息。
第10-14行代码,处理客户端rpc命令相关逻辑。