python-bluezero icon indicating copy to clipboard operation
python-bluezero copied to clipboard

Wait notification functionality feature

Open vovagorodok opened this issue 1 year ago • 2 comments

Will be nice to have functionality like tx_char.wait_for_notify() in order to have possibility to write simple scripts of serial communication that works like:

tx_char.wait_for_notify()
value = tx_char.value

I have this issue in my ArduinoBleOTA library for firmware uploading over ble: https://github.com/vovagorodok/ArduinoBleOTA/blob/7fb9e079cf54e02b0701d117754e14ebe5db34cf/tools/bluezero_uploader.py#L146

vovagorodok avatar Feb 04 '24 11:02 vovagorodok

Apologies for the slow response on this.

I see you have been offered a way forward with the Bleak library https://github.com/hbldh/bleak/issues/1501#issuecomment-1925807205 so is this still of interest?

I thinking you are looking for a solution that doesn't use an event loop and blocks until the notification happens.

I've put the following quick example together

from dataclasses import dataclass
from typing import Optional
from gi.repository import GLib
from bluezero.central import Central

adapter_addr = "xx:xx:xx:xx:xx:xx"
peripheral_addr = "xx:xx:xx:xx:xx:xx"
batt_srvc_uuid = "0000180f-0000-1000-8000-00805f9b34fb"
batt_char_uuid = "00002a19-0000-1000-8000-00805f9b34fb"


@dataclass
class Battery:
    battery_value: Optional[int] = None
    notification_happened: bool = False

    def on_battery_value_changed(self, iface, changed_props, invalidated_props):
        print(iface, changed_props, invalidated_props)
        if "Value" in changed_props:
            self.battery_value = int.from_bytes(changed_props["Value"], "little")
            self.notification_happened = True

    def wait_on_notification(self):
        main_context = GLib.MainContext.default()
        self.notification_happened = False
        while not self.notification_happened:
            main_context.iteration(False)
        return self.battery_value


def main():
    batt_level = Battery()

    batt_device = Central(adapter_addr=adapter_addr, device_addr=peripheral_addr)

    batt_characteristic = batt_device.add_characteristic(batt_srvc_uuid, batt_char_uuid)

    batt_device.connect()
    batt_characteristic.add_characteristic_cb(batt_level.on_battery_value_changed)
    batt_characteristic.start_notify()

    returned_value = batt_level.wait_on_notification()
    batt_characteristic.stop_notify()
    batt_device.disconnect()
    print(f"Battery value is: {returned_value}")


if __name__ == "__main__":
    main()

I probably need to think more about this (especially if it was to be added to the library), but it might give you a way forward to experiment and report back if it is what you are looking for.

I did this experiment with the BLE Peripheral Simulator

ukBaz avatar Feb 07 '24 08:02 ukBaz

I see you have been offered a way forward with the Bleak library https://github.com/hbldh/bleak/issues/1501#issuecomment-1925807205 so is this still of interest?

Yes, I have provided two uploader scripts in order to have user possibility to chose one:

  • bluezero: https://github.com/vovagorodok/ArduinoBleOTA/blob/main/tools/bluezero_uploader.py
  • bleak: https://github.com/vovagorodok/ArduinoBleOTA/blob/main/tools/uploader.py

Bleak is by default because supports other platform than Linux.

I thinking you are looking for a solution that doesn't use an event loop and blocks until the notification happens.

Yes

I've put the following quick example together

Added:

class NotifiedReader:
    _value = None

    def callback(self, iface, changed_props, invalidated_props):
        print(changed_props)
        if "Value" in changed_props:
            self._value = changed_props['Value']

    def read_value(self):
        main_context = gi.repository.GLib.MainContext.default()
        while self._value is None:
            main_context.iteration(True)
        value = self._value
        self._value = None
        return value

...


    reader = NotifiedReader()
    tx_char.add_characteristic_cb(reader.callback)

    rx_char.value = int_to_u8_bytes(BEGIN) + int_to_u32_bytes(firmware_len)
    begin_resp = handleBeginResponse(reader.read_value())

But callback was not called first time and we stuck on reader.read_value(). When reader.read_value() was changed to tx_char.value than callbeck was called, but we stuck on the next reader.read_value()

I probably need to think more about this (especially if it was to be added to the library), but it might give you a way forward to experiment and report back if it is what you are looking for.

Will be nice to have such functionality in library in future if it's possible

vovagorodok avatar Mar 14 '24 13:03 vovagorodok