Skip to content

second draft, with echo example #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 120 additions & 86 deletions adafruit_bluefruitspi.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,92 +42,63 @@
* Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice
"""

# imports

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BluefruitSPI.git"

import time
import struct
from digitalio import Direction, Pull
from adafruit_bus_device.spi_device import SPIDevice
from micropython import const
import struct


class MsgType: #pylint: disable=too-few-public-methods,bad-whitespace
"""An enum-like class representing the possible message types.
Possible values are:
- ``MsgType.COMMAND``
- ``MsgType.RESPONSE``
- ``MsgType.ALERT``
- ``MsgType.ERROR``
"""
COMMAND = const(0x10) # Command message
RESPONSE = const(0x20) # Response message
ALERT = const(0x40) # Alert message
ERROR = const(0x80) # Error message


class SDEPCommand: #pylint: disable=too-few-public-methods,bad-whitespace
"""An enum-like class representing the possible SDEP commands.
Possible values are:
- ``SDEPCommand.INITIALIZE``
- ``SDEPCommand.ATCOMMAND``
- ``SDEPCommand.BLEUART_TX``
- ``SDEPCommand.BLEUART_RX``
"""
INITIALIZE = const(0xBEEF) # Resets the Bluefruit device
ATCOMMAND = const(0x0A00) # AT command wrapper
BLEUART_TX = const(0x0A01) # BLE UART transmit data
BLEUART_RX = const(0x0A02) # BLE UART read data


class ArgType: #pylint: disable=too-few-public-methods,bad-whitespace
"""An enum-like class representing the possible argument types.
Possible values are
- ``ArgType.STRING``
- ``ArgType.BYTEARRAY``
- ``ArgType.INT32``
- ``ArgType.UINT32``
- ``ArgType.INT16``
- ``ArgType.UINT16``
- ``ArgType.INT8``
- ``ArgType.UINT8``
"""
STRING = const(0x0100) # String data type
BYTEARRAY = const(0x0200) # Byte array data type
INT32 = const(0x0300) # Signed 32-bit integer data type
UINT32 = const(0x0400) # Unsigned 32-bit integer data type
INT16 = const(0x0500) # Signed 16-bit integer data type
UINT16 = const(0x0600) # Unsigned 16-bit integer data type
INT8 = const(0x0700) # Signed 8-bit integer data type
UINT8 = const(0x0800) # Unsigned 8-bit integer data type


class ErrorCode: #pylint: disable=too-few-public-methods,bad-whitespace
"""An enum-like class representing possible error codes.
Possible values are
- ``ErrorCode.``
"""
INVALIDMSGTYPE = const(0x8021) # SDEP: Unexpected SDEP MsgType
INVALIDCMDID = const(0x8022) # SDEP: Unknown command ID
INVALIDPAYLOAD = const(0x8023) # SDEP: Payload problem
INVALIDLEN = const(0x8024) # SDEP: Indicated len too large
INVALIDINPUT = const(0x8060) # AT: Invalid data
UNKNOWNCMD = const(0x8061) # AT: Unknown command name
INVALIDPARAM = const(0x8062) # AT: Invalid param value
UNSUPPORTED = const(0x8063) # AT: Unsupported command
# pylint: disable=bad-whitespace
_MSG_COMMAND = const(0x10) # Command message
_MSG_RESPONSE = const(0x20) # Response message
_MSG_ALERT = const(0x40) # Alert message
_MSG_ERROR = const(0x80) # Error message

_SDEP_INITIALIZE = const(0xBEEF) # Resets the Bluefruit device
_SDEP_ATCOMMAND = const(0x0A00) # AT command wrapper
_SDEP_BLEUART_TX = const(0x0A01) # BLE UART transmit data
_SDEP_BLEUART_RX = const(0x0A02) # BLE UART read data

_ARG_STRING = const(0x0100) # String data type
_ARG_BYTEARRAY = const(0x0200) # Byte array data type
_ARG_INT32 = const(0x0300) # Signed 32-bit integer data type
_ARG_UINT32 = const(0x0400) # Unsigned 32-bit integer data type
_ARG_INT16 = const(0x0500) # Signed 16-bit integer data type
_ARG_UINT16 = const(0x0600) # Unsigned 16-bit integer data type
_ARG_INT8 = const(0x0700) # Signed 8-bit integer data type
_ARG_UINT8 = const(0x0800) # Unsigned 8-bit integer data type

_ERROR_INVALIDMSGTYPE = const(0x8021) # SDEP: Unexpected SDEP MsgType
_ERROR_INVALIDCMDID = const(0x8022) # SDEP: Unknown command ID
_ERROR_INVALIDPAYLOAD = const(0x8023) # SDEP: Payload problem
_ERROR_INVALIDLEN = const(0x8024) # SDEP: Indicated len too large
_ERROR_INVALIDINPUT = const(0x8060) # AT: Invalid data
_ERROR_UNKNOWNCMD = const(0x8061) # AT: Unknown command name
_ERROR_INVALIDPARAM = const(0x8062) # AT: Invalid param value
_ERROR_UNSUPPORTED = const(0x8063) # AT: Unsupported command

# For the Bluefruit Connect packets
_PACKET_BUTTON_LEN = const(5)
_PACKET_COLOR_LEN = const(6)

# pylint: enable=bad-whitespace


class BluefruitSPI:
"""Helper for the Bluefruit LE SPI Friend"""

def __init__(self, spi, cs, irq, reset, debug=False):
def __init__(self, spi, cs, irq, reset, debug=False): # pylint: disable=too-many-arguments
self._irq = irq
self._buf_tx = bytearray(20)
self._buf_rx = bytearray(20)
self._debug = debug

# a cache of data, used for packet parsing
self._buffer = []

# Reset
reset.direction = Direction.OUTPUT
reset.value = False
Expand All @@ -146,7 +117,7 @@ def __init__(self, spi, cs, irq, reset, debug=False):
self._spi_device = SPIDevice(spi, cs,
baudrate=4000000, phase=0, polarity=0)

def _cmd(self, cmd):
def _cmd(self, cmd): # pylint: disable=too-many-branches
"""
Executes the supplied AT command, which must be terminated with
a new-line character.
Expand All @@ -172,16 +143,19 @@ def _cmd(self, cmd):
plen = 16
# Note the 'more' value in bit 8 of the packet len
struct.pack_into("<BHB16s", self._buf_tx, 0,
MsgType.COMMAND, SDEPCommand.ATCOMMAND,
_MSG_COMMAND, _SDEP_ATCOMMAND,
plen | more, cmd[pos:pos+plen])
if self._debug:
print("Writing: ", [hex(b) for b in self._buf_tx])
else:
time.sleep(0.05)

