@@ -627,6 +627,7 @@ def _connect(
627
627
self ._send_str (self ._username )
628
628
self ._send_str (self ._password )
629
629
self .logger .debug ("Receiving CONNACK packet from broker" )
630
+ stamp = time .monotonic ()
630
631
while True :
631
632
op = self ._wait_for_msg ()
632
633
if op == 32 :
@@ -641,6 +642,12 @@ def _connect(
641
642
642
643
return result
643
644
645
+ if op is None :
646
+ if time .monotonic () - stamp > self ._recv_timeout :
647
+ raise MMQTTException (
648
+ f"No data received from broker for { self ._recv_timeout } seconds."
649
+ )
650
+
644
651
def disconnect (self ) -> None :
645
652
"""Disconnects the MiniMQTT client from the MQTT broker."""
646
653
self ._connected ()
@@ -752,6 +759,7 @@ def publish(
752
759
if qos == 0 and self .on_publish is not None :
753
760
self .on_publish (self , self ._user_data , topic , self ._pid )
754
761
if qos == 1 :
762
+ stamp = time .monotonic ()
755
763
while True :
756
764
op = self ._wait_for_msg ()
757
765
if op == 0x40 :
@@ -764,6 +772,12 @@ def publish(
764
772
self .on_publish (self , self ._user_data , topic , rcv_pid )
765
773
return
766
774
775
+ if op is None :
776
+ if time .monotonic () - stamp > self ._recv_timeout :
777
+ raise MMQTTException (
778
+ f"No data received from broker for { self ._recv_timeout } seconds."
779
+ )
780
+
767
781
def subscribe (self , topic : str , qos : int = 0 ) -> None :
768
782
"""Subscribes to a topic on the MQTT Broker.
769
783
This method can subscribe to one topics or multiple topics.
@@ -811,26 +825,34 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
811
825
for t , q in topics :
812
826
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
813
827
self ._sock .send (packet )
828
+ stamp = time .monotonic ()
814
829
while True :
815
830
op = self ._wait_for_msg ()
816
- if op == 0x90 :
817
- rc = self ._sock_exact_recv (3 )
818
- # Check packet identifier.
819
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
820
- remaining_len = rc [0 ] - 2
821
- assert remaining_len > 0
822
- rc = self ._sock_exact_recv (remaining_len )
823
- for i in range (0 , remaining_len ):
824
- if rc [i ] not in [0 , 1 , 2 ]:
825
- raise MMQTTException (
826
- f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
827
- )
828
-
829
- for t , q in topics :
830
- if self .on_subscribe is not None :
831
- self .on_subscribe (self , self ._user_data , t , q )
832
- self ._subscribed_topics .append (t )
831
+ if op is None :
832
+ if time .monotonic () - stamp > self ._recv_timeout :
833
+ raise MMQTTException (
834
+ f"No data received from broker for { self ._recv_timeout } seconds."
835
+ )
833
836
else :
837
+ if op == 0x90 :
838
+ rc = self ._sock_exact_recv (3 )
839
+ # Check packet identifier.
840
+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
841
+ remaining_len = rc [0 ] - 2
842
+ assert remaining_len > 0
843
+ rc = self ._sock_exact_recv (remaining_len )
844
+ for i in range (0 , remaining_len ):
845
+ if rc [i ] not in [0 , 1 , 2 ]:
846
+ raise MMQTTException (
847
+ f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
848
+ )
849
+
850
+ for t , q in topics :
851
+ if self .on_subscribe is not None :
852
+ self .on_subscribe (self , self ._user_data , t , q )
853
+ self ._subscribed_topics .append (t )
854
+ return
855
+
834
856
raise MMQTTException (
835
857
f"invalid message received as response to SUBSCRIBE: { hex (op )} "
836
858
)
@@ -870,20 +892,27 @@ def unsubscribe(self, topic: str) -> None:
870
892
self ._sock .send (packet )
871
893
self .logger .debug ("Waiting for UNSUBACK..." )
872
894
while True :
895
+ stamp = time .monotonic ()
873
896
op = self ._wait_for_msg ()
874
- if op == 176 :
875
- rc = self ._sock_exact_recv (3 )
876
- assert rc [0 ] == 0x02
877
- # [MQTT-3.32]
878
- assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
879
- for t in topics :
880
- if self .on_unsubscribe is not None :
881
- self .on_unsubscribe (self , self ._user_data , t , self ._pid )
882
- self ._subscribed_topics .remove (t )
897
+ if op is None :
898
+ if time .monotonic () - stamp > self ._recv_timeout :
899
+ raise MMQTTException (
900
+ f"No data received from broker for { self ._recv_timeout } seconds."
901
+ )
883
902
else :
884
- raise MMQTTException (
885
- f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
886
- )
903
+ if op == 176 :
904
+ rc = self ._sock_exact_recv (3 )
905
+ assert rc [0 ] == 0x02
906
+ # [MQTT-3.32]
907
+ assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
908
+ for t in topics :
909
+ if self .on_unsubscribe is not None :
910
+ self .on_unsubscribe (self , self ._user_data , t , self ._pid )
911
+ self ._subscribed_topics .remove (t )
912
+ else :
913
+ raise MMQTTException (
914
+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
915
+ )
887
916
888
917
def _recompute_reconnect_backoff (self ) -> None :
889
918
"""
0 commit comments