@@ -784,22 +784,30 @@ def subscribe(self, topic, qos=0):
784
784
stamp = time .monotonic ()
785
785
while True :
786
786
op = self ._wait_for_msg ()
787
- if op == 0x90 :
788
- rc = self ._sock_exact_recv (4 )
789
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
790
- if rc [3 ] == 0x80 :
791
- raise MMQTTException ("SUBACK Failure!" )
792
- for t , q in topics :
793
- if self .on_subscribe is not None :
794
- self .on_subscribe (self , self ._user_data , t , q )
795
- self ._subscribed_topics .append (t )
796
- return
797
-
798
787
if op is None :
799
788
if time .monotonic () - stamp > self ._recv_timeout :
800
789
raise MMQTTException (
801
790
f"No data received from broker for { self ._recv_timeout } seconds."
802
791
)
792
+ else :
793
+ if op == 0x90 :
794
+ rc = self ._sock_exact_recv (3 )
795
+ # Check packet identifier.
796
+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
797
+ remaining_len = rc [0 ] - 2
798
+ assert remaining_len > 0
799
+ rc = self ._sock_exact_recv (remaining_len )
800
+ for i in range (0 , remaining_len ):
801
+ if rc [i ] not in [0 , 1 , 2 ]:
802
+ raise MMQTTException (f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} " )
803
+
804
+ for t , q in topics :
805
+ if self .on_subscribe is not None :
806
+ self .on_subscribe (self , self ._user_data , t , q )
807
+ self ._subscribed_topics .append (t )
808
+ return
809
+ else :
810
+ raise MMQTTException (f"invalid message received as response to SUBSCRIBE: { hex (op )} " )
803
811
804
812
def unsubscribe (self , topic ):
805
813
"""Unsubscribes from a MQTT topic.
@@ -838,22 +846,24 @@ def unsubscribe(self, topic):
838
846
while True :
839
847
stamp = time .monotonic ()
840
848
op = self ._wait_for_msg ()
841
- if op == 176 :
842
- rc = self ._sock_exact_recv (3 )
843
- assert rc [0 ] == 0x02
844
- # [MQTT-3.32]
845
- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
846
- for t in topics :
847
- if self .on_unsubscribe is not None :
848
- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
849
- self ._subscribed_topics .remove (t )
850
- return
851
-
852
849
if op is None :
853
850
if time .monotonic () - stamp > self ._recv_timeout :
854
851
raise MMQTTException (
855
852
f"No data received from broker for { self ._recv_timeout } seconds."
856
853
)
854
+ else :
855
+ if op == 176 :
856
+ rc = self ._sock_exact_recv (3 )
857
+ assert rc [0 ] == 0x02
858
+ # [MQTT-3.32]
859
+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
860
+ for t in topics :
861
+ if self .on_unsubscribe is not None :
862
+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
863
+ self ._subscribed_topics .remove (t )
864
+ return
865
+ else :
866
+ raise MMQTTException (f"invalid message received as response to UNSUBSCRIBE: { hex (op )} " )
857
867
858
868
def _recompute_reconnect_backoff (self ):
859
869
"""
@@ -992,6 +1002,7 @@ def _wait_for_msg(self, timeout=0.1):
992
1002
return MQTT_PINGRESP
993
1003
994
1004
if res [0 ] & MQTT_PKT_TYPE_MASK != MQTT_PUBLISH :
1005
+ self .logger .debug (f"Got message type: { hex (res [0 ])} " )
995
1006
return res [0 ]
996
1007
997
1008
# Handle only the PUBLISH packet type from now on.
0 commit comments