Skip to content

Commit 4050fa5

Browse files
author
brentru
committed
update for MMQTT
1 parent f3ff110 commit 4050fa5

13 files changed

+284
-73
lines changed

adafruit_azureiot/device_registration.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,13 @@ def _parse_http_status(status_code: int, status_reason: str) -> None:
7272
"""
7373
for error in AZURE_HTTP_ERROR_CODES:
7474
if error == status_code:
75-
raise DeviceRegistrationError("Error {0}: {1}".format(status_code, status_reason))
75+
raise DeviceRegistrationError(
76+
"Error {0}: {1}".format(status_code, status_reason)
77+
)
7678

77-
def __init__(self, socket, id_scope: str, device_id: str, key: str, logger: Logger = None):
79+
def __init__(
80+
self, socket, id_scope: str, device_id: str, key: str, logger: Logger = None
81+
):
7882
"""Creates an instance of the device registration service
7983
:param socket: The network socket
8084
:param str id_scope: The ID scope of the device to register
@@ -98,7 +102,9 @@ def compute_derived_symmetric_key(secret: str, msg: str) -> bytes:
98102
:rtype: bytes
99103
"""
100104
secret = base64.b64decode(secret)
101-
return base64.b64encode(hmac.new(secret, msg=msg.encode("utf8"), digestmod=hashlib.sha256).digest())
105+
return base64.b64encode(
106+
hmac.new(secret, msg=msg.encode("utf8"), digestmod=hashlib.sha256).digest()
107+
)
102108

103109
def _loop_assign(self, operation_id, headers) -> str:
104110
uri = "https://%s/%s/registrations/%s/operations/%s?api-version=%s" % (
@@ -155,7 +161,10 @@ def _run_put_request_with_retry(self, url, body, headers):
155161
self._logger.debug("Sent!")
156162
break
157163
except RuntimeError as runtime_error:
158-
self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error))
164+
self._logger.info(
165+
"Could not send data, retrying after 0.5 seconds: "
166+
+ str(runtime_error)
167+
)
159168
retry = retry + 1
160169

161170
if retry >= 10:
@@ -180,7 +189,10 @@ def _run_get_request_with_retry(self, url, headers):
180189
self._logger.debug("Sent!")
181190
break
182191
except RuntimeError as runtime_error:
183-
self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error))
192+
self._logger.info(
193+
"Could not send data, retrying after 0.5 seconds: "
194+
+ str(runtime_error)
195+
)
184196
retry = retry + 1
185197

186198
if retry >= 10:
@@ -205,9 +217,19 @@ def register_device(self, expiry: int) -> str:
205217
"""
206218
# pylint: disable=C0103
207219
sr = self._id_scope + "%2Fregistrations%2F" + self._device_id
208-
sig_no_encode = DeviceRegistration.compute_derived_symmetric_key(self._key, sr + "\n" + str(expiry))
220+
sig_no_encode = DeviceRegistration.compute_derived_symmetric_key(
221+
self._key, sr + "\n" + str(expiry)
222+
)
209223
sig_encoded = parse.quote(sig_no_encode, "~()*!.'")
210-
auth_string = "SharedAccessSignature sr=" + sr + "&sig=" + sig_encoded + "&se=" + str(expiry) + "&skn=registration"
224+
auth_string = (
225+
"SharedAccessSignature sr="
226+
+ sr
227+
+ "&sig="
228+
+ sig_encoded
229+
+ "&se="
230+
+ str(expiry)
231+
+ "&skn=registration"
232+
)
211233

