|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from asyncio import Future, Queue, ensure_future |
| 4 | +from asyncio.tasks import FIRST_COMPLETED, ensure_future, gather, wait |
| 5 | +from contextlib import asynccontextmanager |
| 6 | +from logging import getLogger |
| 7 | +from typing import ( |
| 8 | + Any, |
| 9 | + AsyncIterator, |
| 10 | + Awaitable, |
| 11 | + Callable, |
| 12 | + Dict, |
| 13 | + List, |
| 14 | + NamedTuple, |
| 15 | + Sequence, |
| 16 | + Tuple, |
| 17 | + cast, |
| 18 | +) |
| 19 | +from weakref import WeakSet |
| 20 | + |
| 21 | +from anyio import create_task_group |
| 22 | + |
| 23 | +from idom.utils import Ref |
| 24 | + |
| 25 | +from ._fixed_jsonpatch import apply_patch, make_patch # type: ignore |
| 26 | +from .layout import LayoutEvent, LayoutUpdate |
| 27 | +from .types import LayoutType, VdomJson |
| 28 | + |
| 29 | + |
| 30 | +logger = getLogger(__name__) |
| 31 | + |
| 32 | + |
| 33 | +SendCoroutine = Callable[["VdomJsonPatch"], Awaitable[None]] |
| 34 | +"""Send model patches given by a dispatcher""" |
| 35 | + |
| 36 | +RecvCoroutine = Callable[[], Awaitable[LayoutEvent]] |
| 37 | +"""Called by a dispatcher to return a :class:`idom.core.layout.LayoutEvent` |
| 38 | +
|
| 39 | +The event will then trigger an :class:`idom.core.proto.EventHandlerType` in a layout. |
| 40 | +""" |
| 41 | + |
| 42 | + |
| 43 | +class Stop(BaseException): |
| 44 | + """Stop serving changes and events |
| 45 | +
|
| 46 | + Raising this error will tell dispatchers to gracefully exit. Typically this is |
| 47 | + called by code running inside a layout to tell it to stop rendering. |
| 48 | + """ |
| 49 | + |
| 50 | + |
| 51 | +async def serve_json_patch( |
| 52 | + layout: LayoutType[LayoutUpdate, LayoutEvent], |
| 53 | + send: SendCoroutine, |
| 54 | + recv: RecvCoroutine, |
| 55 | +) -> None: |
| 56 | + """Run a dispatch loop for a single view instance""" |
| 57 | + with layout: |
| 58 | + try: |
| 59 | + async with create_task_group() as task_group: |
| 60 | + task_group.start_soon(_single_outgoing_loop, layout, send) |
| 61 | + task_group.start_soon(_single_incoming_loop, layout, recv) |
| 62 | + except Stop: |
| 63 | + logger.info("Stopped dispatch task") |
| 64 | + |
| 65 | + |
| 66 | +async def render_json_patch(layout: LayoutType[LayoutUpdate, Any]) -> VdomJsonPatch: |
| 67 | + """Render a class:`VdomJsonPatch` from a layout""" |
| 68 | + return VdomJsonPatch.create_from(await layout.render()) |
| 69 | + |
| 70 | + |
| 71 | +class VdomJsonPatch(NamedTuple): |
| 72 | + """An object describing an update to a :class:`Layout` in the form of a JSON patch""" |
| 73 | + |
| 74 | + path: str |
| 75 | + """The path where changes should be applied""" |
| 76 | + |
| 77 | + changes: List[Dict[str, Any]] |
| 78 | + """A list of JSON patches to apply at the given path""" |
| 79 | + |
| 80 | + def apply_to(self, model: VdomJson) -> VdomJson: |
| 81 | + """Return the model resulting from the changes in this update""" |
| 82 | + return cast( |
| 83 | + VdomJson, |
| 84 | + apply_patch( |
| 85 | + model, [{**c, "path": self.path + c["path"]} for c in self.changes] |
| 86 | + ), |
| 87 | + ) |
| 88 | + |
| 89 | + @classmethod |
| 90 | + def create_from(cls, update: LayoutUpdate) -> VdomJsonPatch: |
| 91 | + """Return a patch given an layout update""" |
| 92 | + return cls(update.path, make_patch(update.old or {}, update.new).patch) |
| 93 | + |
| 94 | + |
| 95 | +async def _single_outgoing_loop( |
| 96 | + layout: LayoutType[LayoutUpdate, LayoutEvent], send: SendCoroutine |
| 97 | +) -> None: |
| 98 | + while True: |
| 99 | + await send(await render_json_patch(layout)) |
| 100 | + |
| 101 | + |
| 102 | +async def _single_incoming_loop( |
| 103 | + layout: LayoutType[LayoutUpdate, LayoutEvent], recv: RecvCoroutine |
| 104 | +) -> None: |
| 105 | + while True: |
| 106 | + # We need to fire and forget here so that we avoid waiting on the completion |
| 107 | + # of this event handler before receiving and running the next one. |
| 108 | + ensure_future(layout.deliver(await recv())) |
0 commit comments