@@ -627,7 +627,6 @@ def _connect(
627
627
self ._send_str (self ._username )
628
628
self ._send_str (self ._password )
629
629
self .logger .debug ("Receiving CONNACK packet from broker" )
630
- stamp = time .monotonic ()
631
630
while True :
632
631
op = self ._wait_for_msg ()
633
632
if op == 32 :
@@ -642,12 +641,6 @@ def _connect(
642
641
643
642
return result
644
643
645
- if op is None :
646
- if time .monotonic () - stamp > self ._recv_timeout :
647
- raise MMQTTException (
648
- f"No data received from broker for { self ._recv_timeout } seconds."
649
- )
650
-
651
644
def disconnect (self ) -> None :
652
645
"""Disconnects the MiniMQTT client from the MQTT broker."""
653
646
self ._connected ()
@@ -759,7 +752,6 @@ def publish(
759
752
if qos == 0 and self .on_publish is not None :
760
753
self .on_publish (self , self ._user_data , topic , self ._pid )
761
754
if qos == 1 :
762
- stamp = time .monotonic ()
763
755
while True :
764
756
op = self ._wait_for_msg ()
765
757
if op == 0x40 :
@@ -772,12 +764,6 @@ def publish(
772
764
self .on_publish (self , self ._user_data , topic , rcv_pid )
773
765
return
774
766
775
- if op is None :
776
- if time .monotonic () - stamp > self ._recv_timeout :
777
- raise MMQTTException (
778
- f"No data received from broker for { self ._recv_timeout } seconds."
779
- )
780
-
781
767
def subscribe (self , topic : str , qos : int = 0 ) -> None :
782
768
"""Subscribes to a topic on the MQTT Broker.
783
769
This method can subscribe to one topics or multiple topics.
@@ -825,34 +811,26 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
825
811
for t , q in topics :
826
812
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
827
813
self ._sock .send (packet )
828
- stamp = time .monotonic ()
829
814
while True :
830
815
op = self ._wait_for_msg ()
831
- if op is None :
832
- if time .monotonic () - stamp > self ._recv_timeout :
833
- raise MMQTTException (
834
- f"No data received from broker for { self ._recv_timeout } seconds."
835
- )
836
- else :
837
- if op == 0x90 :
838
- rc = self ._sock_exact_recv (3 )
839
- # Check packet identifier.
840
- assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
841
- remaining_len = rc [0 ] - 2
842
- assert remaining_len > 0
843
- rc = self ._sock_exact_recv (remaining_len )
844
- for i in range (0 , remaining_len ):
845
- if rc [i ] not in [0 , 1 , 2 ]:
846
- raise MMQTTException (
847
- f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
848
- )
849
-
850
- for t , q in topics :
851
- if self .on_subscribe is not None :
852
- self .on_subscribe (self , self ._user_data , t , q )
853
- self ._subscribed_topics .append (t )
854
- return
816
+ if op == 0x90 :
817
+ rc = self ._sock_exact_recv (3 )
818
+ # Check packet identifier.
819
+ assert rc [1 ] == packet [2 ] and rc [2 ] == packet [3 ]
820
+ remaining_len = rc [0 ] - 2
821
+ assert remaining_len > 0
822
+ rc = self ._sock_exact_recv (remaining_len )
823
+ for i in range (0 , remaining_len ):
824
+ if rc [i ] not in [0 , 1 , 2 ]:
825
+ raise MMQTTException (
826
+ f"SUBACK Failure for topic { topics [i ][0 ]} : { hex (rc [i ])} "
827
+ )
855
828
829
+ for t , q in topics :
830
+ if self .on_subscribe is not None :
831
+ self .on_subscribe (self , self ._user_data , t , q )
832
+ self ._subscribed_topics .append (t )
833
+ else :
856
834
raise MMQTTException (
857
835
f"invalid message received as response to SUBSCRIBE: { hex (op )} "
858
836
)
@@ -892,14 +870,8 @@ def unsubscribe(self, topic: str) -> None:
892
870
self ._sock .send (packet )
893
871
self .logger .debug ("Waiting for UNSUBACK..." )
894
872
while True :
895
- stamp = time .monotonic ()
896
873
op = self ._wait_for_msg ()
897
- if op is None :
898
- if time .monotonic () - stamp > self ._recv_timeout :
899
- raise MMQTTException (
900
- f"No data received from broker for { self ._recv_timeout } seconds."
901
- )
902
- else :
874
+ if op :
903
875
if op == 176 :
904
876
rc = self ._sock_exact_recv (3 )
905
877
assert rc [0 ] == 0x02
@@ -910,10 +882,10 @@ def unsubscribe(self, topic: str) -> None:
910
882
self .on_unsubscribe (self , self ._user_data , t , self ._pid )
911
883
self ._subscribed_topics .remove (t )
912
884
return
913
-
914
- raise MMQTTException (
915
- f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
916
- )
885
+ else :
886
+ raise MMQTTException (
887
+ f"invalid message received as response to UNSUBSCRIBE: { hex (op )} "
888
+ )
917
889
918
890
def _recompute_reconnect_backoff (self ) -> None :
919
891
"""
@@ -1115,7 +1087,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1115
1087
to_read = bufsize - recv_len
1116
1088
if to_read < 0 :
1117
1089
raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1118
- read_timeout = self .keep_alive
1090
+ read_timeout = self ._recv_timeout
1119
1091
mv = mv [recv_len :]
1120
1092
while to_read > 0 :
1121
1093
recv_len = self ._sock .recv_into (mv , to_read )
@@ -1136,7 +1108,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1136
1108
# or raise exception if wait longer than read_timeout
1137
1109
to_read = bufsize - len (rc )
1138
1110
assert to_read >= 0
1139
- read_timeout = self .keep_alive
1111
+ read_timeout = self ._recv_timeout
1140
1112
while to_read > 0 :
1141
1113
recv = self ._sock .recv (to_read )
1142
1114
to_read -= len (recv )
0 commit comments