@@ -128,6 +128,7 @@ class MQTT:
128
128
:param str client_id: Optional client identifier, defaults to a unique, generated string.
129
129
:param bool is_ssl: Sets a secure or insecure connection with the broker.
130
130
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
131
+ :param int recv_timeout: receive timeout, in seconds.
131
132
:param socket socket_pool: A pool of socket resources available for the given radio.
132
133
:param ssl_context: SSL context for long-lived SSL connections.
133
134
:param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks.
@@ -146,6 +147,7 @@ def __init__(
146
147
client_id = None ,
147
148
is_ssl = True ,
148
149
keep_alive = 60 ,
150
+ recv_timeout = 10 ,
149
151
socket_pool = None ,
150
152
ssl_context = None ,
151
153
use_binary_mode = False ,
@@ -160,6 +162,7 @@ def __init__(
160
162
self ._socket_timeout = socket_timeout
161
163
162
164
self .keep_alive = keep_alive
165
+ self ._recv_timeout = recv_timeout
163
166
self ._user_data = None
164
167
self ._is_connected = False
165
168
self ._msg_size_lim = MQTT_MSG_SZ_LIM
@@ -522,6 +525,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
522
525
self ._send_str (self ._password )
523
526
if self .logger is not None :
524
527
self .logger .debug ("Receiving CONNACK packet from broker" )
528
+ stamp = time .monotonic ()
525
529
while True :
526
530
op = self ._wait_for_msg ()
527
531
if op == 32 :
@@ -535,6 +539,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
535
539
self .on_connect (self , self ._user_data , result , rc [2 ])
536
540
return result
537
541
542
+ if op is None :
543
+ if time .monotonic () - stamp > self ._recv_timeout :
544
+ raise MMQTTException (
545
+ f"No data received from broker for { self ._recv_timeout } seconds."
546
+ )
547
+
538
548
def disconnect (self ):
539
549
"""Disconnects the MiniMQTT client from the MQTT broker."""
540
550
self .is_connected ()
@@ -645,6 +655,7 @@ def publish(self, topic, msg, retain=False, qos=0):
645
655
if qos == 0 and self .on_publish is not None :
646
656
self .on_publish (self , self ._user_data , topic , self ._pid )
647
657
if qos == 1 :
658
+ stamp = time .monotonic ()
648
659
while True :
649
660
op = self ._wait_for_msg ()
650
661
if op == 0x40 :
@@ -657,6 +668,12 @@ def publish(self, topic, msg, retain=False, qos=0):
657
668
self .on_publish (self , self ._user_data , topic , rcv_pid )
658
669
return
659
670
671
+ if op is None :
672
+ if time .monotonic () - stamp > self ._recv_timeout :
673
+ raise MMQTTException (
674
+ f"No data received from broker for { self ._recv_timeout } seconds."
675
+ )
676
+
660
677
def subscribe (self , topic , qos = 0 ):
661
678
"""Subscribes to a topic on the MQTT Broker.
662
679
This method can subscribe to one topics or multiple topics.
@@ -705,6 +722,7 @@ def subscribe(self, topic, qos=0):
705
722
for t , q in topics :
706
723
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
707
724
self ._sock .send (packet )
725
+ stamp = time .monotonic ()
708
726
while True :
709
727
op = self ._wait_for_msg ()
710
728
if op == 0x90 :
@@ -718,6 +736,12 @@ def subscribe(self, topic, qos=0):
718
736
self ._subscribed_topics .append (t )
719
737
return
720
738
739
+ if op is None :
740
+ if time .monotonic () - stamp > self ._recv_timeout :
741
+ raise MMQTTException (
742
+ f"No data received from broker for { self ._recv_timeout } seconds."
743
+ )
744
+
721
745
def unsubscribe (self , topic ):
722
746
"""Unsubscribes from a MQTT topic.
723
747
@@ -755,6 +779,7 @@ def unsubscribe(self, topic):
755
779
if self .logger is not None :
756
780
self .logger .debug ("Waiting for UNSUBACK..." )
757
781
while True :
782
+ stamp = time .monotonic ()
758
783
op = self ._wait_for_msg ()
759
784
if op == 176 :
760
785
rc = self ._sock_exact_recv (3 )
@@ -767,6 +792,12 @@ def unsubscribe(self, topic):
767
792
self ._subscribed_topics .remove (t )
768
793
return
769
794
795
+ if op is None :
796
+ if time .monotonic () - stamp > self ._recv_timeout :
797
+ raise MMQTTException (
798
+ f"No data received from broker for { self ._recv_timeout } seconds."
799
+ )
800
+
770
801
def reconnect (self , resub_topics = True ):
771
802
"""Attempts to reconnect to the MQTT broker.
772
803
0 commit comments