pb_ble.messages

A pure Python implementation of the Pybricks BLE broadcast/observe message protocol.

Supports the message format first introduced in Pybricks v3.3.0b5 and updated in v3.3.0b9.

Based on:

  1"""
  2A pure Python implementation of the Pybricks BLE broadcast/observe message protocol.
  3
  4Supports the message format first introduced in Pybricks `v3.3.0b5` and updated in `v3.3.0b9`.
  5
  6Based on:
  7
  8* [Technical specification](https://github.com/pybricks/technical-info/blob/master/pybricks-ble-broadcast-observe.md) of the message format.
  9* [Reference implementation](https://github.com/pybricks/pybricks-micropython/blob/v3.3.0/pybricks/common/pb_type_ble.c) in Pybricks.
 10
 11"""
 12
 13import logging
 14from enum import IntEnum
 15from struct import pack, unpack, unpack_from
 16from typing import Literal, Tuple, cast
 17
 18from .constants import PybricksBroadcast, PybricksBroadcastData, PybricksBroadcastValue
 19
 20logger = logging.getLogger(__name__)
 21
 22
 23def decode_message(
 24    data: bytes,
 25) -> PybricksBroadcast:
 26    """
 27    Parses a Pybricks broadcast message, typically sourced from
 28    the BLE advertisement manufacturer data.
 29
 30    :param data: The encoded data.
 31    :return: Tuple containing the Pybricks message channel and the
 32        original value. The original value is either a single object
 33        or a tuple.
 34    """
 35
 36    logger.debug(f"decoding[{len(data)}]: {data!r}")
 37
 38    # idx 0 is the channel
 39    channel: int = unpack_from("<B", data)[0]  # uint8
 40    logger.debug(f"channel: {channel}")
 41    # idx 1 is the message start
 42    idx = 1
 43    decoded_data: list[PybricksBroadcastValue] = []
 44    single_object = False
 45
 46    while idx < len(data):
 47        idx, val = _decode_next_value(idx, data)
 48        if val is None:
 49            logger.debug(f"data[{len(decoded_data)}]: SINGLE_OBJECT marker")
 50            single_object = True
 51        else:
 52            logger.debug(f"data[{len(decoded_data)}] of {type(val)!s:<15}: {val!r}")
 53            decoded_data.append(val)
 54
 55    if single_object:
 56        decoded_value: PybricksBroadcastValue = decoded_data[0]
 57        return PybricksBroadcast(channel, decoded_value)
 58    else:
 59        return PybricksBroadcast(
 60            channel, cast(PybricksBroadcastData, tuple(decoded_data))
 61        )
 62
 63
 64def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:
 65    """
 66    Encodes the given values as a Pybricks broadcast message.
 67
 68    :param channel: The Pybricks broadcast channel (0 to 255), defaults to 0.
 69    :param values: The values to encode in the message.
 70    :raises ValueError: If the total size of the message exceeds the maximum
 71        message size of 27 bytes.
 72    :return: The encoded message.
 73    """
 74
 75    # idx 0 is the channel
 76    encoded_channel = pack("<B", channel)
 77    logger.debug(f"channel: {channel} -> {encoded_channel!r}")
 78
 79    # idx 1 is the message start
 80    encoded_data = bytearray(encoded_channel)
 81
 82    if len(values) == 1:
 83        # set SINGLE_OBJECT marker
 84        header = PybricksBleBroadcastDataType.SINGLE_OBJECT << 5
 85        logger.debug(f"data[{len(encoded_data)}]: SINGLE_OBJECT marker")
 86        encoded_data.append(header)
 87
 88    for val in values:
 89        header, encoded_val = _encode_value(val)
 90        logger.debug(
 91            f"data[{len(encoded_data)}] of {type(val)!s}: {val!r} -> ({header!r}, {encoded_val!r})"
 92        )
 93        encoded_data.append(header)
 94
 95        if encoded_val is not None:
 96            encoded_data += encoded_val
 97
 98        # max size is 27 bytes: 26 byte payload + 1 byte channel
 99        if len(encoded_data) > OBSERVED_DATA_MAX_SIZE + 1:
