pb_ble.bluezdbus

BlueZ-backed implementations of BLE observer and BLE broadcaster roles.

High level API:

Generic D-Bus bindings:

The D-Bus bindings are implemented with the dbus-fast library.

 1"""
 2BlueZ-backed implementations of BLE observer and BLE broadcaster roles.
 3
 4High level API:
 5
 6- `BlueZBroadcaster`: A generic BLE broadcaster.
 7- `BlueZPybricksObserver`: A specialised BLE observer for Pybricks messages.
 8
 9Generic D-Bus bindings:
10
11- `LEAdvertisement`: Service implementation of the [org.bluez.LEAdvertisement1][]
12    D-Bus interface and the following specialisations:
13  - `BroadcastAdvertisement`: A BLE broadcast advertisement.
14  - `PybricksBroadcastAdvertisement`: A Pybricks broadcast advertisement.
15- `LEAdvertisingManager`: Client implementation of the [org.bluez.LEAdvertisingManager1][] D-Bus interface.
16
17The D-Bus bindings are implemented with the [dbus-fast][] library.
18
19[org.bluez.LEAdvertisement1]:https://github.com/bluez/bluez/blob/5.75/doc/org.bluez.LEAdvertisement.rst
20[org.bluez.LEAdvertisingManager1]:https://github.com/bluez/bluez/blob/5.75/doc/org.bluez.LEAdvertisingManager.rst
21[dbus-fast]:https://github.com/Bluetooth-Devices/dbus-fast
22"""
23
24from .adapters import get_adapter, get_adapter_details
25from .advertisement import (
26    BroadcastAdvertisement,
27    Capability,
28    Feature,
29    Include,
30    LEAdvertisement,
31    LEAdvertisingManager,
32    PybricksBroadcastAdvertisement,
33    SecondaryChannel,
34    Type,
35)
36from .broadcaster import BlueZBroadcaster
37from .observer import BlueZPybricksObserver, ObservedAdvertisement
38
39__all__ = (
40    # High level objects
41    "BlueZBroadcaster",
42    "BlueZPybricksObserver",
43    "ObservedAdvertisement",
44    # D-Bus bindings
45    "LEAdvertisement",
46    "BroadcastAdvertisement",
47    "PybricksBroadcastAdvertisement",
48    "LEAdvertisingManager",
49    # D-Bus constants
50    "Type",
51    "Include",
52    "Capability",
53    "Feature",
54    "SecondaryChannel",
55)
class BlueZBroadcaster(contextlib.AbstractAsyncContextManager):
 24class BlueZBroadcaster(AbstractAsyncContextManager):
 25    """
 26    A BLE broadcaster backed by BlueZ.
 27    Supports multiple advertising sets in parallel.
 28
 29    The recommended use is as a context manager, which ensures that all
 30    registered broadcasts are stopped when exiting the context:
 31
 32    ```python
 33    bus = ...
 34    adapter = ...
 35    device_name = "my-computer"
 36
 37    async with BlueZBroadcaster(bus, adapter, device_name) as broadcaster:
 38        # Start broadcasting
 39        adv = BroadcastAdvertisement(device_name)
 40        await broadcaster.broadcast(adv)
 41        # Stop after 10 seconds
 42        await asyncio.sleep(10)
 43    ```
 44    """
 45
 46    def __init__(self, bus: MessageBus, adapter: ProxyObject, name: str):
 47        """
 48        Creates a new broadcaster.
 49
 50        :param bus: The message bus.
 51        :param adapter: The Bluetooth adapter.
 52        :param name: The name of this broadcaster.
 53        """
 54        self.bus: MessageBus = bus
 55        """The message bus used to connect to DBus."""
 56        self.adapter: ProxyObject = adapter
 57        """A DBus proxy object for the Bluetooth adapter to use for advertising."""
 58        self.name: str = name
 59        """The name of this broadcaster. Will be used as `local_name` in advertisements."""
 60        self.adv_manager: LEAdvertisingManager = LEAdvertisingManager(adapter)
 61        """The BlueZ advertising manager client."""
 62        self.path_namespace: str = f"/org/bluez/{self.name}"
 63        """Path prefix to use for DBus objects created by this broadcaster."""
 64        self.advertisements: dict[str, BroadcastAdvertisement] = {}
 65        """Active advertisements of this broadcaster."""
 66
 67    @overload
 68    async def stop_broadcast(self, adv: str): ...
 69    @overload
 70    async def stop_broadcast(self, adv: BroadcastAdvertisement): ...
 71    async def stop_broadcast(self, adv: str | BroadcastAdvertisement):
 72        """
 73        Stop broadcasting the given advertisement.
 74
 75        :param adv: The broadcast to stop. Takes either the D-Bus path of the
 76            advertisement, or a reference to the object.
 77        """
 78        path = adv.path if isinstance(adv, BroadcastAdvertisement) else adv
 79
 80        try:
 81            await self.adv_manager.unregister_advertisement(path)
 82        except DBusError:
 83            # Advertisement does not exist
 84            pass
 85        finally:
 86            self.bus.unexport(path)
 87            if path in self.advertisements:
 88                del self.advertisements[path]
 89
 90    async def stop(self):
 91        """
 92        Stops this broadcaster. Cleans up any active broadcasts.
 93        """
 94        await asyncio.gather(
 95            *[self.stop_broadcast(path) for path in self.advertisements.keys()]
 96        )
 97
 98    async def __aexit__(self, exc_type, exc, tb):
 99        await self.stop()
100
101    async def broadcast(self, adv: BroadcastAdvertisement):
102        """
103        Start broadcasting the given advertisement.
104
105        :param adv: The reference to the advertisement object.
106        :raises ValueError: If a D-Bus object already exists on the given path.
107        :raises DBusError: If the given advertisement is invalid, or is already
108            registered with BlueZ.
109        """
110        # TODO construct advertisement in here to ensure local_name
111        assert adv._local_name == self.name, f"{adv.name} != {self.name}"
112
113        # cleanup on release of advertisement
114        on_release = adv.on_release
115
116        def release_advertisement(path):
117            try:
118                self.bus.unexport(path)
119                if path in self.advertisements:
120                    del self.advertisements[path]
121            finally:
122                on_release(path)
123
124        adv.on_release = release_advertisement
125
126        log.info("Broadcasting: %s", adv)
127
128        try:
129            self.bus.export(adv.path, adv)
130        except ValueError:
131            # Already exported
132            raise
133
134        try:
135            await self.adv_manager.register_advertisement(adv)
136        except DBusError:
137            # org.bluez.Error.InvalidArguments
138            # org.bluez.Error.AlreadyExists
139            # org.bluez.Error.InvalidLength
140            # org.bluez.Error.NotPermitted
141            raise
142
143        self.advertisements[adv.path] = adv
144
145    def is_broadcasting(self, adv: BroadcastAdvertisement | None = None) -> bool:
146        """
147        Checks whether this broadcaster is active.
148
149        :param adv: The reference to the advertisement object to check,
150            defaults to None (check if any broadcast is active).
151        :return: True if the given (or any) broadcast is active.
152        """
153        if adv is not None:
154            return adv.path in self.advertisements
155        else:
156            return len(self.advertisements) > 0

A BLE broadcaster backed by BlueZ. Supports multiple advertising sets in parallel.

The recommended use is as a context manager, which ensures that all registered broadcasts are stopped when exiting the context:

bus = ...
adapter = ...
device_name = "my-computer"

async with BlueZBroadcaster(bus, adapter, device_name) as broadcaster:
    # Start broadcasting
    adv = BroadcastAdvertisement(device_name)
    await broadcaster.broadcast(adv)
    # Stop after 10 seconds
    await asyncio.sleep(10)
