pb_ble.messages
A pure Python implementation of the Pybricks BLE broadcast/observe message protocol.
Supports the message format first introduced in Pybricks v3.3.0b5
and updated in v3.3.0b9
.
Based on:
- Technical specification of the message format.
- Reference implementation in Pybricks.
1""" 2A pure Python implementation of the Pybricks BLE broadcast/observe message protocol. 3 4Supports the message format first introduced in Pybricks `v3.3.0b5` and updated in `v3.3.0b9`. 5 6Based on: 7 8* [Technical specification](https://github.com/pybricks/technical-info/blob/master/pybricks-ble-broadcast-observe.md) of the message format. 9* [Reference implementation](https://github.com/pybricks/pybricks-micropython/blob/v3.3.0/pybricks/common/pb_type_ble.c) in Pybricks. 10 11""" 12 13import logging 14from enum import IntEnum 15from struct import pack, unpack, unpack_from 16from typing import Literal, Tuple, cast 17 18from .constants import PybricksBroadcast, PybricksBroadcastData, PybricksBroadcastValue 19 20logger = logging.getLogger(__name__) 21 22 23def decode_message( 24 data: bytes, 25) -> PybricksBroadcast: 26 """ 27 Parses a Pybricks broadcast message, typically sourced from 28 the BLE advertisement manufacturer data. 29 30 :param data: The encoded data. 31 :return: Tuple containing the Pybricks message channel and the 32 original value. The original value is either a single object 33 or a tuple. 34 """ 35 36 logger.debug(f"decoding[{len(data)}]: {data!r}") 37 38 # idx 0 is the channel 39 channel: int = unpack_from("<B", data)[0] # uint8 40 logger.debug(f"channel: {channel}") 41 # idx 1 is the message start 42 idx = 1 43 decoded_data: list[PybricksBroadcastValue] = [] 44 single_object = False 45 46 while idx < len(data): 47 idx, val = _decode_next_value(idx, data) 48 if val is None: 49 logger.debug(f"data[{len(decoded_data)}]: SINGLE_OBJECT marker") 50 single_object = True 51 else: 52 logger.debug(f"data[{len(decoded_data)}] of {type(val)!s:<15}: {val!r}") 53 decoded_data.append(val) 54 55 if single_object: 56 decoded_value: PybricksBroadcastValue = decoded_data[0] 57 return PybricksBroadcast(channel, decoded_value) 58 else: 59 return PybricksBroadcast( 60 channel, cast(PybricksBroadcastData, tuple(decoded_data)) 61 ) 62 63 64def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes: 65 """ 66 Encodes the given values as a Pybricks broadcast message. 67 68 :param channel: The Pybricks broadcast channel (0 to 255), defaults to 0. 69 :param values: The values to encode in the message. 70 :raises ValueError: If the total size of the message exceeds the maximum 71 message size of 27 bytes. 72 :return: The encoded message. 73 """ 74 75 # idx 0 is the channel 76 encoded_channel = pack("<B", channel) 77 logger.debug(f"channel: {channel} -> {encoded_channel!r}") 78 79 # idx 1 is the message start 80 encoded_data = bytearray(encoded_channel) 81 82 if len(values) == 1: 83 # set SINGLE_OBJECT marker 84 header = PybricksBleBroadcastDataType.SINGLE_OBJECT << 5 85 logger.debug(f"data[{len(encoded_data)}]: SINGLE_OBJECT marker") 86 encoded_data.append(header) 87 88 for val in values: 89 header, encoded_val = _encode_value(val) 90 logger.debug( 91 f"data[{len(encoded_data)}] of {type(val)!s}: {val!r} -> ({header!r}, {encoded_val!r})" 92 ) 93 encoded_data.append(header) 94 95 if encoded_val is not None: 96 encoded_data += encoded_val 97 98 # max size is 27 bytes: 26 byte payload + 1 byte channel 99 if len(encoded_data) > OBSERVED_DATA_MAX_SIZE + 1: 100 raise ValueError( 101 f"Payload too large: {len(encoded_data)} bytes (maximum is {OBSERVED_DATA_MAX_SIZE} bytes)" 102 ) 103 104 message = bytes(encoded_data) 105 logger.debug(f"encoded[{len(message)}]: {message!r}") 106 return message 107 108 109def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]: 110 """ 111 Unpacks raw data from the PnP ID characteristic. 112 113 Implementation sourced from [pybricksdev](https://github.com/pybricks/pybricksdev/blob/v1.0.0-alpha.50/pybricksdev/ble/pybricks.py#L354) 114 under the MIT License: Copyright (c) 2018-2023 The Pybricks Authors. 115 116 :param data: The raw data. 117 :return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor 118 ID, the product ID and the product revision. 119 """ 120 vid_type: int 121 vid: int 122 pid: int 123 rev: int 124 125 vid_type, vid, pid, rev = unpack("<BHHH", data) 126 127 return "BT" if vid_type else "USB", vid, pid, rev 128 129 130def pack_pnp_id( 131 product_id: int, 132 product_rev: int, 133 vendor_id_type: Literal["BT", "USB"] = "BT", 134 vendor_id: int = 0x0397, 135) -> bytes: 136 """ 137 Encodes the PnP ID GATT characteristics. 138 139 :param product_id: The Product ID. Should be a LEGO HubKind. 140 :param product_rev: The Product Version. 141 :param vendor_id_type: The Vendor ID Source, defaults to `BT`. 142 :param vendor_id: The Vendor ID, defaults to `0x0397` (LEGO). 143 :return: The encoded PnP ID. 144 """ 145 146 pnp_id = pack( 147 "<BHHH", 1 if vendor_id_type == "BT" else 0, vendor_id, product_id, product_rev 148 ) 149 return pnp_id 150 151 152SIZEOF_INT8_T = 1 153"""Size of an int8 value in bytes.""" 154SIZEOF_INT16_T = 2 155"""Size of an int16 value in bytes.""" 156SIZEOF_INT32_T = 4 157"""Size of an int32 value in bytes.""" 158 159INT_FORMAT = { 160 SIZEOF_INT8_T: "<b", 161 SIZEOF_INT16_T: "<h", 162 SIZEOF_INT32_T: "<i", 163} 164"""Mapping of integer types to struct format.""" 165 166OBSERVED_DATA_MAX_SIZE = 31 - 5 167"""Maximum size of the observed data included in the BLE advertising packet: 16831 (max adv data size) - 5 (overhead). 169""" 170 171 172class PybricksBleBroadcastDataType(IntEnum): 173 """Type codes used for encoding/decoding data.""" 174 175 # NB: These values are sent over the air so the numeric values must not be changed. 176 # There can be at most 8 types since the values have to fit in 3 bits. 177 178 SINGLE_OBJECT = 0 179 """Indicator that the next value is the one and only value (instead of a tuple).""" 180 181 TRUE = 1 182 """The Python @c True value.""" 183 184 FALSE = 2 185 """The Python @c False value.""" 186 187 INT = 3 188 """The Python @c int type.""" 189 190 FLOAT = 4 191 """The Python @c float type.""" 192 193 STR = 5 194 """The Python @c str type.""" 195 196 BYTES = 6 197 """The Python @c bytes type.""" 198 199 200def _decode_next_value( 201 idx: int, data: bytes 202) -> tuple[int, None | PybricksBroadcastValue]: 203 """ 204 Decodes the next value in data, starting at ``idx``. 205 206 :param idx: The starting index of the next value in data. 207 :param data: The raw data. 208 :raises ValueError: If the data type is unsupported. 209 :return: Tuple of the next index, and the parsed data. 210 The parsed data is ``None`` if this is the 211 :py:attr:`PybricksBleBroadcastDataType.SINGLE_OBJECT` marker. 212 """ 213 214 # data type and size 215 type_id = data[idx] >> 5 216 size = data[idx] & 0x1F 217 data_type = PybricksBleBroadcastDataType(type_id) 218 # move cursor to value 219 idx += 1 220 221 # data value 222 if data_type == PybricksBleBroadcastDataType.SINGLE_OBJECT: 223 # Does not contain data by itself, is only used as indicator 224 # that the next data is the one and only object 225 assert size == 0 226 return idx, None 227 elif data_type == PybricksBleBroadcastDataType.TRUE: 228 assert size == 0 229 return idx, True 230 elif data_type == PybricksBleBroadcastDataType.FALSE: 231 assert size == 0 232 return idx, False 233 elif data_type == PybricksBleBroadcastDataType.INT: 234 # int8 / 1 byte 235 # int16 / 2 bytes 236 # int32 / 4 bytes 237 format = INT_FORMAT[size] 238 return idx + size, unpack_from(format, data, idx)[0] 239 elif data_type == PybricksBleBroadcastDataType.FLOAT: 240 # float / uint32 / 4 bytes 241 return idx + size, unpack_from("<f", data, idx)[0] 242 elif data_type == PybricksBleBroadcastDataType.STR: 243 val = data[idx : idx + size] 244 return idx + size, bytes.decode(val) 245 elif data_type == PybricksBleBroadcastDataType.BYTES: 246 val = data[idx : idx + size] 247 return idx + size, val 248 else: 249 # unsupported data type 250 raise ValueError(f"Unsupported data type: {data_type}") 251 252 253def _encode_value( 254 val: PybricksBroadcastValue, 255) -> tuple[int, None | bytes]: 256 """ 257 Encodes the given value for a Pybricks broadcast message. 258 259 :param val: The value to encode. 260 :raises ValueError: If the data type is unsupported. 261 :return: Tuple containing the header byte and the encoded value, 262 which may be None if no value is required in addition to the 263 header byte. 264 """ 265 266 size = 0 267 encoded_val = None 268 269 if isinstance(val, bool): 270 data_type = ( 271 PybricksBleBroadcastDataType.TRUE 272 if val 273 else PybricksBleBroadcastDataType.FALSE 274 ) 275 elif isinstance(val, int): 276 data_type = PybricksBleBroadcastDataType.INT 277 if -128 <= val <= 127: 278 # int8 279 size = 1 280 elif -32768 <= val <= 32767: 281 # int16 282 size = 2 283 else: 284 # int32 285 size = 4 286 format = INT_FORMAT[size] 287 encoded_val = pack(format, val) 288 elif isinstance(val, float): 289 data_type = PybricksBleBroadcastDataType.FLOAT 290 size = 4 291 encoded_val = pack("<f", val) 292 elif isinstance(val, str): 293 data_type = PybricksBleBroadcastDataType.STR 294 size = len(val) 295 encoded_val = val.encode() 296 elif isinstance(val, bytes): 297 data_type = PybricksBleBroadcastDataType.BYTES 298 size = len(val) 299 encoded_val = val 300 else: 301 # unsupported data type 302 raise ValueError(f"Unsupported data type: {type(val)}") 303 304 header = (data_type << 5) | (size & 0x1F) 305 return header, encoded_val
24def decode_message( 25 data: bytes, 26) -> PybricksBroadcast: 27 """ 28 Parses a Pybricks broadcast message, typically sourced from 29 the BLE advertisement manufacturer data. 30 31 :param data: The encoded data. 32 :return: Tuple containing the Pybricks message channel and the 33 original value. The original value is either a single object 34 or a tuple. 35 """ 36 37 logger.debug(f"decoding[{len(data)}]: {data!r}") 38 39 # idx 0 is the channel 40 channel: int = unpack_from("<B", data)[0] # uint8 41 logger.debug(f"channel: {channel}") 42 # idx 1 is the message start 43 idx = 1 44 decoded_data: list[PybricksBroadcastValue] = [] 45 single_object = False 46 47 while idx < len(data): 48 idx, val = _decode_next_value(idx, data) 49 if val is None: 50 logger.debug(f"data[{len(decoded_data)}]: SINGLE_OBJECT marker") 51 single_object = True 52 else: 53 logger.debug(f"data[{len(decoded_data)}] of {type(val)!s:<15}: {val!r}") 54 decoded_data.append(val) 55 56 if single_object: 57 decoded_value: PybricksBroadcastValue = decoded_data[0] 58 return PybricksBroadcast(channel, decoded_value) 59 else: 60 return PybricksBroadcast( 61 channel, cast(PybricksBroadcastData, tuple(decoded_data)) 62 )
Parses a Pybricks broadcast message, typically sourced from the BLE advertisement manufacturer data.
Parameters
- data: The encoded data.
Returns
Tuple containing the Pybricks message channel and the original value. The original value is either a single object or a tuple.
65def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes: 66 """ 67 Encodes the given values as a Pybricks broadcast message. 68 69 :param channel: The Pybricks broadcast channel (0 to 255), defaults to 0. 70 :param values: The values to encode in the message. 71 :raises ValueError: If the total size of the message exceeds the maximum 72 message size of 27 bytes. 73 :return: The encoded message. 74 """ 75 76 # idx 0 is the channel 77 encoded_channel = pack("<B", channel) 78 logger.debug(f"channel: {channel} -> {encoded_channel!r}") 79 80 # idx 1 is the message start 81 encoded_data = bytearray(encoded_channel) 82 83 if len(values) == 1: 84 # set SINGLE_OBJECT marker 85 header = PybricksBleBroadcastDataType.SINGLE_OBJECT << 5 86 logger.debug(f"data[{len(encoded_data)}]: SINGLE_OBJECT marker") 87 encoded_data.append(header) 88 89 for val in values: 90 header, encoded_val = _encode_value(val) 91 logger.debug( 92 f"data[{len(encoded_data)}] of {type(val)!s}: {val!r} -> ({header!r}, {encoded_val!r})" 93 ) 94 encoded_data.append(header) 95 96 if encoded_val is not None: 97 encoded_data += encoded_val 98 99 # max size is 27 bytes: 26 byte payload + 1 byte channel 100 if len(encoded_data) > OBSERVED_DATA_MAX_SIZE + 1: 101 raise ValueError( 102 f"Payload too large: {len(encoded_data)} bytes (maximum is {OBSERVED_DATA_MAX_SIZE} bytes)" 103 ) 104 105 message = bytes(encoded_data) 106 logger.debug(f"encoded[{len(message)}]: {message!r}") 107 return message
Encodes the given values as a Pybricks broadcast message.
Parameters
- channel: The Pybricks broadcast channel (0 to 255), defaults to 0.
- values: The values to encode in the message.
Raises
- ValueError: If the total size of the message exceeds the maximum message size of 27 bytes.
Returns
The encoded message.
110def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]: 111 """ 112 Unpacks raw data from the PnP ID characteristic. 113 114 Implementation sourced from [pybricksdev](https://github.com/pybricks/pybricksdev/blob/v1.0.0-alpha.50/pybricksdev/ble/pybricks.py#L354) 115 under the MIT License: Copyright (c) 2018-2023 The Pybricks Authors. 116 117 :param data: The raw data. 118 :return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor 119 ID, the product ID and the product revision. 120 """ 121 vid_type: int 122 vid: int 123 pid: int 124 rev: int 125 126 vid_type, vid, pid, rev = unpack("<BHHH", data) 127 128 return "BT" if vid_type else "USB", vid, pid, rev
Unpacks raw data from the PnP ID characteristic.
Implementation sourced from pybricksdev under the MIT License: Copyright (c) 2018-2023 The Pybricks Authors.
Parameters
- data: The raw data.
Returns
Tuple containing the vendor ID type (
BT
orUSB
), the vendor ID, the product ID and the product revision.
131def pack_pnp_id( 132 product_id: int, 133 product_rev: int, 134 vendor_id_type: Literal["BT", "USB"] = "BT", 135 vendor_id: int = 0x0397, 136) -> bytes: 137 """ 138 Encodes the PnP ID GATT characteristics. 139 140 :param product_id: The Product ID. Should be a LEGO HubKind. 141 :param product_rev: The Product Version. 142 :param vendor_id_type: The Vendor ID Source, defaults to `BT`. 143 :param vendor_id: The Vendor ID, defaults to `0x0397` (LEGO). 144 :return: The encoded PnP ID. 145 """ 146 147 pnp_id = pack( 148 "<BHHH", 1 if vendor_id_type == "BT" else 0, vendor_id, product_id, product_rev 149 ) 150 return pnp_id
Encodes the PnP ID GATT characteristics.
Parameters
- product_id: The Product ID. Should be a LEGO HubKind.
- product_rev: The Product Version.
- vendor_id_type: The Vendor ID Source, defaults to
BT
. - vendor_id: The Vendor ID, defaults to
0x0397
(LEGO).
Returns
The encoded PnP ID.
Size of an int8 value in bytes.
Size of an int16 value in bytes.
Size of an int32 value in bytes.
Mapping of integer types to struct format.
Maximum size of the observed data included in the BLE advertising packet: 31 (max adv data size) - 5 (overhead).
173class PybricksBleBroadcastDataType(IntEnum): 174 """Type codes used for encoding/decoding data.""" 175 176 # NB: These values are sent over the air so the numeric values must not be changed. 177 # There can be at most 8 types since the values have to fit in 3 bits. 178 179 SINGLE_OBJECT = 0 180 """Indicator that the next value is the one and only value (instead of a tuple).""" 181 182 TRUE = 1 183 """The Python @c True value.""" 184 185 FALSE = 2 186 """The Python @c False value.""" 187 188 INT = 3 189 """The Python @c int type.""" 190 191 FLOAT = 4 192 """The Python @c float type.""" 193 194 STR = 5 195 """The Python @c str type.""" 196 197 BYTES = 6 198 """The Python @c bytes type."""
Type codes used for encoding/decoding data.
Indicator that the next value is the one and only value (instead of a tuple).