# Update the position if there is data remaining
pos += plen

# Send out the SPI bus
with self._spi_device as spi:
spi.write(self._buf_tx, end=len(cmd) + 4)
spi.write(self._buf_tx, end=len(cmd) + 4) # pylint: disable=no-member

# Wait up to 200ms for a response
timeout = 0.2
Expand Down Expand Up @@ -212,7 +186,8 @@ def _cmd(self, cmd):
rsp += self._buf_rx[4:rsplen+4]
if self._debug:
print("Reading: ", [hex(b) for b in self._buf_rx])

else:
time.sleep(0.05)
# Clean up the response buffer
if self._debug:
print(rsp)
Expand All @@ -226,46 +201,105 @@ def init(self):
"""
# Construct the SDEP packet
struct.pack_into("<BHB", self._buf_tx, 0,
MsgType.COMMAND, SDEPCommand.INITIALIZE, 0)
_MSG_COMMAND, _SDEP_INITIALIZE, 0)
if self._debug:
print("Writing: ", [hex(b) for b in self._buf_tx])

# Send out the SPI bus
with self._spi_device as spi:
spi.write(self._buf_tx, end=4)
spi.write(self._buf_tx, end=4) # pylint: disable=no-member

# Wait 1 second for the command to complete.
time.sleep(1)

def uarttx(self, txt):
@property
def connected(self):
"""Whether the Bluefruit module is connected to the central"""
return int(self.command_check_OK(b'AT+GAPGETCONN')) == 1

def uart_tx(self, data):
"""
Sends the specific string out over BLE UART.
:param txt: The new-line terminated string to send.
Sends the specific bytestring out over BLE UART.
:param data: The bytestring to send.
"""
return self._cmd("AT+BLEUARTTX="+txt+"\n")
return self._cmd(b'AT+BLEUARTTX='+data+b'\r\n')

def uartrx(self):
def uart_rx(self):
"""
Reads data from the BLE UART FIFO.
Reads byte data from the BLE UART FIFO.
"""
return self._cmd("AT+BLEUARTRX\n")
data = self.command_check_OK(b'AT+BLEUARTRX')
if data:
# remove \r\n from ending
return data[:-2]
return None

def command(self, string):
"""Send a command and check response code"""
try:
msgtype, msgid, rsp = self._cmd(string+"\n")
if msgtype == MsgType.ERROR:
if msgtype == _MSG_ERROR:
raise RuntimeError("Error (id:{0})".format(hex(msgid)))
if msgtype == MsgType.RESPONSE:
if msgtype == _MSG_RESPONSE:
return rsp
else:
raise RuntimeError("Unknown response (id:{0})".format(hex(msgid)))
except RuntimeError as error:
raise RuntimeError("AT command failure: " + repr(error))

def command_check_OK(self, string, delay=0.0):
ret = self.command(string)
def command_check_OK(self, command, delay=0.0): # pylint: disable=invalid-name
"""Send a fully formed bytestring AT command, and check
whether we got an 'OK' back. Returns payload bytes if there is any"""
ret = self.command(command)
time.sleep(delay)
if not ret or not ret[-4:]:
raise RuntimeError("Not OK")
if ret[-4:] != b'OK\r\n':
raise RuntimeError("Not OK")
if ret[:-4]:
return str(ret[:-4], 'utf-8')
return ret[:-4]
return None

def read_packet(self): # pylint: disable=too-many-return-statements
"""
Will read a Bluefruit Connect packet and return it in a parsed format.
Currently supports Button and Color packets only
"""
data = self.uart_rx()
if not data:
return None
# convert to an array of character bytes
self._buffer += [chr(b) for b in data]
# Find beginning of new packet, starts with a '!'
while self._buffer and self._buffer[0] != '!':
self._buffer.pop(0)
# we need at least 2 bytes in the buffer
if len(self._buffer) < 2:
return None

# Packet beginning found
if self._buffer[1] == 'B':
plen = _PACKET_BUTTON_LEN
elif self._buffer[1] == 'C':
plen = _PACKET_COLOR_LEN
else:
# unknown packet type
self._buffer.pop(0)
return None

# split packet off of buffer cache
packet = self._buffer[0:plen]

self._buffer = self._buffer[plen:] # remove packet from buffer
if sum([ord(x) for x in packet]) % 256 != 255: # check sum
return None

# OK packet's good!
if packet[1] == 'B': # buttons have 2 int args to parse
# button number & True/False press
return ('B', int(packet[2]), packet[3] == '1')
if packet[1] == 'C': # colorpick has 3 int args to parse
# red, green and blue
return ('C', ord(packet[2]), ord(packet[3]), ord(packet[4]))
# We don't nicely parse this yet
return packet[1:-1]
84 changes: 84 additions & 0 deletions examples/bluefruitspi_neocolorpicker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# NeoPixel Color Picker demo - wire up some NeoPixels and set their color
# using Adafruit Bluefruit Connect App on your phone

import time
import busio
import board
from digitalio import DigitalInOut
from adafruit_bluefruitspi import BluefruitSPI
import neopixel

ADVERT_NAME = b'BlinkaNeoLamp'

# 16 neopixels on a digital pin, adjust as necessary!
pixels = neopixel.NeoPixel(board.D5, 16)
pixels.fill(0)

spi_bus = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)
cs = DigitalInOut(board.D8)
irq = DigitalInOut(board.D7)
rst = DigitalInOut(board.D4)
bluefruit = BluefruitSPI(spi_bus, cs, irq, rst, debug=False)

def init_bluefruit():
# Initialize the device and perform a factory reset
print("Initializing the Bluefruit LE SPI Friend module")
bluefruit.init()
bluefruit.command_check_OK(b'AT+FACTORYRESET', delay=1)
# Print the response to 'ATI' (info request) as a string
print(str(bluefruit.command_check_OK(b'ATI'), 'utf-8'))
# Change advertised name
bluefruit.command_check_OK(b'AT+GAPDEVNAME='+ADVERT_NAME)

def wait_for_connection():
print("Waiting for a connection to Bluefruit LE Connect ...")
# Wait for a connection ...
dotcount = 0
while not bluefruit.connected:
print(".", end="")
dotcount = (dotcount + 1) % 80
if dotcount == 79:
print("")
time.sleep(0.5)

# This code will check the connection but only query the module if it has been
# at least 'n_sec' seconds. Otherwise it 'caches' the response, to keep from
# hogging the Bluefruit connection with constant queries
connection_timestamp = None
is_connected = None
def check_connection(n_sec):
# pylint: disable=global-statement
global connection_timestamp, is_connected
if (not connection_timestamp) or (time.monotonic() - connection_timestamp > n_sec):
connection_timestamp = time.monotonic()
is_connected = bluefruit.connected
return is_connected

# Unlike most circuitpython code, this runs in two loops
# one outer loop manages reconnecting bluetooth if we lose connection
# then one inner loop for doing what we want when connected!
while True:
# Initialize the module
try: # Wireless connections can have corrupt data or other runtime failures
# This try block will reset the module if that happens
init_bluefruit()
wait_for_connection()
print("\n *Connected!*")

# Once connected, check for incoming BLE UART data
while check_connection(3): # Check our connection status every 3 seconds
# OK we're still connected, see if we have any data waiting
resp = bluefruit.read_packet()
if not resp:
continue # nothin'
print("Read packet", resp)
# Look for a 'C'olor packet
if resp[0] != 'C':
continue
# Set the neopixels to the three bytes in the packet
pixels.fill(resp[1:4])
print("Connection lost.")

except RuntimeError as e:
print(e) # Print what happened
continue # retry!
Loading