pb_ble.bluezdbus

BlueZ-backed implementations of BLE observer and BLE broadcaster roles.

High level API:

Generic D-Bus bindings:

The D-Bus bindings are implemented with the dbus-fast library.

 1"""
 2BlueZ-backed implementations of BLE observer and BLE broadcaster roles.
 3
 4High level API:
 5
 6- `BlueZBroadcaster`: A generic BLE broadcaster.
 7- `BlueZPybricksObserver`: A specialised BLE observer for Pybricks messages.
 8
 9Generic D-Bus bindings:
10
11- `LEAdvertisement`: Service implementation of the [org.bluez.LEAdvertisement1][]
12    D-Bus interface and the following specialisations:
13  - `BroadcastAdvertisement`: A BLE broadcast advertisement.
14  - `PybricksBroadcastAdvertisement`: A Pybricks broadcast advertisement.
15- `LEAdvertisingManager`: Client implementation of the [org.bluez.LEAdvertisingManager1][] D-Bus interface.
16
17The D-Bus bindings are implemented with the [dbus-fast][] library.
18
19[org.bluez.LEAdvertisement1]:https://github.com/bluez/bluez/blob/5.75/doc/org.bluez.LEAdvertisement.rst
20[org.bluez.LEAdvertisingManager1]:https://github.com/bluez/bluez/blob/5.75/doc/org.bluez.LEAdvertisingManager.rst
21[dbus-fast]:https://github.com/Bluetooth-Devices/dbus-fast
22"""
23
24from .adapters import get_adapter, get_adapter_details
25from .advertisement import (
26    BroadcastAdvertisement,
27    Capability,
28    Feature,
29    Include,
30    LEAdvertisement,
31    LEAdvertisingManager,
32    PybricksBroadcastAdvertisement,
33    SecondaryChannel,
34    Type,
35)
36from .broadcaster import BlueZBroadcaster
37from .observer import BlueZPybricksObserver, ObservedAdvertisement
38
39__all__ = (
40    # High level objects
41    "BlueZBroadcaster",
42    "BlueZPybricksObserver",
43    "ObservedAdvertisement",
44    # D-Bus bindings
45    "LEAdvertisement",
46    "BroadcastAdvertisement",
47    "PybricksBroadcastAdvertisement",
48    "LEAdvertisingManager",
49    # D-Bus constants
50    "Type",
51    "Include",
52    "Capability",
53    "Feature",
54    "SecondaryChannel",
55)
class BlueZBroadcaster(contextlib.AbstractAsyncContextManager):
 24class BlueZBroadcaster(AbstractAsyncContextManager):
 25    """
 26    A BLE broadcaster backed by BlueZ.
 27    Supports multiple advertising sets in parallel.
 28
 29    The recommended use is as a context manager, which ensures that all
 30    registered broadcasts are stopped when exiting the context:
 31
 32    ```python
 33    bus = ...
 34    adapter = ...
 35    device_name = "my-computer"
 36
 37    async with BlueZBroadcaster(bus, adapter, device_name) as broadcaster:
 38        # Start broadcasting
 39        adv = BroadcastAdvertisement(device_name)
 40        await broadcaster.broadcast(adv)
 41        # Stop after 10 seconds
 42        await asyncio.sleep(10)
 43    ```
 44    """
 45
 46    def __init__(self, bus: MessageBus, adapter: ProxyObject, name: str):
 47        """
 48        Creates a new broadcaster.
 49
 50        :param bus: The message bus.
 51        :param adapter: The Bluetooth adapter.
 52        :param name: The name of this broadcaster.
 53        """
 54        self.bus: MessageBus = bus
 55        """The message bus used to connect to DBus."""
 56        self.adapter: ProxyObject = adapter
 57        """A DBus proxy object for the Bluetooth adapter to use for advertising."""
 58        self.name: str = name
 59        """The name of this broadcaster. Will be used as `local_name` in advertisements."""
 60        self.adv_manager: LEAdvertisingManager = LEAdvertisingManager(adapter)
 61        """The BlueZ advertising manager client."""
 62        self.path_namespace: str = f"/org/bluez/{self.name}"
 63        """Path prefix to use for DBus objects created by this broadcaster."""
 64        self.advertisements: dict[str, BroadcastAdvertisement] = {}
 65        """Active advertisements of this broadcaster."""
 66
 67    @overload
 68    async def stop_broadcast(self, adv: str): ...
 69    @overload
 70    async def stop_broadcast(self, adv: BroadcastAdvertisement): ...
 71    async def stop_broadcast(self, adv: str | BroadcastAdvertisement):
 72        """
 73        Stop broadcasting the given advertisement.
 74
 75        :param adv: The broadcast to stop. Takes either the D-Bus path of the
 76            advertisement, or a reference to the object.
 77        """
 78        path = adv.path if isinstance(adv, BroadcastAdvertisement) else adv
 79
 80        try:
 81            await self.adv_manager.unregister_advertisement(path)
 82        except DBusError:
 83            # Advertisement does not exist
 84            pass
 85        finally:
 86            self.bus.unexport(path)
 87            if path in self.advertisements:
 88                del self.advertisements[path]
 89
 90    async def stop(self):
 91        """
 92        Stops this broadcaster. Cleans up any active broadcasts.
 93        """
 94        await asyncio.gather(
 95            *[self.stop_broadcast(path) for path in self.advertisements.keys()]
 96        )
 97
 98    async def __aexit__(self, exc_type, exc, tb):
 99        await self.stop()
100
101    async def broadcast(self, adv: BroadcastAdvertisement):
102        """
103        Start broadcasting the given advertisement.
104
105        :param adv: The reference to the advertisement object.
106        :raises ValueError: If a D-Bus object already exists on the given path.
107        :raises DBusError: If the given advertisement is invalid, or is already
108            registered with BlueZ.
109        """
110        # TODO construct advertisement in here to ensure local_name
111        assert adv._local_name == self.name, f"{adv.name} != {self.name}"
112
113        # cleanup on release of advertisement
114        on_release = adv.on_release
115
116        def release_advertisement(path):
117            try:
118                self.bus.unexport(path)
119                if path in self.advertisements:
120                    del self.advertisements[path]
121            finally:
122                on_release(path)
123
124        adv.on_release = release_advertisement
125
126        log.info("Broadcasting: %s", adv)
127
128        try:
129            self.bus.export(adv.path, adv)
130        except ValueError:
131            # Already exported
132            raise
133
134        try:
135            await self.adv_manager.register_advertisement(adv)
136        except DBusError:
137            # org.bluez.Error.InvalidArguments
138            # org.bluez.Error.AlreadyExists
139            # org.bluez.Error.InvalidLength
140            # org.bluez.Error.NotPermitted
141            raise
142
143        self.advertisements[adv.path] = adv
144
145    def is_broadcasting(self, adv: BroadcastAdvertisement | None = None) -> bool:
146        """
147        Checks whether this broadcaster is active.
148
149        :param adv: The reference to the advertisement object to check,
150            defaults to None (check if any broadcast is active).
151        :return: True if the given (or any) broadcast is active.
152        """
153        if adv is not None:
154            return adv.path in self.advertisements
155        else:
156            return len(self.advertisements) > 0

A BLE broadcaster backed by BlueZ. Supports multiple advertising sets in parallel.

The recommended use is as a context manager, which ensures that all registered broadcasts are stopped when exiting the context:

bus = ...
adapter = ...
device_name = "my-computer"

async with BlueZBroadcaster(bus, adapter, device_name) as broadcaster:
    # Start broadcasting
    adv = BroadcastAdvertisement(device_name)
    await broadcaster.broadcast(adv)
    # Stop after 10 seconds
    await asyncio.sleep(10)
