Skip to content

Commit a3e58ca

Browse files
committed
refactor dispatchers
Simplify them greatly by making them normal functions. This avoids a lot of indirection and complexity caused by inheritance. In addition this fixes some impropper usages of TaskGroups which was causing problems in the SharedClientStateServer implementations. This refactor also allowed us to get rid of the clunky HasAsyncResources util class. In the end it just added confusion.
1 parent 13fca66 commit a3e58ca

File tree

18 files changed

+433
-641
lines changed

18 files changed

+433
-641
lines changed

docs/source/core-concepts.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ ever be removed from the model. Then you'll just need to call and await a
6363

6464
.. testcode::
6565

66-
async with idom.Layout(ClickCount()) as layout:
66+
with idom.Layout(ClickCount()) as layout:
6767
patch = await layout.render()
6868

6969
The layout also handles the triggering of event handlers. Normally these are
@@ -88,7 +88,7 @@ which we can re-render and see what changed:
8888

8989
return idom.html.button({"onClick": handler}, [f"Click count: {count}"])
9090

91-
async with idom.Layout(ClickCount()) as layout:
91+
with idom.Layout(ClickCount()) as layout:
9292
patch_1 = await layout.render()
9393

9494
fake_event = LayoutEvent(target=static_handler.target, data=[{}])

requirements/pkg-deps.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
typing-extensions >=3.7.4
22
mypy-extensions >=0.4.3
33
anyio >=2.0
4-
async_generator >=1.10; python_version<"3.7"
54
async_exit_stack >=1.0.1; python_version<"3.7"
65
jsonpatch >=1.26
76
typer >=0.3.2

src/idom/core/__init__.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +0,0 @@
1-
from .component import AbstractComponent, Component, ComponentConstructor, component
2-
from .dispatcher import AbstractDispatcher, SharedViewDispatcher, SingleViewDispatcher
3-
from .events import EventHandler, Events, event
4-
from .layout import Layout
5-
from .vdom import vdom
6-
7-
8-
__all__ = [
9-
"AbstractComponent",
10-
"Layout",
11-
"AbstractDispatcher",
12-
"component",
13-
"Component",
14-
"EventHandler",
15-
"ComponentConstructor",
16-
"event",
17-
"Events",
18-
"hooks",
19-
"Layout",
20-
"vdom",
21-
"SharedViewDispatcher",
22-
"SingleViewDispatcher",
23-
]

src/idom/core/dispatcher.py

Lines changed: 79 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1-
import abc
2-
import asyncio
1+
from __future__ import annotations
2+
3+
import sys
4+
from asyncio import Future
5+
from asyncio import Queue as AsyncQueue
6+
from asyncio.tasks import ensure_future
37
from logging import getLogger
4-
from typing import Any, AsyncIterator, Awaitable, Callable, Dict
8+
from typing import Any, AsyncIterator, Awaitable, Callable, Tuple
9+
from weakref import WeakSet
510

611
from anyio import create_task_group
7-
from anyio.abc import TaskGroup
12+
13+
from idom.utils import Ref
814

915
from .layout import Layout, LayoutEvent, LayoutUpdate
10-
from .utils import HasAsyncResources, async_resource
16+
17+
18+
if sys.version_info >= (3, 7): # pragma: no cover
19+
from contextlib import asynccontextmanager # noqa
20+
else: # pragma: no cover
21+
from async_generator import asynccontextmanager
1122

1223

1324
logger = getLogger(__name__)
@@ -16,136 +27,85 @@
1627
RecvCoroutine = Callable[[], Awaitable[LayoutEvent]]
1728

1829

19-
class AbstractDispatcher(HasAsyncResources, abc.ABC):
20-
"""A base class for implementing :class:`~idom.core.layout.Layout` dispatchers."""
30+
async def dispatch_single_view(
31+
layout: Layout, send: SendCoroutine, recv: RecvCoroutine
32+
) -> None:
33+
with layout:
34+
async with create_task_group() as task_group:
35+
task_group.start_soon(_single_outgoing_loop, layout, send)
36+
task_group.start_soon(_single_incoming_loop, layout, recv)
2137

