Skip to content

Add type annotations, updates to documentation #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 73 additions & 39 deletions adafruit_aws_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
import json
from adafruit_minimqtt.adafruit_minimqtt import MMQTTException

try:
from typing import Optional, Type, Union
from types import TracebackType
from adafruit_minimqtt.adafruit_minimqtt import MQTT
except ImportError:
pass

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_AWS_IOT.git"

Expand All @@ -40,13 +47,13 @@ class AWS_IOT_ERROR(Exception):
class MQTT_CLIENT:
"""Client for interacting with Amazon AWS IoT MQTT API.

:param MiniMQTT mmqttclient: Pre-configured MiniMQTT Client object.
:param ~MQTT.MQTT mmqttclient: Pre-configured MiniMQTT Client object.
:param int keep_alive: Optional Keep-alive timer interval, in seconds.
Provided interval must be 30 <= keep_alive <= 1200.

"""

def __init__(self, mmqttclient, keep_alive=30):
def __init__(self, mmqttclient: MQTT, keep_alive: int = 30) -> None:
if "MQTT" in str(type(mmqttclient)):
self.client = mmqttclient
else:
Expand Down Expand Up @@ -88,18 +95,23 @@ def __init__(self, mmqttclient, keep_alive=30):
self.client.on_unsubscribe = self._on_unsubscribe_mqtt
self.connected_to_aws = False

def __enter__(self):
def __enter__(self) -> "MQTT_CLIENT":
return self