BlueZBroadcaster( bus: dbus_fast.aio.message_bus.MessageBus, adapter: dbus_fast.aio.proxy_object.ProxyObject, name: str)
46    def __init__(self, bus: MessageBus, adapter: ProxyObject, name: str):
47        """
48        Creates a new broadcaster.
49
50        :param bus: The message bus.
51        :param adapter: The Bluetooth adapter.
52        :param name: The name of this broadcaster.
53        """
54        self.bus: MessageBus = bus
55        """The message bus used to connect to DBus."""
56        self.adapter: ProxyObject = adapter
57        """A DBus proxy object for the Bluetooth adapter to use for advertising."""
58        self.name: str = name
59        """The name of this broadcaster. Will be used as `local_name` in advertisements."""
60        self.adv_manager: LEAdvertisingManager = LEAdvertisingManager(adapter)
61        """The BlueZ advertising manager client."""
62        self.path_namespace: str = f"/org/bluez/{self.name}"
63        """Path prefix to use for DBus objects created by this broadcaster."""
64        self.advertisements: dict[str, BroadcastAdvertisement] = {}
65        """Active advertisements of this broadcaster."""

Creates a new broadcaster.

Parameters
  • bus: The message bus.
  • adapter: The Bluetooth adapter.
  • name: The name of this broadcaster.
bus: dbus_fast.aio.message_bus.MessageBus

The message bus used to connect to DBus.

adapter: dbus_fast.aio.proxy_object.ProxyObject

A DBus proxy object for the Bluetooth adapter to use for advertising.

name: str

The name of this broadcaster. Will be used as local_name in advertisements.

adv_manager: LEAdvertisingManager

The BlueZ advertising manager client.

path_namespace: str

Path prefix to use for DBus objects created by this broadcaster.

advertisements: dict[str, BroadcastAdvertisement]

Active advertisements of this broadcaster.

async def stop_broadcast( self, adv: str | BroadcastAdvertisement):
71    async def stop_broadcast(self, adv: str | BroadcastAdvertisement):
72        """
73        Stop broadcasting the given advertisement.
74
75        :param adv: The broadcast to stop. Takes either the D-Bus path of the
76            advertisement, or a reference to the object.
77        """
78        path = adv.path if isinstance(adv, BroadcastAdvertisement) else adv
79
80        try:
81            await self.adv_manager.unregister_advertisement(path)
82        except DBusError:
83            # Advertisement does not exist
84            pass
85        finally:
86            self.bus.unexport(path)
87            if path in self.advertisements:
88                del self.advertisements[path]

Stop broadcasting the given advertisement.

Parameters
  • adv: The broadcast to stop. Takes either the D-Bus path of the advertisement, or a reference to the object.
async def stop(self):
90    async def stop(self):
91        """
92        Stops this broadcaster. Cleans up any active broadcasts.
93        """
94        await asyncio.gather(
95            *[self.stop_broadcast(path) for path in self.advertisements.keys()]
96        )

Stops this broadcaster. Cleans up any active broadcasts.

async def broadcast(self, adv: BroadcastAdvertisement):
101    async def broadcast(self, adv: BroadcastAdvertisement):
102        """
103        Start broadcasting the given advertisement.
104
105        :param adv: The reference to the advertisement object.
106        :raises ValueError: If a D-Bus object already exists on the given path.
107        :raises DBusError: If the given advertisement is invalid, or is already
108            registered with BlueZ.
109        """
110        # TODO construct advertisement in here to ensure local_name
111        assert adv._local_name == self.name, f"{adv.name} != {self.name}"
112
113        # cleanup on release of advertisement
114        on_release = adv.on_release
115
116        def release_advertisement(path):
117            try:
118                self.bus.unexport(path)
119                if path in self.advertisements:
120                    del self.advertisements[path]
121            finally:
122                on_release(path)
123
124        adv.on_release = release_advertisement
125
126        log.info("Broadcasting: %s", adv)
127
128        try:
129            self.bus.export(adv.path, adv)
130        except ValueError:
131            # Already exported
132            raise
133
134        try:
135            await self.adv_manager.register_advertisement(adv)
136        except DBusError:
137            # org.bluez.Error.InvalidArguments
138            # org.bluez.Error.AlreadyExists
139            # org.bluez.Error.InvalidLength
140            # org.bluez.Error.NotPermitted
141            raise
142
143        self.advertisements[adv.path] = adv

Start broadcasting the given advertisement.

Parameters
  • adv: The reference to the advertisement object.
Raises
  • ValueError: If a D-Bus object already exists on the given path.
  • DBusError: If the given advertisement is invalid, or is already registered with BlueZ.
def is_broadcasting( self, adv: BroadcastAdvertisement | None = None) -> bool:
145    def is_broadcasting(self, adv: BroadcastAdvertisement | None = None) -> bool:
146        """
147        Checks whether this broadcaster is active.
148
149        :param adv: The reference to the advertisement object to check,
150            defaults to None (check if any broadcast is active).
151        :return: True if the given (or any) broadcast is active.
152        """
153        if adv is not None:
154            return adv.path in self.advertisements
155        else:
156            return len(self.advertisements) > 0

Checks whether this broadcaster is active.

Parameters
  • adv: The reference to the advertisement object to check, defaults to None (check if any broadcast is active).
Returns

True if the given (or any) broadcast is active.

class BlueZPybricksObserver(contextlib.AbstractAsyncContextManager):
 37class BlueZPybricksObserver(AbstractAsyncContextManager):
 38    """
 39    A BLE observer backed by BlueZ.
 40    Keeps a cache of observed Pybricks messages.
 41
 42    The recommended use is as a context manager, which ensures that the
 43    underlying BLE scanner is stopped when exiting the context:
 44
 45    ```python
 46    async with BlueZPybricksObserver(channels=[1, 2, 3]) as observer:
 47        # Observe for 10 seconds
 48        await asyncio.sleep(10)
 49        # Check results for channel 2
 50        message = observer.observe(2)
 51        print(message)
 52    ```
 53    """
 54
 55    def __init__(
 56        self,
 57        adapter_name: str | None = None,
 58        scanning_mode: ScanningMode = "passive",
 59        channels: Sequence[int] | None = None,
 60        rssi_threshold: int | None = None,
 61        device_pattern: str | None = None,
 62        message_ttl: int = 60,
 63    ):
 64        """
 65        Creates a new observer.
 66
 67        :param adapter_name: The Bluetooth adapter to use for scanning,
 68            defaults to `None` (auto-discover default device).
 69        :param scanning_mode: The scanning mode to use, defaults to "passive".
 70        :param channels: Channels to observe, defaults to None (all channels).
 71        :param rssi_threshold: Minimum required signal strength of observed
 72            broadcasts in dBm, defaults to None (no RSSI filtering).
 73        :param device_pattern: Pattern that the device name of the sender must
 74            start with, defaults to `None` (no name filtering).
 75        :param message_ttl: Time in seconds to cache observed broadcasts for,
 76            defaults to 60.
 77        """
 78        self.channels = channels or []
 79        """List of channels that this observer is monitoring."""
 80
 81        for channel in self.channels:
 82            if (
 83                not isinstance(channel, int)
 84                or PYBRICKS_MIN_CHANNEL < channel > PYBRICKS_MAX_CHANNEL
 85            ):
 86                raise ValueError(
 87                    f"Observe channel must be list of integers from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
 88                )
 89
 90        self.rssi_threshold = rssi_threshold
 91        """The configured RSSI threshold for broadcasts."""
 92        self.device_pattern = device_pattern
 93        """The configured device name pattern match for broadcasts."""
 94        self.advertisements: TTLCache = TTLCache(
 95            maxsize=len(self.channels) or PYBRICKS_MAX_CHANNEL, ttl=message_ttl
 96        )
 97        """Cache of observed broadcasts."""
 98
 99        # Filters used for active scanning