BlueZBroadcaster( bus: dbus_fast.aio.message_bus.MessageBus, adapter: dbus_fast.aio.proxy_object.ProxyObject, name: str)
46    def __init__(self, bus: MessageBus, adapter: ProxyObject, name: str):
47        """
48        Creates a new broadcaster.
49
50        :param bus: The message bus.
51        :param adapter: The Bluetooth adapter.
52        :param name: The name of this broadcaster.
53        """
54        self.bus: MessageBus = bus
55        """The message bus used to connect to DBus."""
56        self.adapter: ProxyObject = adapter
57        """A DBus proxy object for the Bluetooth adapter to use for advertising."""
58        self.name: str = name
59        """The name of this broadcaster. Will be used as `local_name` in advertisements."""
60        self.adv_manager: LEAdvertisingManager = LEAdvertisingManager(adapter)
61        """The BlueZ advertising manager client."""
62        self.path_namespace: str = f"/org/bluez/{self.name}"
63        """Path prefix to use for DBus objects created by this broadcaster."""
64        self.advertisements: dict[str, BroadcastAdvertisement] = {}
65        """Active advertisements of this broadcaster."""

Creates a new broadcaster.

Parameters
  • bus: The message bus.
  • adapter: The Bluetooth adapter.
  • name: The name of this broadcaster.
bus: dbus_fast.aio.message_bus.MessageBus

The message bus used to connect to DBus.

adapter: dbus_fast.aio.proxy_object.ProxyObject

A DBus proxy object for the Bluetooth adapter to use for advertising.

name: str

The name of this broadcaster. Will be used as local_name in advertisements.

adv_manager: LEAdvertisingManager

The BlueZ advertising manager client.

path_namespace: str

Path prefix to use for DBus objects created by this broadcaster.

advertisements: dict[str, BroadcastAdvertisement]

Active advertisements of this broadcaster.

async def stop_broadcast( self, adv: str | BroadcastAdvertisement):
71    async def stop_broadcast(self, adv: str | BroadcastAdvertisement):
72        """
73        Stop broadcasting the given advertisement.
74
75        :param adv: The broadcast to stop. Takes either the D-Bus path of the
76            advertisement, or a reference to the object.
77        """
78        path = adv.path if isinstance(adv, BroadcastAdvertisement) else adv
79
80        try:
81            await self.adv_manager.unregister_advertisement(path)
82        except DBusError:
83            # Advertisement does not exist
84            pass
85        finally:
86            self.bus.unexport(path)
87            if path in self.advertisements:
88                del self.advertisements[path]

Stop broadcasting the given advertisement.

Parameters
  • adv: The broadcast to stop. Takes either the D-Bus path of the advertisement, or a reference to the object.
async def stop(self):
90    async def stop(self):
91        """
92        Stops this broadcaster. Cleans up any active broadcasts.
93        """
94        await asyncio.gather(
95            *[self.stop_broadcast(path) for path in self.advertisements.keys()]
96        )

Stops this broadcaster. Cleans up any active broadcasts.

async def broadcast(self, adv: BroadcastAdvertisement):
101    async def broadcast(self, adv: BroadcastAdvertisement):
102        """
103        Start broadcasting the given advertisement.
104
105        :param adv: The reference to the advertisement object.
106        :raises ValueError: If a D-Bus object already exists on the given path.
107        :raises DBusError: If the given advertisement is invalid, or is already
108            registered with BlueZ.
109        """
110        # TODO construct advertisement in here to ensure local_name
111        assert adv._local_name == self.name, f"{adv.name} != {self.name}"
112
113        # cleanup on release of advertisement
114        on_release = adv.on_release
115
116        def release_advertisement(path):
117            try:
118                self.bus.unexport(path)
119                if path in self.advertisements:
120                    del self.advertisements[path]
121            finally:
122                on_release(path)
123
124        adv.on_release = release_advertisement
125
126        log.info("Broadcasting: %s", adv)
127
128        try:
129            self.bus.export(adv.path, adv)
130        except ValueError:
131            # Already exported
132            raise
133
134        try:
135            await self.adv_manager.register_advertisement(adv)
136        except DBusError:
137            # org.bluez.Error.InvalidArguments
138            # org.bluez.Error.AlreadyExists
139            # org.bluez.Error.InvalidLength
140            # org.bluez.Error.NotPermitted
141            raise
142
143        self.advertisements[adv.path] = adv

Start broadcasting the given advertisement.

Parameters
  • adv: The reference to the advertisement object.
Raises
  • ValueError: If a D-Bus object already exists on the given path.
  • DBusError: If the given advertisement is invalid, or is already registered with BlueZ.
def is_broadcasting( self, adv: BroadcastAdvertisement | None = None) -> bool:
145    def is_broadcasting(self, adv: BroadcastAdvertisement | None = None) -> bool:
146        """
147        Checks whether this broadcaster is active.
148
149        :param adv: The reference to the advertisement object to check,
150            defaults to None (check if any broadcast is active).
151        :return: True if the given (or any) broadcast is active.
152        """
153        if adv is not None:
154            return adv.path in self.advertisements
155        else:
156            return len(self.advertisements) > 0

Checks whether this broadcaster is active.

Parameters
  • adv: The reference to the advertisement object to check, defaults to None (check if any broadcast is active).
Returns

True if the given (or any) broadcast is active.

class BlueZPybricksObserver(contextlib.AbstractAsyncContextManager):
 38class BlueZPybricksObserver(AbstractAsyncContextManager):
 39    """
 40    A BLE observer backed by BlueZ.
 41    Keeps a cache of observed Pybricks messages.
 42
 43    The recommended use is as a context manager, which ensures that the
 44    underlying BLE scanner is stopped when exiting the context:
 45
 46    ```python
 47    async with BlueZPybricksObserver(channels=[1, 2, 3]) as observer:
 48        # Observe for 10 seconds
 49        await asyncio.sleep(10)
 50        # Check results for channel 2
 51        message = observer.observe(2)
 52        print(message)
 53    ```
 54    """
 55
 56    def __init__(
 57        self,
 58        adapter_name: str | None = None,
 59        scanning_mode: ScanningMode = "passive",
 60        channels: Sequence[int] | None = None,
 61        rssi_threshold: int | None = None,
 62        device_pattern: str | None = None,
 63        message_ttl: int = 60,
 64    ):
 65        """
 66        Creates a new observer.
 67
 68        :param adapter_name: The Bluetooth adapter to use for scanning,
 69            defaults to `None` (auto-discover default device).
 70        :param scanning_mode: The scanning mode to use, defaults to "passive".
 71        :param channels: Channels to observe, defaults to None (all channels).
 72        :param rssi_threshold: Minimum required signal strength of observed
 73            broadcasts in dBm, defaults to None (no RSSI filtering).
 74        :param device_pattern: Pattern that the device name of the sender must
 75            start with, defaults to `None` (no name filtering).
 76        :param message_ttl: Time in seconds to cache observed broadcasts for,
 77            defaults to 60.
 78        """
 79        self.channels = channels or []
 80        """List of channels that this observer is monitoring."""
 81
 82        for channel in self.channels:
 83            if (
 84                not isinstance(channel, int)
 85                or PYBRICKS_MIN_CHANNEL < channel > PYBRICKS_MAX_CHANNEL
 86            ):
 87                raise ValueError(
 88                    f"Observe channel must be list of integers from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
 89                )
 90
 91        self.rssi_threshold = rssi_threshold
 92        """The configured RSSI threshold for broadcasts."""
 93        self.device_pattern = device_pattern
 94        """The configured device name pattern match for broadcasts."""
 95        self.advertisements: TTLCache = TTLCache(
 96            maxsize=len(self.channels) or PYBRICKS_MAX_CHANNEL, ttl=message_ttl
 97        )
 98        """Cache of observed broadcasts."""
 99