22-
__slots__ = "_layout"
2338

24-
def __init__(self, layout: Layout) -> None:
25-
super().__init__()
26-
self._layout = layout
39+
_SharedDispatchCoro = Callable[[SendCoroutine, RecvCoroutine], Awaitable[None]]
2740

28-
async def start(self) -> None:
29-
await self.__aenter__()
3041

31-
async def stop(self) -> None:
32-
await self.task_group.cancel_scope.cancel()
33-
await self.__aexit__(None, None, None)
42+
@asynccontextmanager
43+
async def create_shared_view_dispatcher(
44+
layout: Layout,
45+
) -> AsyncIterator[_SharedDispatchCoro]:
46+
model_state: Ref[Any] = Ref({})
47+
update_queues: WeakSet[AsyncQueue[LayoutUpdate]] = WeakSet()
3448

35-
@async_resource
36-
async def layout(self) -> AsyncIterator[Layout]:
37-
async with self._layout as layout:
38-
yield layout
39-
40-
@async_resource
41-
async def task_group(self) -> AsyncIterator[TaskGroup]:
42-
async with create_task_group() as group:
43-
yield group
44-
45-
async def run(self, send: SendCoroutine, recv: RecvCoroutine, context: Any) -> None:
46-
"""Start an unending loop which will drive the layout.
47-
48-
This will call :meth:`AbstractLayout.render` and :meth:`Layout.dispatch`
49-
to render new models and execute events respectively.
50-
"""
51-
await self.task_group.spawn(self._outgoing_loop, send, context)
52-
await self.task_group.spawn(self._incoming_loop, recv, context)
49+
async def dispatch_shared_view(
50+
send: SendCoroutine,
51+
recv: RecvCoroutine,
52+
) -> None:
53+
queue = AsyncQueue()
54+
update_queues.add(queue)
55+
async with create_task_group() as inner_task_group:
56+
await send(LayoutUpdate.create_from({}, model_state.current))
57+
inner_task_group.start_soon(_single_incoming_loop, layout, recv)
58+
inner_task_group.start_soon(_shared_outgoing_loop, send, queue)
5359
return None
5460

55-
async def _outgoing_loop(self, send: SendCoroutine, context: Any) -> None:
56-
try:
57-
while True:
58-
await send(await self._outgoing(self.layout, context))
59-
except Exception:
60-
logger.info("Failed to send outgoing update", exc_info=True)
61-
raise
61+
with layout:
62+
async with create_task_group() as task_group:
63+
task_group.start_soon(
64+
_shared_render_loop,
65+
layout,
66+
model_state,
67+
update_queues,
68+
)
69+
yield dispatch_shared_view
6270

63-
async def _incoming_loop(self, recv: RecvCoroutine, context: Any) -> None:
64-
try:
65-
while True:
66-
await self._incoming(self.layout, context, await recv())
67-
except Exception:
68-
logger.info("Failed to receive incoming event", exc_info=True)
69-
raise
7071

71-
@abc.abstractmethod
72-
async def _outgoing(self, layout: Layout, context: Any) -> Any:
73-
...
72+
async def create_shared_view_dispatcher_future(
73+
layout: Layout,
74+
) -> Tuple[Future, _SharedDispatchCoro]:
75+
queue: AsyncQueue[_SharedDispatchCoro] = AsyncQueue()
7476

75-
@abc.abstractmethod
76-
async def _incoming(self, layout: Layout, context: Any, message: Any) -> None:
77-
...
77+
async def task():
78+
async with create_shared_view_dispatcher(layout) as dispatch:
79+
queue.put_nowait(dispatch)
7880

81+
return ensure_future(task()), (await queue.get())
7982

80-
class SingleViewDispatcher(AbstractDispatcher):
81-
"""Each client of the dispatcher will get its own model.
8283

83-
..note::
84-
The ``context`` parameter of :meth:`SingleViewDispatcher.run` should just
85-
be ``None`` since it's not used.
86-
"""
84+
async def _single_outgoing_loop(layout: Layout, send: SendCoroutine) -> None:
85+
while True:
86+
await send(await layout.render())
8787

