pb_ble.vhub

  1import sys
  2from contextlib import AsyncExitStack
  3from typing import ClassVar, Sequence, cast
  4
  5from dbus_fast.aio import MessageBus, ProxyObject
  6from dbus_fast.constants import BusType
  7from pybricks.hubs import _common
  8
  9from .bluezdbus import (
 10    BlueZBroadcaster,
 11    BlueZPybricksObserver,
 12    PybricksBroadcastAdvertisement,
 13    get_adapter,
 14    get_adapter_details,
 15)
 16from .constants import (
 17    PYBRICKS_MAX_CHANNEL,
 18    PYBRICKS_MIN_CHANNEL,
 19    PybricksBroadcastData,
 20    PybricksBroadcastValue,
 21    ScanningMode,
 22)
 23
 24
 25class VirtualBLE(_common.BLE, AsyncExitStack):
 26    DEFAULT_DEVICE_NAME: ClassVar[str] = "pb_vhub"
 27    """The default device name to use in data broadcasts."""
 28    DEFAULT_DEVICE_VERSION: ClassVar[str] = "1.0"
 29    """The default device version to return from `version()`."""
 30
 31    _adv: PybricksBroadcastAdvertisement
 32    """The current data broadcast."""
 33    _device_version: str
 34    """The version string configured for this VirtualBLE device."""
 35
 36    _broadcaster: BlueZBroadcaster
 37    """The broadcaster to use."""
 38    _observer: BlueZPybricksObserver
 39    """The observer to use."""
 40
 41    def __init__(
 42        self,
 43        broadcaster: BlueZBroadcaster,
 44        observer: BlueZPybricksObserver,
 45        broadcast_channel: int,
 46        device_version: str = DEFAULT_DEVICE_VERSION,
 47    ):
 48        if (
 49            not isinstance(broadcast_channel, int)
 50            or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL
 51        ):
 52            raise ValueError(
 53                f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
 54            )
 55
 56        super(AsyncExitStack, self).__init__()
 57
 58        self._device_version = device_version
 59        self._broadcaster = broadcaster
 60        self._observer = observer
 61
 62        self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
 63
 64    async def __aenter__(self):
 65        try:
 66            await self.enter_async_context(self._broadcaster)
 67            await self.enter_async_context(self._observer)
 68        except Exception:
 69            if not await self.__aexit__(*sys.exc_info()):
 70                raise
 71        return self
 72
 73    async def broadcast(self, *data: PybricksBroadcastValue | None) -> None:  # type: ignore [override]
 74        if len(data) == 0:
 75            raise ValueError("Broadcast must be a value or tuple.")
 76        if None in data:
 77            await self._broadcaster.stop_broadcast(self._adv)
 78        else:
 79            if not self._broadcaster.is_broadcasting(self._adv):
 80                await self._broadcaster.broadcast(self._adv)
 81            self._adv.message = cast(PybricksBroadcastData, tuple(data))
 82
 83    def observe(self, channel: int) -> PybricksBroadcastData | None:
 84        advertisement = self._observer.observe(channel)
 85
 86        if advertisement is not None:
 87            return advertisement.data
 88        else:
 89            return None
 90
 91    def signal_strength(self, channel: int) -> int:
 92        advertisement = self._observer.observe(channel)
 93        return advertisement.rssi if advertisement is not None else -128
 94
 95    def version(self) -> str:
 96        return self._device_version
 97
 98
 99async def get_virtual_ble(
100    adapter_name: str | None = None,
101    device_name: str = VirtualBLE.DEFAULT_DEVICE_NAME,
102    broadcast_channel: int = 0,
103    observe_channels: Sequence[int] | None = None,
104    scanning_mode: ScanningMode = "passive",
105    device_filter: str | None = None,
106) -> VirtualBLE:
107    """
108    Creates a "virtual" Pybricks BLE radio that can be used to exchange
109    messages with other Pybricks Hubs via connectionless Bluetooth messaging.
110
111    The `VirtualBLE` object implements an interface very similar to the [common Pybricks BLE interface](https://github.com/pybricks/pybricks-api/blob/v3.5.0/src/pybricks/_common.py#L1331)
112    available on Pybricks Hubs. The crucial difference is that some methods
113    must be called asynchronously.
114
115    `get_virtual_ble()` should be used as an asynchronous context manager:
116
117    ```python
118    async with await get_virtual_ble(
119        broadcast_channel=2
120    ) as vble:
121        # Broadcast a random number on channel 2
122        val = random.randint(0, 3)
123        await vble.broadcast(val)
124        # Stop after 10 seconds
125        await asyncio.sleep(10)
126    ```
127
128    :param adapter_name: The Bluetooth adapter to use, defaults to `None`
129        (auto-discover default device).
130    :param device_name: The name of the hub. This may be used as local name
131        in the BLE advertisement data, defaults to `VirtualBLE.DEFAULT_DEVICE_NAME`.
132    :param broadcast_channel: A value from 0 to 255 indicating which channel
133        `VirtualBLE.broadcast()` will use, defaults to 0.
134    :param observe_channels: A list of channels to listen to for use
135        with `VirtualBLE.observe()`, defaults to `None` (all channels).
136    :param scanning_mode: The scanning mode to use for observing broadcasts,
137        defaults to `passive`.
138        - Passive scanning is the default and recommended mode.
139        However it is not supported by all devices.
140        - Active scanning is provided as a well-supported fallback.
141        It may negatively impact the power consumption of nearby BLE devices.
142    :param device_filter: Provides a mechanism to filter observed broadcasts
143        based on the custom name of the sending Pybricks Hub, defaults to `None` (no filter).
144        For example, set this to `Pybricks` to receive only broadcasts from Hubs
145        that have a name starting with "Pybricks".
146    :return: A `VirtualBLE` object which is loosely adhering to the Pybricks Hub
147        BLE interface.
148    """
149    bus: MessageBus = await MessageBus(bus_type=BusType.SYSTEM).connect()
150
151    # Find given adapter or default adapter
152    adapter_name, details = (
153        await get_adapter_details()
154        if adapter_name is None
155        else await get_adapter_details(adapter_name)
156    )
157    adapter: ProxyObject = await get_adapter(bus, adapter_name)
158
159    broadcaster = BlueZBroadcaster(bus=bus, adapter=adapter, name=device_name)
160    observer = BlueZPybricksObserver(
161        adapter_name=adapter_name,
162        scanning_mode=scanning_mode,
163        channels=observe_channels,
164        device_pattern=device_filter,
165    )
166
167    return VirtualBLE(
168        broadcaster=broadcaster,
169        observer=observer,
170        broadcast_channel=broadcast_channel,
171        device_version=str(details["hw_version"]),
172    )
class VirtualBLE(pybricks._common.BLE, contextlib.AsyncExitStack):
26class VirtualBLE(_common.BLE, AsyncExitStack):
27    DEFAULT_DEVICE_NAME: ClassVar[str] = "pb_vhub"
28    """The default device name to use in data broadcasts."""
29    DEFAULT_DEVICE_VERSION: ClassVar[str] = "1.0"
30    """The default device version to return from `version()`."""
31
32    _adv: PybricksBroadcastAdvertisement
33    """The current data broadcast."""
34    _device_version: str
35    """The version string configured for this VirtualBLE device."""
36
37    _broadcaster: BlueZBroadcaster
38    """The broadcaster to use."""
39    _observer: BlueZPybricksObserver
40    """The observer to use."""
41
42    def __init__(
43        self,
44        broadcaster: BlueZBroadcaster,
45        observer: BlueZPybricksObserver,
46        broadcast_channel: int,
47        device_version: str = DEFAULT_DEVICE_VERSION,
48    ):
49        if (
50            not isinstance(broadcast_channel, int)
51            or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL
52        ):
53            raise ValueError(
54                f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
55            )
56
57        super(AsyncExitStack, self).__init__()
58
59        self._device_version = device_version
60        self._broadcaster = broadcaster
61        self._observer = observer
62
63        self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
64
65    async def __aenter__(self):
66        try:
67            await self.enter_async_context(self._broadcaster)
68            await self.enter_async_context(self._observer)
69        except Exception:
70            if not await self.__aexit__(*sys.exc_info()):
71                raise
72        return self
73
74    async def broadcast(self, *data: PybricksBroadcastValue | None) -> None:  # type: ignore [override]
75        if len(data) == 0:
76            raise ValueError("Broadcast must be a value or tuple.")
77        if None in data:
78            await self._broadcaster.stop_broadcast(self._adv)
79        else:
80            if not self._broadcaster.is_broadcasting(self._adv):
81                await self._broadcaster.broadcast(self._adv)
82            self._adv.message = cast(PybricksBroadcastData, tuple(data))
83
84    def observe(self, channel: int) -> PybricksBroadcastData | None:
85        advertisement = self._observer.observe(channel)
86
87        if advertisement is not None:
88            return advertisement.data
89        else:
90            return None
91
92    def signal_strength(self, channel: int) -> int:
93        advertisement = self._observer.observe(channel)
94        return advertisement.rssi if advertisement is not None else -128
95
96    def version(self) -> str:
97        return self._device_version

