@@ -187,7 +187,7 @@ def __init__(
187
187
self ._is_connected = False
188
188
self ._msg_size_lim = MQTT_MSG_SZ_LIM
189
189
self ._pid = 0
190
- self ._timestamp : float = 0
190
+ self ._last_msg_sent_timestamp : float = 0
191
191
self .logger = NullLogger ()
192
192
"""An optional logging attribute that can be set with with a Logger
193
193
to enable debug logging."""
@@ -537,6 +537,7 @@ def _connect(
537
537
if self ._username is not None :
538
538
self ._send_str (self ._username )
539
539
self ._send_str (self ._password )
540
+ self ._last_msg_sent_timestamp = self .get_monotonic_time ()
540
541
self .logger .debug ("Receiving CONNACK packet from broker" )
541
542
stamp = self .get_monotonic_time ()
542
543
while True :
@@ -591,6 +592,7 @@ def disconnect(self) -> None:
591
592
self ._connection_manager .free_socket (self ._sock )
592
593
self ._is_connected = False
593
594
self ._subscribed_topics = []
595
+ self ._last_msg_sent_timestamp = 0
594
596
if self .on_disconnect is not None :
595
597
self .on_disconnect (self , self .user_data , 0 )
596
598
@@ -604,6 +606,7 @@ def ping(self) -> list[int]:
604
606
self ._sock .send (MQTT_PINGREQ )
605
607
ping_timeout = self .keep_alive
606
608
stamp = self .get_monotonic_time ()
609
+ self ._last_msg_sent_timestamp = stamp
607
610
rc , rcs = None , []
608
611
while rc != MQTT_PINGRESP :
609
612
rc = self ._wait_for_msg ()
@@ -678,6 +681,7 @@ def publish(
678
681
self ._sock .send (pub_hdr_fixed )
679
682
self ._sock .send (pub_hdr_var )
680
683
self ._sock .send (msg )
684
+ self ._last_msg_sent_timestamp = self .get_monotonic_time ()
681
685
if qos == 0 and self .on_publish is not None :
682
686
self .on_publish (self , self .user_data , topic , self ._pid )
683
687
if qos == 1 :
@@ -755,6 +759,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
755
759
self .logger .debug (f"payload: { payload } " )
756
760
self ._sock .send (payload )
757
761
stamp = self .get_monotonic_time ()
762
+ self ._last_msg_sent_timestamp = stamp
758
763
while True :
759
764
op = self ._wait_for_msg ()
760
765
if op is None :
@@ -830,6 +835,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
830
835
for t in topics :
831
836
self .logger .debug (f"UNSUBSCRIBING from topic { t } " )
832
837
self ._sock .send (payload )
838
+ self ._last_msg_sent_timestamp = self .get_monotonic_time ()
833
839
self .logger .debug ("Waiting for UNSUBACK..." )
834
840
while True :
835
841
stamp = self .get_monotonic_time ()
@@ -919,31 +925,41 @@ def reconnect(self, resub_topics: bool = True) -> int:
919
925
return ret
920
926
921
927
def loop (self , timeout : float = 0 ) -> Optional [list [int ]]:
922
- # pylint: disable = too-many-return-statements
923
928
"""Non-blocking message loop. Use this method to check for incoming messages.
924
929
Returns list of packet types of any messages received or None.
925
930
926
931
:param float timeout: return after this timeout, in seconds.
927
932
928
933
"""
934
+ if timeout < self ._socket_timeout :
935
+ raise MMQTTException (
936
+ # pylint: disable=consider-using-f-string
937
+ "loop timeout ({}) must be bigger " .format (timeout )
938
+ + "than socket timeout ({}))" .format (self ._socket_timeout )
939
+ )
940
+
929
941
self ._connected ()
930
942
self .logger .debug (f"waiting for messages for { timeout } seconds" )
931
- if self ._timestamp == 0 :
932
- self ._timestamp = self .get_monotonic_time ()
933
- current_time = self .get_monotonic_time ()
934
- if current_time - self ._timestamp >= self .keep_alive :
935
- self ._timestamp = 0
936
- # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
937
- self .logger .debug (
938
- "KeepAlive period elapsed - requesting a PINGRESP from the server..."
939
- )
940
- rcs = self .ping ()
941
- return rcs
942
943
943
944
stamp = self .get_monotonic_time ()
944
945
rcs = []
945
946
946
947
while True :
948
+ if (
949
+ self .get_monotonic_time () - self ._last_msg_sent_timestamp
950
+ >= self .keep_alive
951
+ ):
952
+ # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
953
+ self .logger .debug (
954
+ "KeepAlive period elapsed - requesting a PINGRESP from the server..."
955
+ )
956
+ rcs .extend (self .ping ())
957
+ # ping() itself contains a _wait_for_msg() loop which might have taken a while,
958
+ # so check here as well.
959
+ if self .get_monotonic_time () - stamp > timeout :
960
+ self .logger .debug (f"Loop timed out after { timeout } seconds" )
961
+ break
962
+
947
963
rc = self ._wait_for_msg ()
948
964
if rc is not None :
949
965
rcs .append (rc )
@@ -953,11 +969,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
953
969
954
970
return rcs if rcs else None
955
971
956
- def _wait_for_msg (self ) -> Optional [int ]:
972
+ def _wait_for_msg (self , timeout : Optional [ float ] = None ) -> Optional [int ]:
957
973
# pylint: disable = too-many-return-statements
958
974
959
975
"""Reads and processes network events.
960
976
Return the packet type or None if there is nothing to be received.
977
+
978
+ :param float timeout: return after this timeout, in seconds.
961
979
"""
962
980
# CPython socket module contains a timeout attribute
963
981
if hasattr (self ._socket_pool , "timeout" ):
@@ -967,7 +985,7 @@ def _wait_for_msg(self) -> Optional[int]:
967
985
return None
968
986
else : # socketpool, esp32spi
969
987
try :
970
- res = self ._sock_exact_recv (1 )
988
+ res = self ._sock_exact_recv (1 , timeout = timeout )
971
989
except OSError as error :
972
990
if error .errno in (errno .ETIMEDOUT , errno .EAGAIN ):
973
991
# raised by a socket timeout if 0 bytes were present
@@ -1036,7 +1054,9 @@ def _decode_remaining_length(self) -> int:
1036
1054
return n
1037
1055
sh += 7
1038
1056
1039
- def _sock_exact_recv (self , bufsize : int ) -> bytearray :
1057
+ def _sock_exact_recv (
1058
+ self , bufsize : int , timeout : Optional [float ] = None
1059
+ ) -> bytearray :
1040
1060
"""Reads _exact_ number of bytes from the connected socket. Will only return
1041
1061
bytearray with the exact number of bytes requested.
1042
1062
@@ -1047,6 +1067,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1047
1067
bytes is returned or trigger a timeout exception.
1048
1068
1049
1069
:param int bufsize: number of bytes to receive
1070
+ :param float timeout: timeout, in seconds. Defaults to keep_alive
1050
1071
:return: byte array
1051
1072
"""
1052
1073
stamp = self .get_monotonic_time ()
@@ -1058,7 +1079,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1058
1079
to_read = bufsize - recv_len
1059
1080
if to_read < 0 :
1060
1081
raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1061
- read_timeout = self .keep_alive
1082
+ read_timeout = timeout if timeout is not None else self .keep_alive
1062
1083
mv = mv [recv_len :]
1063
1084
while to_read > 0 :
1064
1085
recv_len = self ._sock .recv_into (mv , to_read )
0 commit comments