Custom connector implementation

Connectors are Gateway components that connect to external system or directly to devices. Gateway has many built-in connectors (e.g. MQTT, OPC-UA server, Modbus, BLE, etc). Once connected, connector is either poll data from those systems or subscribe to updates. Poll vs subscribe depends on the protocol capabilities.

Main goal of the custom connector is opportunity to connect to any device with any protocol. Connectors are written in Python language.

We will demonstrate how to create custom connector by example. Let’s assume we want our connector to connect to serial port on your device and read the data. Connector will be able to push data to device over serial. We will call this connector SerialConnector.
Please see step-by-step guide how we have added SerialConnector to the Gateway.
You can create your custom connector, based on this example.

Notate: The gateway already contains this connector, you can find it in the extensions folder
**
Let’s assume our serial devices push UTF-8 encoded strings like this:

  1. 48\r2430947595\n

where 48 is humidity, \r is delimiter between values 2430947595 is device serial number and messages are separated by \n symbol.

Step 1. Define SerialConnector configuration

At first, we need create configuration file for our serial connector. Let’s create file in the config folder (In the folder with tb_gateway.yaml file.)

  1. touch custom_serial.json

After this we need add some configuration for this connector in file.

Example of custom connector configuration file. Press to expand.

  1. {
  2. "name": "Custom serial connector",
  3. "devices": [
  4. {
  5. "name": "CustomSerialDevice1",
  6. "port": "/dev/ttyUSB0",
  7. "baudrate": 9600,
  8. "converter": "CustomSerialUplinkConverter",
  9. "telemetry": [
  10. {
  11. "type": "byte",
  12. "key": "humidity",
  13. "untilDelimiter": "\r"
  14. }
  15. ],
  16. "attributes":[
  17. {
  18. "key": "SerialNumber",
  19. "type": "string",
  20. "fromByte": 4,
  21. "toByte": -1
  22. }
  23. ],
  24. "attributeUpdates": [
  25. {
  26. "attributeOnThingsBoard": "attr1",
  27. "stringToDevice": "value = ${attr1}\n"
  28. }
  29. ]
  30. }
  31. ]
  32. }

In this file we write the configuration that we will use in the connector code.

Parameters in the configuration:

  1. “name” - connector name, it should be like a connector name in the tb_gateway.yaml file. Uses by the gateway to find correct connector for saved devices.
  2. “devices” - array with devices configuration (We can provide more that one device.)

In the “devices” array configuration file has devices json objects with configuration parameters for this device.

Parameters in device object:

  1. “name” - name of the device for ThingsBoard instance.
  2. “port” - port for the device.
  3. “baudrate” - port baudrate for connection to device.
    Notate: You can also use parameters from a configuration for serial port such as parity, stop bits, etc.
    You can read more about serial port parameters here.
  4. “converter” - class name of converter that we will use for the serial connector.
  5. “telemetry” - objects array, with a configuration for processing data from device, data processed with configuration in this section will be interpreted as device telemetries.
  6. “attributes” - objects array, with a configuration for processing data from device, data processed with configuration in this section will be interpreted as device attributes.
  7. “attributesUpdates” - objects array with a configuration for processing attribute update request from ThingsBoard.

Step 2. Locate extensions folder

Connector file should being placed in extensions folder that depends on type of installation:
If you install the gateway as daemon:

  1. /var/lib/thingsboard_gateway/extensions

For installation using pip:

Installation command Path Description
sudo pip3 install thingsboard-gateway /usr/lib/python3/site-packages/thingsboard_gateway/extensions Package installed on system layer, for every user.
pip3 install thingsboard-gateway /usr/local/lib/python3/dist-packages/thingsboard-gateway Package installed only for current user.

Step 3. Define Connector Implementation

We need create a folder and file for our connector class.
We have a folder “serial” in extensions folder and file “custom_serial_connector.py”. After this, we write connector class in the connector file and override some methods of parent class. List of methods.