212234
headers = {
213235
"content-type": "application/json; charset=utf-8",
@@ -238,7 +260,14 @@ def register_device(self, expiry: int) -> str:
238260
try:
239261
data = response.json()
240262
except ValueError as error:
241-
err = "ERROR: non JSON is received from " + constants.DPS_END_POINT + " => " + str(response) + " .. message : " + str(error)
263+
err = (
264+
"ERROR: non JSON is received from "
265+
+ constants.DPS_END_POINT
266+
+ " => "
267+
+ str(response)
268+
+ " .. message : "
269+
+ str(error)
270+
)
242271
self._logger.error(err)
243272
raise DeviceRegistrationError(err)
244273

adafruit_azureiot/iot_mqtt.py

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,21 @@ def cloud_to_device_message_received(self, body: str, properties: dict) -> None:
8484
:param dict properties: The propreties sent with the mesage
8585
"""
8686

87-
def device_twin_desired_updated(self, desired_property_name: str, desired_property_value, desired_version: int) -> None:
87+
def device_twin_desired_updated(
88+
self, desired_property_name: str, desired_property_value, desired_version: int
89+
) -> None:
8890
"""Called when the device twin desired properties are updated
8991
:param str desired_property_name: The name of the desired property that was updated
9092
:param desired_property_value: The value of the desired property that was updated
9193
:param int desired_version: The version of the desired property that was updated
9294
"""
9395

94-
def device_twin_reported_updated(self, reported_property_name: str, reported_property_value, reported_version: int) -> None:
96+
def device_twin_reported_updated(
97+
self,
98+
reported_property_name: str,
99+
reported_property_value,
100+
reported_version: int,
101+
) -> None:
95102
"""Called when the device twin reported values are updated
96103
:param str reported_property_name: The name of the reported property that was updated
97104
:param reported_property_value: The value of the reported property that was updated
@@ -107,11 +114,17 @@ class IoTMQTT:
107114
def _gen_sas_token(self) -> str:
108115
token_expiry = int(time.time() + self._token_expires)
109116
uri = self._hostname + "%2Fdevices%2F" + self._device_id
110-
signed_hmac_sha256 = DeviceRegistration.compute_derived_symmetric_key(self._key, uri + "\n" + str(token_expiry))
117+
signed_hmac_sha256 = DeviceRegistration.compute_derived_symmetric_key(
118+
self._key, uri + "\n" + str(token_expiry)
119+
)
111120
signature = parse.quote(signed_hmac_sha256, "~()*!.'")
112-
if signature.endswith("\n"): # somewhere along the crypto chain a newline is inserted
121+
if signature.endswith(
122+
"\n"
123+
): # somewhere along the crypto chain a newline is inserted
113124
signature = signature[:-1]
114-
token = "SharedAccessSignature sr={}&sig={}&se={}".format(uri, signature, token_expiry)
125+
token = "SharedAccessSignature sr={}&sig={}&se={}".format(
126+
uri, signature, token_expiry
127+
)
115128
return token
116129

117130
# Workaround for https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/issues/25
@@ -143,15 +156,16 @@ def _try_create_mqtt_client(self, hostname: str) -> None:
143156
self._mqtts.connect()
144157

145158
def _create_mqtt_client(self) -> None:
146-
try:
147-
self._try_create_mqtt_client(self._hostname)
148-
except ValueError:
149-
# Workaround for https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT/issues/25
150-
self._try_create_mqtt_client("https://" + self._hostname)
159+
self._try_create_mqtt_client(self._hostname)
151160

152161
# pylint: disable=C0103, W0613
153162
def _on_connect(self, client, userdata, _, rc) -> None:
154-
self._logger.info("- iot_mqtt :: _on_connect :: rc = " + str(rc) + ", userdata = " + str(userdata))
163+
self._logger.info(
164+
"- iot_mqtt :: _on_connect :: rc = "
165+
+ str(rc)
166+
+ ", userdata = "
167+
+ str(userdata)
168+
)
155169
if rc == 0:
156170
self._mqtt_connected = True
157171
self._auth_response_received = True
@@ -178,7 +192,9 @@ def _on_disconnect(self, client, userdata, rc) -> None:
178192
self._callback.connection_status_change(False)
179193

180194
def _on_publish(self, client, data, topic, msg_id) -> None:
181-
self._logger.info("- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic))
195+
self._logger.info(
196+
"- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic)
197+
)
182198

183199
# pylint: disable=W0703
184200
def _handle_device_twin_update(self, msg: str, topic: str) -> None:
@@ -189,7 +205,12 @@ def _handle_device_twin_update(self, msg: str, topic: str) -> None:
189205
try:
190206
twin = json.loads(msg)
191207
except json.JSONDecodeError as e:
192-
self._logger.error("ERROR: JSON parse for Device Twin message object has failed. => " + msg + " => " + str(e))
208+
self._logger.error(
209+
"ERROR: JSON parse for Device Twin message object has failed. => "
210+
+ msg
211+
+ " => "
212+
+ str(e)
213+
)
193214
return
194215

195216
if "reported" in twin:
@@ -199,11 +220,15 @@ def _handle_device_twin_update(self, msg: str, topic: str) -> None:
199220
reported_version = reported["$version"]
200221
reported.pop("$version")
201222
else:
202-
self._logger.error("ERROR: Unexpected payload for reported twin update => " + msg)
223+
self._logger.error(
224+
"ERROR: Unexpected payload for reported twin update => " + msg
225+
)
203226
return
204227

205228
for property_name, value in reported.items():
206-
self._callback.device_twin_reported_updated(property_name, value, reported_version)
229+
self._callback.device_twin_reported_updated(
230+
property_name, value, reported_version
231+
)
207232

208233
is_patch = "desired" not in twin
209234

@@ -216,11 +241,15 @@ def _handle_device_twin_update(self, msg: str, topic: str) -> None:
216241
desired_version = desired["$version"]
217242
desired.pop("$version")
218243
else:
219-
self._logger.error("ERROR: Unexpected payload for desired twin update => " + msg)
244+
self._logger.error(
245+
"ERROR: Unexpected payload for desired twin update => " + msg
246+
)
220247
return
221248

222249
for property_name, value in desired.items():
223-
self._callback.device_twin_desired_updated(property_name, value, desired_version)
250+
self._callback.device_twin_desired_updated(
251+
property_name, value, desired_version
252+
)
224253

225254
def _handle_direct_method(self, msg: str, topic: str) -> None:
226255
index = topic.find("$rid=")
@@ -249,7 +278,14 @@ def _handle_direct_method(self, msg: str, topic: str) -> None:
249278
ret_message = json.dumps(ret_json)
250279

251280
next_topic = "$iothub/methods/res/{}/?$rid={}".format(ret_code, method_id)
252-
self._logger.info("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name)
281+
self._logger.info(
282+
"C2D: => "
283+
+ next_topic
284+
+ " with data "
285+
+ ret_message
286+
+ " and name => "
287+
+ method_name
288+
)
253289
self._send_common(next_topic, ret_message)
254290

255291
def _handle_cloud_to_device_message(self, msg: str, topic: str) -> None:
@@ -282,14 +318,18 @@ def _on_message(self, client, msg_topic, payload) -> None:
282318
topic = str(msg_topic)
283319

284320
if topic.startswith("$iothub/"):
285-
if topic.startswith("$iothub/twin/PATCH/properties/desired/") or topic.startswith("$iothub/twin/res/200/?$rid="):
321+
if topic.startswith(
322+
"$iothub/twin/PATCH/properties/desired/"
323+
) or topic.startswith("$iothub/twin/res/200/?$rid="):
286324
self._handle_device_twin_update(str(msg), topic)
287325
elif topic.startswith("$iothub/methods"):
288326
self._handle_direct_method(str(msg), topic)
289327
else:
290328
if not topic.startswith("$iothub/twin/res/"): # not twin response
291329
self._logger.error("ERROR: unknown twin! - {}".format(msg))
292-
elif topic.startswith("devices/{}/messages/devicebound".format(self._device_id)):
330+
elif topic.startswith(
331+
"devices/{}/messages/devicebound".format(self._device_id)
332+
):
293333
self._handle_cloud_to_device_message(str(msg), topic)
294334
else:
295335
self._logger.error("ERROR: (unknown message) - {}".format(msg))
@@ -315,7 +355,10 @@ def _send_common(self, topic: str, data) -> None:
315355
self._logger.debug("Data sent")
316356
break
317357
except RuntimeError as runtime_error:
318-
self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error))
358+
self._logger.info(
359+
"Could not send data, retrying after 0.5 seconds: "
360+
+ str(runtime_error)
361+
)
319362
retry = retry + 1
320363

321364
if retry >= 10:
@@ -364,18 +407,24 @@ def __init__(
364407
self._hostname = hostname
365408
self._key = key
366409
self._token_expires = token_expires
367-
self._username = "{}/{}/api-version={}".format(self._hostname, device_id, constants.IOTC_API_VERSION)
410+
self._username = "{}/{}/api-version={}".format(
411+
self._hostname, device_id, constants.IOTC_API_VERSION
412+
)
368413
self._passwd = self._gen_sas_token()
369414
self._logger = logger if logger is not None else logging.getLogger("log")
370415
self._is_subscribed_to_twins = False
371416

372417
def _subscribe_to_core_topics(self):
373418
self._mqtts.subscribe("devices/{}/messages/events/#".format(self._device_id))
374-
self._mqtts.subscribe("devices/{}/messages/devicebound/#".format(self._device_id))
419+
self._mqtts.subscribe(
420+
"devices/{}/messages/devicebound/#".format(self._device_id)
421+
)
375422
self._mqtts.subscribe("$iothub/methods/#")
376423

377424
def _subscribe_to_twin_topics(self):
378-
self._mqtts.subscribe("$iothub/twin/PATCH/properties/desired/#") # twin desired property changes
425+
self._mqtts.subscribe(
426+
"$iothub/twin/PATCH/properties/desired/#"
427+
) # twin desired property changes
379428
self._mqtts.subscribe("$iothub/twin/res/#") # twin properties response
380429

381430
def connect(self) -> bool:
@@ -391,7 +440,10 @@ def connect(self) -> bool:
391440
while self._auth_response_received is None:
392441
self.loop()
393442

394-
self._logger.info(" - iot_mqtt :: connect :: on_connect must be fired. Connected ? " + str(self.is_connected()))
443+
self._logger.info(
444+
" - iot_mqtt :: connect :: on_connect must be fired. Connected ? "
445+
+ str(self.is_connected())
446+
)
395447
if not self.is_connected():
396448
return False
397449

@@ -448,7 +500,9 @@ def loop(self) -> None:
448500

449501
self._mqtts.loop()
450502

451-
def send_device_to_cloud_message(self, message, system_properties: dict = None) -> None:
503+
def send_device_to_cloud_message(
504+
self, message, system_properties: dict = None
505+
) -> None:
452506
"""Send a device to cloud message from this device to Azure IoT Hub
453507
:param message: The message data as a JSON string or a dictionary
454508
:param system_properties: System properties to send with the message
@@ -484,5 +538,7 @@ def send_twin_patch(self, patch) -> None:
484538
:raises RuntimeError: if the internet connection is not responding or is unable to connect
485539
"""
486540
self._logger.info("- iot_mqtt :: sendProperty :: " + str(patch))
487-
topic = "$iothub/twin/PATCH/properties/reported/?$rid={}".format(int(time.time()))
541+
topic = "$iothub/twin/PATCH/properties/reported/?$rid={}".format(
542+
int(time.time())
543+
)
488544
self._send_common(topic, patch)

0 commit comments

Comments
 (0)