100        filters: BlueZDiscoveryFilters = BlueZDiscoveryFilters()
101
102        if device_pattern:
103            filters["Pattern"] = device_pattern
104        if rssi_threshold:
105            filters["RSSI"] = rssi_threshold
106
107        # Patterns used for passive scanning
108        or_patterns: list[OrPattern | tuple[int, AdvertisementDataType, bytes]]
109
110        if self.channels:
111            or_patterns = [
112                OrPattern(
113                    0,
114                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
115                    pack("<HB", LEGO_CID, channel),
116                )
117                for channel in self.channels
118            ]
119        else:
120            or_patterns = [
121                OrPattern(
122                    0,
123                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
124                    pack("<H", LEGO_CID),
125                )
126            ]
127
128        log.debug(
129            "Observer init: scanning_mode=%s, device_pattern=%s, rssi_threshold=%s",
130            scanning_mode,
131            device_pattern,
132            rssi_threshold,
133        )
134
135        self._scanner = BleakScanner(
136            detection_callback=self._callback,
137            scanning_mode=scanning_mode,
138            bluez=BlueZScannerArgs(
139                filters=filters,
140                or_patterns=or_patterns,
141            ),
142            adapter=adapter_name,
143        )
144
145    def _callback(self, device: BLEDevice, ad: AdvertisementData):
146        """
147        @public
148        The callback function for detected BLE advertisements from the
149        scanner.
150
151        Performs filtering of advertisements, decodes the Pybricks
152        broadcast message contained in the advertisement, and stores
153        it in the broadcast cache.
154
155        Depending on the selected scanning mode, certain filters must
156        be applied here:
157
158        1. Filter advertisements based on the configured `rssi_threshold`
159            and `device_pattern` (required in "passive" mode).
160        2. Filter advertisements that contain invalid manufacturer data,
161            such as non-Pybricks advertisements (required in "active" mode).
162        4. Filter messages on the incorrect Pybricks channel (required in
163            "active" mode).
164
165        :param device: The device sending the advertisement.
166        :param ad: The advertisement data.
167        """
168        if self.rssi_threshold is not None and ad.rssi < self.rssi_threshold:
169            log.debug("Filtered AD due to RSSI below threshold: %i", ad.rssi)
170            return
171
172        if (ad.local_name and self.device_pattern) and not ad.local_name.startswith(
173            self.device_pattern
174        ):
175            log.debug("Filtered AD due to invalid device name: %s", ad.local_name)
176            return
177
178        if LEGO_CID not in ad.manufacturer_data:
179            log.debug(
180                "Filtered AD due to invalid manufacturer data: %s",
181                ad.manufacturer_data.keys(),
182            )
183            return
184
185        message = ad.manufacturer_data[LEGO_CID]
186        channel, data = decode_message(message)
187
188        if self.channels and channel not in self.channels:
189            log.debug("Filtered broadcast due to wrong channel: %i", channel)
190            return
191
192        log.info(
193            "Pybricks broadcast on channel %i: %s (rssi %s)", channel, data, ad.rssi
194        )
195        self.advertisements[channel] = ObservedAdvertisement(data, ad.rssi)
196
197    def observe(self, channel: int) -> ObservedAdvertisement | None:
198        """
199        Retrieves the last observed data for a given channel.
200
201        :param channel: The channel to observe (0 to 255).
202        :return: The received data in the same format as it was sent, or `None`
203            if no recent data is available.
204        """
205        if self.channels and channel not in self.channels:
206            raise ValueError(f"Channel {channel} not allocated.")
207
208        return self.advertisements.get(channel, None)
209
210    async def __aenter__(self):
211        log.info("Observing on channels %s...", self.channels or "ALL")
212        await self._scanner.start()
213        return self
214
215    async def __aexit__(self, exc_type, exc, tb):
216        await self._scanner.stop()

A BLE observer backed by BlueZ. Keeps a cache of observed Pybricks messages.

The recommended use is as a context manager, which ensures that the underlying BLE scanner is stopped when exiting the context:

async with BlueZPybricksObserver(channels=[1, 2, 3]) as observer:
    # Observe for 10 seconds
    await asyncio.sleep(10)
    # Check results for channel 2
    message = observer.observe(2)
    print(message)
BlueZPybricksObserver( adapter_name: str | None = None, scanning_mode: Literal['active', 'passive'] = 'passive', channels: Optional[Sequence[int]] = None, rssi_threshold: int | None = None, device_pattern: str | None = None, message_ttl: int = 60)
 55    def __init__(
 56        self,
 57        adapter_name: str | None = None,
 58        scanning_mode: ScanningMode = "passive",
 59        channels: Sequence[int] | None = None,
 60        rssi_threshold: int | None = None,
 61        device_pattern: str | None = None,
 62        message_ttl: int = 60,
 63    ):
 64        """
 65        Creates a new observer.
 66
 67        :param adapter_name: The Bluetooth adapter to use for scanning,
 68            defaults to `None` (auto-discover default device).
 69        :param scanning_mode: The scanning mode to use, defaults to "passive".
 70        :param channels: Channels to observe, defaults to None (all channels).
 71        :param rssi_threshold: Minimum required signal strength of observed
 72            broadcasts in dBm, defaults to None (no RSSI filtering).
 73        :param device_pattern: Pattern that the device name of the sender must
 74            start with, defaults to `None` (no name filtering).
 75        :param message_ttl: Time in seconds to cache observed broadcasts for,
 76            defaults to 60.
 77        """
 78        self.channels = channels or []
 79        """List of channels that this observer is monitoring."""
 80
 81        for channel in self.channels:
 82            if (
 83                not isinstance(channel, int)
 84                or PYBRICKS_MIN_CHANNEL < channel > PYBRICKS_MAX_CHANNEL
 85            ):
 86                raise ValueError(
 87                    f"Observe channel must be list of integers from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
 88                )
 89
 90        self.rssi_threshold = rssi_threshold
 91        """The configured RSSI threshold for broadcasts."""
 92        self.device_pattern = device_pattern
 93        """The configured device name pattern match for broadcasts."""
 94        self.advertisements: TTLCache = TTLCache(
 95            maxsize=len(self.channels) or PYBRICKS_MAX_CHANNEL, ttl=message_ttl
 96        )
 97        """Cache of observed broadcasts."""
 98
 99        # Filters used for active scanning
100        filters: BlueZDiscoveryFilters = BlueZDiscoveryFilters()
101
102        if device_pattern:
103            filters["Pattern"] = device_pattern
104        if rssi_threshold:
105            filters["RSSI"] = rssi_threshold
106
107        # Patterns used for passive scanning
108        or_patterns: list[OrPattern | tuple[int, AdvertisementDataType, bytes]]
109
110        if self.channels:
111            or_patterns = [
112                OrPattern(
113                    0,
114                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
115                    pack("<HB", LEGO_CID, channel),
116                )
117                for channel in self.channels
118            ]
119        else:
120            or_patterns = [
121                OrPattern(
122                    0,
123                    AdvertisementDataType.MANUFACTURER_SPECIFIC_DATA,
124                    pack("<H", LEGO_CID),
125                )
126            ]
127
128        log.debug(
129            "Observer init: scanning_mode=%s, device_pattern=%s, rssi_threshold=%s",
130            scanning_mode,
131            device_pattern,
132            rssi_threshold,
133        )
134
135        self._scanner = BleakScanner(
136            detection_callback=self._callback,
137            scanning_mode=scanning_mode,
138            bluez=BlueZScannerArgs(
139                filters=filters,
140                or_patterns=or_patterns,
141            ),
142            adapter=adapter_name,
143        )

Creates a new observer.

Parameters
  • adapter_name: The Bluetooth adapter to use for scanning, defaults to None (auto-discover default device).
  • scanning_mode: The scanning mode to use, defaults to "passive".
  • channels: Channels to observe, defaults to None (all channels).
  • rssi_threshold: Minimum required signal strength of observed broadcasts in dBm, defaults to None (no RSSI filtering).
  • device_pattern: Pattern that the device name of the sender must start with, defaults to None (no name filtering).
  • message_ttl: Time in seconds to cache observed broadcasts for, defaults to 60.