Example of custom connector file. Press to expand.

  1. """Import libraries"""
  2. import serial
  3. import time
  4. from threading import Thread
  5. from random import choice
  6. from string import ascii_lowercase
  7. from thingsboard_gateway.connectors.connector import Connector, log # Import base class for connector and logger
  8. from thingsboard_gateway.tb_utility.tb_utility import TBUtility
  9. class CustomSerialConnector(Thread, Connector): # Define a connector class, it should inherit from "Connector" class.
  10. def __init__(self, gateway, config, connector_type):
  11. super().__init__() # Initialize parents classes
  12. self.statistics = {'MessagesReceived': 0,
  13. 'MessagesSent': 0} # Dictionary, will save information about count received and sent messages.
  14. self.__config = config # Save configuration from the configuration file.
  15. self.__gateway = gateway # Save gateway object, we will use some gateway methods for adding devices and saving data from them.
  16. self.__connector_type = connector_type # Saving type for connector, need for loading converter
  17. self.setName(self.__config.get("name",
  18. "Custom %s connector " % self.get_name() + ''.join(choice(ascii_lowercase) for _ in range(5)))) # get from the configuration or create name for logs.
  19. log.info("Starting Custom %s connector", self.get_name()) # Send message to logger
  20. self.daemon = True # Set self thread as daemon
  21. self.stopped = True # Service variable for check state
  22. self.connected = False # Service variable for check connection to device
  23. self.devices = {} # Dictionary with devices, will contain devices configurations, converters for devices and serial port objects
  24. self.load_converters() # Call function to load converters and save it into devices dictionary
  25. self.__connect_to_devices() # Call function for connect to devices
  26. log.info('Custom connector %s initialization success.', self.get_name()) # Message to logger
  27. log.info("Devices in configuration file found: %s ", '\n'.join(device for device in self.devices)) # Message to logger
  28. def __connect_to_devices(self): # Function for opening connection and connecting to devices
  29. for device in self.devices:
  30. try: # Start error handler
  31. connection_start = time.time()
  32. if self.devices[device].get("serial") is None \
  33. or self.devices[device]["serial"] is None \
  34. or not self.devices[device]["serial"].isOpen(): # Connect only if serial not available earlier or it is closed.
  35. self.devices[device]["serial"] = None
  36. while self.devices[device]["serial"] is None or not self.devices[device]["serial"].isOpen(): # Try connect
  37. '''connection to serial port with parameters from configuration file or default'''
  38. self.devices[device]["serial"] = serial.Serial(
  39. port=self.__config.get('port', '/dev/ttyUSB0'),
  40. baudrate=self.__config.get('baudrate', 9600),
  41. bytesize=self.__config.get('bytesize', serial.EIGHTBITS),
  42. parity=self.__config.get('parity', serial.PARITY_NONE),
  43. stopbits=self.__config.get('stopbits', serial.STOPBITS_ONE),
  44. timeout=self.__config.get('timeout', 1),
  45. xonxoff=self.__config.get('xonxoff', False),
  46. rtscts=self.__config.get('rtscts', False),
  47. write_timeout=self.__config.get('write_timeout', None),
  48. dsrdtr=self.__config.get('dsrdtr', False),
  49. inter_byte_timeout=self.__config.get('inter_byte_timeout', None),
  50. exclusive=self.__config.get('exclusive', None)
  51. )
  52. time.sleep(.1)
  53. if time.time() - connection_start > 10: # Break connection try if it setting up for 10 seconds
  54. log.error("Connection refused per timeout for device %s", self.devices[device]["device_config"].get("name"))
  55. break
  56. except serial.serialutil.SerialException:
  57. log.error("Port %s for device %s - not found", self.__config.get('port', '/dev/ttyUSB0'), device)
  58. time.sleep(10)
  59. except Exception as e:
  60. log.exception(e)
  61. time.sleep(10)
  62. else: # if no exception handled - add device and change connection state
  63. self.__gateway.add_device(self.devices[device]["device_config"]["name"], {"connector": self})
  64. self.connected = True
  65. def open(self): # Function called by gateway on start
  66. self.stopped = False
  67. self.start()
  68. def get_name(self): # Function used for logging, sending data and statistic
  69. return self.name
  70. def is_connected(self): # Function for checking connection state
  71. return self.connected
  72. def load_converters(self): # Function for search a converter and save it.
  73. devices_config = self.__config.get('devices')
  74. try:
  75. if devices_config is not None:
  76. for device_config in devices_config:
  77. if device_config.get('converter') is not None:
  78. converter = TBUtility.check_and_import(self.__connector_type, device_config['converter'])
  79. self.devices[device_config['name']] = {'converter': converter(device_config),
  80. 'device_config': device_config}
  81. else:
  82. log.error('Converter configuration for the custom connector %s -- not found, please check your configuration file.', self.get_name())
  83. else:
  84. log.error('Section "devices" in the configuration not found. A custom connector %s has being stopped.', self.get_name())
  85. self.close()
  86. except Exception as e:
  87. log.exception(e)
  88. def run(self): # Main loop of thread
  89. try:
  90. while True:
  91. for device in self.devices:
  92. serial = self.devices[device]["serial"]
  93. ch = b''
  94. data_from_device = b''
  95. while ch != b'\n':
  96. try:
  97. try:
  98. ch = serial.read(1) # Reading data from serial
  99. except AttributeError as e:
  100. if serial is None:
  101. self.__connect_to_devices() # if port not found - try to connect to it
  102. raise e
  103. data_from_device = data_from_device + ch
  104. except Exception as e:
  105. log.exception(e)
  106. break
  107. try:
  108. converted_data = self.devices[device]['converter'].convert(self.devices[device]['device_config'], data_from_device)
  109. self.__gateway.send_to_storage(self.get_name(), converted_data)
  110. time.sleep(.1)
  111. except Exception as e:
  112. log.exception(e)
  113. self.close()
  114. raise e
  115. if not self.connected:
  116. break
  117. except Exception as e:
  118. log.exception(e)
  119. def close(self): # Close connect function, usually used if exception handled in gateway main loop or in connector main loop
  120. self.stopped = True
  121. for device in self.devices:
  122. self.__gateway.del_device(self.devices[device])
  123. if self.devices[device]['serial'].isOpen():
  124. self.devices[device]['serial'].close()
  125. def on_attributes_update(self, content): # Function used for processing attribute update requests from ThingsBoard
  126. log.debug(content)
  127. if self.devices.get(content["device"]) is not None: # checking - is device in configuration?
  128. device_config = self.devices[content["device"]].get("device_config")
  129. if device_config is not None:
  130. log.debug(device_config)
  131. if device_config.get("attributeUpdates") is not None:
  132. requests = device_config["attributeUpdates"] # getting configuration for attribute requests
  133. for request in requests:
  134. attribute = request.get("attributeOnThingsBoard")
  135. log.debug(attribute)
  136. if attribute is not None and attribute in content["data"]:
  137. try:
  138. value = content["data"][attribute] # get value from content
  139. str_to_send = str(request["stringToDevice"].replace("${" + attribute + "}", str(value))).encode("UTF-8") # form a string to send to device
  140. self.devices[content["device"]]["serial"].write(str_to_send) # send string to device
  141. log.debug("Attribute update request to device %s : %s", content["device"], str_to_send)
  142. time.sleep(.01)
  143. except Exception as e:
  144. log.exception(e)
  145. def server_side_rpc_handler(self, content):
  146. pass

