Skip to content

Commit adaa635

Browse files
author
Jim Bennett
committed
Moving from HTTPS to MQTT for device registration
1 parent 6f71151 commit adaa635

File tree

1 file changed

+100
-160
lines changed

1 file changed

+100
-160
lines changed

adafruit_azureiot/device_registration.py

Lines changed: 100 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,16 @@
1313
* Author(s): Jim Bennett, Elena Horton
1414
"""
1515

16-
import gc
1716
import json
1817
import time
19-
import adafruit_requests as requests
2018
import adafruit_logging as logging
2119
from adafruit_logging import Logger
20+
import adafruit_minimqtt.adafruit_minimqtt as minimqtt
21+
from adafruit_minimqtt.adafruit_minimqtt import MQTT
2222
from . import constants
2323
from .quote import quote
2424
from .keys import compute_derived_symmetric_key
2525

26-
# Azure HTTP error status codes
27-
AZURE_HTTP_ERROR_CODES = [400, 401, 404, 403, 412, 429, 500]
28-
2926

3027
class DeviceRegistrationError(Exception):
3128
"""
@@ -43,21 +40,6 @@ class DeviceRegistration:
4340
to IoT Central over MQTT
4441
"""
4542

46-
_loop_interval = 2
47-
48-
@staticmethod
49-
def _parse_http_status(status_code: int, status_reason: str) -> None:
50-
"""Parses status code, throws error based on Azure IoT Common Error Codes.
51-
:param int status_code: HTTP status code.
52-
:param str status_reason: Description of HTTP status.
53-
:raises DeviceRegistrationError: if the status code is an error code
54-
"""
55-
for error in AZURE_HTTP_ERROR_CODES:
56-
if error == status_code:
57-
raise DeviceRegistrationError(
58-
"Error {0}: {1}".format(status_code, status_reason)
59-
)
60-
6143
# pylint: disable=R0913
6244
def __init__(
6345
self,
@@ -70,7 +52,6 @@ def __init__(
7052
):
7153
"""Creates an instance of the device registration service
7254
:param socket: The network socket
73-
:param iface: The network interface
7455
:param str id_scope: The ID scope of the device to register
7556
:param str device_id: The device ID of the device to register
7657
:param str key: The primary or secondary key of the device to register
@@ -81,106 +62,95 @@ def __init__(
8162
self._key = key
8263
self._logger = logger if logger is not None else logging.getLogger("log")
8364

84-
socket.set_interface(iface)
85-
requests.set_socket(socket, iface)
65+
self._mqtt_connected = False
66+
self._mqtt = None
67+
self._auth_response_received = False
68+
self._operation_id = None
69+
self._hostname = None
8670

87-
def _loop_assign(self, operation_id, headers) -> str:
88-
uri = "https://%s/%s/registrations/%s/operations/%s?api-version=%s" % (
89-
constants.DPS_END_POINT,
90-
self._id_scope,
91-
self._device_id,
92-
operation_id,
93-
constants.DPS_API_VERSION,
71+
self._socket = socket
72+
self._iface = iface
73+
74+
# pylint: disable=W0613
75+
# pylint: disable=C0103
76+
def _on_connect(self, client, userdata, _, rc) -> None:
77+
self._logger.info(
78+
f"- device_registration :: _on_connect :: rc = {str(rc)}, userdata = {str(userdata)}"
9479
)
95-
self._logger.info("- iotc :: _loop_assign :: " + uri)
80+
if rc == 0:
81+
self._mqtt_connected = True
82+
self._auth_response_received = True
83+
84+
# pylint: disable=W0613
85+
def _handle_dps_update(self, client, topic: str, msg: str) -> None:
86+
self._logger.info(f"Received registration results on topic {topic} - {msg}")
87+
message = json.loads(msg)
88+
89+
if topic.startswith("$dps/registrations/res/202"):
90+
self._operation_id = message["operationId"]
91+
elif topic.startswith("$dps/registrations/res/200"):
92+
self._hostname = message["registrationState"]["assignedHub"]
9693

97-
response = self._run_get_request_with_retry(uri, headers)
94+
def _connect_to_mqtt(self) -> None:
95+
self._mqtt.on_connect = self._on_connect
9896

99-
try:
100-
data = response.json()
101-
except ValueError as error:
102-
err = "ERROR: " + str(error) + " => " + str(response)
103-
self._logger.error(err)
104-
raise DeviceRegistrationError(err) from error
97+
self._mqtt.connect()
10598

106-
loop_try = 0
99+
self._logger.info(
100+
" - device_registration :: connect :: created mqtt client. connecting.."
101+
)
102+
while not self._auth_response_received:
103+
self._mqtt.loop()
107104

108-
if data is not None and "status" in data:
109-
if data["status"] == "assigning":
110-
time.sleep(self._loop_interval)
111-
if loop_try < 20:
112-
loop_try = loop_try + 1
113-
return self._loop_assign(operation_id, headers)
105+
self._logger.info(
106+
f" - device_registration :: connect :: on_connect must be fired. Connected ? {str(self._mqtt_connected)}"
107+
)
114108

115-
err = "ERROR: Unable to provision the device."
116-
self._logger.error(err)
117-
raise DeviceRegistrationError(err)
109+
if not self._mqtt_connected:
110+
raise DeviceRegistrationError("Cannot connect to MQTT")
118111

119-
if data["status"] == "assigned":
120-
state = data["registrationState"]
121-
return state["assignedHub"]
122-
else:
123-
data = str(data)
112+
def _start_registration(self) -> None:
113+
self._mqtt.add_topic_callback(
114+
"$dps/registrations/res/#", self._handle_dps_update
115+
)
116+
self._mqtt.subscribe("$dps/registrations/res/#")
124117

125-
err = "DPS L => " + str(data)
126-
self._logger.error(err)
127-
raise DeviceRegistrationError(err)
118+
message = json.dumps({"registrationId": self._device_id})
119+
120+
self._mqtt.publish(
121+
f"$dps/registrations/PUT/iotdps-register/?$rid={self._device_id}", message
122+
)
128123

129-
def _run_put_request_with_retry(self, url, body, headers):
130124
retry = 0
131-
response = None
132-
133-
while True:
134-
gc.collect()
135-
try:
136-
self._logger.debug("Trying to send...")
137-
response = requests.put(url, json=body, headers=headers)
138-
self._logger.debug("Sent!")
139-
break
140-
except RuntimeError as runtime_error:
141-
self._logger.info(
142-
"Could not send data, retrying after 0.5 seconds: "
143-
+ str(runtime_error)
144-
)
145-
retry = retry + 1
146-
147-
if retry >= 10:
148-
self._logger.error("Failed to send data")
149-
raise
150-
151-
time.sleep(0.5)
152-
continue
153-
154-
gc.collect()
155-
return response
156-
157-
def _run_get_request_with_retry(self, url, headers):
125+
126+
while self._operation_id is None and retry < 10:
127+
time.sleep(1)
128+
retry = retry + 1
129+
self._mqtt.loop()
130+
131+
if self._operation_id is None:
132+
raise DeviceRegistrationError(
133+
"Cannot register device - no response from broker for registration result"
134+
)
135+
136+
def _wait_for_operation(self) -> None:
137+
message = json.dumps({"operationId": self._operation_id})
138+
self._mqtt.publish(
139+
f"$dps/registrations/GET/iotdps-get-operationstatus/?$rid={self._device_id}&operationId={self._operation_id}",
140+
message,
141+
)
142+
158143
retry = 0
159-
response = None
160-
161-
while True:
162-
gc.collect()
163-
try:
164-
self._logger.debug("Trying to send...")
165-
response = requests.get(url, headers=headers)
166-
self._logger.debug("Sent!")
167-
break
168-
except RuntimeError as runtime_error:
169-
self._logger.info(
170-
"Could not send data, retrying after 0.5 seconds: "
171-
+ str(runtime_error)
172-
)
173-
retry = retry + 1
174-
175-
if retry >= 10:
176-
self._logger.error("Failed to send data")
177-
raise
178-
179-
time.sleep(0.5)
180-
continue
181-
182-
gc.collect()
183-
return response
144+
145+
while self._hostname is None and retry < 10:
146+
time.sleep(1)
147+
retry = retry + 1
148+
self._mqtt.loop()
149+
150+
if self._hostname is None:
151+
raise DeviceRegistrationError(
152+
"Cannot register device - no response from broker for operation status"
153+
)
184154

185155
def register_device(self, expiry: int) -> str:
186156
"""
@@ -192,65 +162,35 @@ def register_device(self, expiry: int) -> str:
192162
:raises DeviceRegistrationError: if the device cannot be registered successfully
193163
:raises RuntimeError: if the internet connection is not responding or is unable to connect
194164
"""
165+
166+
username = f"{self._id_scope}/registrations/{self._device_id}/api-version={constants.DPS_API_VERSION}"
167+
195168
# pylint: disable=C0103
196169
sr = self._id_scope + "%2Fregistrations%2F" + self._device_id
197170
sig_no_encode = compute_derived_symmetric_key(
198171
self._key, sr + "\n" + str(expiry)
199172
)
200173
sig_encoded = quote(sig_no_encode, "~()*!.'")
201-
auth_string = (
202-
"SharedAccessSignature sr="
203-
+ sr
204-
+ "&sig="
205-
+ sig_encoded
206-
+ "&se="
207-
+ str(expiry)
208-
+ "&skn=registration"
174+
auth_string = f"SharedAccessSignature sr={sr}&sig={sig_encoded}&se={str(expiry)}&skn=registration"
175+
176+
minimqtt.set_socket(self._socket, self._iface)
177+
178+
self._mqtt = MQTT(
179+
broker=constants.DPS_END_POINT,
180+
username=username,
181+
password=auth_string,
182+
port=8883,
183+
keep_alive=120,
184+
is_ssl=True,
185+
client_id=self._device_id,
209186
)
210187

211-
headers = {
212-
"content-type": "application/json; charset=utf-8",
213-
"user-agent": "iot-central-client/1.0",
214-
"Accept": "*/*",
215-
}
216-
217-
if auth_string is not None:
218-
headers["authorization"] = auth_string
219-
220-
body = {"registrationId": self._device_id}
188+
self._mqtt.enable_logger(logging, self._logger.getEffectiveLevel())
221189

222-
uri = "https://%s/%s/registrations/%s/register?api-version=%s" % (
223-
constants.DPS_END_POINT,
224-
self._id_scope,
225-
self._device_id,
226-
constants.DPS_API_VERSION,
227-
)
228-
229-
self._logger.info("Connecting...")
230-
self._logger.info("URL: " + uri)
231-
self._logger.info("body: " + json.dumps(body))
232-
233-
response = self._run_put_request_with_retry(uri, body, headers)
234-
235-
data = None
236-
try:
237-
data = response.json()
238-
except ValueError as error:
239-
err = (
240-
"ERROR: non JSON is received from "
241-
+ constants.DPS_END_POINT
242-
+ " => "
243-
+ str(response)
244-
+ " .. message : "
245-
+ str(error)
246-
)
247-
self._logger.error(err)
248-
raise DeviceRegistrationError(err) from error
190+
self._connect_to_mqtt()
191+
self._start_registration()
192+
self._wait_for_operation()
249193

250-
if "errorCode" in data:
251-
err = "DPS => " + str(data)
252-
self._logger.error(err)
253-
raise DeviceRegistrationError(err)
194+
self._mqtt.disconnect()
254195

255-
time.sleep(1)
256-
return self._loop_assign(data["operationId"], headers)
196+
return str(self._hostname)

0 commit comments

Comments
 (0)