pb_ble

A Python implementation of the Pybricks connectionless Bluetooth messaging protocol.

This package includes the following modules:

  • Python implementation of the Pybricks BLE hub interface (pb_ble.vhub).
  • Observer and Broadcaster for Pybricks BLE messages backed by BlueZ (pb_ble.bluezdbus).
  • Decoder and Encoder for the Pybricks BLE message format (pb_ble.messages).

Requirements

Pybricks is using Bluetooth Low Energy (BLE) broadcasting advertising packets to exchange connectionless messages (also known as "BLE Broadcast/Observe").

To use the BLE radio features of this library, you need:

  • a BLE-capable Bluetooth adapter.
  • a device running Linux with BlueZ and D-Bus (e.g. Ubuntu 20.04 or newer).

To use the BLE observer in passive scanning mode (recommended):

Usage from the command line

This package includes CLI tools to communicate with Pybricks devices via connectionless Bluetooth messaging, see pb_ble.cli for details and usage.

This is a great way to test your Pybricks programs without having a second Pybricks Hub available.

It also allows you to check that the connectivity between your device and Pybricks is going to work as expected before endeavouring further :)

Usage from Python

  • High level: The get_virtual_ble() method can be used to configure and obtain a high level client object. This object is similar to the Pybricks Hub BLE interface, so should feel familiar if you've written a Pybricks program already.

  • Low level: D-Bus client and service objects defined in pb_ble.bluezdbus can be used to integrate Pybricks connectionless Bluetooth messaging into applications independent from the Pybricks Hub API.

 1"""
 2A Python implementation of the [Pybricks connectionless Bluetooth messaging][pybricks-message-spec]
 3protocol.
 4
 5This package includes the following modules:
 6
 7- Python implementation of the Pybricks BLE hub interface (`pb_ble.vhub`).
 8- Observer and Broadcaster for Pybricks BLE messages backed by BlueZ (`pb_ble.bluezdbus`).
 9- Decoder and Encoder for the Pybricks BLE message format (`pb_ble.messages`).
10
11## Requirements
12
13Pybricks is using Bluetooth Low Energy (BLE) broadcasting advertising packets
14to exchange connectionless messages (also known as "BLE Broadcast/Observe").
15
16To use the BLE radio features of this library, you need:
17
18- a BLE-capable Bluetooth adapter.
19- a device running Linux with BlueZ and D-Bus (e.g. Ubuntu 20.04 or newer).
20
21To use the BLE observer in passive scanning mode (recommended):
22
23- BlueZ version 5.56 or newer, with [experimental features enabled][bluez-experimental]
24    (e.g. Ubuntu 22.04 configured with the bluetoothd `--experimental` flag).
25
26## Usage from the command line
27
28This package includes CLI tools to communicate with Pybricks devices via
29connectionless Bluetooth messaging, see `pb_ble.cli` for details and usage.
30
31This is a great way to test your Pybricks programs without having a second
32Pybricks Hub available.
33
34It also allows you to check that the connectivity between your device and Pybricks
35is going to work as expected before endeavouring further :)
36
37## Usage from Python
38
39- High level: The `get_virtual_ble()` method can be used to configure and
40    obtain a high level client object. This object is similar to the Pybricks
41    Hub BLE interface, so should feel familiar if you've written a Pybricks
42    program already.
43
44- Low level: D-Bus client and service objects defined in `pb_ble.bluezdbus` can
45    be used to integrate Pybricks connectionless Bluetooth messaging into
46    applications independent from the Pybricks Hub API.
47
48[pybricks-message-spec]:https://github.com/pybricks/technical-info/blob/master/pybricks-ble-broadcast-observe.md
49[bluez-experimental]:https://wiki.archlinux.org/title/Bluetooth#Enabling_experimental_features
50"""
51
52from .constants import (
53    LEGO_CID,
54    PybricksBroadcast,
55    PybricksBroadcastData,
56    PybricksBroadcastValue,
57)
58from .vhub import VirtualBLE, get_virtual_ble
59
60__all__ = (
61    # Core API
62    "get_virtual_ble",
63    "VirtualBLE",
64    "PybricksBroadcast",
65    # Submodules
66    "vhub",
67    "bluezdbus",
68    "messages",
69    "cli",
70)
async def get_virtual_ble( adapter_name: str | None = None, device_name: str = 'pb_vhub', broadcast_channel: int = 0, observe_channels: Optional[Sequence[int]] = None, scanning_mode: Literal['active', 'passive'] = 'passive', device_filter: str | None = None) -> VirtualBLE:
100async def get_virtual_ble(
101    adapter_name: str | None = None,
102    device_name: str = VirtualBLE.DEFAULT_DEVICE_NAME,
103    broadcast_channel: int = 0,
104    observe_channels: Sequence[int] | None = None,
105    scanning_mode: ScanningMode = "passive",
106    device_filter: str | None = None,
107) -> VirtualBLE:
108    """
109    Creates a "virtual" Pybricks BLE radio that can be used to exchange
110    messages with other Pybricks Hubs via connectionless Bluetooth messaging.
111
112    The `VirtualBLE` object implements an interface very similar to the [common Pybricks BLE interface](https://github.com/pybricks/pybricks-api/blob/v3.5.0/src/pybricks/_common.py#L1331)
113    available on Pybricks Hubs. The crucial difference is that some methods
114    must be called asynchronously.
115
116    `get_virtual_ble()` should be used as an asynchronous context manager:
117
118    ```python
119    async with await get_virtual_ble(
120        broadcast_channel=2
121    ) as vble:
122        # Broadcast a random number on channel 2
123        val = random.randint(0, 3)
124        await vble.broadcast(val)
125        # Stop after 10 seconds
126        await asyncio.sleep(10)
127    ```
128
129    :param adapter_name: The Bluetooth adapter to use, defaults to `None`
130        (auto-discover default device).
131    :param device_name: The name of the hub. This may be used as local name
132        in the BLE advertisement data, defaults to `VirtualBLE.DEFAULT_DEVICE_NAME`.
133    :param broadcast_channel: A value from 0 to 255 indicating which channel
134        `VirtualBLE.broadcast()` will use, defaults to 0.
135    :param observe_channels: A list of channels to listen to for use
136        with `VirtualBLE.observe()`, defaults to `None` (all channels).
137    :param scanning_mode: The scanning mode to use for observing broadcasts,
138        defaults to `passive`.
139        - Passive scanning is the default and recommended mode.
140        However it is not supported by all devices.
141        - Active scanning is provided as a well-supported fallback.
142        It may negatively impact the power consumption of nearby BLE devices.
143    :param device_filter: Provides a mechanism to filter observed broadcasts
144        based on the custom name of the sending Pybricks Hub, defaults to `None` (no filter).
145        For example, set this to `Pybricks` to receive only broadcasts from Hubs
146        that have a name starting with "Pybricks".
147    :return: A `VirtualBLE` object which is loosely adhering to the Pybricks Hub
148        BLE interface.
149    """
150    bus: MessageBus = await MessageBus(bus_type=BusType.SYSTEM).connect()
151
152    # Find given adapter or default adapter
153    adapter_name, details = (
154        await get_adapter_details()
155        if adapter_name is None
156        else await get_adapter_details(adapter_name)
157    )
158    adapter: ProxyObject = await get_adapter(bus, adapter_name)
159
160    broadcaster = BlueZBroadcaster(bus=bus, adapter=adapter, name=device_name)
161    observer = BlueZPybricksObserver(
162        adapter_name=adapter_name,
163        scanning_mode=scanning_mode,
164        channels=observe_channels,
165        device_pattern=device_filter,
166    )
167
168    return VirtualBLE(
169        broadcaster=broadcaster,
170        observer=observer,
171        broadcast_channel=broadcast_channel,
172        device_version=str(details["hw_version"]),
173    )