100            raise ValueError(
101                f"Payload too large: {len(encoded_data)} bytes (maximum is {OBSERVED_DATA_MAX_SIZE} bytes)"
102            )
103
104    message = bytes(encoded_data)
105    logger.debug(f"encoded[{len(message)}]: {message!r}")
106    return message
107
108
109def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]:
110    """
111    Unpacks raw data from the PnP ID characteristic.
112
113    Implementation sourced from [pybricksdev](https://github.com/pybricks/pybricksdev/blob/v1.0.0-alpha.50/pybricksdev/ble/pybricks.py#L354)
114    under the MIT License: Copyright (c) 2018-2023 The Pybricks Authors.
115
116    :param data: The raw data.
117    :return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor
118        ID, the product ID and the product revision.
119    """
120    vid_type: int
121    vid: int
122    pid: int
123    rev: int
124
125    vid_type, vid, pid, rev = unpack("<BHHH", data)
126
127    return "BT" if vid_type else "USB", vid, pid, rev
128
129
130def pack_pnp_id(
131    product_id: int,
132    product_rev: int,
133    vendor_id_type: Literal["BT", "USB"] = "BT",
134    vendor_id: int = 0x0397,
135) -> bytes:
136    """
137    Encodes the PnP ID GATT characteristics.
138
139    :param product_id: The Product ID. Should be a LEGO HubKind.
140    :param product_rev: The Product Version.
141    :param vendor_id_type: The Vendor ID Source, defaults to `BT`.
142    :param vendor_id:  The Vendor ID, defaults to `0x0397` (LEGO).
143    :return: The encoded PnP ID.
144    """
145
146    pnp_id = pack(
147        "<BHHH", 1 if vendor_id_type == "BT" else 0, vendor_id, product_id, product_rev
148    )
149    return pnp_id
150
151
152SIZEOF_INT8_T = 1
153"""Size of an int8 value in bytes."""
154SIZEOF_INT16_T = 2
155"""Size of an int16 value in bytes."""
156SIZEOF_INT32_T = 4
157"""Size of an int32 value in bytes."""
158
159INT_FORMAT = {
160    SIZEOF_INT8_T: "<b",
161    SIZEOF_INT16_T: "<h",
162    SIZEOF_INT32_T: "<i",
163}
164"""Mapping of integer types to struct format."""
165
166OBSERVED_DATA_MAX_SIZE = 31 - 5
167"""Maximum size of the observed data included in the BLE advertising packet:
16831 (max adv data size) - 5 (overhead).
169"""
170
171
172class PybricksBleBroadcastDataType(IntEnum):
173    """Type codes used for encoding/decoding data."""
174
175    # NB: These values are sent over the air so the numeric values must not be changed.
176    # There can be at most 8 types since the values have to fit in 3 bits.
177
178    SINGLE_OBJECT = 0
179    """Indicator that the next value is the one and only value (instead of a tuple)."""
180
181    TRUE = 1
182    """The Python @c True value."""
183
184    FALSE = 2
185    """The Python @c False value."""
186
187    INT = 3
188    """The Python @c int type."""
189
190    FLOAT = 4
191    """The Python @c float type."""
192
193    STR = 5
194    """The Python @c str type."""
195
196    BYTES = 6
197    """The Python @c bytes type."""
198
199
200def _decode_next_value(
201    idx: int, data: bytes
202) -> tuple[int, None | PybricksBroadcastValue]:
203    """
204    Decodes the next value in data, starting at ``idx``.
205
206    :param idx: The starting index of the next value in data.
207    :param data: The raw data.
208    :raises ValueError: If the data type is unsupported.
209    :return: Tuple of the next index, and the parsed data.
210        The parsed data is ``None`` if this is the
211        :py:attr:`PybricksBleBroadcastDataType.SINGLE_OBJECT` marker.
212    """
213
214    # data type and size
215    type_id = data[idx] >> 5
216    size = data[idx] & 0x1F
217    data_type = PybricksBleBroadcastDataType(type_id)
218    # move cursor to value
219    idx += 1
220
221    # data value
222    if data_type == PybricksBleBroadcastDataType.SINGLE_OBJECT:
223        # Does not contain data by itself, is only used as indicator
224        # that the next data is the one and only object
225        assert size == 0
226        return idx, None
227    elif data_type == PybricksBleBroadcastDataType.TRUE:
228        assert size == 0
229        return idx, True
230    elif data_type == PybricksBleBroadcastDataType.FALSE:
231        assert size == 0
232        return idx, False
233    elif data_type == PybricksBleBroadcastDataType.INT:
234        # int8 / 1 byte
235        # int16 / 2 bytes
236        # int32 / 4 bytes
237        format = INT_FORMAT[size]
238        return idx + size, unpack_from(format, data, idx)[0]
239    elif data_type == PybricksBleBroadcastDataType.FLOAT:
240        # float / uint32 / 4 bytes
241        return idx + size, unpack_from("<f", data, idx)[0]
242    elif data_type == PybricksBleBroadcastDataType.STR:
243        val = data[idx : idx + size]
244        return idx + size, bytes.decode(val)
245    elif data_type == PybricksBleBroadcastDataType.BYTES:
246        val = data[idx : idx + size]
247        return idx + size, val
248    else:
249        # unsupported data type
250        raise ValueError(f"Unsupported data type: {data_type}")
251
252
253def _encode_value(
254    val: PybricksBroadcastValue,
255) -> tuple[int, None | bytes]:
256    """
257    Encodes the given value for a Pybricks broadcast message.
258
259    :param val: The value to encode.
260    :raises ValueError: If the data type is unsupported.
261    :return: Tuple containing the header byte and the encoded value,
262        which may be None if no value is required in addition to the
263        header byte.
264    """
265
266    size = 0
267    encoded_val = None
268
269    if isinstance(val, bool):
270        data_type = (
271            PybricksBleBroadcastDataType.TRUE
272            if val
273            else PybricksBleBroadcastDataType.FALSE
274        )
275    elif isinstance(val, int):
276        data_type = PybricksBleBroadcastDataType.INT
277        if -128 <= val <= 127:
278            # int8
279            size = 1
280        elif -32768 <= val <= 32767:
281            # int16
282            size = 2
283        else:
284            # int32
285            size = 4
286        format = INT_FORMAT[size]
287        encoded_val = pack(format, val)
288    elif isinstance(val, float):
289        data_type = PybricksBleBroadcastDataType.FLOAT
290        size = 4
291        encoded_val = pack("<f", val)
292    elif isinstance(val, str):
293        data_type = PybricksBleBroadcastDataType.STR
294        size = len(val)
295        encoded_val = val.encode()
296    elif isinstance(val, bytes):
297        data_type = PybricksBleBroadcastDataType.BYTES
298        size = len(val)
299        encoded_val = val
300    else:
301        # unsupported data type
302        raise ValueError(f"Unsupported data type: {type(val)}")
303
304    header = (data_type << 5) | (size & 0x1F)
305    return header, encoded_val
logger = <Logger pb_ble.messages (WARNING)>
def decode_message(data: bytes) -> pb_ble.PybricksBroadcast:
24def decode_message(
25    data: bytes,
26) -> PybricksBroadcast:
27    """
28    Parses a Pybricks broadcast message, typically sourced from
29    the BLE advertisement manufacturer data.
30
31    :param data: The encoded data.
32    :return: Tuple containing the Pybricks message channel and the
33        original value. The original value is either a single object
34        or a tuple.
35    """
36
37    logger.debug(f"decoding[{len(data)}]: {data!r}")
38
39    # idx 0 is the channel
40    channel: int = unpack_from("<B", data)[0]  # uint8
41    logger.debug(f"channel: {channel}")
42    # idx 1 is the message start
43    idx = 1
44    decoded_data: list[PybricksBroadcastValue] = []
45    single_object = False
46
47    while idx < len(data):
48        idx, val = _decode_next_value(idx, data)
49        if val is None:
50            logger.debug(f"data[{len(decoded_data)}]: SINGLE_OBJECT marker")
51            single_object = True
52        else:
53            logger.debug(f"data[{len(decoded_data)}] of {type(val)!s:<15}: {val!r}")
54            decoded_data.append(val)
55
56    if single_object:
57        decoded_value: PybricksBroadcastValue = decoded_data[0]
58        return PybricksBroadcast(channel, decoded_value)
59    else:
60        return PybricksBroadcast(
61            channel, cast(PybricksBroadcastData, tuple(decoded_data))
62        )

