Skip to content

Pairing #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 94 additions & 49 deletions adafruit_ble/advertising.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class AdvertisingPacket:
"""Incomplete list of 128 bit service UUIDs."""
ALL_128_BIT_SERVICE_UUIDS = 0x07
"""Complete list of 128 bit service UUIDs."""
SOLICITED_16_BIT_SERVICE_UUIDS = 0x14
"""List of 16 bit service UUIDs solicited by a peripheral."""
SOLICITED_128_BIT_SERVICE_UUIDS = 0x15
"""List of 128 bit service UUIDs solicited by a peripheral."""
SHORT_LOCAL_NAME = 0x08
"""Short local device name (shortened to fit)."""
COMPLETE_LOCAL_NAME = 0x09
Expand Down Expand Up @@ -131,101 +135,142 @@ def get(self, element_type, default=None):
except KeyError:
return default

@property
def length(self):
"""Current number of bytes in packet."""
return len(self._packet_bytes)

@property
def bytes_remaining(self):
"""Number of bytes still available for use in the packet."""
return self._max_length - len(self._packet_bytes)
return self._max_length - self.length

def _check_length(self):
if len(self._packet_bytes) > self._max_length:
if self.length > self._max_length:
raise IndexError("Advertising data too long")

def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
"""Add advertising flags."""
self.add_field(self.FLAGS, struct.pack("<B", flags))

def add_field(self, field_type, field_data):
"""Append an advertising data field to the current packet, of the given type.
"""Append byte data to the current packet, of the given type.
The length field is calculated from the length of field_data."""
self._packet_bytes.append(1 + len(field_data))
self._packet_bytes.append(field_type)
self._packet_bytes.extend(field_data)
self._check_length()

def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
"""Add default or custom advertising flags."""
self.add_field(self.FLAGS, struct.pack("<B", flags))

def add_16_bit_uuids(self, uuids):
"""Add a complete list of 16 bit service UUIDs."""
for uuid in uuids:
self.add_field(self.ALL_16_BIT_SERVICE_UUIDS, struct.pack("<H", uuid.uuid16))

def add_128_bit_uuids(self, uuids):
"""Add a complete list of 128 bit service UUIDs."""
for uuid in uuids:
self.add_field(self.ALL_128_BIT_SERVICE_UUIDS, uuid.uuid128)

def add_mfr_specific_data(self, mfr_id, data):
"""Add manufacturer-specific data bytes."""
self.add_field(self.MANUFACTURER_SPECIFIC_DATA, struct.pack('<H', mfr_id) + data)

def add_tx_power(self, tx_power):
"""Add transmit power value."""
self.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))

class ServerAdvertisement:
"""
Data to advertise a peripheral's services.
def add_appearance(self, appearance):
"""Add BLE Appearance value."""
self.add_field(AdvertisingPacket.APPEARANCE, struct.pack("<H", appearance))

The advertisement consists of an advertising data packet and an optional scan response packet,
The scan response packet is created only if there is not room in the
advertising data packet for the complete peripheral name.

:param peripheral Peripheral the Peripheral to advertise. Use its services and name
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
"""

def __init__(self, peripheral, *, tx_power=0):
self._peripheral = peripheral
class Advertisement:
"""Superclass for common code to construct a BLE advertisement,
consisting of an advertising data packet and an optional scan response packet.

packet = AdvertisingPacket()
packet.add_flags()
:param int flags: advertising flags. Default is general discovery, and BLE only (not classic)
"""
def __init__(self, flags=None, tx_power=None):
self._packet = AdvertisingPacket()
self._scan_response_packet = None
if flags:
self._packet.add_flags(flags)
else:
self._packet.add_flags()

# Need to check service.secondary
uuids_16_bits = [service.uuid for service in peripheral.services
if service.uuid.size == 16 and not service.secondary]
if uuids_16_bits:
packet.add_16_bit_uuids(uuids_16_bits)

uuids_128_bits = [service.uuid for service in peripheral.services
if service.uuid.size == 128 and not service.secondary]
if uuids_128_bits:
packet.add_128_bit_uuids(uuids_128_bits)

packet.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))
if tx_power is not None:
self._packet.add_tx_power(tx_power)

def add_name(self, name):
"""Add name to advertisement. If it doesn't fit, add truncated name to packet,
and add complete name to scan response packet.
"""
# 2 bytes needed for field length and type.
bytes_available = packet.bytes_remaining - 2
bytes_available = self._packet.bytes_remaining - 2
if bytes_available <= 0:
raise IndexError("No room for name")

name_bytes = bytes(peripheral.name, 'utf-8')
name_bytes = bytes(name, 'utf-8')
if bytes_available >= len(name_bytes):
packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
self._packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
else:
packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
self._packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
self._scan_response_packet = AdvertisingPacket()
try:
self._scan_response_packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME,
name_bytes)
except IndexError:
raise IndexError("Name too long")

self._advertising_data_packet = packet
def add_uuids(self, uuids, field_type_16_bit_uuids, field_type_128_bit_uuids):
"""Add 16-bit and 128-bit uuids to the packet, using the given field types."""
concatenated_16_bit_uuids = b''.join(
struct.pack("<H", uuid.uuid16) for uuid in uuids if uuid.size == 16)
if concatenated_16_bit_uuids:
self._packet.add_field(field_type_16_bit_uuids, concatenated_16_bit_uuids)

uuids_128_bits = [uuid for uuid in uuids if uuid.size == 128]
if len(uuids_128_bits) > 1:
raise ValueError("Only one 128 bit UUID will fit")
if uuids_128_bits:
self._packet.add_field(field_type_128_bit_uuids, uuids_128_bits[0].uuid128)

@property
def advertising_data_bytes(self):
"""The raw bytes for the initial advertising data packet."""
return self._advertising_data_packet.packet_bytes
return self._packet.packet_bytes

@property
def scan_response_bytes(self):
"""The raw bytes for the scan response packet. None if there is no response packet."""
if self._scan_response_packet:
return self._scan_response_packet.packet_bytes
return None


class ServerAdvertisement(Advertisement):
"""Build an advertisement for a peripheral's services.