Creates a "virtual" Pybricks BLE radio that can be used to exchange messages with other Pybricks Hubs via connectionless Bluetooth messaging.

The VirtualBLE object implements an interface very similar to the common Pybricks BLE interface available on Pybricks Hubs. The crucial difference is that some methods must be called asynchronously.

get_virtual_ble() should be used as an asynchronous context manager:

async with await get_virtual_ble(
    broadcast_channel=2
) as vble:
    # Broadcast a random number on channel 2
    val = random.randint(0, 3)
    await vble.broadcast(val)
    # Stop after 10 seconds
    await asyncio.sleep(10)
Parameters
  • adapter_name: The Bluetooth adapter to use, defaults to None (auto-discover default device).
  • device_name: The name of the hub. This may be used as local name in the BLE advertisement data, defaults to VirtualBLE.DEFAULT_DEVICE_NAME.
  • broadcast_channel: A value from 0 to 255 indicating which channel VirtualBLE.broadcast() will use, defaults to 0.
  • observe_channels: A list of channels to listen to for use with VirtualBLE.observe(), defaults to None (all channels).
  • scanning_mode: The scanning mode to use for observing broadcasts, defaults to passive.
    • Passive scanning is the default and recommended mode. However it is not supported by all devices.
    • Active scanning is provided as a well-supported fallback. It may negatively impact the power consumption of nearby BLE devices.
  • device_filter: Provides a mechanism to filter observed broadcasts based on the custom name of the sending Pybricks Hub, defaults to None (no filter). For example, set this to Pybricks to receive only broadcasts from Hubs that have a name starting with "Pybricks".
