Skip to content

Commit 801b3a3

Browse files
authored
Merge pull request #4 from brentru/master
Improve network handling and implement time-based KeepAlive
2 parents f3c0cdf + 4d4d6a5 commit 801b3a3

File tree

1 file changed

+91
-44
lines changed

1 file changed

+91
-44
lines changed

adafruit_minimqtt.py

Lines changed: 91 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,12 @@ class MQTT:
9090
:param str client_id: Optional client identifier, defaults to a unique, generated string.
9191
:param bool is_ssl: Sets a secure or insecure connection with the broker.
9292
:param bool log: Attaches a logger to the MQTT client, defaults to logging level INFO.
93+
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
9394
"""
9495
# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
9596
def __init__(self, socket, broker, port=None, username=None,
96-
password=None, network_manager=None, client_id=None, is_ssl=True, log=False):
97+
password=None, network_manager=None, client_id=None,
98+
is_ssl=True, log=False, keep_alive=60):
9799
# network management
98100
self._socket = socket
99101
network_manager_type = str(type(network_manager))
@@ -137,9 +139,11 @@ def __init__(self, socket, broker, port=None, username=None,
137139
self._is_connected = False
138140
self._msg_size_lim = MQTT_MSG_SZ_LIM
139141
self.packet_id = 0
140-
self._keep_alive = 60
142+
self._keep_alive = keep_alive
141143
self._pid = 0
142144
self._user_data = None
145+
self._timestamp = 0
146+
# List of subscribed topics, used for tracking
143147
self._subscribed_topics = []
144148
# Server callbacks
145149
self.on_message = None
@@ -180,34 +184,6 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
180184
self._lw_msg = message
181185
self._lw_retain = retain
182186

183-
def reconnect(self, retries=30, resub_topics=True):
184-
"""Attempts to reconnect to the MQTT broker.
185-
:param int retries: Amount of retries before resetting the network interface.
186-
:param bool resub_topics: Resubscribe to previously subscribed topics.
187-
"""
188-
retries = 0
189-
while not self._is_connected:
190-
if self._logger is not None:
191-
self._logger.debug('Attempting to reconnect to broker')
192-
try:
193-
self.connect()
194-
if self._logger is not None:
195-
self._logger.debug('Reconnected to broker')
196-
if resub_topics:
197-
if self._logger is not None:
198-
self._logger.debug('Attempting to resubscribe to prv. subscribed topics.')
199-
while self._subscribed_topics:
200-
feed = self._subscribed_topics.pop()
201-
self.subscribe(feed)
202-
except OSError as e:
203-
if self._logger is not None:
204-
self._logger.debug('Lost connection, reconnecting and resubscribing...', e)
205-
retries += 1
206-
if retries >= 30:
207-
retries = 0
208-
time.sleep(1)
209-
continue
210-
211187
# pylint: disable=too-many-branches, too-many-statements
212188
def connect(self, clean_session=True):
213189
"""Initiates connection with the MQTT Broker.
@@ -564,29 +540,100 @@ def unsubscribe(self, topic):
564540
self._subscribed_topics.remove(t)
565541
return
566542

543+
@property
544+
def is_wifi_connected(self):
545+
"""Returns if the ESP module is connected to
546+
an access point, resets module if False"""
547+
if self._wifi:
548+
return self._wifi.esp.is_connected
549+
raise MMQTTException("MiniMQTT Client does not use a WiFi NetworkManager.")
550+
551+
# pylint: disable=line-too-long, protected-access
552+
@property
553+
def is_sock_connected(self):
554+
"""Returns if the socket is connected."""
555+
return self.is_wifi_connected and self._sock and self._wifi.esp.socket_connected(self._sock._socknum)
556+
557+
def reconnect_socket(self):
558+
"""Re-establishes the socket's connection with the MQTT broker.
559+
"""
560+
try:
561+
if self._logger is not None:
562+
self._logger.debug("Attempting to reconnect with MQTT Broker...")
563+
self.reconnect()
564+
except RuntimeError as err:
565+
if self._logger is not None:
566+
self._logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
567+
time.sleep(1)
568+
self.reconnect_socket()
569+
570+
def reconnect_wifi(self):
571+
"""Reconnects to WiFi Access Point and socket, if disconnected.
572+
"""
573+
while not self.is_wifi_connected:
574+
try:
575+
if self._logger is not None:
576+
self._logger.debug('Connecting to WiFi AP...')
577+
self._wifi.connect()
578+
except (RuntimeError, ValueError):
579+
if self._logger is not None:
580+
self._logger.debug('Failed to reset WiFi module, retrying...')
581+
time.sleep(1)
582+
# we just reconnected, is the socket still connected?
583+
if not self.is_sock_connected:
584+
self.reconnect_socket()
585+
586+
def reconnect(self, resub_topics=True):
587+
"""Attempts to reconnect to the MQTT broker.
588+
:param bool resub_topics: Resubscribe to previously subscribed topics.
589+
"""
590+
if self._logger is not None:
591+
self._logger.debug('Attempting to reconnect with MQTT broker')
592+
self.connect()
593+
if self._logger is not None:
594+
self._logger.debug('Reconnected with broker')
595+
if resub_topics:
596+
if self._logger is not None:
597+
self._logger.debug('Attempting to resubscribe to previously subscribed topics.')
598+
while self._subscribed_topics:
599+
feed = self._subscribed_topics.pop()
600+
self.subscribe(feed)
601+
567602
def loop_forever(self):
568603
"""Starts a blocking message loop. Use this
569604
method if you want to run a program forever.
605+
Code below a call to this method will NOT execute.
570606
Network reconnection is handled within this call.
571-
Your code will not execute anything below this call.
607+
572608
"""
573-
run = True
574-
while run:
575-
if self._is_connected:
576-
self._wait_for_msg(0.0)
577-
else:
578-
if self._logger is not None:
579-
self._logger.debug('Lost connection, reconnecting and resubscribing...')
580-
self.reconnect(resub_topics=True)
581-
if self._logger is not None:
582-
self._logger.debug('Connection restored, continuing to loop forever...')
609+
while True:
610+
# Check WiFi and socket status
611+
if self.is_sock_connected:
612+
try:
613+
self.loop()
614+
except (RuntimeError, ValueError):
615+
if self._wifi:
616+
# Reconnect the WiFi module and the socket
617+
self.reconnect_wifi()
618+
continue
583619

584620
def loop(self):
585621
"""Non-blocking message loop. Use this method to
586-
check incoming subscription messages. Does not handle
587-
network reconnection like loop_forever - reconnection must
588-
be handled within your code.
622+
check incoming subscription messages.
623+
624+
This method does NOT handle networking or
625+
network hardware management, use loop_forever
626+
or handle in code instead.
589627
"""
628+
if self._timestamp == 0:
629+
self._timestamp = time.monotonic()
630+
current_time = time.monotonic()
631+
if current_time - self._timestamp >= self._keep_alive:
632+
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
633+
if self._logger is not None:
634+
self._logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
635+
self.ping()
636+
self._timestamp = 0
590637
self._sock.settimeout(0.1)
591638
return self._wait_for_msg()
592639

0 commit comments

Comments
 (0)