Skip to content

Commit c5e451b

Browse files
committed
Fix exception of task during gather
This is based on adafruit/circuitpython@90aaf2d
1 parent fbdb77d commit c5e451b

File tree

2 files changed

+75
-25
lines changed

2 files changed

+75
-25
lines changed

asyncio/core.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,11 @@ def __next__(self):
114114
self.state = None
115115
return None
116116
else:
117-
self.exc.__traceback__ = None
118-
raise self.exc
117+
self.exc.__traceback__ = None
118+
raise self.exc
119119

120-
_never = _Never()
121120

121+
_never = _Never()
122122

123123
################################################################################
124124
# Queue and poller for stream IO
@@ -260,6 +260,11 @@ def run_until_complete(main_task=None):
260260
if t.state is True:
261261
# "None" indicates that the task is complete and not await'ed on (yet).
262262
t.state = None
263+
elif callable(t.state):
264+
# The task has a callback registered to be called on completion.
265+
t.state(t, er)
266+
t.state = False
267+
waiting = True
263268
else:
264269
# Schedule any other tasks waiting on the completion of this task.
265270
while t.state.peek():
@@ -281,7 +286,6 @@ def run_until_complete(main_task=None):
281286
_exc_context["future"] = t
282287
Loop.call_exception_handler(_exc_context)
283288

284-
285289
# Create a new task from a coroutine and run it until it finishes
286290
def run(coro):
287291
"""Create a new task from the given coroutine and run it until it completes.

asyncio/funcs.py

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,30 +85,76 @@ def wait_for_ms(aw, timeout):
8585
return wait_for(aw, timeout, core.sleep_ms)
8686

8787

88-
async def gather(*aws, return_exceptions=False):
89-
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
90-
are promoted to tasks.
88+
class _Remove:
89+
@staticmethod
90+
def remove(t):
91+
pass
9192

92-
Returns a list of return values of all *aws*
9393

94-
This is a coroutine.
95-
"""
94+
async def gather(*aws, return_exceptions=False):
95+
if not aws:
96+
return []
97+
98+
def done(t, er):
99+
# Sub-task "t" has finished, with exception "er".
100+
nonlocal state
101+
if gather_task.data is not _Remove:
102+
# The main gather task has already been scheduled, so do nothing.
103+
# This happens if another sub-task already raised an exception and
104+
# woke the main gather task (via this done function), or if the main
105+
# gather task was cancelled externally.
106+
return
107+
elif not return_exceptions and not isinstance(er, StopIteration):
108+
# A sub-task raised an exception, indicate that to the gather task.
109+
state = er
110+
else:
111+
state -= 1
112+
if state:
113+
# Still some sub-tasks running.
114+
return
115+
# Gather waiting is done, schedule the main gather task.
116+
core._task_queue.push_head(gather_task)
96117

97118
ts = [core._promote_to_task(aw) for aw in aws]
98119
for i in range(len(ts)):
99-
try:
100-
# TODO handle cancel of gather itself
101-
# if ts[i].coro:
102-
# iter(ts[i]).waiting.push_head(cur_task)
103-
# try:
104-
# yield
105-
# except CancelledError as er:
106-
# # cancel all waiting tasks
107-
# raise er
108-
ts[i] = await ts[i]
109-
except (core.CancelledError, Exception) as er:
110-
if return_exceptions:
111-
ts[i] = er
112-
else:
113-
raise er
120+
if ts[i].state is not True:
121+
# Task is not running, gather not currently supported for this case.
122+
raise RuntimeError("can't gather")
123+
# Register the callback to call when the task is done.
124+
ts[i].state = done
125+
126+
# Set the state for execution of the gather.
127+
gather_task = core.cur_task
128+
state = len(ts)
129+
cancel_all = False
130+
131+
# Wait for the a sub-task to need attention.
132+
gather_task.data = _Remove
133+
try:
134+
core._never.state = False
135+
await core._never
136+
except core.CancelledError as er:
137+
cancel_all = True
138+
state = er
139+
140+
# Clean up tasks.
141+
for i in range(len(ts)):
142+
if ts[i].state is done:
143+
# Sub-task is still running, deregister the callback and cancel if needed.
144+
ts[i].state = True
145+
if cancel_all:
146+
ts[i].cancel()
147+
elif isinstance(ts[i].data, StopIteration):
148+
# Sub-task ran to completion, get its return value.
149+
ts[i] = ts[i].data.value
150+
else:
151+
# Sub-task had an exception with return_exceptions==True, so get its exception.
152+
ts[i] = ts[i].data
153+
154+
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
155+
# return_exceptions==False, so reraise the exception here.
156+
if state is not 0:
157+
raise state
158+
159+
# Return the list of return values of each sub-task.
114160
return ts

0 commit comments

Comments
 (0)