channels

List of channels that this observer is monitoring.

rssi_threshold

The configured RSSI threshold for broadcasts.

device_pattern

The configured device name pattern match for broadcasts.

advertisements: cachetools.TTLCache

Cache of observed broadcasts.

def _callback( self, device: bleak.backends.device.BLEDevice, ad: bleak.backends.scanner.AdvertisementData):
145    def _callback(self, device: BLEDevice, ad: AdvertisementData):
146        """
147        @public
148        The callback function for detected BLE advertisements from the
149        scanner.
150
151        Performs filtering of advertisements, decodes the Pybricks
152        broadcast message contained in the advertisement, and stores
153        it in the broadcast cache.
154
155        Depending on the selected scanning mode, certain filters must
156        be applied here:
157
158        1. Filter advertisements based on the configured `rssi_threshold`
159            and `device_pattern` (required in "passive" mode).
160        2. Filter advertisements that contain invalid manufacturer data,
161            such as non-Pybricks advertisements (required in "active" mode).
162        4. Filter messages on the incorrect Pybricks channel (required in
163            "active" mode).
164
165        :param device: The device sending the advertisement.
166        :param ad: The advertisement data.
167        """
168        if self.rssi_threshold is not None and ad.rssi < self.rssi_threshold:
169            log.debug("Filtered AD due to RSSI below threshold: %i", ad.rssi)
170            return
171
172        if (ad.local_name and self.device_pattern) and not ad.local_name.startswith(
173            self.device_pattern
174        ):
175            log.debug("Filtered AD due to invalid device name: %s", ad.local_name)
176            return
177
178        if LEGO_CID not in ad.manufacturer_data:
179            log.debug(
180                "Filtered AD due to invalid manufacturer data: %s",
181                ad.manufacturer_data.keys(),
182            )
183            return
184
185        message = ad.manufacturer_data[LEGO_CID]
186        channel, data = decode_message(message)
187
188        if self.channels and channel not in self.channels:
189            log.debug("Filtered broadcast due to wrong channel: %i", channel)
190            return
191
192        log.info(
193            "Pybricks broadcast on channel %i: %s (rssi %s)", channel, data, ad.rssi
194        )
195        self.advertisements[channel] = ObservedAdvertisement(data, ad.rssi)

The callback function for detected BLE advertisements from the scanner.

Performs filtering of advertisements, decodes the Pybricks broadcast message contained in the advertisement, and stores it in the broadcast cache.

Depending on the selected scanning mode, certain filters must be applied here:

  1. Filter advertisements based on the configured rssi_threshold and device_pattern (required in "passive" mode).
  2. Filter advertisements that contain invalid manufacturer data, such as non-Pybricks advertisements (required in "active" mode).
  3. Filter messages on the incorrect Pybricks channel (required in "active" mode).
Parameters
  • device: The device sending the advertisement.
  • ad: The advertisement data.
def observe( self, channel: int) -> ObservedAdvertisement | None:
197    def observe(self, channel: int) -> ObservedAdvertisement | None:
198        """
199        Retrieves the last observed data for a given channel.
200
201        :param channel: The channel to observe (0 to 255).
202        :return: The received data in the same format as it was sent, or `None`
203            if no recent data is available.
204        """
205        if self.channels and channel not in self.channels:
206            raise ValueError(f"Channel {channel} not allocated.")
207
208        return self.advertisements.get(channel, None)

Retrieves the last observed data for a given channel.

Parameters
  • channel: The channel to observe (0 to 255).
Returns

The received data in the same format as it was sent, or None if no recent data is available.

class ObservedAdvertisement(typing.NamedTuple):
28class ObservedAdvertisement(NamedTuple):
29    """
30    Data structure for an observed broadcast.
31    """
32
33    data: PybricksBroadcastData
34    rssi: int

Data structure for an observed broadcast.

ObservedAdvertisement( data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...], rssi: int)

Create new instance of ObservedAdvertisement(data, rssi)

data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...]

Alias for field number 0

rssi: int

Alias for field number 1

class LEAdvertisement(dbus_fast.service.ServiceInterface):
 99class LEAdvertisement(ServiceInterface):
