From 2fedb120166d30fa4481a6fa03eeabcb17b141aa Mon Sep 17 00:00:00 2001 From: brentru Date: Mon, 12 Aug 2019 12:02:52 -0400 Subject: [PATCH 1/5] fixing connack payload issues caused by 0-value KeepAlive and incorrect length fixed header --- adafruit_minimqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index 5959dab6..4a5c12e8 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -64,7 +64,7 @@ MQTT_SUB = b'\x82' MQTT_UNSUB = b'\xA2' MQTT_PUB = bytearray(b'\x30\0') -MQTT_CON = bytearray(b'\x10\0\0') +MQTT_CON = bytearray(b"\x10\x00\x00\x00") # Variable CONNECT header [MQTT 3.1.2] MQTT_CON_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0") MQTT_DISCONNECT = b'\xe0\0' @@ -138,7 +138,7 @@ def __init__(self, socket, broker, port=None, username=None, self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self.packet_id = 0 - self._keep_alive = 0 + self._keep_alive = 60 self._pid = 0 self._user_data = None self._subscribed_topics = [] From bc99b3e773ac5b7dc4ebf542c1430968cfe08400 Mon Sep 17 00:00:00 2001 From: brentru Date: Mon, 12 Aug 2019 16:31:50 -0400 Subject: [PATCH 2/5] incorporate new relength encoding scheme --- adafruit_minimqtt.py | 88 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 70 insertions(+), 18 deletions(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index 4a5c12e8..68794544 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -238,31 +238,83 @@ def connect(self, clean_session=True): self._sock.connect(addr, TCP_MODE) except RuntimeError as e: raise MMQTTException("Invalid broker address defined.", e) - premsg = MQTT_CON - msg = MQTT_CON_HEADER - msg[6] = clean_session << 1 - sz = 12 + len(self._client_id) + + # Google core IOT Premsg + #fixed_header = bytearray(b"\x10\x00\x00\x00") + # Adafruit IO + #fixed_header = bytearray(b'\x10\x00\x00') + + # Fixed Header + fixed_header = bytearray() + fixed_header.append(0x10) + + # Variable Header + var_header = MQTT_CON_HEADER + var_header[6] = clean_session << 1 + + + # 12 + (protocol information, we're not supporting MQTTv311..) + remaining_length = 12 + len(self._client_id) if self._user is not None: - sz += 2 + len(self._user) + 2 + len(self._pass) - msg[6] |= 0xC0 + remaining_length += 2 + len(self._user) + 2 + len(self._pass) + var_header[6] |= 0xC0 if self._keep_alive: assert self._keep_alive < MQTT_TOPIC_LENGTH_LIMIT - msg[7] |= self._keep_alive >> 8 - msg[8] |= self._keep_alive & 0x00FF + var_header[7] |= self._keep_alive >> 8 + var_header[8] |= self._keep_alive & 0x00FF if self._lw_topic: - sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) - msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 - msg[6] |= self._lw_retain << 5 + remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) + var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 + var_header[6] |= self._lw_retain << 5 + + # Remaining length i = 1 - while sz > 0x7f: - premsg[i] = (sz & 0x7f) | 0x80 - sz >>= 7 + if remaining_length > 0x7f: + # Calculate Remaining Length [2.2.3] + remaining_bytes = bytearray() + while remaining_length > 0: + encoded_byte = remaining_length % 0x80 + remaining_length = remaining_length // 0x80 + # if there is more data to encode, set the top bit of the byte + if remaining_length > 0: + encoded_byte |= 0x80 + print('enc byte: ', encoded_byte) + remaining_bytes.append(encoded_byte) + fixed_header.append(encoded_byte) + print('_prl: packet', fixed_header) + print('prl, rel.length: ', remaining_length) + print('prl, byte: ', encoded_byte) + i+=1 + #fixed_header[i] = 0x00 + fixed_header.append(0x00) + + """ + # Old, non-working MMQT/UMQTT IMPL + i = 1 + while remaining_length > 0x7f: + fixed_header[i] = (remaining_length & 0x7f) | 0x80 + remaining_length >>= 7 i += 1 - premsg[i] = sz + fixed_header[i] = remaining_length + print("i: ", i) + print(fixed_header) + print(remaining_length) + """ + + if self._logger is not None: self._logger.debug('Sending CONNECT packet to broker') - self._sock.write(premsg) - self._sock.write(msg) + print("---fixed_header----") + print("rel len: ", remaining_length) + print(fixed_header) + self._sock.write(fixed_header) + #print("remaining len: ", fixed_header[i]) + print("---var_header----") + print(var_header) + self._sock.write(var_header) + print("Keepalive Bytes:") + print(hex(var_header[7])) + print(hex(var_header[8])) # [MQTT-3.1.3-4] self._send_str(self._client_id) if self._lw_topic: @@ -703,4 +755,4 @@ def set_logger_level(self, log_level): elif log_level == 'ERROR': self._logger.setLevel(logging.CRITICIAL) else: - raise MMQTTException('Incorrect logging level provided!') + raise MMQTTException('Incorrect logging level provided!') \ No newline at end of file From 5c55b01370ca546ad1be2f37caa7635e723352b9 Mon Sep 17 00:00:00 2001 From: brentru Date: Mon, 12 Aug 2019 16:49:23 -0400 Subject: [PATCH 3/5] adding smaller, normative handling --- adafruit_minimqtt.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index 68794544..fbc35146 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -266,13 +266,16 @@ def connect(self, clean_session=True): remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 var_header[6] |= self._lw_retain << 5 + # Remaining length i = 1 + test_byte = 0 if remaining_length > 0x7f: # Calculate Remaining Length [2.2.3] remaining_bytes = bytearray() while remaining_length > 0: + test_byte = 1 encoded_byte = remaining_length % 0x80 remaining_length = remaining_length // 0x80 # if there is more data to encode, set the top bit of the byte @@ -285,22 +288,29 @@ def connect(self, clean_session=True): print('prl, rel.length: ', remaining_length) print('prl, byte: ', encoded_byte) i+=1 - #fixed_header[i] = 0x00 - fixed_header.append(0x00) + if test_byte: + fixed_header.append(0x00) + else: + print(fixed_header) + print(remaining_length) + fixed_header.append(remaining_length) + fixed_header.append(0x00) + """ # Old, non-working MMQT/UMQTT IMPL i = 1 + print(remaining_length) while remaining_length > 0x7f: + print("with variable header...") fixed_header[i] = (remaining_length & 0x7f) | 0x80 remaining_length >>= 7 i += 1 - fixed_header[i] = remaining_length print("i: ", i) + fixed_header[i] = remaining_length print(fixed_header) print(remaining_length) """ - if self._logger is not None: self._logger.debug('Sending CONNECT packet to broker') From e67aa82bda2a08a4ab4e0eb8114cc9333930aa60 Mon Sep 17 00:00:00 2001 From: brentru Date: Mon, 12 Aug 2019 17:09:51 -0400 Subject: [PATCH 4/5] cleanup and lint! --- adafruit_minimqtt.py | 68 +++++++++----------------------------------- 1 file changed, 13 insertions(+), 55 deletions(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index fbc35146..03aae2d4 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -64,9 +64,8 @@ MQTT_SUB = b'\x82' MQTT_UNSUB = b'\xA2' MQTT_PUB = bytearray(b'\x30\0') -MQTT_CON = bytearray(b"\x10\x00\x00\x00") # Variable CONNECT header [MQTT 3.1.2] -MQTT_CON_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0") +MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0") MQTT_DISCONNECT = b'\xe0\0' CONNACK_ERRORS = {const(0x01) : 'Connection Refused - Incorrect Protocol Version', @@ -238,22 +237,16 @@ def connect(self, clean_session=True): self._sock.connect(addr, TCP_MODE) except RuntimeError as e: raise MMQTTException("Invalid broker address defined.", e) - - # Google core IOT Premsg - #fixed_header = bytearray(b"\x10\x00\x00\x00") - # Adafruit IO - #fixed_header = bytearray(b'\x10\x00\x00') # Fixed Header fixed_header = bytearray() fixed_header.append(0x10) # Variable Header - var_header = MQTT_CON_HEADER + var_header = MQTT_VAR_HEADER var_header[6] = clean_session << 1 - - - # 12 + (protocol information, we're not supporting MQTTv311..) + + # Set up variable header and remaining_length remaining_length = 12 + len(self._client_id) if self._user is not None: remaining_length += 2 + len(self._user) + 2 + len(self._pass) @@ -266,65 +259,30 @@ def connect(self, clean_session=True): remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 var_header[6] |= self._lw_retain << 5 - - # Remaining length - i = 1 - test_byte = 0 + # Remaining length calculation + large_rel_length = False if remaining_length > 0x7f: + large_rel_length = True # Calculate Remaining Length [2.2.3] - remaining_bytes = bytearray() while remaining_length > 0: - test_byte = 1 encoded_byte = remaining_length % 0x80 remaining_length = remaining_length // 0x80 # if there is more data to encode, set the top bit of the byte if remaining_length > 0: encoded_byte |= 0x80 - print('enc byte: ', encoded_byte) - remaining_bytes.append(encoded_byte) fixed_header.append(encoded_byte) - print('_prl: packet', fixed_header) - print('prl, rel.length: ', remaining_length) - print('prl, byte: ', encoded_byte) - i+=1 - if test_byte: + if large_rel_length: fixed_header.append(0x00) else: - print(fixed_header) - print(remaining_length) - fixed_header.append(remaining_length) - fixed_header.append(0x00) - - - """ - # Old, non-working MMQT/UMQTT IMPL - i = 1 - print(remaining_length) - while remaining_length > 0x7f: - print("with variable header...") - fixed_header[i] = (remaining_length & 0x7f) | 0x80 - remaining_length >>= 7 - i += 1 - print("i: ", i) - fixed_header[i] = remaining_length - print(fixed_header) - print(remaining_length) - """ + fixed_header.append(remaining_length + 0x00) if self._logger is not None: - self._logger.debug('Sending CONNECT packet to broker') - print("---fixed_header----") - print("rel len: ", remaining_length) - print(fixed_header) + self._logger.debug('Sending CONNECT to broker') + self._logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header, + var_header)) self._sock.write(fixed_header) - #print("remaining len: ", fixed_header[i]) - print("---var_header----") - print(var_header) self._sock.write(var_header) - print("Keepalive Bytes:") - print(hex(var_header[7])) - print(hex(var_header[8])) # [MQTT-3.1.3-4] self._send_str(self._client_id) if self._lw_topic: @@ -765,4 +723,4 @@ def set_logger_level(self, log_level): elif log_level == 'ERROR': self._logger.setLevel(logging.CRITICIAL) else: - raise MMQTTException('Incorrect logging level provided!') \ No newline at end of file + raise MMQTTException('Incorrect logging level provided!') From bfbace803348d926d8abe2104a1afd7d09f78936 Mon Sep 17 00:00:00 2001 From: brentru Date: Mon, 12 Aug 2019 17:14:23 -0400 Subject: [PATCH 5/5] fix small append --- adafruit_minimqtt.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/adafruit_minimqtt.py b/adafruit_minimqtt.py index 03aae2d4..ef7f3f6c 100644 --- a/adafruit_minimqtt.py +++ b/adafruit_minimqtt.py @@ -275,7 +275,8 @@ def connect(self, clean_session=True): if large_rel_length: fixed_header.append(0x00) else: - fixed_header.append(remaining_length + 0x00) + fixed_header.append(remaining_length) + fixed_header.append(0x00) if self._logger is not None: self._logger.debug('Sending CONNECT to broker')