Skip to content

Commit cfbada0

Browse files
authored
Merge pull request #105 from dlashua/mqtt_trigger
add @mqtt_trigger
2 parents b380b9d + 63bb843 commit cfbada0

File tree

6 files changed

+188
-3
lines changed

6 files changed

+188
-3
lines changed

custom_components/pyscript/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
)
3333
from .eval import AstEval
3434
from .event import Event
35+
from .mqtt import Mqtt
3536
from .function import Function
3637
from .global_ctx import GlobalContext, GlobalContextMgr
3738
from .jupyter_kernel import Kernel
@@ -118,6 +119,7 @@ async def async_setup_entry(hass, config_entry):
118119

119120
Function.init(hass)
120121
Event.init(hass)
122+
Mqtt.init(hass)
121123
TrigTime.init(hass)
122124
State.init(hass)
123125
State.register_functions()

custom_components/pyscript/eval.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,13 @@ async def trigger_init(self):
278278
"time_trigger",
279279
"state_trigger",
280280
"event_trigger",
281+
"mqtt_trigger",
281282
}
282283
trig_decorators = {
283284
"time_trigger",
284285
"state_trigger",
285286
"event_trigger",
287+
"mqtt_trigger",
286288
"state_active",
287289
"time_active",
288290
"task_unique",
@@ -393,6 +395,7 @@ async def do_service_call(func, ast_ctx, data):
393395
#
394396
arg_check = {
395397
"event_trigger": {"arg_cnt": {1, 2}},
398+
"mqtt_trigger": {"arg_cnt": {1, 2}},
396399
"state_active": {"arg_cnt": {1}},
397400
"state_trigger": {"arg_cnt": {"*"}, "type": {list, set}},
398401
"task_unique": {"arg_cnt": {1}},

custom_components/pyscript/mqtt.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""Handles mqtt messages and notification."""
2+
3+
import logging
4+
import json
5+
from homeassistant.components import mqtt
6+
7+
8+
from .const import LOGGER_PATH
9+
10+
_LOGGER = logging.getLogger(LOGGER_PATH + ".mqtt")
11+
12+
13+
class Mqtt:
14+
"""Define mqtt functions."""
15+
16+
#
17+
# Global hass instance
18+
#
19+
hass = None
20+
21+
#
22+
# notify message queues by mqtt message topic
23+
#
24+
notify = {}
25+
notify_remove = {}
26+
27+
def __init__(self):
28+
"""Warn on Mqtt instantiation."""
29+
_LOGGER.error("Mqtt class is not meant to be instantiated")
30+
31+
@classmethod
32+
def init(cls, hass):
33+
"""Initialize Mqtt."""
34+
35+
cls.hass = hass
36+
37+
@classmethod
38+
def mqtt_message_handler_maker(cls, subscribed_topic):
39+
"""closure for mqtt_message_handler"""
40+
41+
async def mqtt_message_handler(mqttmsg):
42+
"""Listen for MQTT messages."""
43+
func_args = {
44+
"trigger_type": "mqtt",
45+
"topic": mqttmsg.topic,
46+
"payload": mqttmsg.payload,
47+
"qos": mqttmsg.qos,
48+
}
49+
50+
try:
51+
func_args["payload_obj"] = json.loads(mqttmsg.payload)
52+
except ValueError:
53+
pass
54+
55+
await cls.update(subscribed_topic, func_args)
56+
57+
return mqtt_message_handler
58+
59+
@classmethod
60+
async def notify_add(cls, topic, queue):
61+
"""Register to notify for mqtt messages of given topic to be sent to queue."""
62+
63+
if topic not in cls.notify:
64+
cls.notify[topic] = set()
65+
_LOGGER.debug("mqtt.notify_add(%s) -> adding mqtt subscription", topic)
66+
cls.notify_remove[topic] = await mqtt.async_subscribe(
67+
cls.hass, topic, cls.mqtt_message_handler_maker(topic), encoding='utf-8', qos=0
68+
)
69+
cls.notify[topic].add(queue)
70+
71+
@classmethod
72+
def notify_del(cls, topic, queue):
73+
"""Unregister to notify for mqtt messages of given topic for given queue."""
74+
75+
if topic not in cls.notify or queue not in cls.notify[topic]:
76+
return
77+
cls.notify[topic].discard(queue)
78+
if len(cls.notify[topic]) == 0:
79+
cls.notify_remove[topic]()
80+
_LOGGER.debug("mqtt.notify_del(%s) -> removing mqtt subscription", topic)
81+
del cls.notify[topic]
82+
del cls.notify_remove[topic]
83+
84+
@classmethod
85+
async def update(cls, topic, func_args):
86+
"""Deliver all notifications for an mqtt message on the given topic."""
87+
88+
_LOGGER.debug("mqtt.update(%s, %s, %s)", topic, vars, func_args)
89+
if topic in cls.notify:
90+
for queue in cls.notify[topic]:
91+
await queue.put(["mqtt", func_args])

custom_components/pyscript/trigger.py

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from .const import LOGGER_PATH
1717
from .eval import AstEval
1818
from .event import Event
19+
from .mqtt import Mqtt
1920
from .function import Function
2021
from .state import STATE_VIRTUAL_ATTRS, State
2122

@@ -149,13 +150,14 @@ async def wait_until(
149150
state_check_now=True,
150151
time_trigger=None,
151152
event_trigger=None,
153+
mqtt_trigger=None,
152154
timeout=None,
153155
state_hold=None,
154156
state_hold_false=None,
155157
__test_handshake__=None,
156158
):
157159
"""Wait for zero or more triggers, until an optional timeout."""
158-
if state_trigger is None and time_trigger is None and event_trigger is None:
160+
if state_trigger is None and time_trigger is None and event_trigger is None and mqtt_trigger is None:
159161
if timeout is not None:
160162
await asyncio.sleep(timeout)
161163
return {"trigger_type": "timeout"}
@@ -164,6 +166,7 @@ async def wait_until(
164166
state_trig_ident_any = set()
165167
state_trig_eval = None
166168
event_trig_expr = None
169+
mqtt_trig_expr = None
167170
exc = None
168171
notify_q = asyncio.Queue(0)
169172

@@ -260,6 +263,23 @@ async def wait_until(
260263
State.notify_del(state_trig_ident, notify_q)
261264
raise exc
262265
Event.notify_add(event_trigger[0], notify_q)
266+
if mqtt_trigger is not None:
267+
if isinstance(mqtt_trigger, str):
268+
mqtt_trigger = [mqtt_trigger]
269+
if len(mqtt_trigger) > 1:
270+
mqtt_trig_expr = AstEval(
271+
f"{ast_ctx.name} mqtt_trigger",
272+
ast_ctx.get_global_ctx(),
273+
logger_name=ast_ctx.get_logger_name(),
274+
)
275+
Function.install_ast_funcs(mqtt_trig_expr)
276+
mqtt_trig_expr.parse(mqtt_trigger[1], mode="eval")
277+
exc = mqtt_trig_expr.get_exception_obj()
278+
if exc is not None:
279+
if len(state_trig_ident) > 0:
280+
State.notify_del(state_trig_ident, notify_q)
281+
raise exc
282+
await Mqtt.notify_add(mqtt_trigger[0], notify_q)
263283
time0 = time.monotonic()
264284

265285
if __test_handshake__:
@@ -297,7 +317,7 @@ async def wait_until(
297317
this_timeout = time_left
298318
state_trig_timeout = True
299319
if this_timeout is None:
300-
if state_trigger is None and event_trigger is None:
320+
if state_trigger is None and event_trigger is None and mqtt_trigger is None:
301321
_LOGGER.debug(
302322
"trigger %s wait_until no next time - returning with none", ast_ctx.name,
303323
)
@@ -403,6 +423,17 @@ async def wait_until(
403423
if event_trig_ok:
404424
ret = notify_info
405425
break
426+
elif notify_type == "mqtt":
427+
if mqtt_trig_expr is None:
428+
ret = notify_info
429+
break
430+
mqtt_trig_ok = await mqtt_trig_expr.eval(notify_info)
431+
exc = mqtt_trig_expr.get_exception_obj()
432+
if exc is not None:
433+
break
434+
if mqtt_trig_ok:
435+
ret = notify_info
436+
break
406437
else:
407438
_LOGGER.error(
408439
"trigger %s wait_until got unexpected queue message %s", ast_ctx.name, notify_type,
@@ -412,6 +443,8 @@ async def wait_until(
412443
State.notify_del(state_trig_ident, notify_q)
413444
if event_trigger is not None:
414445
Event.notify_del(event_trigger[0], notify_q)
446+
if mqtt_trigger is not None:
447+
Mqtt.notify_del(mqtt_trigger[0], notify_q)
415448
if exc:
416449
raise exc
417450
return ret
@@ -641,6 +674,7 @@ def __init__(
641674
self.state_check_now = self.state_trigger_kwargs.get("state_check_now", False)
642675
self.time_trigger = trig_cfg.get("time_trigger", {}).get("args", None)
643676
self.event_trigger = trig_cfg.get("event_trigger", {}).get("args", None)
677+
self.mqtt_trigger = trig_cfg.get("mqtt_trigger", {}).get("args", None)
644678
self.state_active = trig_cfg.get("state_active", {}).get("args", None)
645679
self.time_active = trig_cfg.get("time_active", {}).get("args", None)
646680
self.time_active_hold_off = trig_cfg.get("time_active", {}).get("kwargs", {}).get("hold_off", None)
@@ -656,6 +690,7 @@ def __init__(
656690
self.state_trig_ident = None
657691
self.state_trig_ident_any = set()
658692
self.event_trig_expr = None
693+
self.mqtt_trig_expr = None
659694
self.have_trigger = False
660695
self.setup_ok = False
661696
self.run_on_startup = False
@@ -726,6 +761,19 @@ def __init__(
726761
return
727762
self.have_trigger = True
728763

764+
if self.mqtt_trigger is not None:
765+
if len(self.mqtt_trigger) == 2:
766+
self.mqtt_trig_expr = AstEval(
767+
f"{self.name} @mqtt_trigger()", self.global_ctx, logger_name=self.name,
768+
)
769+
Function.install_ast_funcs(self.mqtt_trig_expr)
770+
self.mqtt_trig_expr.parse(self.mqtt_trigger[1], mode="eval")
771+
exc = self.mqtt_trig_expr.get_exception_long()
772+
if exc is not None:
773+
self.mqtt_trig_expr.get_logger().error(exc)
774+
return
775+
self.have_trigger = True
776+
729777
self.setup_ok = True
730778

731779
def stop(self):
@@ -736,6 +784,8 @@ def stop(self):
736784
State.notify_del(self.state_trig_ident, self.notify_q)
737785
if self.event_trigger is not None:
738786
Event.notify_del(self.event_trigger[0], self.notify_q)
787+
if self.mqtt_trigger is not None:
788+
Mqtt.notify_del(self.mqtt_trigger[0], self.notify_q)
739789
if self.task:
740790
Function.task_cancel(self.task)
741791

@@ -765,6 +815,9 @@ async def trigger_watch(self):
765815
if self.event_trigger is not None:
766816
_LOGGER.debug("trigger %s adding event_trigger %s", self.name, self.event_trigger[0])
767817
Event.notify_add(self.event_trigger[0], self.notify_q)
818+
if self.mqtt_trigger is not None:
819+
_LOGGER.debug("trigger %s adding mqtt_trigger %s", self.name, self.mqtt_trigger[0])
820+
await Mqtt.notify_add(self.mqtt_trigger[0], self.notify_q)
768821

769822
last_trig_time = None
770823
last_state_trig_time = None
@@ -924,6 +977,10 @@ async def trigger_watch(self):
924977
func_args = notify_info
925978
if self.event_trig_expr:
926979
trig_ok = await self.event_trig_expr.eval(notify_info)
980+
elif notify_type == "mqtt":
981+
func_args = notify_info
982+
if self.mqtt_trig_expr:
983+
trig_ok = await self.mqtt_trig_expr.eval(notify_info)
927984

928985
else:
929986
func_args = notify_info
@@ -1038,4 +1095,6 @@ async def do_func_call(func, ast_ctx, task_unique, task_unique_func, hass_contex
10381095
State.notify_del(self.state_trig_ident, self.notify_q)
10391096
if self.event_trigger is not None:
10401097
Event.notify_del(self.event_trigger[0], self.notify_q)
1098+
if self.mqtt_trigger is not None:
1099+
Mqtt.notify_del(self.mqtt_trigger[0], self.notify_q)
10411100
return

docs/reference.rst

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,33 @@ see that the ``EVENT_CALL_SERVICE`` event has parameters ``domain`` set to ``lig
569569
This `wiki page <https://github.com/custom-components/pyscript/wiki/Event-based-triggers>`__ gives
570570
more examples of built-in and user events and how to create triggers for them.
571571

572+
@mqtt_trigger
573+
^^^^^^^^^^^^^
574+
575+
.. code:: python
576+
577+
@mqtt_trigger(topic, str_expr=None)
578+
579+
``@mqtt_trigger`` subscribes to the given MQTT ``topic`` and triggers whenever a message is received
580+
on that topic. An optional ``str_expr`` can be used to match the MQTT message data, and the trigger
581+
will only occur if that expression evaluates to ``True`` or non-zero. This expression has available
582+
these four variables:
583+
584+
- ``trigger_type`` is set to “mqtt”
585+
- ``topic`` is set to the topic the message was received on
586+
- ``payload`` is set to the string payload of the message
587+
- ``payload_obj`` if the payload was valid JSON, this will be set to the native python object
588+
representing that payload.
589+
590+
When the ``@mqtt_trigger`` occurs, those same variables are passed as keyword arguments to the
591+
function in case it needs them.
592+
593+
Wildcards in topics are supported. The ``topic`` variables will be set to the full expanded topic
594+
the message arrived on.
595+
596+
NOTE: The `MQTT Integration in Home Assistant <https://www.home-assistant.io/integrations/mqtt/>`__
597+
must be set up to use ``@mqtt_trigger``.
598+
572599
@task_unique
573600
^^^^^^^^^^^^
574601

@@ -862,6 +889,9 @@ It takes the following keyword arguments (all are optional):
862889
- ``event_trigger=None`` can be set to a string or list of two strings, just like
863890
``@event_trigger``. The first string is the name of the event, and the second string
864891
(when the setting is a two-element list) is an expression based on the event parameters.
892+
- ``mqtt_trigger=None`` can be set to a string or list of two strings, just like
893+
``@mqtt_trigger``. The first string is the MQTT topic, and the second string
894+
(when the setting is a two-element list) is an expression based on the message variables.
865895
- ``timeout=None`` an overall timeout in seconds, which can be floating point.
866896
- ``state_check_now=True`` if set, ``task.wait_until()`` checks any ``state_trigger``
867897
immediately to see if it is already ``True``, and will return immediately if so. If

tests/test_decorator_errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def func_wrapup():
179179
)
180180
assert "SyntaxError: invalid syntax (file.hello.func3 @state_active(), line 1)" in caplog.text
181181
assert (
182-
"func4 defined in file.hello: needs at least one trigger decorator (ie: event_trigger, state_trigger, time_trigger)"
182+
"func4 defined in file.hello: needs at least one trigger decorator (ie: event_trigger, mqtt_trigger, state_trigger, time_trigger)"
183183
in caplog.text
184184
)
185185
assert (

0 commit comments

Comments
 (0)