100        # Filters used for active scanning
101        filters: BlueZDiscoveryFilters = BlueZDiscoveryFilters()
102
103        if device_pattern:
104            filters["Pattern"] = device_pattern
105        if rssi_threshold:
106            filters["RSSI"] = rssi_threshold
107
108        # Patterns used for passive scanning
109        or_patterns: list[OrPattern | tuple[int, AdvertisementDataType, bytes]]
110
111        if self.channels:
112            or_patterns = [
113                OrPattern(
114                    0,
115                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
116                    pack("<HB", LEGO_CID, channel),
117                )
118                for channel in self.channels
119            ]
120        else:
121            or_patterns = [
122                OrPattern(
123                    0,
124                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
125                    pack("<H", LEGO_CID),
126                )
127            ]
128
129        log.debug(
130            "Observer init: scanning_mode=%s, device_pattern=%s, rssi_threshold=%s",
131            scanning_mode,
132            device_pattern,
133            rssi_threshold,
134        )
135
136        self._scanner = BleakScanner(
137            detection_callback=self._callback,
138            scanning_mode=scanning_mode,
139            bluez=BlueZScannerArgs(
140                filters=filters,
141                or_patterns=or_patterns,
142            ),
143            adapter=adapter_name,
144        )
145
146    def _callback(self, device: BLEDevice, ad: AdvertisementData):
147        """
148        @public
149        The callback function for detected BLE advertisements from the
150        scanner.
151
152        Performs filtering of advertisements, decodes the Pybricks
153        broadcast message contained in the advertisement, and stores
154        it in the broadcast cache.
155
156        Depending on the selected scanning mode, certain filters must
157        be applied here:
158
159        1. Filter advertisements based on the configured `rssi_threshold`
160            and `device_pattern` (required in "passive" mode).
161        2. Filter advertisements that contain invalid manufacturer data,
162            such as non-Pybricks advertisements (required in "active" mode).
163        4. Filter messages on the incorrect Pybricks channel (required in
164            "active" mode).
165
166        :param device: The device sending the advertisement.
167        :param ad: The advertisement data.
168        """
169        if self.rssi_threshold is not None and ad.rssi < self.rssi_threshold:
170            log.debug("Filtered AD due to RSSI below threshold: %i", ad.rssi)
171            return
172
173        if (ad.local_name and self.device_pattern) and not ad.local_name.startswith(
174            self.device_pattern
175        ):
176            log.debug("Filtered AD due to invalid device name: %s", ad.local_name)
177            return
178
179        if LEGO_CID not in ad.manufacturer_data:
180            log.debug(
181                "Filtered AD due to invalid manufacturer data: %s",
182                ad.manufacturer_data.keys(),
183            )
184            return
185
186        message = ad.manufacturer_data[LEGO_CID]
187        channel, data = decode_message(message)
188
189        if self.channels and channel not in self.channels:
190            log.debug("Filtered broadcast due to wrong channel: %i", channel)
191            return
192
193        log.info(
194            "Pybricks broadcast on channel %i: %s (rssi %s)", channel, data, ad.rssi
195        )
196        self.advertisements[channel] = ObservedAdvertisement(data, ad.rssi)
197
198    def observe(self, channel: int) -> ObservedAdvertisement | None:
199        """
200        Retrieves the last observed data for a given channel.
201
202        :param channel: The channel to observe (0 to 255).
203        :return: The received data in the same format as it was sent, or `None`
204            if no recent data is available.
205        """
206        if self.channels and channel not in self.channels:
207            raise ValueError(f"Channel {channel} not allocated.")
208
209        return self.advertisements.get(channel, None)
210
211    async def __aenter__(self):
212        log.info("Observing on channels %s...", self.channels or "ALL")
213        await self._scanner.start()
214        return self
215
216    async def __aexit__(self, exc_type, exc, tb):
217        await self._scanner.stop()

A BLE observer backed by BlueZ. Keeps a cache of observed Pybricks messages.

The recommended use is as a context manager, which ensures that the underlying BLE scanner is stopped when exiting the context:

async with BlueZPybricksObserver(channels=[1, 2, 3]) as observer:
    # Observe for 10 seconds
    await asyncio.sleep(10)
    # Check results for channel 2
    message = observer.observe(2)
    print(message)
BlueZPybricksObserver( adapter_name: str | None = None, scanning_mode: Literal['active', 'passive'] = 'passive', channels: Optional[Sequence[int]] = None, rssi_threshold: int | None = None, device_pattern: str | None = None, message_ttl: int = 60)
 56    def __init__(
 57        self,
 58        adapter_name: str | None = None,
 59        scanning_mode: ScanningMode = "passive",
 60        channels: Sequence[int] | None = None,
 61        rssi_threshold: int | None = None,
 62        device_pattern: str | None = None,
 63        message_ttl: int = 60,
 64    ):
 65        """
 66        Creates a new observer.
 67
 68        :param adapter_name: The Bluetooth adapter to use for scanning,
 69            defaults to `None` (auto-discover default device).
 70        :param scanning_mode: The scanning mode to use, defaults to "passive".
 71        :param channels: Channels to observe, defaults to None (all channels).
 72        :param rssi_threshold: Minimum required signal strength of observed
 73            broadcasts in dBm, defaults to None (no RSSI filtering).
 74        :param device_pattern: Pattern that the device name of the sender must
 75            start with, defaults to `None` (no name filtering).
 76        :param message_ttl: Time in seconds to cache observed broadcasts for,
 77            defaults to 60.
 78        """
 79        self.channels = channels or []
 80        """List of channels that this observer is monitoring."""
 81
 82        for channel in self.channels:
 83            if (
 84                not isinstance(channel, int)
 85                or PYBRICKS_MIN_CHANNEL < channel > PYBRICKS_MAX_CHANNEL
 86            ):
 87                raise ValueError(
 88                    f"Observe channel must be list of integers from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
 89                )
 90
 91        self.rssi_threshold = rssi_threshold
 92        """The configured RSSI threshold for broadcasts."""
 93        self.device_pattern = device_pattern
 94        """The configured device name pattern match for broadcasts."""
 95        self.advertisements: TTLCache = TTLCache(
 96            maxsize=len(self.channels) or PYBRICKS_MAX_CHANNEL, ttl=message_ttl
 97        )
 98        """Cache of observed broadcasts."""
 99
100        # Filters used for active scanning
101        filters: BlueZDiscoveryFilters = BlueZDiscoveryFilters()
102
103        if device_pattern:
104            filters["Pattern"] = device_pattern
105        if rssi_threshold:
106            filters["RSSI"] = rssi_threshold
107
108        # Patterns used for passive scanning
109        or_patterns: list[OrPattern | tuple[int, AdvertisementDataType, bytes]]
110
111        if self.channels:
112            or_patterns = [
113                OrPattern(
114                    0,
115                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
116                    pack("<HB", LEGO_CID, channel),
117                )
118                for channel in self.channels
119            ]
120        else:
121            or_patterns = [
122                OrPattern(
123                    0,
124                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
125                    pack("<H", LEGO_CID),
126                )
127            ]
128
129        log.debug(
130            "Observer init: scanning_mode=%s, device_pattern=%s, rssi_threshold=%s",
131            scanning_mode,
132            device_pattern,
133            rssi_threshold,
134        )
135
136        self._scanner = BleakScanner(
137            detection_callback=self._callback,
138            scanning_mode=scanning_mode,
139            bluez=BlueZScannerArgs(
140                filters=filters,
141                or_patterns=or_patterns,
142            ),
143            adapter=adapter_name,
144        )

Creates a new observer.

Parameters
  • adapter_name: The Bluetooth adapter to use for scanning, defaults to None (auto-discover default device).
  • scanning_mode: The scanning mode to use, defaults to "passive".
  • channels: Channels to observe, defaults to None (all channels).
  • rssi_threshold: Minimum required signal strength of observed broadcasts in dBm, defaults to None (no RSSI filtering).
  • device_pattern: Pattern that the device name of the sender must start with, defaults to None (no name filtering).
  • message_ttl: Time in seconds to cache observed broadcasts for, defaults to 60.
channels

List of channels that this observer is monitoring.

rssi_threshold

The configured RSSI threshold for broadcasts.

device_pattern

The configured device name pattern match for broadcasts.

advertisements: cachetools.TTLCache

Cache of observed broadcasts.

