Skip to content

Add simple Packet Buffer test examples #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/ble_packet_buffer_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# SPDX-FileCopyrightText: 2022 Scott Shawcroft for Adafruit Industries
# SPDX-License-Identifier: MIT

"""
Used with ble_packet_buffer_test.py. Transmits "echo" to
PacketBufferService and receives it back.
"""

import time

from ble_packet_buffer_service import PacketBufferService

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement

ble = BLERadio()
buf = bytearray(512)
while True:
while ble.connected and any(
PacketBufferService in connection for connection in ble.connections
):
for connection in ble.connections:
if PacketBufferService not in connection:
continue
print("echo")
pb = connection[PacketBufferService]
pb.write(b"echo")
# Returns 0 if nothing was read.
packet_len = pb.readinto(buf)
if packet_len > 0:
print(buf[:packet_len])
print()
time.sleep(1)

print("disconnected, scanning")
for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1):
if PacketBufferService not in advertisement.services:
continue
ble.connect(advertisement)
print("connected")
break
ble.stop_scan()
73 changes: 73 additions & 0 deletions examples/ble_packet_buffer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# SPDX-FileCopyrightText: 2022 Scott Shawcroft for Adafruit Industries
# SPDX-License-Identifier: MIT

"""
Used with ble_packet_buffer_*.py. Establishes a simple server to test
PacketBuffer with.
"""

import _bleio

from adafruit_ble.services import Service
from adafruit_ble.characteristics import (
Attribute,
Characteristic,
ComplexCharacteristic,
)
from adafruit_ble.uuid import VendorUUID


class PacketBufferUUID(VendorUUID):
"""UUIDs with the PacketBuffer base UUID."""

# pylint: disable=too-few-public-methods

def __init__(self, uuid16):
uuid128 = bytearray("reffuBtekcaP".encode("utf-8") + b"\x00\x00\xaf\xad")
uuid128[-3] = uuid16 >> 8
uuid128[-4] = uuid16 & 0xFF
super().__init__(uuid128)


class PacketBufferCharacteristic(ComplexCharacteristic):
def __init__(
self,
*,
uuid=None,
buffer_size=4,
properties=Characteristic.WRITE_NO_RESPONSE
| Characteristic.NOTIFY
| Characteristic.READ,
read_perm=Attribute.OPEN,
write_perm=Attribute.OPEN
):
self.buffer_size = buffer_size
super().__init__(
uuid=uuid,
properties=properties,
read_perm=read_perm,
write_perm=write_perm,
max_length=512,
fixed_length=False,
)

def bind(self, service):
"""Binds the characteristic to the given Service."""
bound_characteristic = super().bind(service)
return _bleio.PacketBuffer(
bound_characteristic, buffer_size=self.buffer_size, max_packet_size=512
)


class PacketBufferService(Service):
"""Test service that has one packet buffer"""

uuid = PacketBufferUUID(0x0001)

packets = PacketBufferCharacteristic(uuid=PacketBufferUUID(0x0101))

def readinto(self, buf):
return self.packets.readinto(buf)

def write(self, buf, *, header=None):
return self.packets.write(buf, header=header)
31 changes: 31 additions & 0 deletions examples/ble_packet_buffer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: 2020 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

"""
Can be used with ble_packet_buffer_client.py. Receives packets from the
PacketBufferService and transmits them back.
"""

from packet_buffer_service import PacketBufferService

from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement


ble = BLERadio()
pbs = PacketBufferService()
advertisement = ProvideServicesAdvertisement(pbs)

buf = bytearray(512)

while True:
ble.start_advertising(advertisement)
while not ble.connected:
pass
while ble.connected:
# Returns b'' if nothing was read.
packet_len = pbs.readinto(buf)
if packet_len > 0:
packet = buf[:packet_len]
print(packet)
pbs.write(packet)