diff --git a/adafruit_ble/advertising.py b/adafruit_ble/advertising.py index 363de06..e507061 100644 --- a/adafruit_ble/advertising.py +++ b/adafruit_ble/advertising.py @@ -31,8 +31,8 @@ import struct -class AdvertisingData: - """Build up a BLE advertising data packet.""" +class AdvertisingPacket: + """Build up a BLE advertising data or scan response packet.""" # BR/EDR flags not included here, since we don't support BR/EDR. FLAG_LIMITED_DISCOVERY = 0x01 """Discoverable only for a limited time period.""" @@ -53,7 +53,7 @@ class AdvertisingData: """Complete list of 128 bit service UUIDs.""" SHORT_LOCAL_NAME = 0x08 """Short local device name (shortened to fit).""" - COMPLETE_LOCALNAME = 0x09 + COMPLETE_LOCAL_NAME = 0x09 """Complete local device name.""" TX_POWER = 0x0A """Transmit power level""" @@ -81,32 +81,151 @@ class AdvertisingData: MAX_DATA_SIZE = 31 """Data size in a regular BLE packet.""" - def __init__(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY), max_length=MAX_DATA_SIZE): - """Initalize an advertising packet, with the given flags, no larger than max_length.""" - self.data = bytearray((2, self.FLAGS, flags)) + def __init__(self, data=None, *, max_length=MAX_DATA_SIZE): + """Create an advertising packet, no larger than max_length. + + :param buf data: if not supplied (None), create an empty packet + if supplied, create a packet with supplied data. This is usually used + to parse an existing packet. + :param int max_length: maximum length of packet + """ + self._packet_bytes = bytearray(data) if data else bytearray() self._max_length = max_length self._check_length() + @property + def packet_bytes(self): + """The raw packet bytes.""" + return self._packet_bytes + + @packet_bytes.setter + def packet_bytes(self, value): + self._packet_bytes = value + + def __getitem__(self, element_type): + """Return the bytes stored in the advertising packet for the given element type. + + :param int element_type: An integer designating an advertising element type. + A number of types are defined in `AdvertisingPacket`, + such as `AdvertisingPacket.TX_POWER`. + :returns: bytes that are the value for the given element type. + If the element type is not present in the packet, raise KeyError. + """ + i = 0 + adv_bytes = self.packet_bytes + while i < len(adv_bytes): + item_length = adv_bytes[i] + if element_type != adv_bytes[i+1]: + # Type doesn't match: skip to next item. + i += item_length + 1 + else: + return adv_bytes[i + 2:i + 1 + item_length] + raise KeyError + + def get(self, element_type, default=None): + """Return the bytes stored in the advertising packet for the given element type, + returning the default value if not found. + """ + try: + return self.__getitem__(element_type) + except KeyError: + return default + + @property + def bytes_remaining(self): + """Number of bytes still available for use in the packet.""" + return self._max_length - len(self._packet_bytes) + def _check_length(self): - if len(self.data) > self._max_length: - raise IndexError("Advertising data exceeds max_length") + if len(self._packet_bytes) > self._max_length: + raise IndexError("Advertising data too long") def add_field(self, field_type, field_data): """Append an advertising data field to the current packet, of the given type. The length field is calculated from the length of field_data.""" - self.data.append(1 + len(field_data)) - self.data.append(field_type) - self.data.extend(field_data) + self._packet_bytes.append(1 + len(field_data)) + self._packet_bytes.append(field_type) + self._packet_bytes.extend(field_data) self._check_length() + def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)): + """Add default or custom advertising flags.""" + self.add_field(self.FLAGS, struct.pack("= len(name_bytes): + packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes) + else: + packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available]) + self._scan_response_packet = AdvertisingPacket() + try: + self._scan_response_packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, + name_bytes) + except IndexError: + raise IndexError("Name too long") + + self._advertising_data_packet = packet + + @property + def advertising_data_bytes(self): + """The raw bytes for the initial advertising data packet.""" + return self._advertising_data_packet.packet_bytes + + @property + def scan_response_bytes(self): + """The raw bytes for the scan response packet. None if there is no response packet.""" + if self._scan_response_packet: + return self._scan_response_packet.packet_bytes + return None diff --git a/adafruit_ble/beacon.py b/adafruit_ble/beacon.py index c5a9e3b..c7891cd 100644 --- a/adafruit_ble/beacon.py +++ b/adafruit_ble/beacon.py @@ -32,26 +32,29 @@ import struct import bleio -from .advertising import AdvertisingData +from .advertising import AdvertisingPacket class Beacon: """Base class for Beacon advertisers.""" - def __init__(self, advertising_data, interval=1.0): - """Set up a beacon with the given AdvertisingData. + def __init__(self, advertising_packet): + """Set up a beacon with the given AdvertisingPacket. - :param AdvertisingData advertising_data: The advertising packet - :param float interval: Advertising interval in seconds + :param AdvertisingPacket advertising_packet """ - self.broadcaster = bleio.Broadcaster(interval) - self.advertising_data = advertising_data + self._broadcaster = bleio.Peripheral(name=None) + self._advertising_packet = advertising_packet + + def start(self, interval=1.0): + """Turn on beacon. - def start(self): - """Turn on beacon.""" - self.broadcaster.start_advertising(self.advertising_data.data) + :param float interval: Advertising interval in seconds + """ + self._broadcaster.start_advertising(self._advertising_packet.packet_bytes, + interval=interval) def stop(self): """Turn off beacon.""" - self.broadcaster.stop_advertising() + self._broadcaster.stop_advertising() @@ -60,7 +63,7 @@ class LocationBeacon(Beacon): Used for Apple iBeacon, Nordic nRF Beacon, etc. """ # pylint: disable=too-many-arguments - def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0): + def __init__(self, company_id, uuid, major, minor, rssi): """Create a beacon with the given values. :param int company_id: 16-bit company id designating beacon specification owner @@ -69,7 +72,6 @@ def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0): :param int major: 16-bit major number, such as a store number :param int minor: 16-bit minor number, such as a location within a store :param int rssi: Signal strength in dBm at 1m (signed 8-bit value) - :param float interval: Advertising interval in seconds Example:: @@ -81,8 +83,9 @@ def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0): b.start() """ - adv_data = AdvertisingData() - adv_data.add_mfr_specific_data( + adv = AdvertisingPacket() + adv.add_flags() + adv.add_mfr_specific_data( company_id, b''.join(( # 0x02 means a beacon. 0x15 (=21) is length (16 + 2 + 2 + 1) @@ -91,8 +94,8 @@ def __init__(self, company_id, uuid, major, minor, rssi, interval=1.0): # iBeacon and similar expect big-endian UUIDS. Usually they are little-endian. bytes(reversed(uuid.uuid128)), # major and minor are big-endian. - struct.pack(">HHB", major, minor, rssi)))) - super().__init__(adv_data, interval=interval) + struct.pack(">HHb", major, minor, rssi)))) + super().__init__(adv) class EddystoneURLBeacon(Beacon): @@ -126,16 +129,16 @@ class EddystoneURLBeacon(Beacon): '.gov', ) - def __init__(self, url, tx_power=0, interval=1.0): + def __init__(self, url, tx_power=0): """Create a URL beacon with an encoded version of the url and a transmit power. :param url URL to encode. Must be short enough to fit after encoding. :param int tx_power: transmit power in dBm at 0 meters (8 bit signed value) - :param float interval: Advertising interval in seconds """ - adv_data = AdvertisingData() - adv_data.add_field(AdvertisingData.ALL_16_BIT_SERVICE_UUIDS, self._EDDYSTONE_ID) + adv = AdvertisingPacket() + adv.add_flags() + adv.add_field(AdvertisingPacket.ALL_16_BIT_SERVICE_UUIDS, self._EDDYSTONE_ID) short_url = None for idx, prefix in enumerate(self._URL_SCHEMES): if url.startswith(prefix): @@ -148,9 +151,9 @@ def __init__(self, url, tx_power=0, interval=1.0): short_url = short_url.replace(subst + '/', chr(code)) for code, subst in enumerate(self._SUBSTITUTIONS, 7): short_url = short_url.replace(subst, chr(code)) - adv_data.add_field(AdvertisingData.SERVICE_DATA_16_BIT_UUID, - b''.join((self._EDDYSTONE_ID, - b'\x10', - struct.pack("