def _callback( self, device: bleak.backends.device.BLEDevice, ad: bleak.backends.scanner.AdvertisementData):
146    def _callback(self, device: BLEDevice, ad: AdvertisementData):
147        """
148        @public
149        The callback function for detected BLE advertisements from the
150        scanner.
151
152        Performs filtering of advertisements, decodes the Pybricks
153        broadcast message contained in the advertisement, and stores
154        it in the broadcast cache.
155
156        Depending on the selected scanning mode, certain filters must
157        be applied here:
158
159        1. Filter advertisements based on the configured `rssi_threshold`
160            and `device_pattern` (required in "passive" mode).
161        2. Filter advertisements that contain invalid manufacturer data,
162            such as non-Pybricks advertisements (required in "active" mode).
163        4. Filter messages on the incorrect Pybricks channel (required in
164            "active" mode).
165
166        :param device: The device sending the advertisement.
167        :param ad: The advertisement data.
168        """
169        if self.rssi_threshold is not None and ad.rssi < self.rssi_threshold:
170            log.debug("Filtered AD due to RSSI below threshold: %i", ad.rssi)
171            return
172
173        if (ad.local_name and self.device_pattern) and not ad.local_name.startswith(
174            self.device_pattern
175        ):
176            log.debug("Filtered AD due to invalid device name: %s", ad.local_name)
177            return
178
179        if LEGO_CID not in ad.manufacturer_data:
180            log.debug(
181                "Filtered AD due to invalid manufacturer data: %s",
182                ad.manufacturer_data.keys(),
183            )
184            return
185
186        message = ad.manufacturer_data[LEGO_CID]
187        channel, data = decode_message(message)
188
189        if self.channels and channel not in self.channels:
190            log.debug("Filtered broadcast due to wrong channel: %i", channel)
191            return
192
193        log.info(
194            "Pybricks broadcast on channel %i: %s (rssi %s)", channel, data, ad.rssi
195        )
196        self.advertisements[channel] = ObservedAdvertisement(data, ad.rssi)

The callback function for detected BLE advertisements from the scanner.

Performs filtering of advertisements, decodes the Pybricks broadcast message contained in the advertisement, and stores it in the broadcast cache.

Depending on the selected scanning mode, certain filters must be applied here:

  1. Filter advertisements based on the configured rssi_threshold and device_pattern (required in "passive" mode).
  2. Filter advertisements that contain invalid manufacturer data, such as non-Pybricks advertisements (required in "active" mode).
  3. Filter messages on the incorrect Pybricks channel (required in "active" mode).
Parameters
  • device: The device sending the advertisement.
  • ad: The advertisement data.
def observe( self, channel: int) -> ObservedAdvertisement | None:
198    def observe(self, channel: int) -> ObservedAdvertisement | None:
199        """
200        Retrieves the last observed data for a given channel.
201
202        :param channel: The channel to observe (0 to 255).
203        :return: The received data in the same format as it was sent, or `None`
204            if no recent data is available.
205        """
206        if self.channels and channel not in self.channels:
207            raise ValueError(f"Channel {channel} not allocated.")
208
209        return self.advertisements.get(channel, None)

Retrieves the last observed data for a given channel.

Parameters
  • channel: The channel to observe (0 to 255).
Returns

The received data in the same format as it was sent, or None if no recent data is available.

class ObservedAdvertisement(typing.NamedTuple):
29class ObservedAdvertisement(NamedTuple):
30    """
31    Data structure for an observed broadcast.
32    """
33
34    data: PybricksBroadcastData
35    rssi: int

Data structure for an observed broadcast.

ObservedAdvertisement( data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...], rssi: int)

Create new instance of ObservedAdvertisement(data, rssi)

data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...]

Alias for field number 0

rssi: int

Alias for field number 1

class LEAdvertisement(dbus_fast.service.ServiceInterface):
 83class LEAdvertisement(ServiceInterface):
 84    """
 85    Implementation of the `org.bluez.LEAdvertisement1` D-Bus interface.
 86    """
 87
 88    INTERFACE_NAME: str = "org.bluez.LEAdvertisement1"
 89
 90    def __init__(
 91        self,
 92        advertising_type: Type,
 93        local_name: str,
 94        index: int = 0,
 95        includes: set[Include] = set(),
 96    ):
 97        if index < 0:
 98            raise ValueError("index must be positive")
 99