def __exit__(self, exception_type, exception_value, traceback):
def __exit__(
self,
exception_type: Optional[Type[type]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.disconnect()

@property
def is_connected(self):
def is_connected(self) -> bool:
"""Returns if MQTT_CLIENT is connected to AWS IoT MQTT Broker"""
return self.connected_to_aws

def disconnect(self):
def disconnect(self) -> None:
"""Disconnects from Amazon AWS IoT MQTT Broker and de-initializes the MiniMQTT Client."""
try:
self.client.disconnect()
Expand All @@ -114,15 +126,16 @@ def disconnect(self):
self.on_unsubscribe = None
self.client.deinit()

def reconnect(self):
def reconnect(self) -> None:
"""Reconnects to the AWS IoT MQTT Broker"""
try:
self.client.reconnect()
except MMQTTException as error:
raise AWS_IOT_ERROR("Error re-connecting to AWS IoT:", error) from error

def connect(self, clean_session=True):
def connect(self, clean_session: bool = True) -> None:
"""Connects to Amazon AWS IoT MQTT Broker with Client ID.

:param bool clean_session: Establishes a clean session with AWS broker.

"""
Expand All @@ -134,9 +147,12 @@ def connect(self, clean_session=True):

# MiniMQTT Callback Handlers
# pylint: disable=not-callable, unused-argument
def _on_connect_mqtt(self, client, userdata, flag, ret_code):
def _on_connect_mqtt(
self, client: MQTT, userdata: str, flag: int, ret_code: int
) -> None:
"""Runs when code calls on_connect.
:param MiniMQTT client: MiniMQTT client object.

:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param int flag: QoS flag from broker.
:param int ret_code: Return code from broker.
Expand All @@ -148,9 +164,12 @@ def _on_connect_mqtt(self, client, userdata, flag, ret_code):
self.on_connect(self, userdata, flag, ret_code)

# pylint: disable=not-callable, unused-argument
def _on_disconnect_mqtt(self, client, userdata, flag, ret_code):
def _on_disconnect_mqtt(
self, client: MQTT, userdata: str, flag: int, ret_code: int
) -> None:
"""Runs when code calls on_disconnect.
:param MiniMQTT client: MiniMQTT client object.

:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param int flag: QoS flag from broker.
:param int ret_code: Return code from broker.
Expand All @@ -162,9 +181,10 @@ def _on_disconnect_mqtt(self, client, userdata, flag, ret_code):
self.on_connect(self, userdata, flag, ret_code)

# pylint: disable=not-callable
def _on_message_mqtt(self, client, topic, payload):
def _on_message_mqtt(self, client: MQTT, topic: str, payload: str) -> None:
"""Runs when the client calls on_message.
:param MiniMQTT client: MiniMQTT client object.

:param ~MQTT.MQTT client: MiniMQTT client object.
:param str topic: MQTT broker topic.
:param str payload: Payload returned by MQTT broker topic

Expand All @@ -173,26 +193,36 @@ def _on_message_mqtt(self, client, topic, payload):
self.on_message(self, topic, payload)

# pylint: disable=not-callable
def _on_subscribe_mqtt(self, client, user_data, topic, qos):
def _on_subscribe_mqtt(
self, client: MQTT, user_data: str, topic: int, qos: int
) -> None:
"""Runs when the client calls on_subscribe.

:param MiniMQTT client: MiniMQTT client object.
:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param str topic: Desired MQTT topic.
param int qos: Quality of service level for topic, from broker.
:param int qos: Quality of service level for topic, from broker.

"""
if self.on_subscribe is not None:
self.on_subscribe(self, user_data, topic, qos)

# pylint: disable=not-callable
def _on_unsubscribe_mqtt(self, client, user_data, topic, pid):
"""Runs when the client calls on_unsubscribe."""
def _on_unsubscribe_mqtt(
self, client: MQTT, user_data: str, topic: str, pid: int
) -> None:
"""Runs when the client calls on_unsubscribe.

:param ~MQTT.MQTT client: MiniMQTT client object.
:param str user_data: User data from broker
:param str topic: Desired MQTT topic.
:param int pid: Process ID.
"""
if self.on_unsubscribe is not None:
self.on_unsubscribe(self, user_data, topic, pid)

# MiniMQTT Network Control Flow
def loop(self):
def loop(self) -> None:
"""Starts a synchronous message loop which maintains connection with AWS IoT.
Must be called within the keep_alive timeout specified to init.
This method does not handle network connection/disconnection.
Expand All @@ -207,28 +237,28 @@ def loop(self):
if self.connected_to_aws:
self.client.loop()

def loop_forever(self):
def loop_forever(self) -> None:
"""Begins a blocking, asynchronous message loop.
This method handles network connection/disconnection.

"""
if self.connected_to_aws:
self.client.loop_forever()

@staticmethod
def validate_topic(topic):
def validate_topic(topic: str) -> None:
"""Validates if user-provided pub/sub topics adhere to AWS Service Limits.
https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html
:param str topic: Desired topic to validate

:param str topic: Desired topic to validate
"""
assert hasattr(topic, "split"), "Topic must be a string"
assert len(topic) < 256, "Topic must be less than 256 bytes!"
assert len(topic.split("/")) <= 9, "Topics are limited to 7 forward slashes."

# MiniMQTT Pub/Sub Methods, for usage with AWS IoT
def subscribe(self, topic, qos=1):
def subscribe(self, topic: str, qos: int = 1) -> None:
"""Subscribes to an AWS IoT Topic.

:param str topic: MQTT topic to subscribe to.
:param int qos: Desired topic subscription's quality-of-service.

Expand All @@ -237,14 +267,16 @@ def subscribe(self, topic, qos=1):
self.validate_topic(topic)
self.client.subscribe(topic, qos)

def publish(self, topic, payload, qos=1):
def publish(
self, topic: str, payload: Union[str, float, bytes], qos: int = 1
) -> None:
"""Publishes to a AWS IoT Topic.

:param str topic: MQTT topic to publish to.
:param str payload: Data to publish to topic.
:param int payload: Data to publish to topic.
:param float payload: Data to publish to topic.
:param json payload: JSON-formatted data to publish to topic.
:param int qos: Quality of service level for publishing.
:param payload: Data to publish to topic. Must be able to be converted
to a string using ``str()``
:type payload: str|float|bytes
:param int qos: Quality of service level for publishing

"""
assert qos < 2, "AWS IoT does not support publishing with QoS 2."
Expand All @@ -255,35 +287,37 @@ def publish(self, topic, payload, qos=1):

# AWS IoT Device Shadow Service

def shadow_get_subscribe(self, qos=1):
def shadow_get_subscribe(self, qos: int = 1) -> None:
"""Subscribes to device's shadow get response.
:param int qos: Optional quality of service level.

:param int qos: Optional quality of service level.
"""
self.client.subscribe(self.shadow_topic + "/get/#", qos)

def shadow_subscribe(self, qos=1):
def shadow_subscribe(self, qos: int = 1) -> None:
"""Subscribes to all notifications on the device's shadow update topic.
:param int qos: Optional quality of service level.

:param int qos: Optional quality of service level.
"""
self.client.subscribe(self.shadow_topic + "/update/#", qos)

def shadow_update(self, document):
def shadow_update(self, document: str):
"""Publishes a request state document to update the device's shadow.
:param json state_document: JSON-formatted state document.

:param str state_document: JSON-formatted state document string.
"""
self.client.publish(self.shadow_topic + "/update", document)

def shadow_get(self):
def shadow_get(self) -> None:
"""Publishes an empty message to shadow get topic to get the device's shadow."""

self.client.publish(
self.shadow_topic + "/get", json.dumps({"message": "ignore"})
)

def shadow_delete(self):
def shadow_delete(self) -> None:
"""Publishes an empty message to the shadow delete topic to delete a device's shadow"""

self.client.publish(
self.shadow_topic + "/delete", json.dumps({"message": "delete"})
)