Step 4. Define Converter Implementation

The purpose of the converter is to convert data from devices to the ThingsBoard format. Converters written in Python language. We should create a custom converter file “custom_serial_converter.py” in the extension folder, you can find extension folder location in Step 2

Example of custom converter file. Press to expand.

  1. from thingsboard_gateway.connectors.converter import Converter, log # Import base class for the converter and log ("converter.log" in logs directory).
  2. class CustomSerialUplinkConverter(Converter): # Definition of class.
  3. def __init__(self, config): # Initialization method
  4. self.__config = config # Saving configuration to object variable
  5. self.result_dict = {
  6. 'deviceName': config.get('name', 'CustomSerialDevice'),
  7. 'deviceType': config.get('deviceType', 'default'),
  8. 'attributes': [],
  9. 'telemetry': []
  10. } # template for a result dictionary.
  11. def convert(self, config, data: bytes): # Method for conversion data from device format to ThingsBoard format.
  12. keys = ['attributes', 'telemetry'] # Array used for looking data for data processing.
  13. for key in keys: # Data processing loop for parameters in keys array.
  14. self.result_dict[key] = [] # Clean old data.
  15. if self.__config.get(key) is not None: # Checking the parameter from the keys in the config.
  16. for config_object in self.__config.get(key): # The loop for checking whether there is data that interests us.
  17. data_to_convert = data # data for conversion.
  18. if config_object.get('untilDelimiter') is not None: # Checking some parameter from configuration file.
  19. data_to_convert = data.split(config_object.get('untilDelimiter').encode('UTF-8'))[0] # if "utilDelimiter" parameter in configuration file - get data from incoming data to delimiter position in received string.
  20. if config_object.get('fromDelimiter') is not None: # Checking some parameter from configuration file.
  21. data_to_convert = data.split(config_object.get('fromDelimiter').encode('UTF-8'))[1] # if "fromDelimiter" parameter in configuration file - get data from incoming data from delimiter position in received string.
  22. if config_object.get('toByte') is not None: # Checking some parameter from configuration file.
  23. to_byte = config_object.get('toByte') # # if "toByte" parameter in configuration file - get data from incoming data to byte number from a parameter "toByte" in configuration file.
  24. if to_byte == -1: # Checking some parameter from configuration file.
  25. to_byte = len(data) - 1 # If parameter == -1 - we will take data to the end.
  26. data_to_convert = data_to_convert[:to_byte] # saving data to variable for sending
  27. if config_object.get('fromByte') is not None: # Checking some parameter from configuration file
  28. from_byte = config_object.get('fromByte') # if "fromByte" parameter in configuration file - get data from incoming data from byte number from a parameter "fromByte" in configuration file.
  29. data_to_convert = data_to_convert[from_byte:] # saving data to variable for sending.
  30. converted_data = {config_object['key']: data_to_convert.decode('UTF-8')} # Adding data from temporary variable to result string.
  31. self.result_dict[key].append(converted_data) # Append result string to result dictionary.
  32. return self.result_dict # returning result dictionary after all iterations.