100    """
101    Implementation of the `org.bluez.LEAdvertisement1` D-Bus interface.
102    """
103
104    INTERFACE_NAME: str = "org.bluez.LEAdvertisement1"
105
106    def __init__(
107        self,
108        advertising_type: Type,
109        local_name: str,
110        index: int = 0,
111        includes: set[Include] = set(),
112    ):
113        if index < 0:
114            raise ValueError("index must be positive")
115
116        self.index = index
117        self.path = f"/org/bluez/{local_name}/advertisement{index:03}"
118
119        self._type: str = advertising_type.value
120        self._service_uuids: list[str] = []
121        self._manufacturer_data: dict[int, Variant] = {}  # uint16 -> bytes
122        self._solicit_uuids: list[str] = []
123        self._service_data: dict[str, Variant] = {}  # uint16 | str -> bytes
124        self._data: dict[int, Variant] = {}  # EXPERIMENTAL # uint8 -> bytes
125        self._discoverable: bool = False  # EXPERIMENTAL
126        self._discoverable_timeout: int = 0  # EXPERIMENTAL # uint16
127        self._includes: list[str] = [i.value for i in includes]
128        self._local_name: str = local_name
129        self._appearance: int = 0x00  # uint16
130        self._duration: int = 2  # uint16
131        self._timeout: int = 0  # uint16
132        self._secondary_channel: str = SecondaryChannel.ONE.value  # EXPERIMENTAL
133        self._min_interval: int = 100  # EXPERIMENTAL # uint32
134        self._max_interval: int = 1000  # EXPERIMENTAL # uint32
135        self._tx_power: int = 7  # EXPERIMENTAL # int16
136
137        super().__init__(self.INTERFACE_NAME)
138
139    def _enable_props(self, *prop_names: str):
140        """
141        Enables the given properties.
142
143        This should be used by subclasses to opt-into experimental properties.
144
145        :param prop_names: List of D-Bus property names to enable.
146        :raises ValueError: If an unknown property was passed.
147        """
148        for prop_name in prop_names:
149            prop: _Property | None = next(
150                (
151                    p
152                    for p in ServiceInterface._get_properties(self)
153                    if p.name == prop_name
154                ),
155                None,
156            )
157            if prop is None:
158                raise ValueError(f"Unknown property: {prop_name}")
159            else:
160                prop.disabled = False
161
162    def _disable_props(self, *prop_names: str):
163        """
164        Disables the given properties.
165
166        This can be used by subclasses to opt-out of exposing certain properties.
167
168        :param prop_names: List of D-Bus property names to disable.
169        :raises ValueError: If an unknown property was passed.
170        """
171        for prop_name in prop_names:
172            prop: _Property | None = next(
173                (
174                    p
175                    for p in ServiceInterface._get_properties(self)
176                    if p.name == prop_name
177                ),
178                None,
179            )
180            if prop is None:
181                raise ValueError(f"Unknown property: {prop_name}")
182            else:
183                prop.disabled = True
184
185    @dbus_method()
186    def Release(self) -> None:
187        logger.debug("Released advertisement: %s", self)
188
189    @dbus_property(access=PropertyAccess.READ)
190    def Type(self) -> DBusStr:
191        """
192        Determines the type of advertising packet requested.
193        """
194        return self._type
195
196    @Type.setter  # type: ignore[no-redef]
197    def Type(self, type: DBusStr) -> None:
198        self._type = type
199
200    @dbus_property()
201    def ServiceUUIDs(self) -> DBusArrayString:
202        """
203        List of UUIDs to include in the "Service UUID" field of the Advertising Data.
204        """
205        return self._service_uuids
206
207    @ServiceUUIDs.setter  # type: ignore[no-redef]
208    def ServiceUUIDs(self, service_uuids: DBusArrayString) -> None:
209        self._service_uuids = service_uuids
210
211    @dbus_property()
212    def ManufacturerData(self) -> DBusUInt16Dict:
213        """
214        Manufacturer Data fields to include in the Advertising Data.
215        Keys are the Manufacturer ID to associate with the data.
216        """
217        return self._manufacturer_data
218
219    @ManufacturerData.setter  # type: ignore[no-redef]
220    def ManufacturerData(self, data: DBusUInt16Dict) -> None:
221        self._manufacturer_data = data
222
223    @dbus_property()
224    def SolicitUUIDs(self) -> DBusArrayString:
225        """
226        Array of UUIDs to include in "Service Solicitation" Advertisement Data.
227        """
228        return self._solicit_uuids
229
230    @SolicitUUIDs.setter  # type: ignore[no-redef]
231    def SolicitUUIDs(self, uuids: DBusArrayString) -> None:
232        self._solicit_uuids = uuids
233
234    @dbus_property()
235    def ServiceData(self) -> DBusDict:
236        """
237        Service Data elements to include. The keys are the UUID to associate with the data.
238        """
239        return self._service_data
240
241    @ServiceData.setter  # type: ignore[no-redef]
242    def ServiceData(self, data: DBusDict) -> None:
243        self._service_data = data
244
245    @dbus_property(disabled=True)
246    def Data(self) -> DBusByteDict:
247        """
248        Advertising Data to include.
249        Key is the advertising type and value is the data as byte array.
250        """
251        return self._data
252
253    @Data.setter  # type: ignore[no-redef]
254    def Data(self, data: DBusByteDict) -> None:
255        self._data = data
256
257    @dbus_property(disabled=True)
258    def Discoverable(self) -> DBusBool:
259        """
260        Advertise as general discoverable.
261        When present this will override adapter Discoverable property.
262        """
263        return self._discoverable
264
265    @Discoverable.setter  # type: ignore[no-redef]
266    def Discoverable(self, discoverable: DBusBool) -> None:
267        self._discoverable = discoverable
268
269    @dbus_property(disabled=True)
270    def DiscoverableTimeout(self) -> DBusUInt16:
271        """
272        The discoverable timeout in seconds.
273        A value of zero means that the timeout is disabled and it will stay in discoverable/limited mode forever.
274        """
275        return self._discoverable_timeout
276
277    @DiscoverableTimeout.setter  # type: ignore[no-redef]
278    def DiscoverableTimeout(self, timeout: DBusUInt16) -> None:
279        self._discoverable_timeout = timeout
280
281    @dbus_property()
282    def Includes(self) -> DBusArrayString:
283        """
284        List of features to be included in the advertising packet.
285        """
286        return self._includes
287
288    @Includes.setter  # type: ignore[no-redef]
289    def Includes(self, includes: DBusArrayString) -> None:
290        self._includes = includes
291
292    @dbus_property()
293    def LocalName(self) -> DBusStr:
294        """
295        Local name to be used in the advertising report.
296        If the string is too big to fit into the packet it will be truncated.
297        """
298        return self._local_name
299
300    @LocalName.setter  # type: ignore[no-redef]
301    def LocalName(self, name: DBusStr) -> None:
302        self._local_name = name
303
304    @dbus_property()
305    def Appearance(self) -> DBusUInt16:
306        """
307        Appearance to be used in the advertising report.
308        """
309        return self._appearance
310
311    @Appearance.setter  # type: ignore[no-redef]
312    def Appearance(self, appearance: DBusUInt16) -> None:
313        self._appearance = appearance
314
315    @dbus_property()
316    def Duration(self) -> DBusUInt16:
317        """
318        Rotation duration of the advertisement in seconds.
319        If there are other applications advertising no duration is set the default is 2 seconds.
320        """
321        return self._duration
322
323    @Duration.setter  # type: ignore[no-redef]
324    def Duration(self, seconds: DBusUInt16) -> None:
325        self._duration = seconds
326
327    @dbus_property()
328    def Timeout(self) -> DBusUInt16:
329        """
330        Timeout of the advertisement in seconds.
331        This defines the lifetime of the advertisement.
332        """
333        return self._timeout
334
335    @Timeout.setter  # type: ignore[no-redef]
336    def Timeout(self, seconds: DBusUInt16) -> None:
337        self._timeout = seconds
338
339    @dbus_property(disabled=True)
340    def SecondaryChannel(self) -> DBusStr:
341        """
342        Secondary channel to be used.
343        Primary channel is always set to "1M" except when "Coded" is set.
344        """
345        return self._secondary_channel
346
347    @SecondaryChannel.setter  # type: ignore[no-redef]
348    def SecondaryChannel(self, channel: DBusStr) -> None:
349        self._secondary_channel = channel
350
351    @dbus_property(disabled=True)
352    def MinInterval(self) -> DBusUInt32:
353        """
354        Minimum advertising interval to be used by the advertising set, in milliseconds.
355        Acceptable values are in the range [20ms, 10,485s].
356        If the provided MinInterval is larger than the provided MaxInterval,
357        the registration will return failure.
358        """
359        return self._min_interval
360
361    @MinInterval.setter  # type: ignore[no-redef]
362    def MinInterval(self, milliseconds: DBusUInt32) -> None:
363        self._min_interval = milliseconds
364
365    @dbus_property(disabled=True)
366    def MaxInterval(self) -> DBusUInt32:
367        """
368        Maximum advertising interval to be used by the advertising set, in milliseconds.
369        Acceptable values are in the range [20ms, 10,485s].
370        If the provided MinInterval is larger than the provided MaxInterval,
371        the registration will return failure.
372        """
373        return self._max_interval
374
375    @MaxInterval.setter  # type: ignore[no-redef]
376    def MaxInterval(self, milliseconds: DBusUInt32) -> None:
377        self._max_interval = milliseconds
378
379    @dbus_property(disabled=True)
380    def TxPower(self) -> DBusInt16:
381        """
382        Requested transmission power of this advertising set.
383        The provided value is used only if the "CanSetTxPower" feature is enabled on the org.bluez.LEAdvertisingManager(5).
384        The provided value must be in range [-127 to +20], where units are in dBm.
385        """
386        return self._tx_power
387
388    @TxPower.setter  # type: ignore[no-redef]
389    def TxPower(self, dbm: DBusInt16) -> None:
390        self._tx_power = dbm

Implementation of the org.bluez.LEAdvertisement1 D-Bus interface.

LEAdvertisement( advertising_type: Type, local_name: str, index: int = 0, includes: set[Include] = set())
106    def __init__(
107        self,
108        advertising_type: Type,
109        local_name: str,
110        index: int = 0,
111        includes: set[Include] = set(),
112    ):
113        if index < 0:
114            raise ValueError("index must be positive")
115
116        self.index = index
117        self.path = f"/org/bluez/{local_name}/advertisement{index:03}"
118
119        self._type: str = advertising_type.value
120        self._service_uuids: list[str] = []
121        self._manufacturer_data: dict[int, Variant] = {}  # uint16 -> bytes
122        self._solicit_uuids: list[str] = []
123        self._service_data: dict[str, Variant] = {}  # uint16 | str -> bytes
124        self._data: dict[int, Variant] = {}  # EXPERIMENTAL # uint8 -> bytes
125        self._discoverable: bool = False  # EXPERIMENTAL
126        self._discoverable_timeout: int = 0  # EXPERIMENTAL # uint16
127        self._includes: list[str] = [i.value for i in includes]
128        self._local_name: str = local_name
129        self._appearance: int = 0x00  # uint16
130        self._duration: int = 2  # uint16
131        self._timeout: int = 0  # uint16
132        self._secondary_channel: str = SecondaryChannel.ONE.value  # EXPERIMENTAL
133        self._min_interval: int = 100  # EXPERIMENTAL # uint32
134        self._max_interval: int = 1000  # EXPERIMENTAL # uint32
135        self._tx_power: int = 7  # EXPERIMENTAL # int16
136
137        super().__init__(self.INTERFACE_NAME)
INTERFACE_NAME: str = 'org.bluez.LEAdvertisement1'
index
path
@dbus_method()
def Release(self) -> None:
185    @dbus_method()
186    def Release(self) -> None:
187        logger.debug("Released advertisement: %s", self)
Type: Annotated[str, DBusSignature(signature='s')]
189    @dbus_property(access=PropertyAccess.READ)
190    def Type(self) -> DBusStr:
191        """
192        Determines the type of advertising packet requested.
193        """
194        return self._type

