Skip to content

[WIP] Speed up AT+BLEKEYBOARDCODE #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 110 additions & 13 deletions adafruit_bluefruitspi.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@

import time
import struct

try:
import binascii as ba
except ImportError:
import adafruit_binascii as ba
from digitalio import Direction, Pull
from adafruit_bus_device.spi_device import SPIDevice
from micropython import const
Expand Down Expand Up @@ -66,16 +71,59 @@
_PACKET_BUTTON_LEN = const(5)
_PACKET_COLOR_LEN = const(6)

_KEY_CODE_CMD = "AT+BLEKEYBOARDCODE=00-00-00-00-00-00-00-00\n"

# TODO: replace with collections.deque in CircuitPython 7
class FIFOBuffer:
"""FIFO buffer vaguely based on collections.deque.

Uses a tuple internally to allow for O(1) enqueue and dequeue
"""

def __init__(self, maxlen=20):
self.maxlen = maxlen
self._buf = (None,) * self.maxlen
self._end_idx = 0
self._front_idx = 0

def enqueue(self, data):
"""Put an item at the end of the FIFO queue"""
if self._buf[self._end_idx] is not None:
raise IndexError("FIFOBuffer full")
self._buf[self._end_idx] = data
self._end_idx += 1
if self._end_idx >= self.maxlen:
self._end_idx = 0

def dequeue(self):
"""Pop an item from the front of the FIFO queue"""
data = self._buf[self._front_idx]
if data is None:
return None
self._buf[self._front_idx] = None
self._front_idx += 1
if self._front_idx >= self.maxlen:
self._front_idx = 0
return data


class BluefruitSPI:
"""Helper for the Bluefruit LE SPI Friend"""

def __init__(
self, spi, cs, irq, reset, debug=False
self, spi, cs, irq, reset, debug=False, fifo_len=20
): # pylint: disable=too-many-arguments
self._irq = irq
self._buf_tx = bytearray(20)
self._buf_rx = bytearray(20)
self._keycode_template = [
bytearray(20),
bytearray(20),
bytearray(20),
bytearray(20),
]
self._fifo_buffer = FIFOBuffer(maxlen=fifo_len)
self._init_keycode_template()
self._debug = debug

# a cache of data, used for packet parsing
Expand All @@ -98,6 +146,64 @@ def __init__(

self._spi_device = SPIDevice(spi, cs, baudrate=4000000, phase=0, polarity=0)

def _init_keycode_template(self):
"""Prebuild SDEP packets for AT+BLEKEYBOARDCODE command"""
self._create_sdep_raw(
self._keycode_template[0], _KEY_CODE_CMD[:16], True # AT+BLEKEYBOARDCO
)
self._create_sdep_raw(
self._keycode_template[1], _KEY_CODE_CMD[16:32], True # DE=00-00-00-00-0
)
self._create_sdep_raw(
self._keycode_template[2], _KEY_CODE_CMD[32:48], False # 0-00-00-00\n
)

def send_keyboard_code(self, evt):
"""
Put an AT+BLEKEYBOARDCODE command into the FIFO buffer.
Call pop_keyboard_code() to send a single packet to the Bluefruit.
:param evt: bytearray(8) representing keyboard code to send
"""
evt = ba.hexlify(evt)
self._keycode_template[1][7:9] = evt[0:2]
# self._keycode_template[1][10:12] = evt[2:4] # Should always be 0
self._keycode_template[1][13:15] = evt[4:6]
self._keycode_template[1][16:18] = evt[6:8]
self._keycode_template[1][19] = evt[8]
self._keycode_template[2][4] = evt[9]
self._keycode_template[2][6:8] = evt[10:12]
self._keycode_template[2][9:11] = evt[12:14]
self._keycode_template[2][12:14] = evt[14:16]
for k in self._keycode_template:
self._fifo_buffer.enqueue(k)

def pop_keyboard_code_queue(self):
"""Send an SDEP packet from the FIFO buffer to the Bluefruit"""
data = self._fifo_buffer.dequeue()
if data is not None:
with self._spi_device as spi:
spi.write(data, end=24)

@staticmethod
def _create_sdep_raw(dest, payload, more):
"""
Create an SDEP packet
:param dest: bytearray(20) to place SDEP packet in
:param payload: iterable with length <= 16 containing the payload data
:param more: True to set the more bit, False otherwise
"""
_more = 0x80 if more else 0
plen = len(payload)
struct.pack_into(
"<BHB16s",
dest,
0,
_MSG_COMMAND,
_SDEP_ATCOMMAND,
plen | _more,
payload,
)

def _cmd(self, cmd): # pylint: disable=too-many-branches
"""
Executes the supplied AT command, which must be terminated with
Expand All @@ -112,25 +218,16 @@ def _cmd(self, cmd): # pylint: disable=too-many-branches
print("ERROR: Command too long.")
raise ValueError("Command too long.")

more = 0x80 # More bit is in pos 8, 1 = more data available
more = True
pos = 0
while len(cmd) - pos:
# Construct the SDEP packet
if len(cmd) - pos <= 16:
# Last or sole packet
more = 0
more = False
plen = len(cmd) - pos
plen = min(plen, 16)
# Note the 'more' value in bit 8 of the packet len
struct.pack_into(
"<BHB16s",
self._buf_tx,
0,
_MSG_COMMAND,
_SDEP_ATCOMMAND,
plen | more,
cmd[pos : pos + plen],
)
self._create_sdep_raw(self._buf_tx, cmd[pos : pos + plen], more=more)
if self._debug:
print("Writing: ", [hex(b) for b in self._buf_tx])
else:
Expand Down