There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response
is not yet implemented.

:param Peripheral peripheral: the Peripheral to advertise. Use its services and name.
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
"""

def __init__(self, peripheral, *, tx_power=0):
super().__init__()
uuids = [service.uuid for service in peripheral.services if not service.secondary]
self.add_uuids(uuids,
AdvertisingPacket.ALL_16_BIT_SERVICE_UUIDS,
AdvertisingPacket.ALL_128_BIT_SERVICE_UUIDS)
self.add_name(peripheral.name)


class SolicitationAdvertisement(Advertisement):
"""Build an advertisement for a peripheral to solicit one or more services from a central.

There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response
is not yet implemented.

:param string name: Name to use in advertisement.
:param iterable service_uuids: One or more services requested from a central
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm.
"""

def __init__(self, name, service_uuids, *, tx_power=0):
super().__init__()
self.add_uuids(service_uuids,
AdvertisingPacket.SOLICITED_16_BIT_SERVICE_UUIDS,
AdvertisingPacket.SOLICITED_128_BIT_SERVICE_UUIDS)
self.add_name(name)
148 changes: 148 additions & 0 deletions adafruit_ble/current_time_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# The MIT License (MIT)
#
# Copyright (c) 2019 Dan Halbert for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`adafruit_ble.current_time_client`
====================================================

Connect to a Current Time Service, as a peripheral.

* Author(s): Dan Halbert for Adafruit Industries

"""
import struct
import time

from bleio import Peripheral, UUID
from .advertising import SolicitationAdvertisement

class CurrentTimeClient:
"""
Set up a peripheral that solicits centrals for Current Time Service.

:param str name: Name to advertise for server. If None, use default Advertisement name.

Example::

from adafruit_ble.current_time_client import CurrentTimeClient
import time

cts_client = CurrentTimeClient()
cts_client.start_advertising()
while not cts_client.connected:
pass
# The first time a property is read, the client
# will do discovery and pairing.
while True:
print(cts_client.current_time)
time.sleep(5)

To try the example above, open Settings->Bluetooth on your iOS device.
After the program starts advertising, ``CIRCUITPYxxxx` will show up as a Bluetooth
device for possible connection. Tap it, and then accept the pairing request.
Then the time should print.
"""

CTS_UUID = UUID(0x1805)
CURRENT_TIME_UUID = UUID(0x2A2B)
LOCAL_TIME_INFORMATION_UUID = UUID(0x2A0F)

def __init__(self, name=None, tx_power=0):
self._periph = Peripheral(name=name)
self._advertisement = SolicitationAdvertisement(self._periph.name,
(self.CTS_UUID,), tx_power=tx_power)
self._current_time_char = self._local_time_char = None


def start_advertising(self):
"""Start advertising to solicit a central that supports Current Time Service."""
self._periph.start_advertising(self._advertisement.advertising_data_bytes,
scan_response=self._advertisement.scan_response_bytes)

def stop_advertising(self):
"""Stop advertising the service."""
self._periph.stop_advertising()

@property
def connected(self):
"""True if a central connected to this peripheral."""
return self._periph.connected

def disconnect(self):
"""Disconnect from central."""
self._periph.disconnect()

def _check_connected(self):
if not self.connected:
raise OSError("Not connected")
# Do discovery and pairing if not already done.
if not self._current_time_char:
self._discover()
self._periph.pair()

def _discover(self):
"""Discover service information."""
services = self._periph.discover_remote_services((self.CTS_UUID,))
if not services:
raise OSError("Unable to discover service")
for characteristic in services[0].characteristics:
if characteristic.uuid == self.CURRENT_TIME_UUID:
self._current_time_char = characteristic
elif characteristic.uuid == self.LOCAL_TIME_INFORMATION_UUID:
self._local_time_char = characteristic
if not self._current_time_char or not self._local_time_char:
raise OSError("Remote service missing needed characteristic")

@property
def current_time(self):
"""Get a tuple describing the current time from the server:
(year, month, day, hour, minute, second, weekday, subsecond, adjust_reason)
"""
self._check_connected()
if self._current_time_char:
# year, month, day, hour, minute, second, weekday, subsecond, adjust_reason
values = struct.unpack('<HBBBBBBBB', self._current_time_char.value)
return values
else:
raise OSError("Characteristic not discovered")


@property
def local_time_information(self):
"""Get a tuple of location information from the server:
(timezone, dst_offset)
"""
self._check_connected()
if self._local_time_char:
# timezone, dst_offset
values = struct.unpack('<bB', self._local_time_char.value)
return values
else:
raise OSError("Characteristic not discovered")

@property
def struct_time(self):
"""Return the current time as a `time.struct_time` Day of year and whether DST is in effect
is not available from Current Time Service, so these are set to -1.
"""
_, month, day, hour, minute, second, weekday, _, _ = self.current_time
# Bluetooth weekdays count from 1. struct_time counts from 0.
return time.struct_time((month, day, hour, minute, second, weekday - 1, -1))
Loading