Determines the type of advertising packet requested.

ServiceUUIDs: Annotated[list[str], DBusSignature(signature='as')]
200    @dbus_property()
201    def ServiceUUIDs(self) -> DBusArrayString:
202        """
203        List of UUIDs to include in the "Service UUID" field of the Advertising Data.
204        """
205        return self._service_uuids

List of UUIDs to include in the "Service UUID" field of the Advertising Data.

ManufacturerData: Annotated[dict[int, dbus_fast.signature.Variant], DBusSignature(signature='a{qv}')]
211    @dbus_property()
212    def ManufacturerData(self) -> DBusUInt16Dict:
213        """
214        Manufacturer Data fields to include in the Advertising Data.
215        Keys are the Manufacturer ID to associate with the data.
216        """
217        return self._manufacturer_data

Manufacturer Data fields to include in the Advertising Data. Keys are the Manufacturer ID to associate with the data.

SolicitUUIDs: Annotated[list[str], DBusSignature(signature='as')]
223    @dbus_property()
224    def SolicitUUIDs(self) -> DBusArrayString:
225        """
226        Array of UUIDs to include in "Service Solicitation" Advertisement Data.
227        """
228        return self._solicit_uuids

Array of UUIDs to include in "Service Solicitation" Advertisement Data.

ServiceData: Annotated[dict[str, dbus_fast.signature.Variant], DBusSignature(signature='a{sv}')]
234    @dbus_property()
235    def ServiceData(self) -> DBusDict:
236        """
237        Service Data elements to include. The keys are the UUID to associate with the data.
238        """
239        return self._service_data

Service Data elements to include. The keys are the UUID to associate with the data.

Data: Annotated[dict[int, dbus_fast.signature.Variant], DBusSignature(signature='a{yv}')]
245    @dbus_property(disabled=True)
246    def Data(self) -> DBusByteDict:
247        """
248        Advertising Data to include.
249        Key is the advertising type and value is the data as byte array.
250        """
251        return self._data

Advertising Data to include. Key is the advertising type and value is the data as byte array.

Discoverable: Annotated[bool, DBusSignature(signature='b')]
257    @dbus_property(disabled=True)
258    def Discoverable(self) -> DBusBool:
259        """
260        Advertise as general discoverable.
261        When present this will override adapter Discoverable property.
262        """
263        return self._discoverable

Advertise as general discoverable. When present this will override adapter Discoverable property.

DiscoverableTimeout: Annotated[int, DBusSignature(signature='q')]
269    @dbus_property(disabled=True)
270    def DiscoverableTimeout(self) -> DBusUInt16:
271        """
272        The discoverable timeout in seconds.
273        A value of zero means that the timeout is disabled and it will stay in discoverable/limited mode forever.
274        """
275        return self._discoverable_timeout

The discoverable timeout in seconds. A value of zero means that the timeout is disabled and it will stay in discoverable/limited mode forever.

Includes: Annotated[list[str], DBusSignature(signature='as')]
281    @dbus_property()
282    def Includes(self) -> DBusArrayString:
283        """
284        List of features to be included in the advertising packet.
285        """
286        return self._includes

List of features to be included in the advertising packet.

LocalName: Annotated[str, DBusSignature(signature='s')]
292    @dbus_property()
293    def LocalName(self) -> DBusStr:
294        """
295        Local name to be used in the advertising report.
296        If the string is too big to fit into the packet it will be truncated.
297        """
298        return self._local_name

Local name to be used in the advertising report. If the string is too big to fit into the packet it will be truncated.

Appearance: Annotated[int, DBusSignature(signature='q')]
304    @dbus_property()
305    def Appearance(self) -> DBusUInt16:
306        """
307        Appearance to be used in the advertising report.
308        """
309        return self._appearance

Appearance to be used in the advertising report.

Duration: Annotated[int, DBusSignature(signature='q')]
315    @dbus_property()
316    def Duration(self) -> DBusUInt16:
317        """
318        Rotation duration of the advertisement in seconds.
319        If there are other applications advertising no duration is set the default is 2 seconds.
320        """
321        return self._duration

Rotation duration of the advertisement in seconds. If there are other applications advertising no duration is set the default is 2 seconds.

Timeout: Annotated[int, DBusSignature(signature='q')]
327    @dbus_property()
328    def Timeout(self) -> DBusUInt16:
329        """
330        Timeout of the advertisement in seconds.
331        This defines the lifetime of the advertisement.
332        """
333        return self._timeout

Timeout of the advertisement in seconds. This defines the lifetime of the advertisement.

SecondaryChannel: Annotated[str, DBusSignature(signature='s')]
339    @dbus_property(disabled=True)
340    def SecondaryChannel(self) -> DBusStr:
341        """
342        Secondary channel to be used.
343        Primary channel is always set to "1M" except when "Coded" is set.
344        """
345        return self._secondary_channel

Secondary channel to be used. Primary channel is always set to "1M" except when "Coded" is set.

MinInterval: Annotated[int, DBusSignature(signature='u')]
351    @dbus_property(disabled=True)
352    def MinInterval(self) -> DBusUInt32:
353        """
354        Minimum advertising interval to be used by the advertising set, in milliseconds.
355        Acceptable values are in the range [20ms, 10,485s].
356        If the provided MinInterval is larger than the provided MaxInterval,
357        the registration will return failure.
358        """
359        return self._min_interval

Minimum advertising interval to be used by the advertising set, in milliseconds. Acceptable values are in the range [20ms, 10,485s]. If the provided MinInterval is larger than the provided MaxInterval, the registration will return failure.

MaxInterval: Annotated[int, DBusSignature(signature='u')]
365    @dbus_property(disabled=True)
366    def MaxInterval(self) -> DBusUInt32:
367        """
368        Maximum advertising interval to be used by the advertising set, in milliseconds.
369        Acceptable values are in the range [20ms, 10,485s].
370        If the provided MinInterval is larger than the provided MaxInterval,
371        the registration will return failure.
372        """
373        return self._max_interval

Maximum advertising interval to be used by the advertising set, in milliseconds. Acceptable values are in the range [20ms, 10,485s]. If the provided MinInterval is larger than the provided MaxInterval, the registration will return failure.

TxPower: Annotated[int, DBusSignature(signature='n')]
379    @dbus_property(disabled=True)
380    def TxPower(self) -> DBusInt16:
381        """
382        Requested transmission power of this advertising set.
383        The provided value is used only if the "CanSetTxPower" feature is enabled on the org.bluez.LEAdvertisingManager(5).
384        The provided value must be in range [-127 to +20], where units are in dBm.
385        """
386        return self._tx_power

Requested transmission power of this advertising set. The provided value is used only if the "CanSetTxPower" feature is enabled on the org.bluez.LEAdvertisingManager(5). The provided value must be in range [-127 to +20], where units are in dBm.