After processing 48\r2430947595\n we receive following dictionary:

  1. {
  2. "deviceName": "CustomSerialDevice1",
  3. "deviceType": "default",
  4. "attributes": [{"SerialNumber": "2430947595"}],
  5. "telemetry": [{"humidity":48}]
  6. }

This dictionary will be converted into json and gateway will send it to ThingsBoard instance.

Step 5. Include Connector into main Gateway configuration file

To add the serial connector to the gateway, we need add following lines into section connectors tb_gateway.yaml file.

  1. -
  2. name: Custom Serial Connector
  3. type: serial
  4. configuration: custom_serial.json
  5. class: CustomSerialConnector

where: name - connector name type - folder name in extensions, with connector file configuration - connector configuration file in folder with tbgateway.yaml file _class - connector class name in connector file in extensions

Step 6. Run the IoT gateway

To run the gateway you should execute following command, it depends on type of installation:

  • If you install the IoT gateway as daemon, you should restart it with following command to apply changes to the configuration:

    1. sudo systemctl restart thingsboard-gateway
  • If you install the IoT gateway as Python module, you should run it from the folder with tb_gateway.yaml (or change path to the tb_gateway.yaml file) with the following command to apply changes to the configuration:

    1. sudo python3 -c 'from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService; TBGatewayService("./tb_gateway.yaml")'

    You can check a status of the IoT Gateway by watch the logs in a folder that you provide in logs.conf file.
    Default location of logs folder for the daemon - “/var/log/thingsboard-gateway/”
    Default location of logs folder for Python module - “./logs/”

    Step 6. Check a result in the ThingsBoard instance

    To check the result, you should connect device, and go to the ThingsBoard UI into “Devices” tab.
    If device connected correctly and has sent some data, you will see device with a name - “CustomSerialDevice1”.
    To check the data - open device and go to the telemetry tab. You should see the telemetry from config (humidity) with some value 48 (Value from example, your value can be different).

Custom connector methods reference

