60
60
MQTT_PINGREQ = b"\xc0 \0 "
61
61
MQTT_PINGRESP = const (0xD0 )
62
62
MQTT_PUBLISH = const (0x30 )
63
- MQTT_SUB = b" \x82 "
63
+ MQTT_SUB = const ( 0x82 )
64
64
MQTT_UNSUB = b"\xA2 "
65
65
MQTT_DISCONNECT = b"\xe0 \0 "
66
66
@@ -594,18 +594,7 @@ def _connect(
594
594
var_header [6 ] |= 0x4 | (self ._lw_qos & 0x1 ) << 3 | (self ._lw_qos & 0x2 ) << 3
595
595
var_header [6 ] |= self ._lw_retain << 5
596
596
597
- # Remaining length calculation
598
- large_rel_length = False
599
- if remaining_length > 0x7F :
600
- large_rel_length = True
601
- # Calculate Remaining Length [2.2.3]
602
- while remaining_length > 0 :
603
- encoded_byte = remaining_length % 0x80
604
- remaining_length = remaining_length // 0x80
605
- # if there is more data to encode, set the top bit of the byte
606
- if remaining_length > 0 :
607
- encoded_byte |= 0x80
608
- fixed_header .append (encoded_byte )
597
+ large_rel_length = self .encode_remaining_length (fixed_header , remaining_length )
609
598
if large_rel_length :
610
599
fixed_header .append (0x00 )
611
600
else :
@@ -648,6 +637,25 @@ def _connect(
648
637
f"No data received from broker for { self ._recv_timeout } seconds."
649
638
)
650
639
640
+ # pylint: disable=no-self-use
641
+ def encode_remaining_length (self , fixed_header , remaining_length ):
642
+ """
643
+ Encode Remaining Length [2.2.3]
644
+ """
645
+ # Remaining length calculation
646
+ large_rel_length = False
647
+ if remaining_length > 0x7F :
648
+ large_rel_length = True
649
+ while remaining_length > 0 :
650
+ encoded_byte = remaining_length % 0x80
651
+ remaining_length = remaining_length // 0x80
652
+ # if there is more data to encode, set the top bit of the byte
653
+ if remaining_length > 0 :
654
+ encoded_byte |= 0x80
655
+ fixed_header .append (encoded_byte )
656
+
657
+ return large_rel_length
658
+
651
659
def disconnect (self ) -> None :
652
660
"""Disconnects the MiniMQTT client from the MQTT broker."""
653
661
self ._connected ()
@@ -780,7 +788,7 @@ def publish(
780
788
781
789
def subscribe (self , topic : str , qos : int = 0 ) -> None :
782
790
"""Subscribes to a topic on the MQTT Broker.
783
- This method can subscribe to one topics or multiple topics.
791
+ This method can subscribe to one topic or multiple topics.
784
792
785
793
:param str|tuple|list topic: Unique MQTT topic identifier string. If
786
794
this is a `tuple`, then the tuple should
@@ -810,20 +818,27 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
810
818
self ._valid_topic (t )
811
819
topics .append ((t , q ))
812
820
# Assemble packet
821
+ self .logger .debug ("Sending SUBSCRIBE to broker..." )
822
+ fixed_header = bytearray ([MQTT_SUB ])
813
823
packet_length = 2 + (2 * len (topics )) + (1 * len (topics ))
814
824
packet_length += sum (len (topic .encode ("utf-8" )) for topic , qos in topics )
815
- packet_length_byte = packet_length .to_bytes (1 , "big" )
825
+ self .encode_remaining_length (fixed_header , remaining_length = packet_length )
826
+ self .logger .debug (f"Fixed Header: { fixed_header } " )
827
+ self ._sock .send (fixed_header )
816
828
self ._pid = self ._pid + 1 if self ._pid < 0xFFFF else 1
817
829
packet_id_bytes = self ._pid .to_bytes (2 , "big" )
818
- # Packet with variable and fixed headers
819
- packet = MQTT_SUB + packet_length_byte + packet_id_bytes
830
+ var_header = packet_id_bytes
831
+ self .logger .debug (f"Variable Header: { var_header } " )
832
+ self ._sock .send (var_header )
820
833
# attaching topic and QOS level to the packet
834
+ packet = bytes ()
821
835
for t , q in topics :
822
836
topic_size = len (t .encode ("utf-8" )).to_bytes (2 , "big" )
823
837
qos_byte = q .to_bytes (1 , "big" )
824
838
packet += topic_size + t .encode () + qos_byte
825
839
for t , q in topics :
826
840
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
841
+ self .logger .debug (f"packet: { packet } " )
827
842
self ._sock .send (packet )
828
843
stamp = time .monotonic ()
829
844
while True :
@@ -837,7 +852,7 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
837
852
if op == 0x90 :
838
853
rc = self ._sock_exact_recv (3 )
839
854
# Check packet identifier.
840
- assert rc [1 ] == packet [ 2 ] and rc [2 ] == packet [ 3 ]
855
+ assert rc [1 ] == var_header [ 0 ] and rc [2 ] == var_header [ 1 ]
841
856
remaining_len = rc [0 ] - 2
842
857
assert remaining_len > 0
843
858
rc = self ._sock_exact_recv (remaining_len )
0 commit comments