pb_ble
A Python implementation of the Pybricks connectionless Bluetooth messaging protocol.
This package includes the following modules:
- Python implementation of the Pybricks BLE hub interface (
pb_ble.vhub
). - Observer and Broadcaster for Pybricks BLE messages backed by BlueZ (
pb_ble.bluezdbus
). - Decoder and Encoder for the Pybricks BLE message format (
pb_ble.messages
).
Requirements
Pybricks is using Bluetooth Low Energy (BLE) broadcasting advertising packets to exchange connectionless messages (also known as "BLE Broadcast/Observe").
To use the BLE radio features of this library, you need:
- a BLE-capable Bluetooth adapter.
- a device running Linux with BlueZ and D-Bus (e.g. Ubuntu 20.04 or newer).
To use the BLE observer in passive scanning mode (recommended):
- BlueZ version 5.56 or newer, with experimental features enabled
(e.g. Ubuntu 22.04 configured with the bluetoothd
--experimental
flag).
Usage from the command line
This package includes CLI tools to communicate with Pybricks devices via
connectionless Bluetooth messaging, see pb_ble.cli
for details and usage.
This is a great way to test your Pybricks programs without having a second Pybricks Hub available.
It also allows you to check that the connectivity between your device and Pybricks is going to work as expected before endeavouring further :)
Usage from Python
High level: The
get_virtual_ble()
method can be used to configure and obtain a high level client object. This object is similar to the Pybricks Hub BLE interface, so should feel familiar if you've written a Pybricks program already.Low level: D-Bus client and service objects defined in
pb_ble.bluezdbus
can be used to integrate Pybricks connectionless Bluetooth messaging into applications independent from the Pybricks Hub API.
1""" 2A Python implementation of the [Pybricks connectionless Bluetooth messaging][pybricks-message-spec] 3protocol. 4 5This package includes the following modules: 6 7- Python implementation of the Pybricks BLE hub interface (`pb_ble.vhub`). 8- Observer and Broadcaster for Pybricks BLE messages backed by BlueZ (`pb_ble.bluezdbus`). 9- Decoder and Encoder for the Pybricks BLE message format (`pb_ble.messages`). 10 11## Requirements 12 13Pybricks is using Bluetooth Low Energy (BLE) broadcasting advertising packets 14to exchange connectionless messages (also known as "BLE Broadcast/Observe"). 15 16To use the BLE radio features of this library, you need: 17 18- a BLE-capable Bluetooth adapter. 19- a device running Linux with BlueZ and D-Bus (e.g. Ubuntu 20.04 or newer). 20 21To use the BLE observer in passive scanning mode (recommended): 22 23- BlueZ version 5.56 or newer, with [experimental features enabled][bluez-experimental] 24 (e.g. Ubuntu 22.04 configured with the bluetoothd `--experimental` flag). 25 26## Usage from the command line 27 28This package includes CLI tools to communicate with Pybricks devices via 29connectionless Bluetooth messaging, see `pb_ble.cli` for details and usage. 30 31This is a great way to test your Pybricks programs without having a second 32Pybricks Hub available. 33 34It also allows you to check that the connectivity between your device and Pybricks 35is going to work as expected before endeavouring further :) 36 37## Usage from Python 38 39- High level: The `get_virtual_ble()` method can be used to configure and 40 obtain a high level client object. This object is similar to the Pybricks 41 Hub BLE interface, so should feel familiar if you've written a Pybricks 42 program already. 43 44- Low level: D-Bus client and service objects defined in `pb_ble.bluezdbus` can 45 be used to integrate Pybricks connectionless Bluetooth messaging into 46 applications independent from the Pybricks Hub API. 47 48[pybricks-message-spec]:https://github.com/pybricks/technical-info/blob/master/pybricks-ble-broadcast-observe.md 49[bluez-experimental]:https://wiki.archlinux.org/title/Bluetooth#Enabling_experimental_features 50""" 51 52from .constants import ( 53 LEGO_CID, 54 PybricksBroadcast, 55 PybricksBroadcastData, 56 PybricksBroadcastValue, 57) 58from .vhub import VirtualBLE, get_virtual_ble 59 60__all__ = ( 61 # Core API 62 "get_virtual_ble", 63 "VirtualBLE", 64 "PybricksBroadcast", 65 # Submodules 66 "vhub", 67 "bluezdbus", 68 "messages", 69 "cli", 70)
100async def get_virtual_ble( 101 adapter_name: str | None = None, 102 device_name: str = VirtualBLE.DEFAULT_DEVICE_NAME, 103 broadcast_channel: int = 0, 104 observe_channels: Sequence[int] | None = None, 105 scanning_mode: ScanningMode = "passive", 106 device_filter: str | None = None, 107) -> VirtualBLE: 108 """ 109 Creates a "virtual" Pybricks BLE radio that can be used to exchange 110 messages with other Pybricks Hubs via connectionless Bluetooth messaging. 111 112 The `VirtualBLE` object implements an interface very similar to the [common Pybricks BLE interface](https://github.com/pybricks/pybricks-api/blob/v3.5.0/src/pybricks/_common.py#L1331) 113 available on Pybricks Hubs. The crucial difference is that some methods 114 must be called asynchronously. 115 116 `get_virtual_ble()` should be used as an asynchronous context manager: 117 118 ```python 119 async with await get_virtual_ble( 120 broadcast_channel=2 121 ) as vble: 122 # Broadcast a random number on channel 2 123 val = random.randint(0, 3) 124 await vble.broadcast(val) 125 # Stop after 10 seconds 126 await asyncio.sleep(10) 127 ``` 128 129 :param adapter_name: The Bluetooth adapter to use, defaults to `None` 130 (auto-discover default device). 131 :param device_name: The name of the hub. This may be used as local name 132 in the BLE advertisement data, defaults to `VirtualBLE.DEFAULT_DEVICE_NAME`. 133 :param broadcast_channel: A value from 0 to 255 indicating which channel 134 `VirtualBLE.broadcast()` will use, defaults to 0. 135 :param observe_channels: A list of channels to listen to for use 136 with `VirtualBLE.observe()`, defaults to `None` (all channels). 137 :param scanning_mode: The scanning mode to use for observing broadcasts, 138 defaults to `passive`. 139 - Passive scanning is the default and recommended mode. 140 However it is not supported by all devices. 141 - Active scanning is provided as a well-supported fallback. 142 It may negatively impact the power consumption of nearby BLE devices. 143 :param device_filter: Provides a mechanism to filter observed broadcasts 144 based on the custom name of the sending Pybricks Hub, defaults to `None` (no filter). 145 For example, set this to `Pybricks` to receive only broadcasts from Hubs 146 that have a name starting with "Pybricks". 147 :return: A `VirtualBLE` object which is loosely adhering to the Pybricks Hub 148 BLE interface. 149 """ 150 bus: MessageBus = await MessageBus(bus_type=BusType.SYSTEM).connect() 151 152 # Find given adapter or default adapter 153 adapter_name, details = ( 154 await get_adapter_details() 155 if adapter_name is None 156 else await get_adapter_details(adapter_name) 157 ) 158 adapter: ProxyObject = await get_adapter(bus, adapter_name) 159 160 broadcaster = BlueZBroadcaster(bus=bus, adapter=adapter, name=device_name) 161 observer = BlueZPybricksObserver( 162 adapter_name=adapter_name, 163 scanning_mode=scanning_mode, 164 channels=observe_channels, 165 device_pattern=device_filter, 166 ) 167 168 return VirtualBLE( 169 broadcaster=broadcaster, 170 observer=observer, 171 broadcast_channel=broadcast_channel, 172 device_version=str(details["hw_version"]), 173 )
Creates a "virtual" Pybricks BLE radio that can be used to exchange messages with other Pybricks Hubs via connectionless Bluetooth messaging.
The VirtualBLE
object implements an interface very similar to the common Pybricks BLE interface
available on Pybricks Hubs. The crucial difference is that some methods
must be called asynchronously.
get_virtual_ble()
should be used as an asynchronous context manager:
async with await get_virtual_ble(
broadcast_channel=2
) as vble:
# Broadcast a random number on channel 2
val = random.randint(0, 3)
await vble.broadcast(val)
# Stop after 10 seconds
await asyncio.sleep(10)
Parameters
- adapter_name: The Bluetooth adapter to use, defaults to
None
(auto-discover default device). - device_name: The name of the hub. This may be used as local name
in the BLE advertisement data, defaults to
VirtualBLE.DEFAULT_DEVICE_NAME
. - broadcast_channel: A value from 0 to 255 indicating which channel
VirtualBLE.broadcast()
will use, defaults to 0. - observe_channels: A list of channels to listen to for use
with
VirtualBLE.observe()
, defaults toNone
(all channels). - scanning_mode: The scanning mode to use for observing broadcasts,
defaults to
passive
.- Passive scanning is the default and recommended mode. However it is not supported by all devices.
- Active scanning is provided as a well-supported fallback. It may negatively impact the power consumption of nearby BLE devices.
- device_filter: Provides a mechanism to filter observed broadcasts
based on the custom name of the sending Pybricks Hub, defaults to
None
(no filter). For example, set this toPybricks
to receive only broadcasts from Hubs that have a name starting with "Pybricks".
Returns
A
VirtualBLE
object which is loosely adhering to the Pybricks Hub BLE interface.
26class VirtualBLE(_common.BLE, AsyncExitStack): 27 DEFAULT_DEVICE_NAME: ClassVar[str] = "pb_vhub" 28 """The default device name to use in data broadcasts.""" 29 DEFAULT_DEVICE_VERSION: ClassVar[str] = "1.0" 30 """The default device version to return from `version()`.""" 31 32 _adv: PybricksBroadcastAdvertisement 33 """The current data broadcast.""" 34 _device_version: str 35 """The version string configured for this VirtualBLE device.""" 36 37 _broadcaster: BlueZBroadcaster 38 """The broadcaster to use.""" 39 _observer: BlueZPybricksObserver 40 """The observer to use.""" 41 42 def __init__( 43 self, 44 broadcaster: BlueZBroadcaster, 45 observer: BlueZPybricksObserver, 46 broadcast_channel: int, 47 device_version: str = DEFAULT_DEVICE_VERSION, 48 ): 49 if ( 50 not isinstance(broadcast_channel, int) 51 or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL 52 ): 53 raise ValueError( 54 f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}." 55 ) 56 57 super(AsyncExitStack, self).__init__() 58 59 self._device_version = device_version 60 self._broadcaster = broadcaster 61 self._observer = observer 62 63 self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel) 64 65 async def __aenter__(self): 66 try: 67 await self.enter_async_context(self._broadcaster) 68 await self.enter_async_context(self._observer) 69 except Exception: 70 if not await self.__aexit__(*sys.exc_info()): 71 raise 72 return self 73 74 async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override] 75 if len(data) == 0: 76 raise ValueError("Broadcast must be a value or tuple.") 77 if None in data: 78 await self._broadcaster.stop_broadcast(self._adv) 79 else: 80 if not self._broadcaster.is_broadcasting(self._adv): 81 await self._broadcaster.broadcast(self._adv) 82 self._adv.message = cast(PybricksBroadcastData, tuple(data)) 83 84 def observe(self, channel: int) -> PybricksBroadcastData | None: 85 advertisement = self._observer.observe(channel) 86 87 if advertisement is not None: 88 return advertisement.data 89 else: 90 return None 91 92 def signal_strength(self, channel: int) -> int: 93 advertisement = self._observer.observe(channel) 94 return advertisement.rssi if advertisement is not None else -128 95 96 def version(self) -> str: 97 return self._device_version
Bluetooth Low Energy.
New in version 3.3.
42 def __init__( 43 self, 44 broadcaster: BlueZBroadcaster, 45 observer: BlueZPybricksObserver, 46 broadcast_channel: int, 47 device_version: str = DEFAULT_DEVICE_VERSION, 48 ): 49 if ( 50 not isinstance(broadcast_channel, int) 51 or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL 52 ): 53 raise ValueError( 54 f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}." 55 ) 56 57 super(AsyncExitStack, self).__init__() 58 59 self._device_version = device_version 60 self._broadcaster = broadcaster 61 self._observer = observer 62 63 self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
74 async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override] 75 if len(data) == 0: 76 raise ValueError("Broadcast must be a value or tuple.") 77 if None in data: 78 await self._broadcaster.stop_broadcast(self._adv) 79 else: 80 if not self._broadcaster.is_broadcasting(self._adv): 81 await self._broadcaster.broadcast(self._adv) 82 self._adv.message = cast(PybricksBroadcastData, tuple(data))
broadcast(data)
Starts broadcasting the given data on
the broadcast_channel
you selected when initializing the hub.
Data may be of type int
, float
, str
, bytes
,
True
, or False
. It can also be a list or tuple of these.
Choose None
to stop broadcasting. This helps improve performance
when you don't need the broadcast feature, especially when observing
at the same time.
The total data size is quite limited (26 bytes). True
and
False
take 1 byte each. float
takes 5 bytes. int
takes 2 to
5 bytes depending on how big the number is. str
and bytes
take
the number of bytes in the object plus one extra byte.
When multitasking, only one task can broadcast at a time. To broadcast information from multiple tasks (or block stacks), you could use a dedicated separate task that broadcast new values when one or more variables change.
Args: data: The value or values to be broadcast.
New in version 3.3.
84 def observe(self, channel: int) -> PybricksBroadcastData | None: 85 advertisement = self._observer.observe(channel) 86 87 if advertisement is not None: 88 return advertisement.data 89 else: 90 return None
observe(channel) -> bool | int | float | str | bytes | tuple | None
Retrieves the last observed data for a given channel.
Receiving data is more reliable when the hub is not connected to a computer or other devices at the same time.
Args: channel (int): The channel to observe (0 to 255).
Returns:
The received data in the same format as it was sent, or None
if no recent data is available.
New in version 3.3.
92 def signal_strength(self, channel: int) -> int: 93 advertisement = self._observer.observe(channel) 94 return advertisement.rssi if advertisement is not None else -128
signal_strength(channel) -> int: dBm
Gets the average signal strength in dBm for the given channel.
This indicates how near the broadcasting device is. Nearby devices may have a signal strength around -40 dBm, while far away devices might have a signal strength around -70 dBm.
Args: channel (int): The channel number (0 to 255).
Returns:
The signal strength or -128
if there is no recent observed data.
New in version 3.3.
27class PybricksBroadcast(NamedTuple): 28 """ 29 Data structure for a Pybricks broadcast. 30 """ 31 32 channel: int 33 """The broadcast channel for the data (0 to 255).""" 34 35 data: PybricksBroadcastData 36 """The value or values to be broadcast."""
Data structure for a Pybricks broadcast.