You should implement following methods:
init – called on creating object (In example used for loading converters, saving data from configs to object variables and creating serial ports objects).
open – called on start connection to device with connector.
get_name – called to recieve name of connector.
is_connected – called to check the connection to devices.
run – Main method of thread, must contain an infinite loop and all calls to data receiving/processing functions.
close – method, that has being called when gateway stops and should contain processing of closing connection/ports etc.
on_attributes_update – gateway call it when receives AttributeUpdates request from ThingsBoard server to device with this connector.
server_side_rpc – gateway call it when receives ServerRpcRequest from ThingsBoard server.

init method

Parameters:

  1. def __init__(self, gateway, config, connector_type):

self – current object
gateway – gateway object (will being used for saving data)
config – dictionary with data from connector configuration file
connector_type – type of connector(Need for load converters for this connector type, from tb_gateway.yaml)
In example above, we used this method to initialize data with which we will work.

__connect_to_devices method
  1. def __connect_to_devices(self):

self – current object
Service method, used for connection to devices.

get_name method
  1. def get_name(self): # Function used for logging, sending data and statistic

self – current object
Method to get connector name.

is_connected method
  1. def is_connected(self): # Function for checking connection state

self – current object
Method for check current connection state.

load_converters method
  1. def load_converters(self): # Function for search a converter and save it.

self – current object
Method for loading converters for devices.

run method

Method from threading module, that being called after initializing of gateway.
Parameters:

  1. def run(self):

self – current object.
In example above we use this method for connection to devices, read data from them and run converter

close method

Method is being called when gateway stopping or catch a fatal exception in a main loop of gateway.

  1. def close(self):

self – current object.

on_attributes_update method

Method is being called when gateway receive AttributeUpdates request from ThingsBoard.

  1. def on_attributes_update_method(self, content):

self – current object.
content – dictionary with data from ThingsBoard server.
Example of content:

  1. {"device": "CustomSerialDevice1", "data": {"attr1": 25}}

If configuration in section attributesUpdates like following, connector will send string “value = 25\n” to device.

  1. "attributeUpdates": [
  2. {
  3. "attributeOnThingsBoard": "attr1",
  4. "stringToDevice": "value = ${attr1}\n"
  5. }
  6. ]

server_side_rpc_handler

Method is being called when gateway receive AttributeUpdates request from ThingsBoard.

  1. def server_side_rpc_handler(self, content):

self – current object.
content – dictionary with data from ThingsBoard server.
Example of content:

  1. {"device": "CustomSerialDevice1", "data": {"id": 1, "method": "toggle_gpio", "params": {"pin":1}}}

There are 2 types of rpc requests processing available - with response and without it.
After processing request you should just use following gateway method:

  1. self.__gateway.send_rpc_reply(device, req_id, content)

Where:
device - String with device name.
req_id - Id of RPC request from ThingsBoard
content - depends on type of rpc:

  • If without response:

    1. content = {"success": True}
  • If with response in content should be any dictionary with content that you want send to ThingsBoard as response.

    Custom converter methods reference

    You should implement following methods:
    init – called on creating object.
    convert – Method for conversion data from device format to ThingsBoard data format.

    init method

    Parameters:

    1. def __init__(self, config):

    self – current object.
    config – dictionary with data from connector configuration file.
    In the example used to save the converter configuration and create a template for the result’s dictionary with ThingsBoard data format

    convert method

    Method for conversion data from device format to ThingsBoard data format.
    Parameters:

    1. def convert(self, config, data):

    self – current object.
    config – configuration section for this device from connector configuration file.
    data – data from a device.
    This function should return dictionary in format like following:

    1. {
    2. "deviceName": "DEVICE_NAME",
    3. "deviceType": "DEVICE_TYPE",
    4. "attributes": [
    5. {"SOME_ATTRIBUTE_KEY":"SOME_ATTRIBUTE_VALUE"},
    6. {"SOME_ATTRIBUTE_KEY1":"SOME_ATTRIBUTE_VALUE1"}
    7. ],
    8. "telemetry": [
    9. {"SOME_TELEMETRY_KEY": "SOME_TELEMETRY_VALUE"},
    10. {"SOME_TELEMETRY_KEY1": "SOME_TELEMETRY_VALUE1"},
    11. {"SOME_TELEMETRY_KEY2": "SOME_TELEMETRY_VALUE2"}
    12. ]
    13. }

    Gateway will convert this data into json and send it to the ThingsBoard instance.