Bluetooth Low Energy.

New in version 3.3.

VirtualBLE( broadcaster: pb_ble.bluezdbus.BlueZBroadcaster, observer: pb_ble.bluezdbus.BlueZPybricksObserver, broadcast_channel: int, device_version: str = '1.0')
42    def __init__(
43        self,
44        broadcaster: BlueZBroadcaster,
45        observer: BlueZPybricksObserver,
46        broadcast_channel: int,
47        device_version: str = DEFAULT_DEVICE_VERSION,
48    ):
49        if (
50            not isinstance(broadcast_channel, int)
51            or PYBRICKS_MIN_CHANNEL < broadcast_channel > PYBRICKS_MAX_CHANNEL
52        ):
53            raise ValueError(
54                f"Broadcast channel must be integer from {PYBRICKS_MIN_CHANNEL} to {PYBRICKS_MAX_CHANNEL}."
55            )
56
57        super(AsyncExitStack, self).__init__()
58
59        self._device_version = device_version
60        self._broadcaster = broadcaster
61        self._observer = observer
62
63        self._adv = PybricksBroadcastAdvertisement(broadcaster.name, broadcast_channel)
DEFAULT_DEVICE_NAME: ClassVar[str] = 'pb_vhub'

The default device name to use in data broadcasts.