Parses a Pybricks broadcast message, typically sourced from the BLE advertisement manufacturer data.

Parameters
  • data: The encoded data.
Returns

Tuple containing the Pybricks message channel and the original value. The original value is either a single object or a tuple.

def encode_message(channel: int = 0, *values: bool | int | float | str | bytes) -> bytes:
 65def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:
 66    """
 67    Encodes the given values as a Pybricks broadcast message.
 68
 69    :param channel: The Pybricks broadcast channel (0 to 255), defaults to 0.
 70    :param values: The values to encode in the message.
 71    :raises ValueError: If the total size of the message exceeds the maximum
 72        message size of 27 bytes.
 73    :return: The encoded message.
 74    """
 75
 76    # idx 0 is the channel
 77    encoded_channel = pack("<B", channel)
 78    logger.debug(f"channel: {channel} -> {encoded_channel!r}")
 79
 80    # idx 1 is the message start
 81    encoded_data = bytearray(encoded_channel)
 82
 83    if len(values) == 1:
 84        # set SINGLE_OBJECT marker
 85        header = PybricksBleBroadcastDataType.SINGLE_OBJECT << 5
 86        logger.debug(f"data[{len(encoded_data)}]: SINGLE_OBJECT marker")
 87        encoded_data.append(header)
 88
 89    for val in values:
 90        header, encoded_val = _encode_value(val)
 91        logger.debug(
 92            f"data[{len(encoded_data)}] of {type(val)!s}: {val!r} -> ({header!r}, {encoded_val!r})"
 93        )
 94        encoded_data.append(header)
 95
 96        if encoded_val is not None:
 97            encoded_data += encoded_val
 98
 99        # max size is 27 bytes: 26 byte payload + 1 byte channel
