pb_ble.vhub
1import sys 2from contextlib import AsyncExitStack 3from typing import ClassVar, Sequence, cast 4 5from dbus_fast.aio import MessageBus, ProxyObject 6from dbus_fast.constants import BusType 7from pybricks.hubs import _common 8 9from .bluezdbus import ( 10 BlueZBroadcaster, 11 BlueZPybricksObserver, 12 PybricksBroadcastAdvertisement, 13 get_adapter, 14 get_adapter_details, 15) 16from .constants import ( 17 PYBRICKS_MAX_CHANNEL, 18 PYBRICKS_MIN_CHANNEL, 19 PybricksBroadcastData, 20 PybricksBroadcastValue, 21 ScanningMode, 22) 23 24 25class VirtualBLE(_common.BLE, AsyncExitStack): 26 DEFAULT_DEVICE_NAME: ClassVar[str] = "pb_vhub" 27 """The default device name to use in data broadcasts.""" 28 DEFAULT_DEVICE_VERSION: ClassVar[str] = "1.0" 29 """The default device version to return from `version()`.""" 30 31 _adv: PybricksBroadcastAdvertisement 32 """The current data broadcast.""" 33 _device_version: str 34 """The version string configured for this VirtualBLE device.""" 35 36 _broadcaster: BlueZBroadcaster 37 """The broadcaster to use.""" 38 _observer: BlueZPybricksObserver 39 """The observer to use.""" 40 41 def __init__( 42 self, 43 broadcaster: BlueZBroadcaster, 44 observer: BlueZPybricksObserver, 45 broadcast_channel: int, 46 device_version: str = DEFAULT_DEVICE_VERSION, 47 ): 48 if ( 49 not isinstance(broadcast_channel, int) 50 or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL 51 ): 52 raise ValueError( 53 f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}." 54 ) 55 56 super(AsyncExitStack, self).__init__() 57 58 self._device_version = device_version 59 self._broadcaster = broadcaster 60 self._observer = observer 61 62 self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel) 63 64 async def __aenter__(self): 65 try: 66 await self.enter_async_context(self._broadcaster) 67 await self.enter_async_context(self._observer) 68 except Exception: 69 if not await self.__aexit__(*sys.exc_info()): 70 raise 71 return self 72 73 async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override] 74 if len(data) == 0: 75 raise ValueError("Broadcast must be a value or tuple.") 76 if None in data: 77 await self._broadcaster.stop_broadcast(self._adv) 78 else: 79 if not self._broadcaster.is_broadcasting(self._adv): 80 await self._broadcaster.broadcast(self._adv) 81 self._adv.message = cast(PybricksBroadcastData, tuple(data)) 82 83 def observe(self, channel: int) -> PybricksBroadcastData | None: 84 advertisement = self._observer.observe(channel) 85 86 if advertisement is not None: 87 return advertisement.data 88 else: 89 return None 90 91 def signal_strength(self, channel: int) -> int: 92 advertisement = self._observer.observe(channel) 93 return advertisement.rssi if advertisement is not None else -128 94 95 def version(self) -> str: 96 return self._device_version 97 98 99async def get_virtual_ble( 100 adapter_name: str | None = None, 101 device_name: str = VirtualBLE.DEFAULT_DEVICE_NAME, 102 broadcast_channel: int = 0, 103 observe_channels: Sequence[int] | None = None, 104 scanning_mode: ScanningMode = "passive", 105 device_filter: str | None = None, 106) -> VirtualBLE: 107 """ 108 Creates a "virtual" Pybricks BLE radio that can be used to exchange 109 messages with other Pybricks Hubs via connectionless Bluetooth messaging. 110 111 The `VirtualBLE` object implements an interface very similar to the [common Pybricks BLE interface](https://github.com/pybricks/pybricks-api/blob/v3.5.0/src/pybricks/_common.py#L1331) 112 available on Pybricks Hubs. The crucial difference is that some methods 113 must be called asynchronously. 114 115 `get_virtual_ble()` should be used as an asynchronous context manager: 116 117 ```python 118 async with await get_virtual_ble( 119 broadcast_channel=2 120 ) as vble: 121 # Broadcast a random number on channel 2 122 val = random.randint(0, 3) 123 await vble.broadcast(val) 124 # Stop after 10 seconds 125 await asyncio.sleep(10) 126 ``` 127 128 :param adapter_name: The Bluetooth adapter to use, defaults to `None` 129 (auto-discover default device). 130 :param device_name: The name of the hub. This may be used as local name 131 in the BLE advertisement data, defaults to `VirtualBLE.DEFAULT_DEVICE_NAME`. 132 :param broadcast_channel: A value from 0 to 255 indicating which channel 133 `VirtualBLE.broadcast()` will use, defaults to 0. 134 :param observe_channels: A list of channels to listen to for use 135 with `VirtualBLE.observe()`, defaults to `None` (all channels). 136 :param scanning_mode: The scanning mode to use for observing broadcasts, 137 defaults to `passive`. 138 - Passive scanning is the default and recommended mode. 139 However it is not supported by all devices. 140 - Active scanning is provided as a well-supported fallback. 141 It may negatively impact the power consumption of nearby BLE devices. 142 :param device_filter: Provides a mechanism to filter observed broadcasts 143 based on the custom name of the sending Pybricks Hub, defaults to `None` (no filter). 144 For example, set this to `Pybricks` to receive only broadcasts from Hubs 145 that have a name starting with "Pybricks". 146 :return: A `VirtualBLE` object which is loosely adhering to the Pybricks Hub 147 BLE interface. 148 """ 149 bus: MessageBus = await MessageBus(bus_type=BusType.SYSTEM).connect() 150 151 # Find given adapter or default adapter 152 adapter_name, details = ( 153 await get_adapter_details() 154 if adapter_name is None 155 else await get_adapter_details(adapter_name) 156 ) 157 adapter: ProxyObject = await get_adapter(bus, adapter_name) 158 159 broadcaster = BlueZBroadcaster(bus=bus, adapter=adapter, name=device_name) 160 observer = BlueZPybricksObserver( 161 adapter_name=adapter_name, 162 scanning_mode=scanning_mode, 163 channels=observe_channels, 164 device_pattern=device_filter, 165 ) 166 167 return VirtualBLE( 168 broadcaster=broadcaster, 169 observer=observer, 170 broadcast_channel=broadcast_channel, 171 device_version=str(details["hw_version"]), 172 )
26class VirtualBLE(_common.BLE, AsyncExitStack): 27 DEFAULT_DEVICE_NAME: ClassVar[str] = "pb_vhub" 28 """The default device name to use in data broadcasts.""" 29 DEFAULT_DEVICE_VERSION: ClassVar[str] = "1.0" 30 """The default device version to return from `version()`.""" 31 32 _adv: PybricksBroadcastAdvertisement 33 """The current data broadcast.""" 34 _device_version: str 35 """The version string configured for this VirtualBLE device.""" 36 37 _broadcaster: BlueZBroadcaster 38 """The broadcaster to use.""" 39 _observer: BlueZPybricksObserver 40 """The observer to use.""" 41 42 def __init__( 43 self, 44 broadcaster: BlueZBroadcaster, 45 observer: BlueZPybricksObserver, 46 broadcast_channel: int, 47 device_version: str = DEFAULT_DEVICE_VERSION, 48 ): 49 if ( 50 not isinstance(broadcast_channel, int) 51 or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL 52 ): 53 raise ValueError( 54 f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}." 55 ) 56 57 super(AsyncExitStack, self).__init__() 58 59 self._device_version = device_version 60 self._broadcaster = broadcaster 61 self._observer = observer 62 63 self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel) 64 65 async def __aenter__(self): 66 try: 67 await self.enter_async_context(self._broadcaster) 68 await self.enter_async_context(self._observer) 69 except Exception: 70 if not await self.__aexit__(*sys.exc_info()): 71 raise 72 return self 73 74 async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override] 75 if len(data) == 0: 76 raise ValueError("Broadcast must be a value or tuple.") 77 if None in data: 78 await self._broadcaster.stop_broadcast(self._adv) 79 else: 80 if not self._broadcaster.is_broadcasting(self._adv): 81 await self._broadcaster.broadcast(self._adv) 82 self._adv.message = cast(PybricksBroadcastData, tuple(data)) 83 84 def observe(self, channel: int) -> PybricksBroadcastData | None: 85 advertisement = self._observer.observe(channel) 86 87 if advertisement is not None: 88 return advertisement.data 89 else: 90 return None 91 92 def signal_strength(self, channel: int) -> int: 93 advertisement = self._observer.observe(channel) 94 return advertisement.rssi if advertisement is not None else -128 95 96 def version(self) -> str: 97 return self._device_version
Bluetooth Low Energy.
New in version 3.3.
42 def __init__( 43 self, 44 broadcaster: BlueZBroadcaster, 45 observer: BlueZPybricksObserver, 46 broadcast_channel: int, 47 device_version: str = DEFAULT_DEVICE_VERSION, 48 ): 49 if ( 50 not isinstance(broadcast_channel, int) 51 or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL 52 ): 53 raise ValueError( 54 f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}." 55 ) 56 57 super(AsyncExitStack, self).__init__() 58 59 self._device_version = device_version 60 self._broadcaster = broadcaster 61 self._observer = observer 62 63 self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
74 async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type: ignore [override] 75 if len(data) == 0: 76 raise ValueError("Broadcast must be a value or tuple.") 77 if None in data: 78 await self._broadcaster.stop_broadcast(self._adv) 79 else: 80 if not self._broadcaster.is_broadcasting(self._adv): 81 await self._broadcaster.broadcast(self._adv) 82 self._adv.message = cast(PybricksBroadcastData, tuple(data))
broadcast(data)
Starts broadcasting the given data on
the broadcast_channel
you selected when initializing the hub.
Data may be of type int
, float
, str
, bytes
,
True
, or False
. It can also be a list or tuple of these.
Choose None
to stop broadcasting. This helps improve performance
when you don't need the broadcast feature, especially when observing
at the same time.
The total data size is quite limited (26 bytes). True
and
False
take 1 byte each. float
takes 5 bytes. int
takes 2 to
5 bytes depending on how big the number is. str
and bytes
take
the number of bytes in the object plus one extra byte.
When multitasking, only one task can broadcast at a time. To broadcast information from multiple tasks (or block stacks), you could use a dedicated separate task that broadcast new values when one or more variables change.
Args: data: The value or values to be broadcast.
New in version 3.3.
84 def observe(self, channel: int) -> PybricksBroadcastData | None: 85 advertisement = self._observer.observe(channel) 86 87 if advertisement is not None: 88 return advertisement.data 89 else: 90 return None
observe(channel) -> bool | int | float | str | bytes | tuple | None
Retrieves the last observed data for a given channel.
Receiving data is more reliable when the hub is not connected to a computer or other devices at the same time.
Args: channel (int): The channel to observe (0 to 255).
Returns:
The received data in the same format as it was sent, or None
if no recent data is available.
New in version 3.3.
92 def signal_strength(self, channel: int) -> int: 93 advertisement = self._observer.observe(channel) 94 return advertisement.rssi if advertisement is not None else -128
signal_strength(channel) -> int: dBm
Gets the average signal strength in dBm for the given channel.
This indicates how near the broadcasting device is. Nearby devices may have a signal strength around -40 dBm, while far away devices might have a signal strength around -70 dBm.
Args: channel (int): The channel number (0 to 255).
Returns:
The signal strength or -128
if there is no recent observed data.
New in version 3.3.
100async def get_virtual_ble( 101 adapter_name: str | None = None, 102 device_name: str = VirtualBLE.DEFAULT_DEVICE_NAME, 103 broadcast_channel: int = 0, 104 observe_channels: Sequence[int] | None = None, 105 scanning_mode: ScanningMode = "passive", 106 device_filter: str | None = None, 107) -> VirtualBLE: 108 """ 109 Creates a "virtual" Pybricks BLE radio that can be used to exchange 110 messages with other Pybricks Hubs via connectionless Bluetooth messaging. 111 112 The `VirtualBLE` object implements an interface very similar to the [common Pybricks BLE interface](https://github.com/pybricks/pybricks-api/blob/v3.5.0/src/pybricks/_common.py#L1331) 113 available on Pybricks Hubs. The crucial difference is that some methods 114 must be called asynchronously. 115 116 `get_virtual_ble()` should be used as an asynchronous context manager: 117 118 ```python 119 async with await get_virtual_ble( 120 broadcast_channel=2 121 ) as vble: 122 # Broadcast a random number on channel 2 123 val = random.randint(0, 3) 124 await vble.broadcast(val) 125 # Stop after 10 seconds 126 await asyncio.sleep(10) 127 ``` 128 129 :param adapter_name: The Bluetooth adapter to use, defaults to `None` 130 (auto-discover default device). 131 :param device_name: The name of the hub. This may be used as local name 132 in the BLE advertisement data, defaults to `VirtualBLE.DEFAULT_DEVICE_NAME`. 133 :param broadcast_channel: A value from 0 to 255 indicating which channel 134 `VirtualBLE.broadcast()` will use, defaults to 0. 135 :param observe_channels: A list of channels to listen to for use 136 with `VirtualBLE.observe()`, defaults to `None` (all channels). 137 :param scanning_mode: The scanning mode to use for observing broadcasts, 138 defaults to `passive`. 139 - Passive scanning is the default and recommended mode. 140 However it is not supported by all devices. 141 - Active scanning is provided as a well-supported fallback. 142 It may negatively impact the power consumption of nearby BLE devices. 143 :param device_filter: Provides a mechanism to filter observed broadcasts 144 based on the custom name of the sending Pybricks Hub, defaults to `None` (no filter). 145 For example, set this to `Pybricks` to receive only broadcasts from Hubs 146 that have a name starting with "Pybricks". 147 :return: A `VirtualBLE` object which is loosely adhering to the Pybricks Hub 148 BLE interface. 149 """ 150 bus: MessageBus = await MessageBus(bus_type=BusType.SYSTEM).connect() 151 152 # Find given adapter or default adapter 153 adapter_name, details = ( 154 await get_adapter_details() 155 if adapter_name is None 156 else await get_adapter_details(adapter_name) 157 ) 158 adapter: ProxyObject = await get_adapter(bus, adapter_name) 159 160 broadcaster = BlueZBroadcaster(bus=bus, adapter=adapter, name=device_name) 161 observer = BlueZPybricksObserver( 162 adapter_name=adapter_name, 163 scanning_mode=scanning_mode, 164 channels=observe_channels, 165 device_pattern=device_filter, 166 ) 167 168 return VirtualBLE( 169 broadcaster=broadcaster, 170 observer=observer, 171 broadcast_channel=broadcast_channel, 172 device_version=str(details["hw_version"]), 173 )
Creates a "virtual" Pybricks BLE radio that can be used to exchange messages with other Pybricks Hubs via connectionless Bluetooth messaging.
The VirtualBLE
object implements an interface very similar to the common Pybricks BLE interface
available on Pybricks Hubs. The crucial difference is that some methods
must be called asynchronously.
get_virtual_ble()
should be used as an asynchronous context manager:
async with await get_virtual_ble(
broadcast_channel=2
) as vble:
# Broadcast a random number on channel 2
val = random.randint(0, 3)
await vble.broadcast(val)
# Stop after 10 seconds
await asyncio.sleep(10)
Parameters
- adapter_name: The Bluetooth adapter to use, defaults to
None
(auto-discover default device). - device_name: The name of the hub. This may be used as local name
in the BLE advertisement data, defaults to
VirtualBLE.DEFAULT_DEVICE_NAME
. - broadcast_channel: A value from 0 to 255 indicating which channel
VirtualBLE.broadcast()
will use, defaults to 0. - observe_channels: A list of channels to listen to for use
with
VirtualBLE.observe()
, defaults toNone
(all channels). - scanning_mode: The scanning mode to use for observing broadcasts,
defaults to
passive
.- Passive scanning is the default and recommended mode. However it is not supported by all devices.
- Active scanning is provided as a well-supported fallback. It may negatively impact the power consumption of nearby BLE devices.
- device_filter: Provides a mechanism to filter observed broadcasts
based on the custom name of the sending Pybricks Hub, defaults to
None
(no filter). For example, set this toPybricks
to receive only broadcasts from Hubs that have a name starting with "Pybricks".
Returns
A
VirtualBLE
object which is loosely adhering to the Pybricks Hub BLE interface.