class BroadcastAdvertisement(pb_ble.bluezdbus.LEAdvertisement):
393class BroadcastAdvertisement(LEAdvertisement):
394    """
395    Implementation of a broadcast advertisement.
396
397    This sets the advertising type to "broadcast" and toggles
398    available properties appropriately.
399    """
400
401    def __init__(
402        self,
403        local_name: str,
404        index: int = 0,
405        on_release: Callable[[str], None] = lambda path: None,
406    ):
407        super().__init__(
408            Type.BROADCAST,
409            local_name,
410            index,
411            # set([Include.LOCAL_NAME]),
412        )
413
414        self.on_release: Callable[[str], None] = on_release
415        """Callback function that is called when this advertisement is released by BlueZ."""
416
417        # Disable properties that aren't needed for broadcasting
418        self._disable_props(
419            "ServiceUUIDs",
420            "SolicitUUIDs",
421            "LocalName",
422            "Appearance",
423            "Duration",
424        )
425        # Enable experimental properties useful for broadcasting
426        self._enable_props("MinInterval", "MaxInterval", "TxPower")
427
428        # for prop in ServiceInterface._get_properties(self):
429        #    logger.debug("Property %s (%s)", prop.name, "DISABLED" if prop.disabled else "ENABLED")
430
431    @dbus_method()
432    def Release(self) -> None:
433        super().Release()
434        self.on_release(self.path)

Implementation of a broadcast advertisement.

This sets the advertising type to "broadcast" and toggles available properties appropriately.

BroadcastAdvertisement( local_name: str, index: int = 0, on_release: Callable[[str], NoneType] = <function BroadcastAdvertisement.<lambda>>)
401    def __init__(
402        self,
403        local_name: str,
404        index: int = 0,
405        on_release: Callable[[str], None] = lambda path: None,
406    ):
407        super().__init__(
408            Type.BROADCAST,
409            local_name,
410            index,
411            # set([Include.LOCAL_NAME]),
412        )
413
414        self.on_release: Callable[[str], None] = on_release
415        """Callback function that is called when this advertisement is released by BlueZ."""
416
417        # Disable properties that aren't needed for broadcasting
418        self._disable_props(
419            "ServiceUUIDs",
420            "SolicitUUIDs",
421            "LocalName",
422            "Appearance",
423            "Duration",
424        )
425        # Enable experimental properties useful for broadcasting
426        self._enable_props("MinInterval", "MaxInterval", "TxPower")
427
428        # for prop in ServiceInterface._get_properties(self):
429        #    logger.debug("Property %s (%s)", prop.name, "DISABLED" if prop.disabled else "ENABLED")
on_release: Callable[[str], NoneType]

Callback function that is called when this advertisement is released by BlueZ.

@dbus_method()
def Release(self) -> None:
431    @dbus_method()
432    def Release(self) -> None:
433        super().Release()
434        self.on_release(self.path)
class PybricksBroadcastAdvertisement(pb_ble.bluezdbus.BroadcastAdvertisement):
437class PybricksBroadcastAdvertisement(BroadcastAdvertisement):
438    """
439    Implementation of a Pybricks broadcast advertisement.
440
441    The data to broadcast is set via the message property.
442    """
443
444    LEGO_CID = LEGO_CID
445    """LEGO System A/S company identifier."""
446
447    def __init__(
448        self,
449        local_name: str,
450        channel: int = 0,
451        data: PybricksBroadcastData | None = None,
452        on_release: Callable[[str], None] = lambda path: None,
453    ):
454        super().__init__(local_name, channel, on_release)
455        if data:
456            self.message = data
457
458    @property
459    def channel(self) -> int:
460        """The channel of this broadcast message."""
461        return self.index
462
463    @property
464    def message(self) -> PybricksBroadcastData | None:
465        """The data contained in this broadcast message."""
466        if self.LEGO_CID in self._manufacturer_data:
467            channel, value = decode_message(
468                self._manufacturer_data[self.LEGO_CID].value
469            )
470            return value
471        else:
472            return None
473
474    @message.setter
475    def message(self, value: PybricksBroadcastData) -> None:
476        value = value if isinstance(value, tuple) else (value,)
477        message = encode_message(self.channel, *value)
478        self._manufacturer_data[self.LEGO_CID] = Variant("ay", message)
479        # Notify BlueZ of the changed manufacturer data so the advertisement is updated
480        self.emit_properties_changed(
481            changed_properties={"ManufacturerData": self._manufacturer_data}
482        )
483
484    def __str__(self):
485        return f"PybricksBroadcastAdvertisement(channel={self.channel}, data={self.message!r}, timeout={self._timeout})"

Implementation of a Pybricks broadcast advertisement.

The data to broadcast is set via the message property.

PybricksBroadcastAdvertisement( local_name: str, channel: int = 0, data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...] | None = None, on_release: Callable[[str], NoneType] = <function PybricksBroadcastAdvertisement.<lambda>>)
447    def __init__(
448        self,
449        local_name: str,
450        channel: int = 0,
451        data: PybricksBroadcastData | None = None,
452        on_release: Callable[[str], None] = lambda path: None,
453    ):
454        super().__init__(local_name, channel, on_release)
455        if data:
456            self.message = data
LEGO_CID = 919

LEGO System A/S company identifier.

channel: int
458    @property
459    def channel(self) -> int:
460        """The channel of this broadcast message."""
461        return self.index

The channel of this broadcast message.

message: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...] | None
463    @property
464    def message(self) -> PybricksBroadcastData | None:
465        """The data contained in this broadcast message."""
466        if self.LEGO_CID in self._manufacturer_data:
467            channel, value = decode_message(
468                self._manufacturer_data[self.LEGO_CID].value
469            )
470            return value
471        else:
472            return None

The data contained in this broadcast message.

class LEAdvertisingManager:
551class LEAdvertisingManager:
552    """
553    Client implementation of the `org.bluez.LEAdvertisementManager1` D-Bus interface.
554    """
555
556    INTERFACE_NAME: str = "org.bluez.LEAdvertisingManager1"
557
558    _adv_manager: LEAdvertisingManager1
559
560    def __init__(
561        self,
562        adapter: ProxyObject | None = None,
563        adv_manager: ProxyInterface | None = None,
564    ):
565        if adapter is None and adv_manager is None:
566            raise ValueError("adapter or adv_manager required")
567
568        if adv_manager:
569            self._adv_manager = cast(LEAdvertisingManager1, adv_manager)
570        elif adapter:
571            self._adv_manager = cast(
572                LEAdvertisingManager1, adapter.get_interface(self.INTERFACE_NAME)
573            )
574
575    async def register_advertisement(
576        self, adv: LEAdvertisement, options: dict | None = None
577    ):
578        """
579        Registers an advertisement object to be sent over the LE Advertising channel.
580        The service must implement `org.bluez.LEAdvertisement1` interface.
581
582        :param adv: The advertisement service object.
583        :param options: Advertisement options, defaults to None.
584        :return: `None`
585        """
586        options = options or {}
587        return await self._adv_manager.call_register_advertisement(adv.path, options)
588
589    @overload
590    async def unregister_advertisement(self, adv: LEAdvertisement): ...
591    @overload
592    async def unregister_advertisement(self, adv: str): ...
593    async def unregister_advertisement(self, adv):
594        """
595        Unregisters an advertisement that has been previously registered using `register_advertisement()`.
596        The object path parameter must match the same value that has been used on registration.
597
598        :param adv: The advertisement service object, or path.
599        :return: `None`
600        """
601        if isinstance(adv, str):
602            return await self._adv_manager.call_unregister_advertisement(adv)
603        else:
604            return await self._adv_manager.call_unregister_advertisement(adv.path)
605
606    async def active_instances(self) -> int:
607        """Number of active advertising instances."""
608        return await self._adv_manager.get_active_instances()
609
610    async def supported_instances(self) -> int:
611        """Number of available advertising instances."""
612        return await self._adv_manager.get_supported_instances()
613
614    async def supported_includes(self) -> list[Include]:
615        """List of supported system includes."""
616        return await self._adv_manager.get_supported_includes()
617
618    async def supported_secondary_channels(self) -> list[SecondaryChannel]:
619        """List of supported Secondary channels.
620        Secondary channels can be used to advertise  with the corresponding PHY.
621        """
622        return await self._adv_manager.get_supported_secondary_channels()
623
624    async def supported_capabilities(self) -> dict[Capability, Any]:
625        """Enumerates Advertising-related controller capabilities useful to the client."""
626        return await self._adv_manager.get_supported_capabilities()
627
628    async def supported_features(self) -> list[Feature]:
629        """List  of supported platform features.
630        If no features are available on the platform, the SupportedFeatures array will be empty.
631        """
632        return await self._adv_manager.get_supported_features()