88-
__slots__ = "_current_model_as_json"
8988

90-
def __init__(self, layout: Layout) -> None:
91-
super().__init__(layout)
92-
self._current_model_as_json = ""
89+
async def _single_incoming_loop(layout: Layout, recv: RecvCoroutine) -> None:
90+
while True:
91+
await layout.dispatch(await recv())
9392

94-
async def _outgoing(self, layout: Layout, context: Any) -> LayoutUpdate:
95-
return await layout.render()
9693

97-
async def _incoming(self, layout: Layout, context: Any, event: LayoutEvent) -> None:
98-
await layout.dispatch(event)
99-
return None
100-
101-
102-
class SharedViewDispatcher(SingleViewDispatcher):
103-
"""Each client of the dispatcher shares the same model.
104-
105-
The client's ID is indicated by the ``context`` argument of
106-
:meth:`SharedViewDispatcher.run`
107-
"""
94+
async def _shared_render_loop(
95+
layout: Layout,
96+
model_state: Ref[Any],
97+
update_queues: WeakSet[AsyncQueue[LayoutUpdate]],
98+
) -> None:
99+
while True:
100+
update = await layout.render()
101+
model_state.current = update.apply_to(model_state.current)
102+
# append updates to all other contexts
103+
for queue in update_queues:
104+
await queue.put(update)
108105

109-
__slots__ = "_update_queues", "_model_state"
110106

111-
def __init__(self, layout: Layout) -> None:
112-
super().__init__(layout)
113-
self._model_state: Any = {}
114-
self._update_queues: Dict[str, asyncio.Queue[LayoutUpdate]] = {}
115-
116-
@async_resource
117-
async def task_group(self) -> AsyncIterator[TaskGroup]:
118-
async with create_task_group() as group:
119-
await group.spawn(self._render_loop)
120-
yield group
121-
122-
async def run(
123-
self, send: SendCoroutine, recv: RecvCoroutine, context: str, join: bool = False
124-
) -> None:
125-
await super().run(send, recv, context)
126-
if join:
127-
await self._join_event.wait()
128-
129-
async def _render_loop(self) -> None:
130-
while True:
131-
update = await super()._outgoing(self.layout, None)
132-
self._model_state = update.apply_to(self._model_state)
133-
# append updates to all other contexts
134-
for queue in self._update_queues.values():
135-
await queue.put(update)
136-
137-
async def _outgoing_loop(self, send: SendCoroutine, context: Any) -> None:
138-
self._update_queues[context] = asyncio.Queue()
139-
await send(LayoutUpdate.create_from({}, self._model_state))
140-
await super()._outgoing_loop(send, context)
141-
142-
async def _outgoing(self, layout: Layout, context: str) -> LayoutUpdate:
143-
return await self._update_queues[context].get()
144-
145-
@async_resource
146-
async def _join_event(self) -> AsyncIterator[asyncio.Event]:
147-
event = asyncio.Event()
148-
try:
149-
yield event
150-
finally:
151-
event.set()
107+
async def _shared_outgoing_loop(
108+
send: SendCoroutine, queue: AsyncQueue[LayoutUpdate]
109+
) -> None:
110+
while True:
111+
await send(await queue.get())

src/idom/core/events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ async def __call__(self, data: List[Any]) -> Any:
192192
if self._coro_handlers:
193193
async with create_task_group() as group:
194194
for handler in self._coro_handlers:
195-
await group.spawn(handler, *data)
195+
group.start_soon(handler, *data)
196196
for handler in self._func_handlers:
197197
handler(*data)
198198

src/idom/core/hooks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,8 @@ class LifeCycleHook:
387387

388388
def __init__(
389389
self,
390-
component: AbstractComponent,
391390
layout: idom.core.layout.Layout,
391+
component: AbstractComponent,
392392
) -> None:
393393
self.component = component
394394
self._layout = weakref.ref(layout)

0 commit comments

Comments
 (0)