100        self.index = index
101        self.path = f"/org/bluez/{local_name}/advertisement{index:03}"
102
103        self._type: str = advertising_type.value
104        self._service_uuids: list[str] = []
105        self._manufacturer_data: dict[int, bytes] = {}  # uint16 -> bytes
106        self._solicit_uuids: list[str] = []
107        self._service_data: dict[str | int, bytes] = {}  # uint16 | str -> bytes
108        self._data: dict[int, bytes] = {}  # EXPERIMENTAL # uint8 -> bytes
109        self._discoverable: bool = False  # EXPERIMENTAL
110        self._discoverable_timeout: int = 0  # EXPERIMENTAL # uint16
111        self._includes: list[str] = [i.value for i in includes]
112        self._local_name: str = local_name
113        self._appearance: int = 0x00  # uint16
114        self._duration: int = 2  # uint16
115        self._timeout: int = 0  # uint16
116        self._secondary_channel: str = SecondaryChannel.ONE.value  # EXPERIMENTAL
117        self._min_interval: int = 100  # EXPERIMENTAL # uint32
118        self._max_interval: int = 1000  # EXPERIMENTAL # uint32
119        self._tx_power: int = 7  # EXPERIMENTAL # int16
120
121        super().__init__(self.INTERFACE_NAME)
122
123    def _enable_props(self, *prop_names: str):
124        """
125        Enables the given properties.
126
127        This should be used by subclasses to opt-into experimental properties.
128
129        :param prop_names: List of D-Bus property names to enable.
130        :raises ValueError: If an unknown property was passed.
131        """
132        for prop_name in prop_names:
133            prop: _Property | None = next(
134                (
135                    p
136                    for p in ServiceInterface._get_properties(self)
137                    if p.name == prop_name
138                ),
139                None,
140            )
141            if prop is None:
142                raise ValueError(f"Unknown property: {prop_name}")
143            else:
144                prop.disabled = False
145
146    def _disable_props(self, *prop_names: str):
147        """
148        Disables the given properties.
149
150        This can be used by subclasses to opt-out of exposing certain properties.
151
152        :param prop_names: List of D-Bus property names to disable.
153        :raises ValueError: If an unknown property was passed.
154        """
155        for prop_name in prop_names:
156            prop: _Property | None = next(
157                (
158                    p
159                    for p in ServiceInterface._get_properties(self)
160                    if p.name == prop_name
161                ),
162                None,
163            )
164            if prop is None:
165                raise ValueError(f"Unknown property: {prop_name}")
166            else:
167                prop.disabled = True
168
169    @method()
170    def Release(self):
171        logger.debug("Released advertisement: %s", self)
172
173    @dbus_property(access=PropertyAccess.READ)
174    @no_type_check
175    def Type(self) -> "s":  # type: ignore # noqa: F821
176        """
177        Determines the type of advertising packet requested.
178        """
179        return self._type
180
181    @Type.setter  # type: ignore
182    @no_type_check
183    def Type(self, type: "s"):  # type: ignore  # noqa: F821
184        self._type = type
185
186    @dbus_property()
187    @no_type_check
188    def ServiceUUIDs(self) -> "as":  # type: ignore # noqa: F821 F722
189        """
190        List of UUIDs to include in the "Service UUID" field of the Advertising Data.
191        """
192        return self._service_uuids
193
194    @ServiceUUIDs.setter  # type: ignore
195    @no_type_check
196    def ServiceUUIDs(self, service_uuids: "as"):  # type: ignore # noqa: F821 F722
197        self._service_uuids = service_uuids
198
199    @dbus_property()
200    @no_type_check
201    def ManufacturerData(self) -> "a{qv}":  # type: ignore # noqa: F821 F722
202        """
203        Manufacturer Data fields to include in the Advertising Data.
204        Keys are the Manufacturer ID to associate with the data.
205        """
206        return self._manufacturer_data
207
208    @ManufacturerData.setter  # type: ignore
209    @no_type_check
210    def ManufacturerData(self, data: "a{qv}"):  # type: ignore # noqa: F821 F722
211        self._manufacturer_data = data
212
213    @dbus_property()
214    @no_type_check
215    def SolicitUUIDs(self) -> "as":  # type: ignore # noqa: F821 F722
216        """
217        Array of UUIDs to include in "Service Solicitation" Advertisement Data.
218        """
219        return self._solicit_uuids
220
221    @SolicitUUIDs.setter  # type: ignore
222    @no_type_check
223    def SolicitUUIDs(self, uuids: "as"):  # type: ignore # noqa: F821 F722
224        self._solicit_uuids = uuids
225
226    @dbus_property()
227    @no_type_check
228    def ServiceData(self) -> "a{sv}":  # type: ignore # noqa: F821 F722
229        """
230        Service Data elements to include. The keys are the UUID to associate with the data.
231        """
232        return self._service_data
233
234    @ServiceData.setter  # type: ignore
235    @no_type_check
236    def ServiceData(self, data: "a{sv}"):  # type: ignore # noqa: F821 F722
237        self._service_data = data
238
239    @dbus_property(disabled=True)
240    @no_type_check
241    def Data(self) -> "a{yv}":  # type: ignore # noqa: F821 F722
242        """
243        Advertising Data to include.
244        Key is the advertising type and value is the data as byte array.
245        """
246        return self._data
247
248    @Data.setter  # type: ignore
249    @no_type_check
250    def Data(self, data: "a{yv}"):  # type: ignore # noqa: F821 F722
251        self._data = data
252
253    @dbus_property(disabled=True)
254    @no_type_check
255    def Discoverable(self) -> "b":  # type: ignore # noqa: F821 F722
256        """
257        Advertise as general discoverable.
258        When present this will override adapter Discoverable property.
259        """
260        return self._discoverable
261
262    @Discoverable.setter  # type: ignore
263    @no_type_check
264    def Discoverable(self, discoverable: "b"):  # type: ignore # noqa: F821 F722
265        self._discoverable = discoverable
266
267    @dbus_property(disabled=True)
268    @no_type_check
269    def DiscoverableTimeout(self) -> "q":  # type: ignore # noqa: F821 F722
270        """
271        The discoverable timeout in seconds.
272        A value of zero means that the timeout is disabled and it will stay in discoverable/limited mode forever.
273        """
274        return self._discoverable_timeout
275
276    @DiscoverableTimeout.setter  # type: ignore
277    @no_type_check
278    def DiscoverableTimeout(self, timeout: "q"):  # type: ignore # noqa: F821 F722
279        self._discoverable_timeout = timeout
280
281    @dbus_property()
282    @no_type_check
283    def Includes(self) -> "as":  # type: ignore # noqa: F821 F722
284        """
285        List of features to be included in the advertising packet.
286        """
287        return self._includes
288
289    @Includes.setter  # type: ignore
290    @no_type_check
291    def Includes(self, includes: "as"):  # type: ignore # noqa: F821 F722
292        self._includes = includes
293
294    @dbus_property()
295    @no_type_check
296    def LocalName(self) -> "s":  # type: ignore # noqa: F821 N802
297        """
298        Local name to be used in the advertising report.
299        If the string is too big to fit into the packet it will be truncated.
300        """
301        return self._local_name
302
303    @LocalName.setter  # type: ignore
304    @no_type_check
305    def LocalName(self, name: "s"):  # type: ignore # noqa: F821 N802
306        self._local_name = name
307
308    @dbus_property()
309    @no_type_check
310    def Appearance(self) -> "q":  # type: ignore # noqa: F821 N802
311        """
312        Appearance to be used in the advertising report.
313        """
314        return self._appearance
315
316    @Appearance.setter  # type: ignore
317    @no_type_check
318    def Appearance(self, appearance: "q"):  # type: ignore # noqa: F821 N802
319        self._appearance = appearance
320
321    @dbus_property()
322    @no_type_check
323    def Duration(self) -> "q":  # type: ignore # noqa: F821 N802
324        """
325        Rotation duration of the advertisement in seconds.
326        If there are other applications advertising no duration is set the default is 2 seconds.
327        """
328        return self._duration
329
330    @Duration.setter  # type: ignore
331    @no_type_check
332    def Duration(self, seconds: "q"):  # type: ignore # noqa: F821 N802
333        self._duration = seconds
334
335    @dbus_property()
336    @no_type_check
337    def Timeout(self) -> "q":  # type: ignore # noqa: F821 N802
338        """
339        Timeout of the advertisement in seconds.
340        This defines the lifetime of the advertisement.
341        """
342        return self._timeout
343
344    @Timeout.setter  # type: ignore
345    @no_type_check
346    def Timeout(self, seconds: "q"):  # type: ignore # noqa: F821 N802
347        self._timeout = seconds
348
349    @dbus_property(disabled=True)
350    @no_type_check
351    def SecondaryChannel(self) -> "s":  # type: ignore # noqa: F821 N802
352        """
353        Secondary channel to be used.
354        Primary channel is always set to "1M" except when "Coded" is set.
355        """
356        return self._secondary_channel
357
358    @SecondaryChannel.setter  # type: ignore
359    @no_type_check
360    def SecondaryChannel(self, channel: "q"):  # type: ignore # noqa: F821 N802
361        self._secondary_channel = channel
362
363    @dbus_property(disabled=True)
364    @no_type_check
365    def MinInterval(self) -> "u":  # type: ignore # noqa: F821 N802
366        """
367        Minimum advertising interval to be used by the advertising set, in milliseconds.
368        Acceptable values are in the range [20ms, 10,485s].
369        If the provided MinInterval is larger than the provided MaxInterval,
370        the registration will return failure.
371        """
372        return self._min_interval
373
374    @MinInterval.setter  # type: ignore
375    @no_type_check
376    def MinInterval(self, milliseconds: "u"):  # type: ignore # noqa: F821 N802
377        self._min_interval = milliseconds
378
379    @dbus_property(disabled=True)
380    @no_type_check
381    def MaxInterval(self) -> "u":  # type: ignore # noqa: F821 N802
382        """
383        Maximum advertising interval to be used by the advertising set, in milliseconds.
384        Acceptable values are in the range [20ms, 10,485s].
385        If the provided MinInterval is larger than the provided MaxInterval,
386        the registration will return failure.
387        """
388        return self._max_interval
389
390    @MaxInterval.setter  # type: ignore
391    @no_type_check
392    def MaxInterval(self, milliseconds: "u"):  # type: ignore # noqa: F821 N802
393        self._max_interval = milliseconds
394
395    @dbus_property(disabled=True)
396    @no_type_check
397    def TxPower(self) -> "n":  # type: ignore # noqa: F821 N802
398        """
399        Requested transmission power of this advertising set.
400        The provided value is used only if the "CanSetTxPower" feature is enabled on the org.bluez.LEAdvertisingManager(5).
401        The provided value must be in range [-127 to +20], where units are in dBm.
402        """
403        return self._tx_power
404
405    @TxPower.setter  # type: ignore
406    @no_type_check
407    def TxPower(self, dbm: "n"):  # type: ignore # noqa: F821 N802
408        self._tx_power = dbm

Implementation of the org.bluez.LEAdvertisement1 D-Bus interface.

LEAdvertisement( advertising_type: Type, local_name: str, index: int = 0, includes: set[Include] = set())
 90    def __init__(
 91        self,
 92        advertising_type: Type,
 93        local_name: str,
 94        index: int = 0,
 95        includes: set[Include] = set(),
 96    ):
 97        if index < 0:
 98            raise ValueError("index must be positive")
 99
