连接到系统总线busbus = dbus.SystemBus()
获得目前机器上有的适配器,改了一个函数,把busbus传进去,会返回一个列表,里面每个都是一个对象,他的顺序是倒着的
dbus 可以导出对象供其他应用程序使用,要与远程对象进行交互需要使用代理对象get_object获取代理对象
dbus 的接口是一组相关的方法和信号,使用 dbus.Interface(远程对象, 接口名称) 获得
GetManagedObjects() 可以获得所有对象和属性
def find_adapter(bus):remote_om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")objects = remote_om.GetManagedObjects()adapters = []for o, props in objects.items():if "org.bluez.GattManager1" in props.keys():adapters.append(o)print(adapters)if len(adapters) > 0:return adaptersreturn None

这样就把适配器选出来了
adapter_obj = bus.get_object("org.bluez", adapter)adapter_props = dbus.Interface(adapter_obj, "org.freedesktop.DBus.Properties")adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
这里就涉及到 bluez 的 api 了,在 adapter-api 处定义了
#3 这个是打开适配器,效果应该是hciconfig hci0 up
Advertising
开头附 bluez 定义好的广播类
class Advertisement(dbus.service.Object):PATH_BASE = "/org/bluez/example/advertisement"def __init__(self, bus, index, advertising_type):self.path = self.PATH_BASE + str(index)self.bus = busself.ad_type = advertising_typeself.service_uuids = Noneself.manufacturer_data = Noneself.solicit_uuids = Noneself.service_data = Noneself.local_name = Noneself.include_tx_power = Noneself.data = Nonedbus.service.Object.__init__(self, bus, self.path)def get_properties(self):properties = dict()properties["Type"] = self.ad_typeif self.service_uuids is not None:properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, signature="s")if self.solicit_uuids is not None:properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, signature="s")if self.manufacturer_data is not None:properties["ManufacturerData"] = dbus.Dictionary(self.manufacturer_data, signature="qv")if self.service_data is not None:properties["ServiceData"] = dbus.Dictionary(self.service_data, signature="sv")if self.local_name is not None:properties["LocalName"] = dbus.String(self.local_name)if self.include_tx_power is not None:properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)if self.data is not None:properties["Data"] = dbus.Dictionary(self.data, signature="yv")return {LE_ADVERTISEMENT_IFACE: properties}def get_path(self):return dbus.ObjectPath(self.path)def add_service_uuid(self, uuid):if not self.service_uuids:self.service_uuids = []self.service_uuids.append(uuid)def add_solicit_uuid(self, uuid):if not self.solicit_uuids:self.solicit_uuids = []self.solicit_uuids.append(uuid)def add_manufacturer_data(self, manuf_code, data):if not self.manufacturer_data:self.manufacturer_data = dbus.Dictionary({}, signature="qv")self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y")def add_service_data(self, uuid, data):if not self.service_data:self.service_data = dbus.Dictionary({}, signature="sv")self.service_data[uuid] = dbus.Array(data, signature="y")def add_local_name(self, name):if not self.local_name:self.local_name = ""self.local_name = dbus.String(name)def add_data(self, ad_type, data):if not self.data:self.data = dbus.Dictionary({}, signature="yv")self.data[ad_type] = dbus.Array(data, signature="y")@dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")def GetAll(self, interface):logger.info("GetAll")if interface != LE_ADVERTISEMENT_IFACE:raise InvalidArgsException()logger.info("returning props")return self.get_properties()[LE_ADVERTISEMENT_IFACE]@dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature="", out_signature="")def Release(self):logger.info("%s: Released!" % self.path)
获取到 Advertising 的管理的接口
ad_manager = dbus.Interface(adapter_obj, "org.bluez.LEAdvertisingManager1")
然后先使用已经定义好的广播类,去创建一个广播对象
advertisement = Advertisement(bus, 0, "peripheral")advertisement.add_manufacturer_data(0x038f, [0x11, 0x22],) #这里下面就是设置广播数据了advertisement.add_service_uuid("9fdc9c81-fffe-51a1-e511-5a38c414c2f9")advertisement.add_local_name("BLUE-F045DAF3F9CC")advertisement.include_tx_power = True
然后 RegisterAdvertisement 注册和使能广播接口,需要告诉 bluez 我们的 advertisement 在哪里,并且设置应答或错误处理函数
mainloop = MainLoop()ad_manager.RegisterAdvertisement(advertisement.get_path(),{},reply_handler=register_ad_cb,error_handler=register_ad_error_cb,)mainloop.run()
成功克隆出一个可以欺骗 APP 的广播包(他检测的是 service_uuid)
GATT
首先注册 GATT 的管理接口
service_manager = dbus.Interface(adapter_obj, "org.bluez.GattManager1")
定义一个 application,定义一个 service,给 service 添加 characteristic 然后把 service 给 application
app = Application(bus)service = Service(bus, 2, "12634d89-d598-4874-8e86-7d042ee07ba7", True)service.add_characteristic(TestCharacteristic(bus, 0, "12634d89-d598-4874-8e86-7d042ee07ba7", ["encrypt-read", "encrypt-write"], service))app.add_service(service)
为了方便,直接定义了一个 TestCharacteristic 类,继承自官方示例中的 Characteristic 类
class TestCharacteristic(Characteristic):def __init__(self, bus, index, uuid, flags, service):Characteristic.__init__(self, bus, index, uuid, flags, service,)self.value = [0xFF]def ReadValue(self, options):return self.valuedef WriteValue(self, value, options):self.value = value
最后注册 GATT,这样就可以成功识别到特性了,读写这类的属性也可以显示出来
service_manager.RegisterApplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=[register_app_error_cb],)

接下来解决数据收发的问题
问题备忘
如何检测外设是否可连接(connectable)
广播报文如何定义可连接与不可连接
gatt 如何设置不配对仅连接,以及如何设置各种配对的 IO 能力