Client implementation of the org.bluez.LEAdvertisementManager1 D-Bus interface.

LEAdvertisingManager( adapter: dbus_fast.aio.proxy_object.ProxyObject | None = None, adv_manager: dbus_fast.aio.proxy_object.ProxyInterface | None = None)
560    def __init__(
561        self,
562        adapter: ProxyObject | None = None,
563        adv_manager: ProxyInterface | None = None,
564    ):
565        if adapter is None and adv_manager is None:
566            raise ValueError("adapter or adv_manager required")
567
568        if adv_manager:
569            self._adv_manager = cast(LEAdvertisingManager1, adv_manager)
570        elif adapter:
571            self._adv_manager = cast(
572                LEAdvertisingManager1, adapter.get_interface(self.INTERFACE_NAME)
573            )
INTERFACE_NAME: str = 'org.bluez.LEAdvertisingManager1'
async def register_advertisement( self, adv: LEAdvertisement, options: dict | None = None):
575    async def register_advertisement(
576        self, adv: LEAdvertisement, options: dict | None = None
577    ):
578        """
579        Registers an advertisement object to be sent over the LE Advertising channel.
580        The service must implement `org.bluez.LEAdvertisement1` interface.
581
582        :param adv: The advertisement service object.
583        :param options: Advertisement options, defaults to None.
584        :return: `None`
585        """
586        options = options or {}
587        return await self._adv_manager.call_register_advertisement(adv.path, options)

Registers an advertisement object to be sent over the LE Advertising channel. The service must implement org.bluez.LEAdvertisement1 interface.

Parameters
  • adv: The advertisement service object.
  • options: Advertisement options, defaults to None.
Returns

None

async def unregister_advertisement(self, adv):
593    async def unregister_advertisement(self, adv):
594        """
595        Unregisters an advertisement that has been previously registered using `register_advertisement()`.
596        The object path parameter must match the same value that has been used on registration.
597
598        :param adv: The advertisement service object, or path.
599        :return: `None`
600        """
601        if isinstance(adv, str):
602            return await self._adv_manager.call_unregister_advertisement(adv)
603        else:
604            return await self._adv_manager.call_unregister_advertisement(adv.path)

Unregisters an advertisement that has been previously registered using register_advertisement(). The object path parameter must match the same value that has been used on registration.

Parameters
  • adv: The advertisement service object, or path.
Returns

None

async def active_instances(self) -> int:
606    async def active_instances(self) -> int:
607        """Number of active advertising instances."""
608        return await self._adv_manager.get_active_instances()

Number of active advertising instances.

async def supported_instances(self) -> int:
610    async def supported_instances(self) -> int:
611        """Number of available advertising instances."""
612        return await self._adv_manager.get_supported_instances()

Number of available advertising instances.

async def supported_includes(self) -> list[Include]:
614    async def supported_includes(self) -> list[Include]:
615        """List of supported system includes."""
616        return await self._adv_manager.get_supported_includes()

List of supported system includes.

async def supported_secondary_channels(self) -> list[SecondaryChannel]:
618    async def supported_secondary_channels(self) -> list[SecondaryChannel]:
619        """List of supported Secondary channels.
620        Secondary channels can be used to advertise  with the corresponding PHY.
621        """
622        return await self._adv_manager.get_supported_secondary_channels()

List of supported Secondary channels. Secondary channels can be used to advertise with the corresponding PHY.

async def supported_capabilities(self) -> dict[Capability, typing.Any]:
624    async def supported_capabilities(self) -> dict[Capability, Any]:
625        """Enumerates Advertising-related controller capabilities useful to the client."""
626        return await self._adv_manager.get_supported_capabilities()

Enumerates Advertising-related controller capabilities useful to the client.

async def supported_features(self) -> list[Feature]:
628    async def supported_features(self) -> list[Feature]:
629        """List  of supported platform features.
630        If no features are available on the platform, the SupportedFeatures array will be empty.
631        """
632        return await self._adv_manager.get_supported_features()

List of supported platform features. If no features are available on the platform, the SupportedFeatures array will be empty.

class Type(enum.Enum):
46class Type(Enum):
47    """LEAdvertisement: Type"""
48
49    BROADCAST = "broadcast"
50    PERIPHERAL = "peripheral"

LEAdvertisement: Type

BROADCAST = <Type.BROADCAST: 'broadcast'>
PERIPHERAL = <Type.PERIPHERAL: 'peripheral'>
class Include(enum.Enum):
53class Include(Enum):
54    """LEAdvertisingManager: SupportedIncludes"""
55
56    TX_POWER = "tx-power"
57    APPEARANCE = "appearance"
58    LOCAL_NAME = "local-name"
59    RSI = "rsi"

LEAdvertisingManager: SupportedIncludes

TX_POWER = <Include.TX_POWER: 'tx-power'>
APPEARANCE = <Include.APPEARANCE: 'appearance'>
LOCAL_NAME = <Include.LOCAL_NAME: 'local-name'>
RSI = <Include.RSI: 'rsi'>
class Capability(enum.Enum):
70class Capability(Enum):
71    MAX_ADV_LEN = "MaxAdvLen"
72    """Max advertising data length [byte]"""
73
74    MAX_SCN_RSP_LEN = "MaxScnRspLen"
75    """Max advertising scan response length [byte]"""
76
77    MIN_TX_POWER = "MinTxPower"
78    """Min advertising tx power (dBm) [int16]"""
79
80    MAX_TX_POWER = "MaxTxPower"
81    """Max advertising tx power (dBm) [int16]"""
MAX_ADV_LEN = <Capability.MAX_ADV_LEN: 'MaxAdvLen'>

Max advertising data length [byte]

MAX_SCN_RSP_LEN = <Capability.MAX_SCN_RSP_LEN: 'MaxScnRspLen'>

Max advertising scan response length [byte]

MIN_TX_POWER = <Capability.MIN_TX_POWER: 'MinTxPower'>

Min advertising tx power (dBm) [int16]

MAX_TX_POWER = <Capability.MAX_TX_POWER: 'MaxTxPower'>

Max advertising tx power (dBm) [int16]

class Feature(enum.Enum):
84class Feature(Enum):
85    """LEAdvertisingManager: SupportedFeatures"""
86
87    CAN_SET_TX_POWER = "CanSetTxPower"
88    """Indicates whether platform can specify tx power on each advertising instance."""
89
90    HARDWARE_OFFLOAD = "HardwareOffload"
91    """Indicates whether multiple advertising will be offloaded to the controller."""

LEAdvertisingManager: SupportedFeatures

CAN_SET_TX_POWER = <Feature.CAN_SET_TX_POWER: 'CanSetTxPower'>

Indicates whether platform can specify tx power on each advertising instance.

HARDWARE_OFFLOAD = <Feature.HARDWARE_OFFLOAD: 'HardwareOffload'>

Indicates whether multiple advertising will be offloaded to the controller.

class SecondaryChannel(enum.Enum):
62class SecondaryChannel(Enum):
63    """LEAdvertisingManager: SupportedSecondaryChannels"""
64
65    ONE = "1M"
66    TWO = "2M"
67    CODED = "Coded"

LEAdvertisingManager: SupportedSecondaryChannels

ONE = <SecondaryChannel.ONE: '1M'>
TWO = <SecondaryChannel.TWO: '2M'>
CODED = <SecondaryChannel.CODED: 'Coded'>