连接到系统总线
busbus = dbus.SystemBus()
获得目前机器上有的适配器,改了一个函数,把busbus传进去,会返回一个列表,里面每个都是一个对象,他的顺序是倒着的
dbus 可以导出对象供其他应用程序使用,要与远程对象进行交互需要使用代理对象get_object获取代理对象
dbus 的接口是一组相关的方法和信号,使用 dbus.Interface(远程对象, 接口名称) 获得
GetManagedObjects() 可以获得所有对象和属性

  1. def find_adapter(bus):
  2. remote_om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
  3. objects = remote_om.GetManagedObjects()
  4. adapters = []
  5. for o, props in objects.items():
  6. if "org.bluez.GattManager1" in props.keys():
  7. adapters.append(o)
  8. print(adapters)
  9. if len(adapters) > 0:
  10. return adapters
  11. return None

image.png
这样就把适配器选出来了

  1. adapter_obj = bus.get_object("org.bluez", adapter)
  2. adapter_props = dbus.Interface(adapter_obj, "org.freedesktop.DBus.Properties")
  3. adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

这里就涉及到 bluez 的 api 了,在 adapter-api 处定义了
#3 这个是打开适配器,效果应该是hciconfig hci0 up
image.png

Advertising

开头附 bluez 定义好的广播类

  1. class Advertisement(dbus.service.Object):
  2. PATH_BASE = "/org/bluez/example/advertisement"
  3. def __init__(self, bus, index, advertising_type):
  4. self.path = self.PATH_BASE + str(index)
  5. self.bus = bus
  6. self.ad_type = advertising_type
  7. self.service_uuids = None
  8. self.manufacturer_data = None
  9. self.solicit_uuids = None
  10. self.service_data = None
  11. self.local_name = None
  12. self.include_tx_power = None
  13. self.data = None
  14. dbus.service.Object.__init__(self, bus, self.path)
  15. def get_properties(self):
  16. properties = dict()
  17. properties["Type"] = self.ad_type
  18. if self.service_uuids is not None:
  19. properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, signature="s")
  20. if self.solicit_uuids is not None:
  21. properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, signature="s")
  22. if self.manufacturer_data is not None:
  23. properties["ManufacturerData"] = dbus.Dictionary(
  24. self.manufacturer_data, signature="qv"
  25. )
  26. if self.service_data is not None:
  27. properties["ServiceData"] = dbus.Dictionary(
  28. self.service_data, signature="sv"
  29. )
  30. if self.local_name is not None:
  31. properties["LocalName"] = dbus.String(self.local_name)
  32. if self.include_tx_power is not None:
  33. properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)
  34. if self.data is not None:
  35. properties["Data"] = dbus.Dictionary(self.data, signature="yv")
  36. return {LE_ADVERTISEMENT_IFACE: properties}
  37. def get_path(self):
  38. return dbus.ObjectPath(self.path)
  39. def add_service_uuid(self, uuid):
  40. if not self.service_uuids:
  41. self.service_uuids = []
  42. self.service_uuids.append(uuid)
  43. def add_solicit_uuid(self, uuid):
  44. if not self.solicit_uuids:
  45. self.solicit_uuids = []
  46. self.solicit_uuids.append(uuid)
  47. def add_manufacturer_data(self, manuf_code, data):
  48. if not self.manufacturer_data:
  49. self.manufacturer_data = dbus.Dictionary({}, signature="qv")
  50. self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y")
  51. def add_service_data(self, uuid, data):
  52. if not self.service_data:
  53. self.service_data = dbus.Dictionary({}, signature="sv")
  54. self.service_data[uuid] = dbus.Array(data, signature="y")
  55. def add_local_name(self, name):
  56. if not self.local_name:
  57. self.local_name = ""
  58. self.local_name = dbus.String(name)
  59. def add_data(self, ad_type, data):
  60. if not self.data:
  61. self.data = dbus.Dictionary({}, signature="yv")
  62. self.data[ad_type] = dbus.Array(data, signature="y")
  63. @dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
  64. def GetAll(self, interface):
  65. logger.info("GetAll")
  66. if interface != LE_ADVERTISEMENT_IFACE:
  67. raise InvalidArgsException()
  68. logger.info("returning props")
  69. return self.get_properties()[LE_ADVERTISEMENT_IFACE]
  70. @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature="", out_signature="")
  71. def Release(self):
  72. logger.info("%s: Released!" % self.path)

获取到 Advertising 的管理的接口

  1. ad_manager = dbus.Interface(adapter_obj, "org.bluez.LEAdvertisingManager1")

然后先使用已经定义好的广播类,去创建一个广播对象

  1. advertisement = Advertisement(bus, 0, "peripheral")
  2. advertisement.add_manufacturer_data(0x038f, [0x11, 0x22],) #这里下面就是设置广播数据了
  3. advertisement.add_service_uuid("9fdc9c81-fffe-51a1-e511-5a38c414c2f9")
  4. advertisement.add_local_name("BLUE-F045DAF3F9CC")
  5. advertisement.include_tx_power = True

然后 RegisterAdvertisement 注册和使能广播接口,需要告诉 bluez 我们的 advertisement 在哪里,并且设置应答或错误处理函数

  1. mainloop = MainLoop()
  2. ad_manager.RegisterAdvertisement(advertisement.get_path(),{},reply_handler=register_ad_cb,error_handler=register_ad_error_cb,)
  3. mainloop.run()

成功克隆出一个可以欺骗 APP 的广播包(他检测的是 service_uuid)
image.png
image.png

GATT

首先注册 GATT 的管理接口

  1. service_manager = dbus.Interface(adapter_obj, "org.bluez.GattManager1")

定义一个 application,定义一个 service,给 service 添加 characteristic 然后把 service 给 application

  1. app = Application(bus)
  2. service = Service(bus, 2, "12634d89-d598-4874-8e86-7d042ee07ba7", True)
  3. service.add_characteristic(TestCharacteristic(bus, 0, "12634d89-d598-4874-8e86-7d042ee07ba7", ["encrypt-read", "encrypt-write"], service))
  4. app.add_service(service)

为了方便,直接定义了一个 TestCharacteristic 类,继承自官方示例中的 Characteristic 类

  1. class TestCharacteristic(Characteristic):
  2. def __init__(self, bus, index, uuid, flags, service):
  3. Characteristic.__init__(
  4. self, bus, index, uuid, flags, service,
  5. )
  6. self.value = [0xFF]
  7. def ReadValue(self, options):
  8. return self.value
  9. def WriteValue(self, value, options):
  10. self.value = value

最后注册 GATT,这样就可以成功识别到特性了,读写这类的属性也可以显示出来

  1. service_manager.RegisterApplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=[register_app_error_cb],)

image.png
接下来解决数据收发的问题

问题备忘

如何检测外设是否可连接(connectable)
广播报文如何定义可连接与不可连接
gatt 如何设置不配对仅连接,以及如何设置各种配对的 IO 能力

参考:https://github.com/PunchThrough/espresso-ble