100        self.index = index
101        self.path = f"/org/bluez/{local_name}/advertisement{index:03}"
102
103        self._type: str = advertising_type.value
104        self._service_uuids: list[str] = []
105        self._manufacturer_data: dict[int, bytes] = {}  # uint16 -> bytes
106        self._solicit_uuids: list[str] = []
107        self._service_data: dict[str | int, bytes] = {}  # uint16 | str -> bytes
108        self._data: dict[int, bytes] = {}  # EXPERIMENTAL # uint8 -> bytes
109        self._discoverable: bool = False  # EXPERIMENTAL
110        self._discoverable_timeout: int = 0  # EXPERIMENTAL # uint16
111        self._includes: list[str] = [i.value for i in includes]
112        self._local_name: str = local_name
113        self._appearance: int = 0x00  # uint16
114        self._duration: int = 2  # uint16
115        self._timeout: int = 0  # uint16
116        self._secondary_channel: str = SecondaryChannel.ONE.value  # EXPERIMENTAL
117        self._min_interval: int = 100  # EXPERIMENTAL # uint32
118        self._max_interval: int = 1000  # EXPERIMENTAL # uint32
119        self._tx_power: int = 7  # EXPERIMENTAL # int16
120
121        super().__init__(self.INTERFACE_NAME)
INTERFACE_NAME: str = 'org.bluez.LEAdvertisement1'
index
path
@method()
def Release(self):
169    @method()
170    def Release(self):
171        logger.debug("Released advertisement: %s", self)
Type: 's'
173    @dbus_property(access=PropertyAccess.READ)
174    @no_type_check
175    def Type(self) -> "s":  # type: ignore # noqa: F821
176        """
177        Determines the type of advertising packet requested.
178        """
179        return self._type

Determines the type of advertising packet requested.

ServiceUUIDs: 'as'
186    @dbus_property()
187    @no_type_check
188    def ServiceUUIDs(self) -> "as":  # type: ignore # noqa: F821 F722
189        """
190        List of UUIDs to include in the "Service UUID" field of the Advertising Data.
191        """
192        return self._service_uuids

List of UUIDs to include in the "Service UUID" field of the Advertising Data.

ManufacturerData: 'a{qv}'
199    @dbus_property()
200    @no_type_check
201    def ManufacturerData(self) -> "a{qv}":  # type: ignore # noqa: F821 F722
202        """
203        Manufacturer Data fields to include in the Advertising Data.
204        Keys are the Manufacturer ID to associate with the data.
205        """
206        return self._manufacturer_data

Manufacturer Data fields to include in the Advertising Data. Keys are the Manufacturer ID to associate with the data.

SolicitUUIDs: 'as'
213    @dbus_property()
214    @no_type_check
215    def SolicitUUIDs(self) -> "as":  # type: ignore # noqa: F821 F722
216        """
217        Array of UUIDs to include in "Service Solicitation" Advertisement Data.
218        """
219        return self._solicit_uuids

Array of UUIDs to include in "Service Solicitation" Advertisement Data.

ServiceData: 'a{sv}'
226    @dbus_property()
227    @no_type_check
228    def ServiceData(self) -> "a{sv}":  # type: ignore # noqa: F821 F722
229        """
230        Service Data elements to include. The keys are the UUID to associate with the data.
231        """
232        return self._service_data

Service Data elements to include. The keys are the UUID to associate with the data.

Data: 'a{yv}'
239    @dbus_property(disabled=True)
240    @no_type_check
241    def Data(self) -> "a{yv}":  # type: ignore # noqa: F821 F722
242        """
243        Advertising Data to include.
244        Key is the advertising type and value is the data as byte array.
245        """
246        return self._data

Advertising Data to include. Key is the advertising type and value is the data as byte array.

Discoverable: 'b'
253    @dbus_property(disabled=True)
254    @no_type_check
255    def Discoverable(self) -> "b":  # type: ignore # noqa: F821 F722
256        """
257        Advertise as general discoverable.
258        When present this will override adapter Discoverable property.
259        """
260        return self._discoverable

Advertise as general discoverable. When present this will override adapter Discoverable property.

DiscoverableTimeout: 'q'
267    @dbus_property(disabled=True)
268    @no_type_check
269    def DiscoverableTimeout(self) -> "q":  # type: ignore # noqa: F821 F722
270        """
271        The discoverable timeout in seconds.
272        A value of zero means that the timeout is disabled and it will stay in discoverable/limited mode forever.
273        """
274        return self._discoverable_timeout

The discoverable timeout in seconds. A value of zero means that the timeout is disabled and it will stay in discoverable/limited mode forever.

Includes: 'as'
281    @dbus_property()
282    @no_type_check
283    def Includes(self) -> "as":  # type: ignore # noqa: F821 F722
284        """
285        List of features to be included in the advertising packet.
286        """
287        return self._includes

List of features to be included in the advertising packet.

LocalName: 's'
294    @dbus_property()
295    @no_type_check
296    def LocalName(self) -> "s":  # type: ignore # noqa: F821 N802
297        """
298        Local name to be used in the advertising report.
299        If the string is too big to fit into the packet it will be truncated.
300        """
301        return self._local_name

Local name to be used in the advertising report. If the string is too big to fit into the packet it will be truncated.

Appearance: 'q'
308    @dbus_property()
309    @no_type_check
310    def Appearance(self) -> "q":  # type: ignore # noqa: F821 N802
311        """
312        Appearance to be used in the advertising report.
313        """
314        return self._appearance

Appearance to be used in the advertising report.

Duration: 'q'
321    @dbus_property()
322    @no_type_check
323    def Duration(self) -> "q":  # type: ignore # noqa: F821 N802
324        """
325        Rotation duration of the advertisement in seconds.
326        If there are other applications advertising no duration is set the default is 2 seconds.
327        """
328        return self._duration

Rotation duration of the advertisement in seconds. If there are other applications advertising no duration is set the default is 2 seconds.

Timeout: 'q'
335    @dbus_property()
336    @no_type_check
337    def Timeout(self) -> "q":  # type: ignore # noqa: F821 N802
338        """
339        Timeout of the advertisement in seconds.
340        This defines the lifetime of the advertisement.
341        """
342        return self._timeout

Timeout of the advertisement in seconds. This defines the lifetime of the advertisement.

SecondaryChannel: 's'
349    @dbus_property(disabled=True)
350    @no_type_check
351    def SecondaryChannel(self) -> "s":  # type: ignore # noqa: F821 N802
352        """
353        Secondary channel to be used.
354        Primary channel is always set to "1M" except when "Coded" is set.
355        """
356        return self._secondary_channel

Secondary channel to be used. Primary channel is always set to "1M" except when "Coded" is set.

MinInterval: 'u'
363    @dbus_property(disabled=True)
364    @no_type_check
365    def MinInterval(self) -> "u":  # type: ignore # noqa: F821 N802
366        """
367        Minimum advertising interval to be used by the advertising set, in milliseconds.
368        Acceptable values are in the range [20ms, 10,485s].
369        If the provided MinInterval is larger than the provided MaxInterval,
370        the registration will return failure.
371        """
372        return self._min_interval

Minimum advertising interval to be used by the advertising set, in milliseconds. Acceptable values are in the range [20ms, 10,485s]. If the provided MinInterval is larger than the provided MaxInterval, the registration will return failure.

MaxInterval: 'u'
379    @dbus_property(disabled=True)
380    @no_type_check
381    def MaxInterval(self) -> "u":  # type: ignore # noqa: F821 N802
382        """
383        Maximum advertising interval to be used by the advertising set, in milliseconds.
384        Acceptable values are in the range [20ms, 10,485s].
385        If the provided MinInterval is larger than the provided MaxInterval,
386        the registration will return failure.
387        """
388        return self._max_interval

Maximum advertising interval to be used by the advertising set, in milliseconds. Acceptable values are in the range [20ms, 10,485s]. If the provided MinInterval is larger than the provided MaxInterval, the registration will return failure.

TxPower: 'n'
395    @dbus_property(disabled=True)
396    @no_type_check
397    def TxPower(self) -> "n":  # type: ignore # noqa: F821 N802
398        """
399        Requested transmission power of this advertising set.
400        The provided value is used only if the "CanSetTxPower" feature is enabled on the org.bluez.LEAdvertisingManager(5).
401        The provided value must be in range [-127 to +20], where units are in dBm.
402        """
403        return self._tx_power

Requested transmission power of this advertising set. The provided value is used only if the "CanSetTxPower" feature is enabled on the org.bluez.LEAdvertisingManager(5). The provided value must be in range [-127 to +20], where units are in dBm.

