Skip to content

Fix CONNECT for large MQTT payloads #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@
MQTT_SUB = b'\x82'
MQTT_UNSUB = b'\xA2'
MQTT_PUB = bytearray(b'\x30\0')
MQTT_CON = bytearray(b'\x10\0\0')
# Variable CONNECT header [MQTT 3.1.2]
MQTT_CON_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0")
MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0")
MQTT_DISCONNECT = b'\xe0\0'

CONNACK_ERRORS = {const(0x01) : 'Connection Refused - Incorrect Protocol Version',
Expand Down Expand Up @@ -138,7 +137,7 @@ def __init__(self, socket, broker, port=None, username=None,
self._is_connected = False
self._msg_size_lim = MQTT_MSG_SZ_LIM
self.packet_id = 0
self._keep_alive = 0
self._keep_alive = 60
self._pid = 0
self._user_data = None
self._subscribed_topics = []
Expand Down Expand Up @@ -238,31 +237,53 @@ def connect(self, clean_session=True):
self._sock.connect(addr, TCP_MODE)
except RuntimeError as e:
raise MMQTTException("Invalid broker address defined.", e)
premsg = MQTT_CON
msg = MQTT_CON_HEADER
msg[6] = clean_session << 1
sz = 12 + len(self._client_id)

# Fixed Header
fixed_header = bytearray()
fixed_header.append(0x10)

# Variable Header
var_header = MQTT_VAR_HEADER
var_header[6] = clean_session << 1

# Set up variable header and remaining_length
remaining_length = 12 + len(self._client_id)
if self._user is not None:
sz += 2 + len(self._user) + 2 + len(self._pass)
msg[6] |= 0xC0
remaining_length += 2 + len(self._user) + 2 + len(self._pass)
var_header[6] |= 0xC0
if self._keep_alive:
assert self._keep_alive < MQTT_TOPIC_LENGTH_LIMIT
msg[7] |= self._keep_alive >> 8
msg[8] |= self._keep_alive & 0x00FF
var_header[7] |= self._keep_alive >> 8
var_header[8] |= self._keep_alive & 0x00FF
if self._lw_topic:
sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
msg[6] |= self._lw_retain << 5
i = 1
while sz > 0x7f:
premsg[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
var_header[6] |= self._lw_retain << 5

# Remaining length calculation
large_rel_length = False
if remaining_length > 0x7f:
large_rel_length = True
# Calculate Remaining Length [2.2.3]
while remaining_length > 0:
encoded_byte = remaining_length % 0x80
remaining_length = remaining_length // 0x80
# if there is more data to encode, set the top bit of the byte
if remaining_length > 0:
encoded_byte |= 0x80
fixed_header.append(encoded_byte)
if large_rel_length:
fixed_header.append(0x00)
else:
fixed_header.append(remaining_length)
fixed_header.append(0x00)

if self._logger is not None:
self._logger.debug('Sending CONNECT packet to broker')
self._sock.write(premsg)
self._sock.write(msg)
self._logger.debug('Sending CONNECT to broker')
self._logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header,
var_header))
self._sock.write(fixed_header)
self._sock.write(var_header)
# [MQTT-3.1.3-4]
self._send_str(self._client_id)
if self._lw_topic:
Expand Down