Skip to content

Fix publishing large payloads (>127 bytes) #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 3, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions adafruit_minimqtt/adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@
MQTT_PINGRESP = const(0xD0)
MQTT_SUB = b"\x82"
MQTT_UNSUB = b"\xA2"
MQTT_PUB = bytearray(b"\x30\0")
# Variable CONNECT header [MQTT 3.1.2]
MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0")
MQTT_PUB = bytearray(b"\x30")
MQTT_DISCONNECT = b"\xe0\0"

# Variable CONNECT header [MQTT 3.1.2]
MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")


CONNACK_ERRORS = {
const(0x01): "Connection Refused - Incorrect Protocol Version",
const(0x02): "Connection Refused - ID Rejected",
Expand Down Expand Up @@ -301,8 +303,11 @@ def connect(self, clean_session=True):
fixed_header = bytearray()
fixed_header.append(0x10)

# NOTE: Variable header is
# MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0")
# because final 4 bytes are 4, 2, 0, 0
# Variable Header
var_header = MQTT_VAR_HEADER
var_header = MQTT_HDR_CONNECT
var_header[6] = clean_session << 1

# Set up variable header and remaining_length
Expand Down Expand Up @@ -427,7 +432,6 @@ def publish(self, topic, msg, retain=False, qos=0):
.. code-block:: python

mqtt_client.publish('topics/piVal', 'threepointonefour')

"""
self.is_connected()
self._check_topic(topic)
Expand All @@ -443,42 +447,52 @@ def publish(self, topic, msg, retain=False, qos=0):
else:
raise MMQTTException("Invalid message data type.")
if len(msg) > MQTT_MSG_MAX_SZ:
raise MMQTTException("Message size larger than %db." % MQTT_MSG_MAX_SZ)
self._check_qos(qos)
pkt = MQTT_PUB
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ)
assert (
0 <= qos <= 1
), "Quality of Service Level 2 is unsupported by this library."

pub_hdr_fixed = bytearray() # fixed header
pub_hdr_fixed.extend(MQTT_PUB)
pub_hdr_fixed[0] |= retain | qos << 1 # [3.3.1.2], [3.3.1.3]

pub_hdr_var = bytearray() # variable header
pub_hdr_var.append(len(topic) >> 8) # Topic length, MSB
pub_hdr_var.append(len(topic) & 0xFF) # Topic length, LSB
pub_hdr_var.extend(topic.encode("utf-8")) # Topic name

remaining_length = 2 + len(msg) + len(topic)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
# packet identifier where QoS level is 1 or 2. [3.3.2.2]
pid = self._pid
remaining_length += 2
pub_hdr_var.append(0x00)
pub_hdr_var.append(self._pid)
self._pid += 1

# Calculate remaining length [2.2.3]
if remaining_length > 0x7F:
while remaining_length > 0:
encoded_byte = remaining_length % 0x80
remaining_length = remaining_length // 0x80
if remaining_length > 0:
encoded_byte |= 0x80
pub_hdr_fixed.append(encoded_byte)
else:
pub_hdr_fixed.append(remaining_length)

if self.logger is not None:
self.logger.debug(
"Sending PUBLISH\nTopic: {0}\nMsg: {1}\
\nQoS: {2}\nRetain? {3}".format(
topic, msg, qos, retain
)
)
self._sock.send(pkt)
self._send_str(topic)
if qos == 0:
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, self._pid)
if qos > 0:
self._pid += 1
pid = self._pid
struct.pack_into("!H", pkt, 0, pid)
self._sock.send(pkt)
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, pid)
if self.logger is not None:
self.logger.debug("Sending PUBACK")
self._sock.send(pub_hdr_fixed)
self._sock.send(pub_hdr_var)
self._sock.send(msg)
if qos == 0 and self.on_publish is not None:
self.on_publish(self, self.user_data, topic, self._pid)
if qos == 1:
while True:
op = self._wait_for_msg()
Expand All @@ -491,10 +505,6 @@ def publish(self, topic, msg, retain=False, qos=0):
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, rcv_pid)
return
elif qos == 2:
assert 0
if self.on_publish is not None:
self.on_publish(self, self.user_data, topic, rcv_pid)

def subscribe(self, topic, qos=0):
"""Subscribes to a topic on the MQTT Broker.
Expand Down