From 469c152db6240e257e34434c598d23fdff2f3a61 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Fri, 24 Nov 2023 19:20:10 +0100 Subject: [PATCH 1/5] make user_data "public" fixes #178 --- adafruit_minimqtt/adafruit_minimqtt.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index cbeb8f6c..6faa6845 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -169,6 +169,8 @@ class MQTT: :param int connect_retries: How many times to try to connect to the broker before giving up on connect or reconnect. Exponential backoff will be used for the retries. :param class user_data: arbitrary data to pass as a second argument to the callbacks. + This works with all callbacks but "on_message"; there, it is necessary to extract + the user_data from the MQTT object (passed as 1st argument) using the 'user_data' member. """ @@ -205,7 +207,7 @@ def __init__( self._recv_timeout = recv_timeout self.keep_alive = keep_alive - self._user_data = user_data + self.user_data = user_data self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM self._pid = 0 @@ -638,7 +640,7 @@ def _connect( self._is_connected = True result = rc[0] & 1 if self.on_connect is not None: - self.on_connect(self, self._user_data, result, rc[2]) + self.on_connect(self, self.user_data, result, rc[2]) return result @@ -661,7 +663,7 @@ def disconnect(self) -> None: self._is_connected = False self._subscribed_topics = [] if self.on_disconnect is not None: - self.on_disconnect(self, self._user_data, 0) + self.on_disconnect(self, self.user_data, 0) def ping(self) -> list[int]: """Pings the MQTT Broker to confirm if the broker is alive or if @@ -757,7 +759,7 @@ def publish( self._sock.send(pub_hdr_var) self._sock.send(msg) if qos == 0 and self.on_publish is not None: - self.on_publish(self, self._user_data, topic, self._pid) + self.on_publish(self, self.user_data, topic, self._pid) if qos == 1: stamp = time.monotonic() while True: @@ -769,7 +771,7 @@ def publish( rcv_pid = rcv_pid_buf[0] << 0x08 | rcv_pid_buf[1] if self._pid == rcv_pid: if self.on_publish is not None: - self.on_publish(self, self._user_data, topic, rcv_pid) + self.on_publish(self, self.user_data, topic, rcv_pid) return if op is None: @@ -849,7 +851,7 @@ def subscribe(self, topic: str, qos: int = 0) -> None: for t, q in topics: if self.on_subscribe is not None: - self.on_subscribe(self, self._user_data, t, q) + self.on_subscribe(self, self.user_data, t, q) self._subscribed_topics.append(t) return @@ -907,7 +909,7 @@ def unsubscribe(self, topic: str) -> None: assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] for t in topics: if self.on_unsubscribe is not None: - self.on_unsubscribe(self, self._user_data, t, self._pid) + self.on_unsubscribe(self, self.user_data, t, self._pid) self._subscribed_topics.remove(t) return From 8ead04c7af4486e0ed0b3cec4647270c02d77459 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Fri, 24 Nov 2023 19:34:25 +0100 Subject: [PATCH 2/5] improve documentation --- adafruit_minimqtt/adafruit_minimqtt.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 6faa6845..76ef083f 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -168,9 +168,10 @@ class MQTT: in seconds. :param int connect_retries: How many times to try to connect to the broker before giving up on connect or reconnect. Exponential backoff will be used for the retries. - :param class user_data: arbitrary data to pass as a second argument to the callbacks. - This works with all callbacks but "on_message"; there, it is necessary to extract - the user_data from the MQTT object (passed as 1st argument) using the 'user_data' member. + :param class user_data: arbitrary data to pass as a second argument to most of the callbacks. + This works with all callbacks but the "on_message" and those added via add_topic_callback(); + for those, to get access to the user_data use the 'user_data' member of the MQTT object + passed as 1st argument. """ @@ -415,6 +416,9 @@ def add_topic_callback(self, mqtt_topic: str, callback_method) -> None: :param str mqtt_topic: MQTT topic identifier. :param function callback_method: The callback method. + + Expected method signature is ``on_message(client, topic, message)`` + To get access to the user_data, use the client argument. """ if mqtt_topic is None or callback_method is None: raise ValueError("MQTT topic and callback method must both be defined.") @@ -439,6 +443,7 @@ def on_message(self): """Called when a new message has been received on a subscribed topic. Expected method signature is ``on_message(client, topic, message)`` + To get access to the user_data, use the client argument. """ return self._on_message From 40fbdae5b62f6ded69e8cad7e9f3b8d9cf058f0a Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Fri, 24 Nov 2023 22:00:03 +0100 Subject: [PATCH 3/5] add example code --- examples/cpython/user_data.py | 98 +++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 examples/cpython/user_data.py diff --git a/examples/cpython/user_data.py b/examples/cpython/user_data.py new file mode 100644 index 00000000..1523f0e9 --- /dev/null +++ b/examples/cpython/user_data.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 + +# pylint: disable=logging-fstring-interpolation + +""" +Demonstrate on how to use user_data for various callbacks. +""" + +import logging +import socket +import ssl +import sys + +import adafruit_minimqtt.adafruit_minimqtt as MQTT + + +# pylint: disable=unused-argument +def on_connect(mqtt_client, user_data, flags, ret_code): + """ + connect callback + """ + logger = logging.getLogger(__name__) + logger.debug("Connected to MQTT Broker!") + logger.debug(f"Flags: {flags}\n RC: {ret_code}") + + +# pylint: disable=unused-argument +def on_subscribe(mqtt_client, user_data, topic, granted_qos): + """ + subscribe callback + """ + logger = logging.getLogger(__name__) + logger.debug(f"Subscribed to {topic} with QOS level {granted_qos}") + + +def on_message(client, topic, message): + """ + received message callback + """ + logger = logging.getLogger(__name__) + logger.debug(f"New message on topic {topic}: {message}") + + messages = client.user_data + if not messages.get(topic): + messages[topic] = [] + messages[topic].append(message) + + +# pylint: disable=too-many-statements,too-many-locals +def main(): + """ + Main loop. + """ + + logging.basicConfig() + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + + # dictionary/map of topic to list of messages + messages = {} + + # connect to MQTT broker + mqtt = MQTT.MQTT( + broker="172.40.0.3", + port=1883, + socket_pool=socket, + ssl_context=ssl.create_default_context(), + user_data=messages, + ) + + mqtt.on_connect = on_connect + mqtt.on_subscribe = on_subscribe + mqtt.on_message = on_message + + logger.info("Connecting to MQTT broker") + mqtt.connect() + logger.info("Subscribing") + mqtt.subscribe("foo/#", qos=0) + mqtt.add_topic_callback("foo/bar", on_message) + + i = 0 + while True: + i += 1 + logger.debug(f"Loop {i}") + # Make sure to stay connected to the broker e.g. in case of keep alive. + mqtt.loop(1) + + for topic, msg_list in messages.items(): + logger.info(f"Got {len(msg_list)} messages from topic {topic}") + for msg_cnt, msg in enumerate(msg_list): + logger.debug(f"#{msg_cnt}: {msg}") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + sys.exit(0) From 9a1ecba332dff67d51ace79736dfdae58f7443b5 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Fri, 24 Nov 2023 22:00:10 +0100 Subject: [PATCH 4/5] improve doc --- adafruit_minimqtt/adafruit_minimqtt.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 76ef083f..0d092e4f 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -419,6 +419,8 @@ def add_topic_callback(self, mqtt_topic: str, callback_method) -> None: Expected method signature is ``on_message(client, topic, message)`` To get access to the user_data, use the client argument. + + If a callback is called for the topic, then any "on_message" callback will not be called. """ if mqtt_topic is None or callback_method is None: raise ValueError("MQTT topic and callback method must both be defined.") From 66309c1269c750bfdee7562e50e0a825ed3692c7 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Fri, 24 Nov 2023 22:02:49 +0100 Subject: [PATCH 5/5] add header --- examples/cpython/user_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/cpython/user_data.py b/examples/cpython/user_data.py index 1523f0e9..dd19c275 100644 --- a/examples/cpython/user_data.py +++ b/examples/cpython/user_data.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python3 +# SPDX-FileCopyrightText: 2023 VladimĂ­r Kotal +# SPDX-License-Identifier: Unlicense # pylint: disable=logging-fstring-interpolation