From 9db4fa447445a7d7339ba0600194cfd929e5c508 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 08:01:15 -0600 Subject: [PATCH 01/10] first pass for mqtt_trigger --- custom_components/pyscript/__init__.py | 2 ++ custom_components/pyscript/eval.py | 3 +++ custom_components/pyscript/trigger.py | 24 +++++++++++++++++++++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/custom_components/pyscript/__init__.py b/custom_components/pyscript/__init__.py index f3e6514..34ebb2a 100644 --- a/custom_components/pyscript/__init__.py +++ b/custom_components/pyscript/__init__.py @@ -32,6 +32,7 @@ ) from .eval import AstEval from .event import Event +from .mqtt import Mqtt from .function import Function from .global_ctx import GlobalContext, GlobalContextMgr from .jupyter_kernel import Kernel @@ -118,6 +119,7 @@ async def async_setup_entry(hass, config_entry): Function.init(hass) Event.init(hass) + Mqtt.init(hass) TrigTime.init(hass) State.init(hass) State.register_functions() diff --git a/custom_components/pyscript/eval.py b/custom_components/pyscript/eval.py index 96f9d40..ad57d57 100644 --- a/custom_components/pyscript/eval.py +++ b/custom_components/pyscript/eval.py @@ -278,11 +278,13 @@ async def trigger_init(self): "time_trigger", "state_trigger", "event_trigger", + "mqtt_trigger", } trig_decorators = { "time_trigger", "state_trigger", "event_trigger", + "mqtt_trigger", "state_active", "time_active", "task_unique", @@ -393,6 +395,7 @@ async def do_service_call(func, ast_ctx, data): # arg_check = { "event_trigger": {"arg_cnt": {1, 2}}, + "mqtt_trigger": {"arg_cnt": {1, 2}}, "state_active": {"arg_cnt": {1}}, "state_trigger": {"arg_cnt": {"*"}, "type": {list, set}}, "task_unique": {"arg_cnt": {1}}, diff --git a/custom_components/pyscript/trigger.py b/custom_components/pyscript/trigger.py index 6537e74..b82b55b 100644 --- a/custom_components/pyscript/trigger.py +++ b/custom_components/pyscript/trigger.py @@ -16,6 +16,7 @@ from .const import LOGGER_PATH from .eval import AstEval from .event import Event +from .mqtt import Mqtt from .function import Function from .state import STATE_VIRTUAL_ATTRS, State @@ -149,6 +150,7 @@ async def wait_until( state_check_now=True, time_trigger=None, event_trigger=None, + mqtt_trigger=None, timeout=None, state_hold=None, state_hold_false=None, @@ -260,6 +262,8 @@ async def wait_until( State.notify_del(state_trig_ident, notify_q) raise exc Event.notify_add(event_trigger[0], notify_q) + if mqtt_trigger is not None: + await Mqtt.notify_add(mqtt_trigger[0], notify_q) time0 = time.monotonic() if __test_handshake__: @@ -297,7 +301,7 @@ async def wait_until( this_timeout = time_left state_trig_timeout = True if this_timeout is None: - if state_trigger is None and event_trigger is None: + if state_trigger is None and event_trigger is None and mqtt_trigger is None: _LOGGER.debug( "trigger %s wait_until no next time - returning with none", ast_ctx.name, ) @@ -403,6 +407,9 @@ async def wait_until( if event_trig_ok: ret = notify_info break + elif notify_type == "mqtt": + ret = notify_info + break else: _LOGGER.error( "trigger %s wait_until got unexpected queue message %s", ast_ctx.name, notify_type, @@ -412,6 +419,8 @@ async def wait_until( State.notify_del(state_trig_ident, notify_q) if event_trigger is not None: Event.notify_del(event_trigger[0], notify_q) + if mqtt_trigger is not None: + Mqtt.notify_del(mqtt_trigger[0], notify_q) if exc: raise exc return ret @@ -641,6 +650,7 @@ def __init__( self.state_check_now = self.state_trigger_kwargs.get("state_check_now", False) self.time_trigger = trig_cfg.get("time_trigger", {}).get("args", None) self.event_trigger = trig_cfg.get("event_trigger", {}).get("args", None) + self.mqtt_trigger = trig_cfg.get("mqtt_trigger", {}).get("args", None) self.state_active = trig_cfg.get("state_active", {}).get("args", None) self.time_active = trig_cfg.get("time_active", {}).get("args", None) self.time_active_hold_off = trig_cfg.get("time_active", {}).get("kwargs", {}).get("hold_off", None) @@ -726,6 +736,9 @@ def __init__( return self.have_trigger = True + if self.mqtt_trigger is not None: + self.have_trigger = True + self.setup_ok = True def stop(self): @@ -736,6 +749,8 @@ def stop(self): State.notify_del(self.state_trig_ident, self.notify_q) if self.event_trigger is not None: Event.notify_del(self.event_trigger[0], self.notify_q) + if self.mqtt_trigger is not None: + Mqtt.notify_del(self.mqtt_trigger[0], self.notify_q) if self.task: Function.task_cancel(self.task) @@ -765,6 +780,9 @@ async def trigger_watch(self): if self.event_trigger is not None: _LOGGER.debug("trigger %s adding event_trigger %s", self.name, self.event_trigger[0]) Event.notify_add(self.event_trigger[0], self.notify_q) + if self.mqtt_trigger is not None: + _LOGGER.debug("trigger %s adding mqtt_trigger %s", self.name, self.mqtt_trigger[0]) + await Mqtt.notify_add(self.mqtt_trigger[0], self.notify_q) last_trig_time = None last_state_trig_time = None @@ -924,6 +942,8 @@ async def trigger_watch(self): func_args = notify_info if self.event_trig_expr: trig_ok = await self.event_trig_expr.eval(notify_info) + elif notify_type == "mqtt": + func_args = notify_info else: func_args = notify_info @@ -1038,4 +1058,6 @@ async def do_func_call(func, ast_ctx, task_unique, task_unique_func, hass_contex State.notify_del(self.state_trig_ident, self.notify_q) if self.event_trigger is not None: Event.notify_del(self.event_trigger[0], self.notify_q) + if self.mqtt_trigger is not None: + Mqtt.notify_del(self.mqtt_trigger[0], self.notify_q) return From f805f495be1e6070624af1552e5315c771f1b413 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 08:01:30 -0600 Subject: [PATCH 02/10] first pass for mqtt_trigger --- custom_components/pyscript/mqtt.py | 86 ++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 custom_components/pyscript/mqtt.py diff --git a/custom_components/pyscript/mqtt.py b/custom_components/pyscript/mqtt.py new file mode 100644 index 0000000..179083c --- /dev/null +++ b/custom_components/pyscript/mqtt.py @@ -0,0 +1,86 @@ +"""Handles event firing and notification.""" + +import logging +import json +from homeassistant.components import mqtt + + +from .const import LOGGER_PATH + +_LOGGER = logging.getLogger(LOGGER_PATH + ".mqtt") + + +class Mqtt: + """Define mqtt functions.""" + + # + # Global hass instance + # + hass = None + + # + # notify message queues by event type + # + notify = {} + notify_remove = {} + + def __init__(self): + """Warn on Mqtt instantiation.""" + _LOGGER.error("Mqtt class is not meant to be instantiated") + + @classmethod + def init(cls, hass): + """Initialize Mqtt.""" + + cls.hass = hass + + @classmethod + async def mqtt_message_handler(cls, mqttmsg): + """Listen for MQTT messages.""" + func_args = { + "trigger_type": "mqtt", + "topic": mqttmsg.topic, + "payload": mqttmsg.payload, + "qos": mqttmsg.qos, + } + + try: + func_args["payload_json"] = json.loads(mqttmsg.payload) + except ValueError: + pass + + await cls.update(mqttmsg.topic, func_args) + + @classmethod + async def notify_add(cls, topic, queue): + """Register to notify for mqtt message of given topic to be sent to queue.""" + + if topic not in cls.notify: + cls.notify[topic] = set() + _LOGGER.debug("mqtt.notify_add(%s) -> adding mqtt subscription", topic) + cls.notify_remove[topic] = await mqtt.async_subscribe( + cls.hass, topic, cls.mqtt_message_handler, encoding='utf-8', qos=0 + ) + cls.notify[topic].add(queue) + + @classmethod + def notify_del(cls, topic, queue): + """Unregister to notify for events of given type for given queue.""" + + if topic not in cls.notify or queue not in cls.notify[topic]: + return + cls.notify[topic].discard(queue) + if len(cls.notify[topic]) == 0: + cls.notify_remove[topic]() + _LOGGER.debug("mqtt.notify_del(%s) -> removing mqtt subscription", topic) + del cls.notify[topic] + del cls.notify_remove[topic] + + @classmethod + async def update(cls, topic, func_args): + """Deliver all notifications for an event of the given type.""" + + _LOGGER.debug("mqtt.update(%s, %s, %s)", topic, vars, func_args) + if topic in cls.notify: + for queue in cls.notify[topic]: + await queue.put(["mqtt", func_args]) From f1641e7e440b017126af6f7dec9b5e5e43c1e728 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 08:08:04 -0600 Subject: [PATCH 03/10] rename docstrings from copy/paste --- custom_components/pyscript/mqtt.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/custom_components/pyscript/mqtt.py b/custom_components/pyscript/mqtt.py index 179083c..2d041ea 100644 --- a/custom_components/pyscript/mqtt.py +++ b/custom_components/pyscript/mqtt.py @@ -1,4 +1,4 @@ -"""Handles event firing and notification.""" +"""Handles mqtt messages and notification.""" import logging import json @@ -19,7 +19,7 @@ class Mqtt: hass = None # - # notify message queues by event type + # notify message queues by mqtt message topic # notify = {} notify_remove = {} @@ -53,7 +53,7 @@ async def mqtt_message_handler(cls, mqttmsg): @classmethod async def notify_add(cls, topic, queue): - """Register to notify for mqtt message of given topic to be sent to queue.""" + """Register to notify for mqtt messages of given topic to be sent to queue.""" if topic not in cls.notify: cls.notify[topic] = set() @@ -65,7 +65,7 @@ async def notify_add(cls, topic, queue): @classmethod def notify_del(cls, topic, queue): - """Unregister to notify for events of given type for given queue.""" + """Unregister to notify for mqtt messages of given topic for given queue.""" if topic not in cls.notify or queue not in cls.notify[topic]: return @@ -78,7 +78,7 @@ def notify_del(cls, topic, queue): @classmethod async def update(cls, topic, func_args): - """Deliver all notifications for an event of the given type.""" + """Deliver all notifications for an mqtt message on the given topic.""" _LOGGER.debug("mqtt.update(%s, %s, %s)", topic, vars, func_args) if topic in cls.notify: From dd5028b6ef5b468c772bf6cadf209fe50f7b459b Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 08:39:38 -0600 Subject: [PATCH 04/10] support conditionals in mqtt_trigger --- custom_components/pyscript/trigger.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/custom_components/pyscript/trigger.py b/custom_components/pyscript/trigger.py index b82b55b..42de804 100644 --- a/custom_components/pyscript/trigger.py +++ b/custom_components/pyscript/trigger.py @@ -666,6 +666,7 @@ def __init__( self.state_trig_ident = None self.state_trig_ident_any = set() self.event_trig_expr = None + self.mqtt_trig_expr = None self.have_trigger = False self.setup_ok = False self.run_on_startup = False @@ -737,6 +738,16 @@ def __init__( self.have_trigger = True if self.mqtt_trigger is not None: + if len(self.mqtt_trigger) == 2: + self.mqtt_trig_expr = AstEval( + f"{self.name} @mqtt_trigger()", self.global_ctx, logger_name=self.name, + ) + Function.install_ast_funcs(self.mqtt_trig_expr) + self.mqtt_trig_expr.parse(self.mqtt_trigger[1], mode="eval") + exc = self.mqtt_trig_expr.get_exception_long() + if exc is not None: + self.mqtt_trig_expr.get_logger().error(exc) + return self.have_trigger = True self.setup_ok = True @@ -944,6 +955,8 @@ async def trigger_watch(self): trig_ok = await self.event_trig_expr.eval(notify_info) elif notify_type == "mqtt": func_args = notify_info + if self.mqtt_trig_expr: + trig_ok = await self.mqtt_trig_expr.eval(notify_info) else: func_args = notify_info From 1f36234bc937d5a1fe084e4d2aad881c34083b23 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 08:50:58 -0600 Subject: [PATCH 05/10] handle wildcards properly --- custom_components/pyscript/mqtt.py | 36 +++++++++++++++++------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/custom_components/pyscript/mqtt.py b/custom_components/pyscript/mqtt.py index 2d041ea..f0a8d59 100644 --- a/custom_components/pyscript/mqtt.py +++ b/custom_components/pyscript/mqtt.py @@ -35,21 +35,25 @@ def init(cls, hass): cls.hass = hass @classmethod - async def mqtt_message_handler(cls, mqttmsg): - """Listen for MQTT messages.""" - func_args = { - "trigger_type": "mqtt", - "topic": mqttmsg.topic, - "payload": mqttmsg.payload, - "qos": mqttmsg.qos, - } - - try: - func_args["payload_json"] = json.loads(mqttmsg.payload) - except ValueError: - pass - - await cls.update(mqttmsg.topic, func_args) + def mqtt_message_handler_maker(cls, subscribed_topic): + + async def mqtt_message_handler(mqttmsg): + """Listen for MQTT messages.""" + func_args = { + "trigger_type": "mqtt", + "topic": mqttmsg.topic, + "payload": mqttmsg.payload, + "qos": mqttmsg.qos, + } + + try: + func_args["payload_json"] = json.loads(mqttmsg.payload) + except ValueError: + pass + + await cls.update(subscribed_topic, func_args) + + return mqtt_message_handler @classmethod async def notify_add(cls, topic, queue): @@ -59,7 +63,7 @@ async def notify_add(cls, topic, queue): cls.notify[topic] = set() _LOGGER.debug("mqtt.notify_add(%s) -> adding mqtt subscription", topic) cls.notify_remove[topic] = await mqtt.async_subscribe( - cls.hass, topic, cls.mqtt_message_handler, encoding='utf-8', qos=0 + cls.hass, topic, cls.mqtt_message_handler_maker(topic), encoding='utf-8', qos=0 ) cls.notify[topic].add(queue) From d7987b2a40fc513c2c12f9f3ef9d881d19b6bda3 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 09:36:53 -0600 Subject: [PATCH 06/10] make task.wait_until work --- custom_components/pyscript/trigger.py | 30 ++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/custom_components/pyscript/trigger.py b/custom_components/pyscript/trigger.py index 42de804..e4a73c1 100644 --- a/custom_components/pyscript/trigger.py +++ b/custom_components/pyscript/trigger.py @@ -157,7 +157,7 @@ async def wait_until( __test_handshake__=None, ): """Wait for zero or more triggers, until an optional timeout.""" - if state_trigger is None and time_trigger is None and event_trigger is None: + if state_trigger is None and time_trigger is None and event_trigger is None and mqtt_trigger is None: if timeout is not None: await asyncio.sleep(timeout) return {"trigger_type": "timeout"} @@ -166,6 +166,7 @@ async def wait_until( state_trig_ident_any = set() state_trig_eval = None event_trig_expr = None + mqtt_trig_expr = None exc = None notify_q = asyncio.Queue(0) @@ -263,6 +264,21 @@ async def wait_until( raise exc Event.notify_add(event_trigger[0], notify_q) if mqtt_trigger is not None: + if isinstance(mqtt_trigger, str): + mqtt_trigger = [mqtt_trigger] + if len(mqtt_trigger) > 1: + mqtt_trig_expr = AstEval( + f"{ast_ctx.name} mqtt_trigger", + ast_ctx.get_global_ctx(), + logger_name=ast_ctx.get_logger_name(), + ) + Function.install_ast_funcs(mqtt_trig_expr) + mqtt_trig_expr.parse(mqtt_trigger[1], mode="eval") + exc = mqtt_trig_expr.get_exception_obj() + if exc is not None: + if len(state_trig_ident) > 0: + State.notify_del(state_trig_ident, notify_q) + raise exc await Mqtt.notify_add(mqtt_trigger[0], notify_q) time0 = time.monotonic() @@ -408,8 +424,16 @@ async def wait_until( ret = notify_info break elif notify_type == "mqtt": - ret = notify_info - break + if mqtt_trig_expr is None: + ret = notify_info + break + mqtt_trig_ok = await mqtt_trig_expr.eval(notify_info) + exc = mqtt_trig_expr.get_exception_obj() + if exc is not None: + break + if mqtt_trig_ok: + ret = notify_info + break else: _LOGGER.error( "trigger %s wait_until got unexpected queue message %s", ast_ctx.name, notify_type, From fce2e11110c5e06d2a3e608d82d148f7f32f921e Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 10:15:56 -0600 Subject: [PATCH 07/10] fix tests --- tests/test_decorator_errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_decorator_errors.py b/tests/test_decorator_errors.py index 7cd60b1..94e80ca 100644 --- a/tests/test_decorator_errors.py +++ b/tests/test_decorator_errors.py @@ -179,7 +179,7 @@ def func_wrapup(): ) assert "SyntaxError: invalid syntax (file.hello.func3 @state_active(), line 1)" in caplog.text assert ( - "func4 defined in file.hello: needs at least one trigger decorator (ie: event_trigger, state_trigger, time_trigger)" + "func4 defined in file.hello: needs at least one trigger decorator (ie: event_trigger, mqtt_trigger, state_trigger, time_trigger)" in caplog.text ) assert ( From d1e63ea765f96c98cc9f3f44d5593fd70d4573f8 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 10:18:22 -0600 Subject: [PATCH 08/10] add docstring --- custom_components/pyscript/mqtt.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/custom_components/pyscript/mqtt.py b/custom_components/pyscript/mqtt.py index f0a8d59..0d78aeb 100644 --- a/custom_components/pyscript/mqtt.py +++ b/custom_components/pyscript/mqtt.py @@ -36,7 +36,8 @@ def init(cls, hass): @classmethod def mqtt_message_handler_maker(cls, subscribed_topic): - + """closure for mqtt_message_handler""" + async def mqtt_message_handler(mqttmsg): """Listen for MQTT messages.""" func_args = { From f3a663fd92611290343c0ae65f89229ee5d97a9a Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 20:46:42 -0600 Subject: [PATCH 09/10] docs for mqtt_trigger --- docs/reference.rst | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/docs/reference.rst b/docs/reference.rst index 6aaad13..0abe530 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -569,6 +569,33 @@ see that the ``EVENT_CALL_SERVICE`` event has parameters ``domain`` set to ``lig This `wiki page `__ gives more examples of built-in and user events and how to create triggers for them. +@mqtt_trigger +^^^^^^^^^^^^^ + +.. code:: python + + @mqtt_trigger(topic, str_expr=None) + +``@mqtt_trigger`` subscribes to the given MQTT ``topic`` and triggers whenever a message is received +on that topic. An optional ``str_expr`` can be used to match the MQTT message data, and the trigger +will only occur if that expression evaluates to ``True`` or non-zero. This expression has available +these four variables: + +- ``trigger_type`` is set to “mqtt” +- ``topic`` is set to the topic the message was received on +- ``payload`` is set to the string payload of the message +- ``payload_json`` if the payload was valid JSON, this will be set to the native python object + representing that payload. + +When the ``@mqtt_trigger`` occurs, those same variables are passed as keyword arguments to the +function in case it needs them. + +Wildcards in topics are supported. The ``topic`` variables will be set to the full expanded topic +the message arrived on. + +NOTE: The `MQTT Integration in Home Assistant `__ +must be set up to use ``@mqtt_trigger``. + @task_unique ^^^^^^^^^^^^ @@ -859,6 +886,9 @@ It takes the following keyword arguments (all are optional): - ``event_trigger=None`` can be set to a string or list of two strings, just like ``@event_trigger``. The first string is the name of the event, and the second string (when the setting is a two-element list) is an expression based on the event parameters. +- ``mqtt_trigger=None`` can be set to a string or list of two strings, just like + ``@mqtt_trigger``. The first string is the MQTT topic, and the second string + (when the setting is a two-element list) is an expression based on the message variables. - ``timeout=None`` an overall timeout in seconds, which can be floating point. - ``state_check_now=True`` if set, ``task.wait_until()`` checks any ``state_trigger`` immediately to see if it is already ``True``, and will return immediately if so. If From 63bb84352f8579c73509442e718d262a03f66721 Mon Sep 17 00:00:00 2001 From: Daniel Lashua Date: Tue, 1 Dec 2020 21:38:39 -0600 Subject: [PATCH 10/10] rename payload_json to payload_obj --- custom_components/pyscript/mqtt.py | 2 +- docs/reference.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/custom_components/pyscript/mqtt.py b/custom_components/pyscript/mqtt.py index 0d78aeb..50c5158 100644 --- a/custom_components/pyscript/mqtt.py +++ b/custom_components/pyscript/mqtt.py @@ -48,7 +48,7 @@ async def mqtt_message_handler(mqttmsg): } try: - func_args["payload_json"] = json.loads(mqttmsg.payload) + func_args["payload_obj"] = json.loads(mqttmsg.payload) except ValueError: pass diff --git a/docs/reference.rst b/docs/reference.rst index 0abe530..f5a6c14 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -584,7 +584,7 @@ these four variables: - ``trigger_type`` is set to “mqtt” - ``topic`` is set to the topic the message was received on - ``payload`` is set to the string payload of the message -- ``payload_json`` if the payload was valid JSON, this will be set to the native python object +- ``payload_obj`` if the payload was valid JSON, this will be set to the native python object representing that payload. When the ``@mqtt_trigger`` occurs, those same variables are passed as keyword arguments to the