class BroadcastAdvertisement(pb_ble.bluezdbus.LEAdvertisement):
411class BroadcastAdvertisement(LEAdvertisement):
412    """
413    Implementation of a broadcast advertisement.
414
415    This sets the advertising type to "broadcast" and toggles
416    available properties appropriately.
417    """
418
419    def __init__(
420        self,
421        local_name: str,
422        index: int = 0,
423        on_release: Callable[[str], None] = lambda path: None,
424    ):
425        super().__init__(
426            Type.BROADCAST,
427            local_name,
428            index,
429            # set([Include.LOCAL_NAME]),
430        )
431
432        self.on_release: Callable[[str], None] = on_release
433        """Callback function that is called when this advertisement is released by BlueZ."""
434
435        # Disable properties that aren't needed for broadcasting
436        self._disable_props(
437            "ServiceUUIDs",
438            "SolicitUUIDs",
439            "LocalName",
440            "Appearance",
441            "Duration",
442        )
443        # Enable experimental properties useful for broadcasting
444        self._enable_props("MinInterval", "MaxInterval", "TxPower")
445
446        # for prop in ServiceInterface._get_properties(self):
447        #    logger.debug("Property %s (%s)", prop.name, "DISABLED" if prop.disabled else "ENABLED")
448
449    @method()
450    def Release(self):
451        super().Release()
452        self.on_release(self.path)

Implementation of a broadcast advertisement.

This sets the advertising type to "broadcast" and toggles available properties appropriately.

BroadcastAdvertisement( local_name: str, index: int = 0, on_release: Callable[[str], NoneType] = <function BroadcastAdvertisement.<lambda>>)
419    def __init__(
420        self,
421        local_name: str,
422        index: int = 0,
423        on_release: Callable[[str], None] = lambda path: None,
424    ):
425        super().__init__(
426            Type.BROADCAST,
427            local_name,
428            index,
429            # set([Include.LOCAL_NAME]),
430        )
431
432        self.on_release: Callable[[str], None] = on_release
433        """Callback function that is called when this advertisement is released by BlueZ."""
434
435        # Disable properties that aren't needed for broadcasting
436        self._disable_props(
437            "ServiceUUIDs",
438            "SolicitUUIDs",
439            "LocalName",
440            "Appearance",
441            "Duration",
442        )
443        # Enable experimental properties useful for broadcasting
444        self._enable_props("MinInterval", "MaxInterval", "TxPower")
445
446        # for prop in ServiceInterface._get_properties(self):
447        #    logger.debug("Property %s (%s)", prop.name, "DISABLED" if prop.disabled else "ENABLED")
on_release: Callable[[str], NoneType]

Callback function that is called when this advertisement is released by BlueZ.

@method()
def Release(self):
449    @method()
450    def Release(self):
451        super().Release()
452        self.on_release(self.path)
class PybricksBroadcastAdvertisement(pb_ble.bluezdbus.BroadcastAdvertisement):
455class PybricksBroadcastAdvertisement(BroadcastAdvertisement):
456    """
457    Implementation of a Pybricks broadcast advertisement.
458
459    The data to broadcast is set via the message property.
460    """
461
462    LEGO_CID = LEGO_CID
463    """LEGO System A/S company identifier."""
464
465    def __init__(
466        self,
467        local_name: str,
468        channel: int = 0,
469        data: PybricksBroadcastData | None = None,
470        on_release: Callable[[str], None] = lambda path: None,
471    ):
472        super().__init__(local_name, channel, on_release)
473        if data:
474            self.message = data
475
476    @property
477    def channel(self) -> int:
478        """The channel of this broadcast message."""
479        return self.index
480
481    @property
482    def message(self) -> PybricksBroadcastData | None:
483        """The data contained in this broadcast message."""
484        if self.LEGO_CID in self._manufacturer_data:
485            channel, value = decode_message(
486                self._manufacturer_data[self.LEGO_CID].value  # type: ignore
487            )
488            return value
489        else:
490            return None
491
492    @message.setter
493    def message(self, value: PybricksBroadcastData):
494        value = value if isinstance(value, tuple) else (value,)
495        message = encode_message(self.channel, *value)
496        self._manufacturer_data[self.LEGO_CID] = Variant("ay", message)  # type: ignore
497        # Notify BlueZ of the changed manufacturer data so the advertisement is updated
498        self.emit_properties_changed(
499            changed_properties={"ManufacturerData": self._manufacturer_data}
500        )
501
502    def __str__(self):
503        return f"PybricksBroadcastAdvertisement(channel={self.channel}, data={self.message!r}, timeout={self._timeout})"

Implementation of a Pybricks broadcast advertisement.

The data to broadcast is set via the message property.

PybricksBroadcastAdvertisement( local_name: str, channel: int = 0, data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...] | None = None, on_release: Callable[[str], NoneType] = <function PybricksBroadcastAdvertisement.<lambda>>)
465    def __init__(
466        self,
467        local_name: str,
468        channel: int = 0,
469        data: PybricksBroadcastData | None = None,
470        on_release: Callable[[str], None] = lambda path: None,
471    ):
472        super().__init__(local_name, channel, on_release)
473        if data:
474            self.message = data
LEGO_CID = 919

LEGO System A/S company identifier.

channel: int
476    @property
477    def channel(self) -> int:
478        """The channel of this broadcast message."""
479        return self.index

The channel of this broadcast message.

message: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...] | None
481    @property
482    def message(self) -> PybricksBroadcastData | None:
483        """The data contained in this broadcast message."""
484        if self.LEGO_CID in self._manufacturer_data:
485            channel, value = decode_message(
486                self._manufacturer_data[self.LEGO_CID].value  # type: ignore
487            )
488            return value
489        else:
490            return None

The data contained in this broadcast message.

class LEAdvertisingManager:
506class LEAdvertisingManager:
507    """
508    Client implementation of the `org.bluez.LEAdvertisementManager1` D-Bus interface.
509    """
510
511    INTERFACE_NAME: str = "org.bluez.LEAdvertisingManager1"
512
513    def __init__(
514        self,
515        adapter: ProxyObject | None = None,
516        adv_manager: ProxyInterface | None = None,
517    ):
518        if adapter is None and adv_manager is None:
519            raise ValueError("adapter or adv_manager required")
520
521        self._adv_manager = adv_manager or adapter.get_interface(self.INTERFACE_NAME)  # type: ignore
522
523    async def register_advertisement(
524        self, adv: LEAdvertisement, options: dict | None = None
525    ):
526        """
527        Registers an advertisement object to be sent over the LE Advertising channel.
528        The service must implement `org.bluez.LEAdvertisement1` interface.
529
530        :param adv: The advertisement service object.
531        :param options: Advertisement options, defaults to None.
532        :return: `None`
533        """
534        options = options or {}
535        return await self._adv_manager.call_register_advertisement(adv.path, options)  # type: ignore
536
537    @overload
538    async def unregister_advertisement(self, adv: LEAdvertisement): ...
539    @overload
540    async def unregister_advertisement(self, adv: str): ...
541    async def unregister_advertisement(self, adv):
542        """
543        Unregisters an advertisement that has been previously registered using `register_advertisement()`.
544        The object path parameter must match the same value that has been used on registration.
545
546        :param adv: The advertisement service object, or path.
547        :return: `None`
548        """
549        if isinstance(adv, str):
550            return await self._adv_manager.call_unregister_advertisement(adv)  # type: ignore
551        else:
552            return await self._adv_manager.call_unregister_advertisement(adv.path)  # type: ignore
553
554    async def active_instances(self) -> int:
555        """Number of active advertising instances."""
556        return await self._adv_manager.get_active_instances()  # type: ignore
557
558    async def supported_instances(self) -> int:
559        """Number of available advertising instances."""
560        return await self._adv_manager.get_supported_instances()  # type: ignore
561
562    async def supported_includes(self) -> list[Include]:
563        """List of supported system includes."""
564        return await self._adv_manager.get_supported_includes()  # type: ignore
565
566    async def supported_secondary_channels(self) -> list[SecondaryChannel]:
567        """List of supported Secondary channels.
568        Secondary channels can be used to advertise  with the corresponding PHY.
569        """
570        return await self._adv_manager.get_supported_secondary_channels()  # type: ignore
571
572    async def supported_capabilities(self) -> dict[Capability, Any]:
573        """Enumerates Advertising-related controller capabilities useful to the client."""
574        return await self._adv_manager.get_supported_capabilities()  # type: ignore
575
576    async def supported_features(self) -> list[Feature]:
577        """List  of supported platform features.
578        If no features are available on the platform, the SupportedFeatures array will be empty.
579        """
580        return await self._adv_manager.get_supported_features()  # type: ignore

