diff --git a/examples/ble_packet_buffer_client.py b/examples/ble_packet_buffer_client.py new file mode 100644 index 0000000..158ac61 --- /dev/null +++ b/examples/ble_packet_buffer_client.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: 2022 Scott Shawcroft for Adafruit Industries +# SPDX-License-Identifier: MIT + +""" +Used with ble_packet_buffer_test.py. Transmits "echo" to +PacketBufferService and receives it back. +""" + +import time + +from ble_packet_buffer_service import PacketBufferService + +from adafruit_ble import BLERadio +from adafruit_ble.advertising.standard import ProvideServicesAdvertisement + +ble = BLERadio() +buf = bytearray(512) +while True: + while ble.connected and any( + PacketBufferService in connection for connection in ble.connections + ): + for connection in ble.connections: + if PacketBufferService not in connection: + continue + print("echo") + pb = connection[PacketBufferService] + pb.write(b"echo") + # Returns 0 if nothing was read. + packet_len = pb.readinto(buf) + if packet_len > 0: + print(buf[:packet_len]) + print() + time.sleep(1) + + print("disconnected, scanning") + for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1): + if PacketBufferService not in advertisement.services: + continue + ble.connect(advertisement) + print("connected") + break + ble.stop_scan() diff --git a/examples/ble_packet_buffer_service.py b/examples/ble_packet_buffer_service.py new file mode 100644 index 0000000..510e4f0 --- /dev/null +++ b/examples/ble_packet_buffer_service.py @@ -0,0 +1,73 @@ +# SPDX-FileCopyrightText: 2022 Scott Shawcroft for Adafruit Industries +# SPDX-License-Identifier: MIT + +""" +Used with ble_packet_buffer_*.py. Establishes a simple server to test +PacketBuffer with. +""" + +import _bleio + +from adafruit_ble.services import Service +from adafruit_ble.characteristics import ( + Attribute, + Characteristic, + ComplexCharacteristic, +) +from adafruit_ble.uuid import VendorUUID + + +class PacketBufferUUID(VendorUUID): + """UUIDs with the PacketBuffer base UUID.""" + + # pylint: disable=too-few-public-methods + + def __init__(self, uuid16): + uuid128 = bytearray("reffuBtekcaP".encode("utf-8") + b"\x00\x00\xaf\xad") + uuid128[-3] = uuid16 >> 8 + uuid128[-4] = uuid16 & 0xFF + super().__init__(uuid128) + + +class PacketBufferCharacteristic(ComplexCharacteristic): + def __init__( + self, + *, + uuid=None, + buffer_size=4, + properties=Characteristic.WRITE_NO_RESPONSE + | Characteristic.NOTIFY + | Characteristic.READ, + read_perm=Attribute.OPEN, + write_perm=Attribute.OPEN + ): + self.buffer_size = buffer_size + super().__init__( + uuid=uuid, + properties=properties, + read_perm=read_perm, + write_perm=write_perm, + max_length=512, + fixed_length=False, + ) + + def bind(self, service): + """Binds the characteristic to the given Service.""" + bound_characteristic = super().bind(service) + return _bleio.PacketBuffer( + bound_characteristic, buffer_size=self.buffer_size, max_packet_size=512 + ) + + +class PacketBufferService(Service): + """Test service that has one packet buffer""" + + uuid = PacketBufferUUID(0x0001) + + packets = PacketBufferCharacteristic(uuid=PacketBufferUUID(0x0101)) + + def readinto(self, buf): + return self.packets.readinto(buf) + + def write(self, buf, *, header=None): + return self.packets.write(buf, header=header) diff --git a/examples/ble_packet_buffer_test.py b/examples/ble_packet_buffer_test.py new file mode 100644 index 0000000..66c986f --- /dev/null +++ b/examples/ble_packet_buffer_test.py @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: 2020 ladyada for Adafruit Industries +# SPDX-License-Identifier: MIT + +""" +Can be used with ble_packet_buffer_client.py. Receives packets from the +PacketBufferService and transmits them back. +""" + +from packet_buffer_service import PacketBufferService + +from adafruit_ble import BLERadio +from adafruit_ble.advertising.standard import ProvideServicesAdvertisement + + +ble = BLERadio() +pbs = PacketBufferService() +advertisement = ProvideServicesAdvertisement(pbs) + +buf = bytearray(512) + +while True: + ble.start_advertising(advertisement) + while not ble.connected: + pass + while ble.connected: + # Returns b'' if nothing was read. + packet_len = pbs.readinto(buf) + if packet_len > 0: + packet = buf[:packet_len] + print(packet) + pbs.write(packet)