diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 38531974..08953f0d 100755 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -62,11 +62,13 @@ MQTT_PINGRESP = const(0xD0) MQTT_SUB = b"\x82" MQTT_UNSUB = b"\xA2" -MQTT_PUB = bytearray(b"\x30\0") -# Variable CONNECT header [MQTT 3.1.2] -MQTT_VAR_HEADER = bytearray(b"\x04MQTT\x04\x02\0\0") +MQTT_PUB = bytearray(b"\x30") MQTT_DISCONNECT = b"\xe0\0" +# Variable CONNECT header [MQTT 3.1.2] +MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") + + CONNACK_ERRORS = { const(0x01): "Connection Refused - Incorrect Protocol Version", const(0x02): "Connection Refused - ID Rejected", @@ -301,8 +303,11 @@ def connect(self, clean_session=True): fixed_header = bytearray() fixed_header.append(0x10) + # NOTE: Variable header is + # MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") + # because final 4 bytes are 4, 2, 0, 0 # Variable Header - var_header = MQTT_VAR_HEADER + var_header = MQTT_HDR_CONNECT var_header[6] = clean_session << 1 # Set up variable header and remaining_length @@ -427,7 +432,6 @@ def publish(self, topic, msg, retain=False, qos=0): .. code-block:: python mqtt_client.publish('topics/piVal', 'threepointonefour') - """ self.is_connected() self._check_topic(topic) @@ -443,20 +447,40 @@ def publish(self, topic, msg, retain=False, qos=0): else: raise MMQTTException("Invalid message data type.") if len(msg) > MQTT_MSG_MAX_SZ: - raise MMQTTException("Message size larger than %db." % MQTT_MSG_MAX_SZ) - self._check_qos(qos) - pkt = MQTT_PUB - pkt[0] |= qos << 1 | retain - sz = 2 + len(topic) + len(msg) + raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ) + assert ( + 0 <= qos <= 1 + ), "Quality of Service Level 2 is unsupported by this library." + + pub_hdr_fixed = bytearray() # fixed header + pub_hdr_fixed.extend(MQTT_PUB) + pub_hdr_fixed[0] |= retain | qos << 1 # [3.3.1.2], [3.3.1.3] + + pub_hdr_var = bytearray() # variable header + pub_hdr_var.append(len(topic) >> 8) # Topic length, MSB + pub_hdr_var.append(len(topic) & 0xFF) # Topic length, LSB + pub_hdr_var.extend(topic.encode("utf-8")) # Topic name + + remaining_length = 2 + len(msg) + len(topic) if qos > 0: - sz += 2 - assert sz < 2097152 - i = 1 - while sz > 0x7F: - pkt[i] = (sz & 0x7F) | 0x80 - sz >>= 7 - i += 1 - pkt[i] = sz + # packet identifier where QoS level is 1 or 2. [3.3.2.2] + pid = self._pid + remaining_length += 2 + pub_hdr_var.append(0x00) + pub_hdr_var.append(self._pid) + self._pid += 1 + + # Calculate remaining length [2.2.3] + if remaining_length > 0x7F: + while remaining_length > 0: + encoded_byte = remaining_length % 0x80 + remaining_length = remaining_length // 0x80 + if remaining_length > 0: + encoded_byte |= 0x80 + pub_hdr_fixed.append(encoded_byte) + else: + pub_hdr_fixed.append(remaining_length) + if self.logger is not None: self.logger.debug( "Sending PUBLISH\nTopic: {0}\nMsg: {1}\ @@ -464,21 +488,11 @@ def publish(self, topic, msg, retain=False, qos=0): topic, msg, qos, retain ) ) - self._sock.send(pkt) - self._send_str(topic) - if qos == 0: - if self.on_publish is not None: - self.on_publish(self, self.user_data, topic, self._pid) - if qos > 0: - self._pid += 1 - pid = self._pid - struct.pack_into("!H", pkt, 0, pid) - self._sock.send(pkt) - if self.on_publish is not None: - self.on_publish(self, self.user_data, topic, pid) - if self.logger is not None: - self.logger.debug("Sending PUBACK") + self._sock.send(pub_hdr_fixed) + self._sock.send(pub_hdr_var) self._sock.send(msg) + if qos == 0 and self.on_publish is not None: + self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: while True: op = self._wait_for_msg() @@ -491,10 +505,6 @@ def publish(self, topic, msg, retain=False, qos=0): if self.on_publish is not None: self.on_publish(self, self.user_data, topic, rcv_pid) return - elif qos == 2: - assert 0 - if self.on_publish is not None: - self.on_publish(self, self.user_data, topic, rcv_pid) def subscribe(self, topic, qos=0): """Subscribes to a topic on the MQTT Broker.