Client implementation of the org.bluez.LEAdvertisementManager1 D-Bus interface.

LEAdvertisingManager( adapter: dbus_fast.aio.proxy_object.ProxyObject | None = None, adv_manager: dbus_fast.aio.proxy_object.ProxyInterface | None = None)
513    def __init__(
514        self,
515        adapter: ProxyObject | None = None,
516        adv_manager: ProxyInterface | None = None,
517    ):
518        if adapter is None and adv_manager is None:
519            raise ValueError("adapter or adv_manager required")
520
521        self._adv_manager = adv_manager or adapter.get_interface(self.INTERFACE_NAME)  # type: ignore
INTERFACE_NAME: str = 'org.bluez.LEAdvertisingManager1'
async def register_advertisement( self, adv: LEAdvertisement, options: dict | None = None):
523    async def register_advertisement(
524        self, adv: LEAdvertisement, options: dict | None = None
525    ):
526        """
527        Registers an advertisement object to be sent over the LE Advertising channel.
528        The service must implement `org.bluez.LEAdvertisement1` interface.
529
530        :param adv: The advertisement service object.
531        :param options: Advertisement options, defaults to None.
532        :return: `None`
533        """
534        options = options or {}
535        return await self._adv_manager.call_register_advertisement(adv.path, options)  # type: ignore

Registers an advertisement object to be sent over the LE Advertising channel. The service must implement org.bluez.LEAdvertisement1 interface.

Parameters
  • adv: The advertisement service object.
  • options: Advertisement options, defaults to None.
Returns

None

async def unregister_advertisement(self, adv):
541    async def unregister_advertisement(self, adv):
542        """
543        Unregisters an advertisement that has been previously registered using `register_advertisement()`.
544        The object path parameter must match the same value that has been used on registration.
545
546        :param adv: The advertisement service object, or path.
547        :return: `None`
548        """
549        if isinstance(adv, str):
550            return await self._adv_manager.call_unregister_advertisement(adv)  # type: ignore
551        else:
552            return await self._adv_manager.call_unregister_advertisement(adv.path)  # type: ignore

Unregisters an advertisement that has been previously registered using register_advertisement(). The object path parameter must match the same value that has been used on registration.

Parameters
  • adv: The advertisement service object, or path.
Returns

None

async def active_instances(self) -> int:
554    async def active_instances(self) -> int:
555        """Number of active advertising instances."""
556        return await self._adv_manager.get_active_instances()  # type: ignore

Number of active advertising instances.

async def supported_instances(self) -> int:
558    async def supported_instances(self) -> int:
559        """Number of available advertising instances."""
560        return await self._adv_manager.get_supported_instances()  # type: ignore

Number of available advertising instances.

async def supported_includes(self) -> list[Include]:
562    async def supported_includes(self) -> list[Include]:
563        """List of supported system includes."""
564        return await self._adv_manager.get_supported_includes()  # type: ignore

List of supported system includes.

async def supported_secondary_channels(self) -> list[SecondaryChannel]:
566    async def supported_secondary_channels(self) -> list[SecondaryChannel]:
567        """List of supported Secondary channels.
568        Secondary channels can be used to advertise  with the corresponding PHY.
569        """
570        return await self._adv_manager.get_supported_secondary_channels()  # type: ignore

List of supported Secondary channels. Secondary channels can be used to advertise with the corresponding PHY.

async def supported_capabilities(self) -> dict[Capability, typing.Any]:
572    async def supported_capabilities(self) -> dict[Capability, Any]:
573        """Enumerates Advertising-related controller capabilities useful to the client."""
574        return await self._adv_manager.get_supported_capabilities()  # type: ignore

Enumerates Advertising-related controller capabilities useful to the client.

async def supported_features(self) -> list[Feature]:
576    async def supported_features(self) -> list[Feature]:
577        """List  of supported platform features.
578        If no features are available on the platform, the SupportedFeatures array will be empty.
579        """
580        return await self._adv_manager.get_supported_features()  # type: ignore

List of supported platform features. If no features are available on the platform, the SupportedFeatures array will be empty.

class Type(enum.Enum):
35class Type(Enum):
36    """LEAdvertisement: Type"""
37
38    BROADCAST = "broadcast"
39    PERIPHERAL = "peripheral"

LEAdvertisement: Type

BROADCAST = <Type.BROADCAST: 'broadcast'>
PERIPHERAL = <Type.PERIPHERAL: 'peripheral'>
class Include(enum.Enum):
42class Include(Enum):
43    """LEAdvertisingManager: SupportedIncludes"""
44
45    TX_POWER = "tx-power"
46    APPEARANCE = "appearance"
47    LOCAL_NAME = "local-name"
48    RSI = "rsi"

LEAdvertisingManager: SupportedIncludes

TX_POWER = <Include.TX_POWER: 'tx-power'>
APPEARANCE = <Include.APPEARANCE: 'appearance'>
LOCAL_NAME = <Include.LOCAL_NAME: 'local-name'>
RSI = <Include.RSI: 'rsi'>
class Capability(enum.Enum):
59class Capability(Enum):
60    MAX_ADV_LEN = "MaxAdvLen"
61    """Max advertising data length [byte]"""
62
63    MAX_SCN_RSP_LEN = "MaxScnRspLen"
64    """Max advertising scan response length [byte]"""
65
66    MIN_TX_POWER = "MinTxPower"
67    """Min advertising tx power (dBm) [int16]"""
68
69    MAX_TX_POWER = "MaxTxPower"
70    """Max advertising tx power (dBm) [int16]"""
MAX_ADV_LEN = <Capability.MAX_ADV_LEN: 'MaxAdvLen'>

Max advertising data length [byte]

MAX_SCN_RSP_LEN = <Capability.MAX_SCN_RSP_LEN: 'MaxScnRspLen'>

Max advertising scan response length [byte]

MIN_TX_POWER = <Capability.MIN_TX_POWER: 'MinTxPower'>

Min advertising tx power (dBm) [int16]

MAX_TX_POWER = <Capability.MAX_TX_POWER: 'MaxTxPower'>

Max advertising tx power (dBm) [int16]

class Feature(enum.Enum):
73class Feature(Enum):
74    """LEAdvertisingManager: SupportedFeatures"""
75
76    CAN_SET_TX_POWER = "CanSetTxPower"
77    """Indicates whether platform can specify tx power on each advertising instance."""
78
79    HARDWARE_OFFLOAD = "HardwareOffload"
80    """Indicates whether multiple advertising will be offloaded to the controller."""

LEAdvertisingManager: SupportedFeatures

CAN_SET_TX_POWER = <Feature.CAN_SET_TX_POWER: 'CanSetTxPower'>

Indicates whether platform can specify tx power on each advertising instance.

HARDWARE_OFFLOAD = <Feature.HARDWARE_OFFLOAD: 'HardwareOffload'>

Indicates whether multiple advertising will be offloaded to the controller.

class SecondaryChannel(enum.Enum):
51class SecondaryChannel(Enum):
52    """LEAdvertisingManager: SupportedSecondaryChannels"""
53
54    ONE = "1M"
55    TWO = "2M"
56    CODED = "Coded"

LEAdvertisingManager: SupportedSecondaryChannels

ONE = <SecondaryChannel.ONE: '1M'>
TWO = <SecondaryChannel.TWO: '2M'>
CODED = <SecondaryChannel.CODED: 'Coded'>