100        if len(encoded_data) > OBSERVED_DATA_MAX_SIZE + 1:
101            raise ValueError(
102                f"Payload too large: {len(encoded_data)} bytes (maximum is {OBSERVED_DATA_MAX_SIZE} bytes)"
103            )
104
105    message = bytes(encoded_data)
106    logger.debug(f"encoded[{len(message)}]: {message!r}")
107    return message

Encodes the given values as a Pybricks broadcast message.

Parameters
  • channel: The Pybricks broadcast channel (0 to 255), defaults to 0.
  • values: The values to encode in the message.
Raises
  • ValueError: If the total size of the message exceeds the maximum message size of 27 bytes.
Returns

The encoded message.

def unpack_pnp_id(data: bytes) -> Tuple[Literal['BT', 'USB'], int, int, int]:
110def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]:
111    """
112    Unpacks raw data from the PnP ID characteristic.
113
114    Implementation sourced from [pybricksdev](https://github.com/pybricks/pybricksdev/blob/v1.0.0-alpha.50/pybricksdev/ble/pybricks.py#L354)
115    under the MIT License: Copyright (c) 2018-2023 The Pybricks Authors.
116
117    :param data: The raw data.
118    :return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor
119        ID, the product ID and the product revision.
120    """
121    vid_type: int
122    vid: int
123    pid: int
124    rev: int
125
126    vid_type, vid, pid, rev = unpack("<BHHH", data)
127
128    return "BT" if vid_type else "USB", vid, pid, rev

Unpacks raw data from the PnP ID characteristic.

Implementation sourced from pybricksdev under the MIT License: Copyright (c) 2018-2023 The Pybricks Authors.

Parameters
  • data: The raw data.
Returns

Tuple containing the vendor ID type (BT or USB), the vendor ID, the product ID and the product revision.

def pack_pnp_id( product_id: int, product_rev: int, vendor_id_type: Literal['BT', 'USB'] = 'BT', vendor_id: int = 919) -> bytes:
131def pack_pnp_id(
132    product_id: int,
133    product_rev: int,
134    vendor_id_type: Literal["BT", "USB"] = "BT",
135    vendor_id: int = 0x0397,
136) -> bytes:
137    """
138    Encodes the PnP ID GATT characteristics.
139
140    :param product_id: The Product ID. Should be a LEGO HubKind.
141    :param product_rev: The Product Version.
142    :param vendor_id_type: The Vendor ID Source, defaults to `BT`.
143    :param vendor_id:  The Vendor ID, defaults to `0x0397` (LEGO).
144    :return: The encoded PnP ID.
145    """
146
147    pnp_id = pack(
148        "<BHHH", 1 if vendor_id_type == "BT" else 0, vendor_id, product_id, product_rev
149    )
150    return pnp_id

Encodes the PnP ID GATT characteristics.

Parameters
  • product_id: The Product ID. Should be a LEGO HubKind.
  • product_rev: The Product Version.
  • vendor_id_type: The Vendor ID Source, defaults to BT.
  • vendor_id: The Vendor ID, defaults to 0x0397 (LEGO).
Returns

The encoded PnP ID.

SIZEOF_INT8_T = 1

Size of an int8 value in bytes.

SIZEOF_INT16_T = 2

Size of an int16 value in bytes.

SIZEOF_INT32_T = 4

Size of an int32 value in bytes.

INT_FORMAT = {1: '<b', 2: '<h', 4: '<i'}

Mapping of integer types to struct format.

OBSERVED_DATA_MAX_SIZE = 26

Maximum size of the observed data included in the BLE advertising packet: 31 (max adv data size) - 5 (overhead).

class PybricksBleBroadcastDataType(enum.IntEnum):
173class PybricksBleBroadcastDataType(IntEnum):
174    """Type codes used for encoding/decoding data."""
175
176    # NB: These values are sent over the air so the numeric values must not be changed.
177    # There can be at most 8 types since the values have to fit in 3 bits.
178
179    SINGLE_OBJECT = 0
180    """Indicator that the next value is the one and only value (instead of a tuple)."""
181
182    TRUE = 1
183    """The Python @c True value."""
184
185    FALSE = 2
186    """The Python @c False value."""
187
188    INT = 3
189    """The Python @c int type."""
190
191    FLOAT = 4
192    """The Python @c float type."""
193
194    STR = 5
195    """The Python @c str type."""
196
197    BYTES = 6
198    """The Python @c bytes type."""

Type codes used for encoding/decoding data.

Indicator that the next value is the one and only value (instead of a tuple).

The Python @c True value.

The Python @c False value.

The Python @c int type.

The Python @c float type.

The Python @c str type.

The Python @c bytes type.