Skip to content

Commit eb87156

Browse files
committed
clean up patch queues after exit
also does some internal code cleanup and adds a dedicated Stop exception.
1 parent dc8281a commit eb87156

File tree

3 files changed

+105
-31
lines changed

3 files changed

+105
-31
lines changed

src/idom/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from . import config, html, log, web
22
from .core import hooks
33
from .core.component import Component, component
4+
from .core.dispatcher import Stop
45
from .core.events import EventHandler, event
56
from .core.layout import Layout
67
from .core.vdom import vdom
@@ -27,6 +28,7 @@
2728
"multiview",
2829
"Ref",
2930
"run",
31+
"Stop",
3032
"vdom",
3133
"web",
3234
]

src/idom/core/dispatcher.py

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,21 @@
3636

3737

3838
SendCoroutine = Callable[["VdomJsonPatch"], Awaitable[None]]
39+
"""Send model patches given by a dispatcher"""
40+
3941
RecvCoroutine = Callable[[], Awaitable[LayoutEvent]]
42+
"""Called by a dispatcher to return a :class:`idom.core.layout.LayoutEvent`
43+
44+
The event will then trigger an :class:`idom.core.proto.EventHandlerType` in a layout.
45+
"""
46+
47+
48+
class Stop(BaseException):
49+
"""Stop dispatching changes and events
50+
51+
Raising this error will tell dispatchers to gracefully exit. Typically this is
52+
called by code running inside a layout to tell it to stop rendering.
53+
"""
4054

4155