DEFAULT_DEVICE_VERSION: ClassVar[str] = '1.0'

The default device version to return from version().

async def broadcast(self, *data: bool | int | float | str | bytes | None) -> None:
74    async def broadcast(self, *data: PybricksBroadcastValue | None) -> None:  # type: ignore [override]
75        if len(data) == 0:
76            raise ValueError("Broadcast must be a value or tuple.")
77        if None in data:
78            await self._broadcaster.stop_broadcast(self._adv)
79        else:
80            if not self._broadcaster.is_broadcasting(self._adv):
81                await self._broadcaster.broadcast(self._adv)
82            self._adv.message = cast(PybricksBroadcastData, tuple(data))

broadcast(data)

Starts broadcasting the given data on the broadcast_channel you selected when initializing the hub.

Data may be of type int, float, str, bytes, True, or False. It can also be a list or tuple of these.

Choose None to stop broadcasting. This helps improve performance when you don't need the broadcast feature, especially when observing at the same time.

The total data size is quite limited (26 bytes). True and False take 1 byte each. float takes 5 bytes. int takes 2 to 5 bytes depending on how big the number is. str and bytes take the number of bytes in the object plus one extra byte.

When multitasking, only one task can broadcast at a time. To broadcast information from multiple tasks (or block stacks), you could use a dedicated separate task that broadcast new values when one or more variables change.

Args: data: The value or values to be broadcast.

New in version 3.3.

def observe( self, channel: int) -> bool | int | float | str | bytes | tuple[bool | int | float | str | bytes, ...] | None:
84    def observe(self, channel: int) -> PybricksBroadcastData | None:
85        advertisement = self._observer.observe(channel)
86
87        if advertisement is not None:
88            return advertisement.data
89        else:
90            return None

observe(channel) -> bool | int | float | str | bytes | tuple | None

Retrieves the last observed data for a given channel.

Receiving data is more reliable when the hub is not connected to a computer or other devices at the same time.

Args: channel (int): The channel to observe (0 to 255).

Returns: The received data in the same format as it was sent, or None if no recent data is available.

New in version 3.3.

def signal_strength(self, channel: int) -> int:
92    def signal_strength(self, channel: int) -> int:
93        advertisement = self._observer.observe(channel)
94        return advertisement.rssi if advertisement is not None else -128

signal_strength(channel) -> int: dBm

Gets the average signal strength in dBm for the given channel.

This indicates how near the broadcasting device is. Nearby devices may have a signal strength around -40 dBm, while far away devices might have a signal strength around -70 dBm.

Args: channel (int): The channel number (0 to 255).

Returns: The signal strength or -128 if there is no recent observed data.

New in version 3.3.

def version(self) -> str:
96    def version(self) -> str:
97        return self._device_version

version() -> str

Gets the firmware version from the Bluetooth chip.

New in version 3.3.

