diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 73182d17..53e98011 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -36,6 +36,7 @@ from random import randint from adafruit_connection_manager import get_connection_manager +from adafruit_ticks import ticks_ms, ticks_diff try: from typing import List, Optional, Tuple, Type, Union @@ -133,8 +134,6 @@ class MQTT: This works with all callbacks but the "on_message" and those added via add_topic_callback(); for those, to get access to the user_data use the 'user_data' member of the MQTT object passed as 1st argument. - :param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set - this to True in order to operate correctly over more than 24 days or so """ @@ -156,7 +155,6 @@ def __init__( socket_timeout: int = 1, connect_retries: int = 5, user_data=None, - use_imprecise_time: Optional[bool] = None, ) -> None: self._connection_manager = get_connection_manager(socket_pool) self._socket_pool = socket_pool @@ -165,20 +163,6 @@ def __init__( self._backwards_compatible_sock = False self._use_binary_mode = use_binary_mode - self.use_monotonic_ns = False - try: - time.monotonic_ns() - self.use_monotonic_ns = True - except AttributeError: - if use_imprecise_time: - self.use_monotonic_ns = False - else: - raise MMQTTException( # pylint: disable=raise-missing-from - "time.monotonic_ns() is not available. " - "Will use imprecise time however only if the" - "use_imprecise_time argument is set to True." - ) - if recv_timeout <= socket_timeout: raise MMQTTException( "recv_timeout must be strictly greater than socket_timeout" @@ -191,7 +175,7 @@ def __init__( self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self._pid = 0 - self._last_msg_sent_timestamp: float = 0 + self._last_msg_sent_timestamp: int = 0 self.logger = NullLogger() """An optional logging attribute that can be set with with a Logger to enable debug logging.""" @@ -230,7 +214,7 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - time_int = int(self.get_monotonic_time() * 100) % 1000 + time_int = int(ticks_ms() / 10) % 1000 self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: @@ -254,17 +238,6 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None - def get_monotonic_time(self) -> float: - """ - Provide monotonic time in seconds. Based on underlying implementation - this might result in imprecise time, that will result in the library - not being able to operate if running contiguously for more than 24 days or so. - """ - if self.use_monotonic_ns: - return time.monotonic_ns() / 1000000000 - - return time.monotonic() - def __enter__(self): return self @@ -546,9 +519,9 @@ def _connect( if self._username is not None: self._send_str(self._username) self._send_str(self._password) - self._last_msg_sent_timestamp = self.get_monotonic_time() + self._last_msg_sent_timestamp = ticks_ms() self.logger.debug("Receiving CONNACK packet from broker") - stamp = self.get_monotonic_time() + stamp = ticks_ms() while True: op = self._wait_for_msg() if op == 32: @@ -564,7 +537,7 @@ def _connect( return result if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -618,15 +591,16 @@ def ping(self) -> list[int]: self._connected() self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) - ping_timeout = self._recv_timeout - stamp = self.get_monotonic_time() + ping_timeout = self.keep_alive + stamp = ticks_ms() + self._last_msg_sent_timestamp = stamp rc, rcs = None, [] while rc != MQTT_PINGRESP: rc = self._wait_for_msg() if rc: rcs.append(rc) - if self.get_monotonic_time() - stamp > ping_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > ping_timeout: raise MMQTTException( f"PINGRESP not returned from broker within {ping_timeout} seconds." ) @@ -697,11 +671,11 @@ def publish( self._sock.send(pub_hdr_fixed) self._sock.send(pub_hdr_var) self._sock.send(msg) - self._last_msg_sent_timestamp = self.get_monotonic_time() + self._last_msg_sent_timestamp = ticks_ms() if qos == 0 and self.on_publish is not None: self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: - stamp = self.get_monotonic_time() + stamp = ticks_ms() while True: op = self._wait_for_msg() if op == 0x40: @@ -715,7 +689,7 @@ def publish( return if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -774,12 +748,12 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N self.logger.debug(f"SUBSCRIBING to topic {t} with QoS {q}") self.logger.debug(f"payload: {payload}") self._sock.send(payload) - stamp = self.get_monotonic_time() + stamp = ticks_ms() self._last_msg_sent_timestamp = stamp while True: op = self._wait_for_msg() if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -851,13 +825,13 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None: for t in topics: self.logger.debug(f"UNSUBSCRIBING from topic {t}") self._sock.send(payload) - self._last_msg_sent_timestamp = self.get_monotonic_time() + self._last_msg_sent_timestamp = ticks_ms() self.logger.debug("Waiting for UNSUBACK...") while True: - stamp = self.get_monotonic_time() + stamp = ticks_ms() op = self._wait_for_msg() if op is None: - if self.get_monotonic_time() - stamp > self._recv_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -957,12 +931,12 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self._connected() self.logger.debug(f"waiting for messages for {timeout} seconds") - stamp = self.get_monotonic_time() + stamp = ticks_ms() rcs = [] while True: if ( - self.get_monotonic_time() - self._last_msg_sent_timestamp + ticks_diff(ticks_ms(), self._last_msg_sent_timestamp) / 1000 >= self.keep_alive ): # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server @@ -972,14 +946,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs.extend(self.ping()) # ping() itself contains a _wait_for_msg() loop which might have taken a while, # so check here as well. - if self.get_monotonic_time() - stamp > timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break rc = self._wait_for_msg() if rc is not None: rcs.append(rc) - if self.get_monotonic_time() - stamp > timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break @@ -987,7 +961,6 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]: # pylint: disable = too-many-return-statements - """Reads and processes network events. Return the packet type or None if there is nothing to be received. @@ -1086,7 +1059,7 @@ def _sock_exact_recv( :param float timeout: timeout, in seconds. Defaults to keep_alive :return: byte array """ - stamp = self.get_monotonic_time() + stamp = ticks_ms() if not self._backwards_compatible_sock: # CPython, socketpool, esp32spi, wiznet5k rc = bytearray(bufsize) @@ -1101,7 +1074,7 @@ def _sock_exact_recv( recv_len = self._sock.recv_into(mv, to_read) to_read -= recv_len mv = mv[recv_len:] - if self.get_monotonic_time() - stamp > read_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) @@ -1121,7 +1094,7 @@ def _sock_exact_recv( recv = self._sock.recv(to_read) to_read -= len(recv) rc += recv - if self.get_monotonic_time() - stamp > read_timeout: + if ticks_diff(ticks_ms(), stamp) / 1000 > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) diff --git a/docs/conf.py b/docs/conf.py index eb5696d4..3b989959 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,7 +26,7 @@ # Uncomment the below if you use native CircuitPython modules such as # digitalio, micropython and busio. List the modules you use. Without it, the # autodoc module docs will fail to generate with a warning. -autodoc_mock_imports = ["micropython", "microcontroller", "random"] +autodoc_mock_imports = ["microcontroller", "random"] intersphinx_mapping = { diff --git a/requirements.txt b/requirements.txt index 25052887..8075f629 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ Adafruit-Blinka Adafruit-Circuitpython-ConnectionManager +adafruit-circuitpython-ticks diff --git a/tests/test_loop.py b/tests/test_loop.py index 4d79b65b..6666d86f 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -142,7 +142,7 @@ def test_loop_basic(self) -> None: time_before = time.monotonic() timeout = random.randint(3, 8) # pylint: disable=protected-access - mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time() + mqtt_client._last_msg_sent_timestamp = MQTT.ticks_ms() rcs = mqtt_client.loop(timeout=timeout) time_after = time.monotonic() @@ -220,10 +220,10 @@ def test_loop_ping_timeout(self): mqtt_client._sock = mocket start = time.monotonic() - res = mqtt_client.loop(timeout=2 * keep_alive_timeout) + res = mqtt_client.loop(timeout=2 * keep_alive_timeout + recv_timeout) assert time.monotonic() - start >= 2 * keep_alive_timeout assert len(mocket.sent) > 0 - assert len(res) == 2 + assert len(res) == 3 assert set(res) == {int(0xD0)} # pylint: disable=no-self-use diff --git a/tests/test_recv_timeout.py b/tests/test_recv_timeout.py index f65dc403..b291dd83 100644 --- a/tests/test_recv_timeout.py +++ b/tests/test_recv_timeout.py @@ -49,7 +49,7 @@ def test_recv_timeout_vs_keepalive(self) -> None: socket_mock.recv_into.assert_called() now = time.monotonic() - assert recv_timeout <= (now - start) < keep_alive + assert recv_timeout <= (now - start) <= (keep_alive + 0.1) if __name__ == "__main__":