Returns

A VirtualBLE object which is loosely adhering to the Pybricks Hub BLE interface.

class VirtualBLE(pybricks._common.BLE, contextlib.AsyncExitStack):
26class VirtualBLE(_common.BLE, AsyncExitStack):
27    DEFAULT_DEVICE_NAME: ClassVar[str] = "pb_vhub"
28    """The default device name to use in data broadcasts."""
29    DEFAULT_DEVICE_VERSION: ClassVar[str] = "1.0"
30    """The default device version to return from `version()`."""
31
32    _adv: PybricksBroadcastAdvertisement
33    """The current data broadcast."""
34    _device_version: str
35    """The version string configured for this VirtualBLE device."""
36
37    _broadcaster: BlueZBroadcaster
38    """The broadcaster to use."""
39    _observer: BlueZPybricksObserver
40    """The observer to use."""
41
42    def __init__(
43        self,
44        broadcaster: BlueZBroadcaster,
45        observer: BlueZPybricksObserver,
46        broadcast_channel: int,
47        device_version: str = DEFAULT_DEVICE_VERSION,
48    ):
49        if (
50            not isinstance(broadcast_channel, int)
51            or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL
52        ):
53            raise ValueError(
54                f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
55            )
56
57        super(AsyncExitStack, self).__init__()
58
59        self._device_version = device_version
60        self._broadcaster = broadcaster
61        self._observer = observer
62
63        self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
64
65    async def __aenter__(self):
66        try:
67            await self.enter_async_context(self._broadcaster)
68            await self.enter_async_context(self._observer)
69        except Exception:
70            if not await self.__aexit__(*sys.exc_info()):
71                raise
72        return self
73
74    async def broadcast(self, *data: PybricksBroadcastValue | None) -> None:  # type: ignore [override]
75        if len(data) == 0:
76            raise ValueError("Broadcast must be a value or tuple.")
77        if None in data:
78            await self._broadcaster.stop_broadcast(self._adv)
79        else:
80            if not self._broadcaster.is_broadcasting(self._adv):
81                await self._broadcaster.broadcast(self._adv)
82            self._adv.message = cast(PybricksBroadcastData, tuple(data))
83
84    def observe(self, channel: int) -> PybricksBroadcastData | None:
85        advertisement = self._observer.observe(channel)
86
87        if advertisement is not None:
88            return advertisement.data
89        else:
90            return None
91
92    def signal_strength(self, channel: int) -> int:
93        advertisement = self._observer.observe(channel)
94        return advertisement.rssi if advertisement is not None else -128
95
96    def version(self) -> str:
97        return self._device_version

Bluetooth Low Energy.

New in version 3.3.

