diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 00bace2b..4fa0b90a 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -313,35 +313,6 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): self.deinit() - def _sock_exact_recv(self, bufsize): - """Reads _exact_ number of bytes from the connected socket. Will only return - string with the exact number of bytes requested. - - The semantics of native socket receive is that it returns no more than the - specified number of bytes (i.e. max size). However, it makes no guarantees in - terms of the minimum size of the buffer, which could be 1 byte. This is a - wrapper for socket recv() to ensure that no less than the expected number of - bytes is returned or trigger a timeout exception. - - :param int bufsize: number of bytes to receive - """ - stamp = time.monotonic() - rc = self._sock.recv(bufsize) - to_read = bufsize - len(rc) - assert to_read >= 0 - read_timeout = self.keep_alive - while to_read > 0: - recv = self._sock.recv(to_read) - to_read -= len(recv) - rc += recv - if time.monotonic() - stamp > read_timeout: - raise MMQTTException( - "Unable to receive {} bytes within {} seconds.".format( - to_read, read_timeout - ) - ) - return rc - def deinit(self): """De-initializes the MQTT client and disconnects from the mqtt broker.""" self.disconnect() @@ -985,15 +956,28 @@ def _sock_exact_recv(self, bufsize): bytes is returned or trigger a timeout exception. :param int bufsize: number of bytes to receive - + :return: byte array """ + stamp = time.monotonic() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) - self._sock.recv_into(rc, bufsize) - else: # ESP32SPI Impl. - stamp = time.monotonic() + mv = memoryview(rc) + recv_len = self._sock.recv_into(rc, bufsize) + to_read = bufsize - recv_len + if to_read < 0: + raise MMQTTException(f"negative number of bytes to read: {to_read}") read_timeout = self.keep_alive + mv = mv[recv_len:] + while to_read > 0: + recv_len = self._sock.recv_into(mv, to_read) + to_read -= recv_len + mv = mv[recv_len:] + if time.monotonic() - stamp > read_timeout: + raise MMQTTException( + f"Unable to receive {to_read} bytes within {read_timeout} seconds." + ) + else: # ESP32SPI Impl. # This will timeout with socket timeout (not keepalive timeout) rc = self._sock.recv(bufsize) if not rc: @@ -1012,9 +996,7 @@ def _sock_exact_recv(self, bufsize): rc += recv if time.monotonic() - stamp > read_timeout: raise MMQTTException( - "Unable to receive {} bytes within {} seconds.".format( - to_read, read_timeout - ) + f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) return rc