async def get_virtual_ble( adapter_name: str | None = None, device_name: str = 'pb_vhub', broadcast_channel: int = 0, observe_channels: Optional[Sequence[int]] = None, scanning_mode: Literal['active', 'passive'] = 'passive', device_filter: str | None = None) -> VirtualBLE:
100async def get_virtual_ble(
101    adapter_name: str | None = None,
102    device_name: str = VirtualBLE.DEFAULT_DEVICE_NAME,
103    broadcast_channel: int = 0,
104    observe_channels: Sequence[int] | None = None,
105    scanning_mode: ScanningMode = "passive",
106    device_filter: str | None = None,
107) -> VirtualBLE:
108    """
109    Creates a "virtual" Pybricks BLE radio that can be used to exchange
110    messages with other Pybricks Hubs via connectionless Bluetooth messaging.
111
112    The `VirtualBLE` object implements an interface very similar to the [common Pybricks BLE interface](https://github.com/pybricks/pybricks-api/blob/v3.5.0/src/pybricks/_common.py#L1331)
113    available on Pybricks Hubs. The crucial difference is that some methods
114    must be called asynchronously.
115
116    `get_virtual_ble()` should be used as an asynchronous context manager:
117
118    ```python
119    async with await get_virtual_ble(
120        broadcast_channel=2
121    ) as vble:
122        # Broadcast a random number on channel 2
123        val = random.randint(0, 3)
124        await vble.broadcast(val)
125        # Stop after 10 seconds
126        await asyncio.sleep(10)
127    ```
128
129    :param adapter_name: The Bluetooth adapter to use, defaults to `None`
130        (auto-discover default device).
131    :param device_name: The name of the hub. This may be used as local name
132        in the BLE advertisement data, defaults to `VirtualBLE.DEFAULT_DEVICE_NAME`.
133    :param broadcast_channel: A value from 0 to 255 indicating which channel
134        `VirtualBLE.broadcast()` will use, defaults to 0.
135    :param observe_channels: A list of channels to listen to for use
136        with `VirtualBLE.observe()`, defaults to `None` (all channels).
137    :param scanning_mode: The scanning mode to use for observing broadcasts,
138        defaults to `passive`.
139        - Passive scanning is the default and recommended mode.
140        However it is not supported by all devices.
141        - Active scanning is provided as a well-supported fallback.
142        It may negatively impact the power consumption of nearby BLE devices.
143    :param device_filter: Provides a mechanism to filter observed broadcasts
144        based on the custom name of the sending Pybricks Hub, defaults to `None` (no filter).
145        For example, set this to `Pybricks` to receive only broadcasts from Hubs
146        that have a name starting with "Pybricks".
147    :return: A `VirtualBLE` object which is loosely adhering to the Pybricks Hub
148        BLE interface.
149    """
150    bus: MessageBus = await MessageBus(bus_type=BusType.SYSTEM).connect()
151
152    # Find given adapter or default adapter
153    adapter_name, details = (
154        await get_adapter_details()
155        if adapter_name is None
156        else await get_adapter_details(adapter_name)
157    )
158    adapter: ProxyObject = await get_adapter(bus, adapter_name)
159
160    broadcaster = BlueZBroadcaster(bus=bus, adapter=adapter, name=device_name)
161    observer = BlueZPybricksObserver(
162        adapter_name=adapter_name,
163        scanning_mode=scanning_mode,
164        channels=observe_channels,
165        device_pattern=device_filter,
166    )
167
168    return VirtualBLE(
169        broadcaster=broadcaster,
170        observer=observer,
171        broadcast_channel=broadcast_channel,
172        device_version=str(details["hw_version"]),
173    )

Creates a "virtual" Pybricks BLE radio that can be used to exchange messages with other Pybricks Hubs via connectionless Bluetooth messaging.

The VirtualBLE object implements an interface very similar to the common Pybricks BLE interface available on Pybricks Hubs. The crucial difference is that some methods must be called asynchronously.

get_virtual_ble() should be used as an asynchronous context manager:

async with await get_virtual_ble(
    broadcast_channel=2
) as vble:
    # Broadcast a random number on channel 2
    val = random.randint(0, 3)
    await vble.broadcast(val)
    # Stop after 10 seconds
    await asyncio.sleep(10)
Parameters
  • adapter_name: The Bluetooth adapter to use, defaults to None (auto-discover default device).
  • device_name: The name of the hub. This may be used as local name in the BLE advertisement data, defaults to VirtualBLE.DEFAULT_DEVICE_NAME.
  • broadcast_channel: A value from 0 to 255 indicating which channel VirtualBLE.broadcast() will use, defaults to 0.
  • observe_channels: A list of channels to listen to for use with VirtualBLE.observe(), defaults to None (all channels).
  • scanning_mode: The scanning mode to use for observing broadcasts, defaults to passive.
    • Passive scanning is the default and recommended mode. However it is not supported by all devices.
    • Active scanning is provided as a well-supported fallback. It may negatively impact the power consumption of nearby BLE devices.
  • device_filter: Provides a mechanism to filter observed broadcasts based on the custom name of the sending Pybricks Hub, defaults to None (no filter). For example, set this to Pybricks to receive only broadcasts from Hubs that have a name starting with "Pybricks".
Returns

A VirtualBLE object which is loosely adhering to the Pybricks Hub BLE interface.