VirtualBLE( broadcaster: pb_ble.bluezdbus.BlueZBroadcaster, observer: pb_ble.bluezdbus.BlueZPybricksObserver, broadcast_channel: int, device_version: str = '1.0')
42    def __init__(
43        self,
44        broadcaster: BlueZBroadcaster,
45        observer: BlueZPybricksObserver,
46        broadcast_channel: int,
47        device_version: str = DEFAULT_DEVICE_VERSION,
48    ):
49        if (
50            not isinstance(broadcast_channel, int)
51            or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL
52        ):
53            raise ValueError(
54                f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
55            )
56
57        super(AsyncExitStack, self).__init__()
58
59        self._device_version = device_version
60        self._broadcaster = broadcaster
61        self._observer = observer
62
63        self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
DEFAULT_DEVICE_NAME: ClassVar[str] = 'pb_vhub'

The default device name to use in data broadcasts.

DEFAULT_DEVICE_VERSION: ClassVar[str] = '1.0'

The default device version to return from version().

async def broadcast(self, *data: bool | int | float | str | bytes | None) -> None:
74    async def broadcast(self, *data: PybricksBroadcastValue | None) -> None:  # type: ignore [override]
75        if len(data) == 0:
76            raise ValueError("Broadcast must be a value or tuple.")
77        if None in data:
78            await self._broadcaster.stop_broadcast(self._adv)
79        else:
80            if not self._broadcaster.is_broadcasting(self._adv):
81                await self._broadcaster.broadcast(self._adv)
82            self._adv.message = cast(PybricksBroadcastData, tuple(data))

broadcast(data)

Starts broadcasting the given data on the broadcast_channel you selected when initializing the hub.

Data may be of type int, float, str, bytes, True, or False. It can also be a list or tuple of these.

Choose None to stop broadcasting. This helps improve performance when you don't need the broadcast feature, especially when observing at the same time.

The total data size is quite limited (26 bytes). True and False take 1 byte each. float takes 5 bytes. int takes 2 to 5 bytes depending on how big the number is. str and bytes take the number of bytes in the object plus one extra byte.

When multitasking, only one task can broadcast at a time. To broadcast information from multiple tasks (or block stacks), you could use a dedicated separate task that broadcast new values when one or more variables change.

Args: data: The value or values to be broadcast.

New in version 3.3.

def observe( self, channel: int) -> bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...] | None:
84    def observe(self, channel: int) -> PybricksBroadcastData | None:
85        advertisement = self._observer.observe(channel)
86
87        if advertisement is not None:
88            return advertisement.data
89        else:
90            return None

observe(channel) -> bool | int | float | str | bytes | tuple | None

Retrieves the last observed data for a given channel.

Receiving data is more reliable when the hub is not connected to a computer or other devices at the same time.

Args: channel (int): The channel to observe (0 to 255).

Returns: The received data in the same format as it was sent, or None if no recent data is available.

New in version 3.3.

def signal_strength(self, channel: int) -> int:
92    def signal_strength(self, channel: int) -> int:
93        advertisement = self._observer.observe(channel)
94        return advertisement.rssi if advertisement is not None else -128

signal_strength(channel) -> int: dBm

Gets the average signal strength in dBm for the given channel.

This indicates how near the broadcasting device is. Nearby devices may have a signal strength around -40 dBm, while far away devices might have a signal strength around -70 dBm.

Args: channel (int): The channel number (0 to 255).

Returns: The signal strength or -128 if there is no recent observed data.

New in version 3.3.

def version(self) -> str:
96    def version(self) -> str:
97        return self._device_version

version() -> str

Gets the firmware version from the Bluetooth chip.

New in version 3.3.

class PybricksBroadcast(typing.NamedTuple):
27class PybricksBroadcast(NamedTuple):
28    """
29    Data structure for a Pybricks broadcast.
30    """
31
32    channel: int
33    """The broadcast channel for the data (0 to 255)."""
34
35    data: PybricksBroadcastData
36    """The value or values to be broadcast."""

Data structure for a Pybricks broadcast.

PybricksBroadcast(channel: int, data: ForwardRef('PybricksBroadcastData'))

Create new instance of PybricksBroadcast(channel, data)

channel: int

The broadcast channel for the data (0 to 255).

data: bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...]

The value or values to be broadcast.