Skip to content

Commit cd8d954

Browse files
author
brentru
committed
pull in _sock_exact_recv and modify it for a cpython/socketpool implementation, tested with _connect. Removing the shared buffer overhead which may or may not work in favor of dynamically sizing rc buffer like in master
1 parent 65afbd5 commit cd8d954

File tree

1 file changed

+43
-19
lines changed

1 file changed

+43
-19
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# The MIT License (MIT)
22
#
3-
# Copyright (c) 2019 Brent Rubell for Adafruit Industries
3+
# Copyright (c) 2019-2021 Brent Rubell for Adafruit Industries
44
#
55
# Original Work Copyright (c) 2016 Paul Sokolovsky, uMQTT
66
# Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt
@@ -36,14 +36,6 @@
3636
3737
Adapted from https://github.com/micropython/micropython-lib/tree/master/umqtt.simple/umqtt
3838
39-
micropython-lib consists of multiple modules from different sources and
40-
authors. Each module comes under its own licensing terms. Short name of
41-
a license can be found in a file within a module directory (usually
42-
metadata.txt or setup.py). Complete text of each license used is provided
43-
at https://github.com/micropython/micropython-lib/blob/master/LICENSE
44-
45-
author='Paul Sokolovsky'
46-
license='MIT'
4739
**Software and Dependencies:**
4840
4941
* Adafruit CircuitPython firmware for the supported boards:
@@ -95,8 +87,7 @@ class MMQTTException(Exception):
9587
# pass
9688

9789

98-
# Legacy Socket API
99-
90+
# Legacy ESP32SPI Socket API
10091
def set_socket(sock, iface=None):
10192
"""Legacy API for setting the socket and network interface, use a `Session` instead.
10293
@@ -237,8 +228,6 @@ def __init__(
237228
self.on_subscribe = None
238229
self.on_unsubscribe = None
239230

240-
# Shared buffer
241-
self._rx_length = 0
242231
self._rx_buffer = bytearray(32)
243232

244233
# Socket helpers
@@ -516,14 +505,14 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
516505
while True:
517506
op = self._wait_for_msg()
518507
if op == 32:
519-
self._recv_into(buf, 3)
520-
assert buf[0] == 0x02
521-
if buf[2] != 0x00:
522-
raise MMQTTException(CONNACK_ERRORS[buf[2]])
508+
rc = self._sock_exact_recv(3)
509+
assert rc[0] == 0x02
510+
if rc[2] != 0x00:
511+
raise MMQTTException(CONNACK_ERRORS[rc[2]])
523512
self._is_connected = True
524-
result = buf[0] & 1
513+
result = rc[0] & 1
525514
if self.on_connect is not None:
526-
self.on_connect(self, self._user_data, result, buf[2])
515+
self.on_connect(self, self._user_data, result, rc[2])
527516
return result
528517

529518
def disconnect(self):
@@ -945,6 +934,41 @@ def _recv_into(self, buf, size=0):
945934
return read_size
946935
return self._sock.recv_into(buf, size)
947936

937+
def _sock_exact_recv(self, bufsize):
938+
"""Reads _exact_ number of bytes from the connected socket. Will only return
939+
string with the exact number of bytes requested.
940+
941+
The semantics of native socket receive is that it returns no more than the
942+
specified number of bytes (i.e. max size). However, it makes no guarantees in
943+
terms of the minimum size of the buffer, which could be 1 byte. This is a
944+
wrapper for socket recv() to ensure that no less than the expected number of
945+
bytes is returned or trigger a timeout exception.
946+
:param int bufsize: number of bytes to receive
947+
948+
"""
949+
if not self._backwards_compatible_sock:
950+
# CPython/Socketpool Impl.
951+
rc = bytearray(bufsize)
952+
self._sock.recv_into(rc, bufsize)
953+
else: # ESP32SPI Impl.
954+
stamp = time.monotonic()
955+
read_timeout = self.keep_alive
956+
rc = self._sock.recv(bufsize)
957+
to_read = bufsize - len(rc)
958+
assert to_read >= 0
959+
read_timeout = self.keep_alive
960+
while to_read > 0:
961+
recv = self._sock.recv(to_read)
962+
to_read -= len(recv)
963+
rc += recv
964+
if time.monotonic() - stamp > read_timeout:
965+
raise MMQTTException(
966+
"Unable to receive {} bytes within {} seconds.".format(
967+
to_read, read_timeout
968+
)
969+
)
970+
return rc
971+
948972
def _send_str(self, string):
949973
"""Packs and encodes a string to a socket.
950974

0 commit comments

Comments
 (0)