4256
async def dispatch_single_view(
@@ -46,9 +60,12 @@ async def dispatch_single_view(
4660
) -> None:
4761
"""Run a dispatch loop for a single view instance"""
4862
with layout:
49-
async with create_task_group() as task_group:
50-
task_group.start_soon(_single_outgoing_loop, layout, send)
51-
task_group.start_soon(_single_incoming_loop, layout, recv)
63+
try:
64+
async with create_task_group() as task_group:
65+
task_group.start_soon(_single_outgoing_loop, layout, send)
66+
task_group.start_soon(_single_incoming_loop, layout, recv)
67+
except Stop:
68+
logger.info("Stopped dispatch task")
5269

5370

5471
SharedViewDispatcher = Callable[[SendCoroutine, RecvCoroutine], Awaitable[None]]
@@ -63,9 +80,8 @@ async def create_shared_view_dispatcher(
6380
with layout:
6481
(
6582
dispatch_shared_view,
66-
model_state,
67-
all_patch_queues,
68-
) = await _make_shared_view_dispatcher(layout)
83+
send_patch,
84+
) = await _create_shared_view_dispatcher(layout)
6985

7086
dispatch_tasks: List[Future[None]] = []
7187

@@ -95,34 +111,35 @@ def dispatch_shared_view_soon(
95111
else:
96112
patch = VdomJsonPatch.create_from(update_future.result())
97113

98-
model_state.current = patch.apply_to(model_state.current)
99-
# push updates to all dispatcher callbacks
100-
for queue in all_patch_queues:
101-
queue.put_nowait(patch)
114+
send_patch(patch)
102115

103116

104117
def ensure_shared_view_dispatcher_future(
105118
layout: LayoutType[LayoutUpdate, LayoutEvent],
106119
) -> Tuple[Future[None], SharedViewDispatcher]:
107-
"""Ensure the future of a dispatcher created by :func:`create_shared_view_dispatcher`"""
120+
"""Ensure the future of a dispatcher made by :func:`create_shared_view_dispatcher`
121+
122+
This returns a future that can be awaited to block until all dispatch tasks have
123+
completed as well as the dispatcher coroutine itself which is used to start dispatch
124+
tasks.
125+
126+
This is required in situations where usage of the async context manager from
127+
:func:`create_shared_view_dispatcher` is not possible. Typically this happens when
128+
integrating IDOM with other frameworks, servers, or applications.
129+
"""
108130
dispatcher_future: Future[SharedViewDispatcher] = Future()
109131

110132
async def dispatch_shared_view_forever() -> None:
111133
with layout:
112134
(
113135
dispatch_shared_view,
114-
model_state,
115-
all_patch_queues,
116-
) = await _make_shared_view_dispatcher(layout)
136+
send_patch,
137+
) = await _create_shared_view_dispatcher(layout)
117138

118139
dispatcher_future.set_result(dispatch_shared_view)
119140

120141
while True:
121-
patch = await render_json_patch(layout)
122-
model_state.current = patch.apply_to(model_state.current)
123-
# push updates to all dispatcher callbacks
124-
for queue in all_patch_queues:
125-
queue.put_nowait(patch)
142+
send_patch(await render_json_patch(layout))
126143

127144
async def dispatch(send: SendCoroutine, recv: RecvCoroutine) -> None:
128145
await (await dispatcher_future)(send, recv)
@@ -159,28 +176,37 @@ def create_from(cls, update: LayoutUpdate) -> VdomJsonPatch:
159176
return cls(update.path, make_patch(update.old or {}, update.new).patch)
160177

161178

162-
async def _make_shared_view_dispatcher(
179+
async def _create_shared_view_dispatcher(
163180
layout: LayoutType[LayoutUpdate, LayoutEvent],
164-
) -> Tuple[SharedViewDispatcher, Ref[Any], WeakSet[Queue[VdomJsonPatch]]]:
181+
) -> Tuple[SharedViewDispatcher, Callable[[VdomJsonPatch], None]]:
165182
update = await layout.render()
166183
model_state = Ref(update.new)
167184

168185
# We push updates to queues instead of pushing directly to send() callbacks in
169-
# order to isolate the render loop from any errors dispatch callbacks might
170-
# raise.
186+
# order to isolate send_patch() from any errors send() callbacks might raise.
171187
all_patch_queues: WeakSet[Queue[VdomJsonPatch]] = WeakSet()
172188

173189
async def dispatch_shared_view(send: SendCoroutine, recv: RecvCoroutine) -> None:
174190
patch_queue: Queue[VdomJsonPatch] = Queue()
175-
async with create_task_group() as inner_task_group:
176-
all_patch_queues.add(patch_queue)
177-
effective_update = LayoutUpdate("", None, model_state.current)
178-
await send(VdomJsonPatch.create_from(effective_update))
179-
inner_task_group.start_soon(_single_incoming_loop, layout, recv)
180-
inner_task_group.start_soon(_shared_outgoing_loop, send, patch_queue)
191+
try:
192+
async with create_task_group() as inner_task_group:
193+
all_patch_queues.add(patch_queue)
194+
effective_update = LayoutUpdate("", None, model_state.current)
195+
await send(VdomJsonPatch.create_from(effective_update))
196+
inner_task_group.start_soon(_single_incoming_loop, layout, recv)
197+
inner_task_group.start_soon(_shared_outgoing_loop, send, patch_queue)
198+
except Stop:
199+
logger.info("Stopped dispatch task")
200+
finally:
201+
all_patch_queues.remove(patch_queue)
181202
return None
182203

183-
return dispatch_shared_view, model_state, all_patch_queues
204+
def send_patch(patch: VdomJsonPatch) -> None:
205+
model_state.current = patch.apply_to(model_state.current)
206+
for queue in all_patch_queues:
207+
queue.put_nowait(patch)
208+
209+
return dispatch_shared_view, send_patch
184210

185211

186212
async def _single_outgoing_loop(

tests/test_core/test_dispatcher.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import asyncio
2+
import sys
23
from typing import Any, Sequence
34

45
import pytest
56

67
import idom
78
from idom.core.dispatcher import (
89
VdomJsonPatch,
10+
_create_shared_view_dispatcher,
911
create_shared_view_dispatcher,
1012
dispatch_single_view,
1113
ensure_shared_view_dispatcher_future,
@@ -39,7 +41,7 @@ async def send(patch):
3941
changes.append(patch)
4042
sem.release()
4143
if not events_to_inject:
42-
raise asyncio.CancelledError()
44+
raise idom.Stop()
4345

4446
async def recv():
4547
await sem.acquire()
@@ -130,3 +132,47 @@ async def test_ensure_shared_view_dispatcher_future():
130132

131133
assert_changes_produce_expected_model(changes_1, model)
132134
assert_changes_produce_expected_model(changes_2, model)
135+
136+
137+
async def test_private_create_shared_view_dispatcher_cleans_up_patch_queues():
138+
"""Report an issue if this test breaks
139+
140+
Some internals of idom.core.dispatcher may need to be changed in order to make some
141+
internal state easier to introspect.
142+
143+
Ideally we would just check if patch queues are getting cleaned up more directly,
144+
but without having access to that, we use some side effects to try and infer whether
145+
it happens.
146+
"""
147+
148+
@idom.component
149+
def SomeComponent():
150+
return idom.html.div()
151+
152+
async def send(patch):
153+
raise idom.Stop()
154+
155+
async def recv():
156+
return LayoutEvent("something", [])
157+
158+
with idom.Layout(SomeComponent()) as layout:
159+
dispatch_shared_view, push_patch = await _create_shared_view_dispatcher(layout)
160+
161+
# Dispatch a view that should exit. After exiting its patch queue should be
162+
# cleaned up and removed. Since we only dispatched one view there should be
163+
# no patch queues.
164+
await dispatch_shared_view(send, recv) # this should stop immediately
165+
166+
# We create a patch and check its ref count. We will check this after attempting
167+
# to push out the change.
168+
patch = VdomJsonPatch("anything", [])
169+
patch_ref_count = sys.getrefcount(patch)
170+
171+
# We push out this change.
172+
push_patch(patch)
173+
174+
# Because there should be no patch queues, we expect that the ref count remains
175+
# the same. If the ref count had increased, then we would know that the patch
176+
# queue has not been cleaned up and that the patch we just pushed was added to
177+
# it.
178+
assert not sys.getrefcount(patch) > patch_ref_count

0 commit comments

Comments
 (0)