连接器加载流程:
- 遍历tb_gateway.yaml配置文件【connectors】配置的连接器,每次遍历做如下操作:
- 如果连接器的类型不是grpc,获取连接器对应的实现类添加到TBGatewayService里名称为_implemented_connectors的字典中,字典的键值为连接器的type
- 连接器的持久化键的获取
- 加载连接器的配置文件解析到临时变量connector_conf中
- 添加连接器的配置信息到TBGatewayService里名称为connectors_configs的字典中
- 保存连接器的持久化键,连接器类型为grpc才会有持久化键
最终的connectors_configs配置数据格式如下:
{
"mqtt":[
{
"name":"MQTT Broker Connector",
"config":{
"mqtt-test.json":{
"name":"MQTT Broker Connector",
//mqtt-test.json配置文件里的相关的配置信息
"broker":{},
"mapping":{},
"connectRequests":{},
"disconnectRequests":{},
"attributeUpdates":{},
"serverSideRpc":{}
}
},
"config_updated":{
},
"config_file_path":"mqtt-test.json",
"grpc_key":None
}
]
}
代码:tb_gateway_service.py:
def _load_connectors(self):
self.connectors_configs = {}
# 加载连接器持久化键{"GRPC Connector 1":"AD5722c73E"}字典
connectors_persistent_keys = self.__load_persistent_connector_keys()
# 从配置文件读取连接器配置对象数组
if self.__config.get("connectors"):
# 遍历连接器
for connector in self.__config['connectors']:
try:
# 连接器持久化键
connector_persistent_key = None
# 如果当前连接器的类型是grpc但是grpc的管理器为None报错并且continue继续遍历下一个连接器,__grpc_manager在TBGatewayService构造器里构造
# self.__grpc_manager = TBGRPCServerManager(self, self.__grpc_config)
if connector['type'] == "grpc" and self.__grpc_manager is None:
log.error("Cannot load connector with name: %s and type grpc. GRPC server is disabled!", connector['name'])
continue
# 如果连接器的类型不是grpc
if connector['type'] != "grpc":
# 获取该类型的连接器的实现类
connector_class = TBModuleLoader.import_module(connector['type'],
self._default_connectors.get(connector['type'],
connector.get('class')))
# 添加连接器实现类到集合中,如{"mqtt":mqtt_connector_class}
self._implemented_connectors[connector['type']] = connector_class
# 如果连接器的类型是grpc,grpc连接器的属性实例,包含name、key、type、configuration:
# {name: GRPC Connector 1,key: auto,type: grpc,configuration: grpc_connector_1.json}
elif connector['type'] == "grpc":
# 如果连接器的key是auto
if connector.get('key') == "auto":
# 如果从配置文件获取到了该连接器的持久化键,就赋值
if connectors_persistent_keys and connectors_persistent_keys.get(connector['name']) is not None:
connector_persistent_key = connectors_persistent_keys[connector['name']]
# 没有获取到该连接器的持久化键,那么生成一个连接器持久化键,并且添加到连接器持久化键字典中,键名是连接器的名字
else:
connector_persistent_key = "".join(choice(hexdigits) for _ in range(10))
connectors_persistent_keys[connector['name']] = connector_persistent_key
# 连接器的类型不是auto那么久把连接器的key值作为该连接器的持久化键
else:
connector_persistent_key = connector['key']
log.info("Connector key for GRPC connector with name [%s] is: [%s]", connector['name'], connector_persistent_key)
# 得到连接器的配置文件路径
config_file_path = self._config_dir + connector['configuration']
connector_conf_file_data = ''
# 打开连接器的配置文件解析配置文件信息到json对象,解析失败报异常
with open(config_file_path, 'r', encoding="UTF-8") as conf_file:
connector_conf_file_data = conf_file.read()
connector_conf = connector_conf_file_data
try:
connector_conf = loads(connector_conf_file_data)
except JSONDecodeError as e:
log.debug(e)
log.warning("Cannot parse connector configuration as a JSON, it will be passed as a string.")
if not self.connectors_configs.get(connector['type']):
self.connectors_configs[connector['type']] = []
log.info("connectors_configs is %s", self.connectors_configs)
# connectors_configs is {'mqtt': []}"
# 如果连接器的类型不是grpc并且连机器的配置文件是字典类型的那么把连接器的配置数据字典添加name键,因为连接器的配置文件没有name键
if connector['type'] != 'grpc' and isinstance(connector_conf, dict):
connector_conf["name"] = connector['name']
# 添加连接器的配置信息到连接器配置字典中 {'mqtt': [{'name': 'MQTT Broker Connector', 'config': {'mqtt-test.json': {...
self.connectors_configs[connector['type']].append({"name": connector['name'],
"config": {connector['configuration']: connector_conf} if connector[
'type'] != 'grpc' else connector_conf,
"config_updated": stat(config_file_path),
"config_file_path": config_file_path,
"grpc_key": connector_persistent_key})
log.info("connectors_configs is %s", self.connectors_configs)
except Exception as e:
log.exception("Error on loading connector: %r", e)
# 保存连接器的持久化键,连接器类型为grpc才会有持久化键
if connectors_persistent_keys:
self.__save_persistent_keys(connectors_persistent_keys)
else:
log.error("Connectors - not found! Check your configuration!")
self.__init_remote_configuration(force=True)
log.info("Remote configuration is enabled forcibly!")