diff --git a/asyncio/core.py b/asyncio/core.py index dcd4b24..079d86a 100644 --- a/asyncio/core.py +++ b/asyncio/core.py @@ -114,11 +114,11 @@ def __next__(self): self.state = None return None else: - self.exc.__traceback__ = None - raise self.exc + self.exc.__traceback__ = None + raise self.exc -_never = _Never() +_never = _Never() ################################################################################ # Queue and poller for stream IO @@ -260,6 +260,11 @@ def run_until_complete(main_task=None): if t.state is True: # "None" indicates that the task is complete and not await'ed on (yet). t.state = None + elif callable(t.state): + # The task has a callback registered to be called on completion. + t.state(t, er) + t.state = False + waiting = True else: # Schedule any other tasks waiting on the completion of this task. while t.state.peek(): @@ -281,7 +286,6 @@ def run_until_complete(main_task=None): _exc_context["future"] = t Loop.call_exception_handler(_exc_context) - # Create a new task from a coroutine and run it until it finishes def run(coro): """Create a new task from the given coroutine and run it until it completes. diff --git a/asyncio/funcs.py b/asyncio/funcs.py index 8718118..1ee4b6a 100644 --- a/asyncio/funcs.py +++ b/asyncio/funcs.py @@ -85,30 +85,76 @@ def wait_for_ms(aw, timeout): return wait_for(aw, timeout, core.sleep_ms) -async def gather(*aws, return_exceptions=False): - """Run all *aws* awaitables concurrently. Any *aws* that are not tasks - are promoted to tasks. +class _Remove: + @staticmethod + def remove(t): + pass - Returns a list of return values of all *aws* - This is a coroutine. - """ +async def gather(*aws, return_exceptions=False): + if not aws: + return [] + + def done(t, er): + # Sub-task "t" has finished, with exception "er". + nonlocal state + if gather_task.data is not _Remove: + # The main gather task has already been scheduled, so do nothing. + # This happens if another sub-task already raised an exception and + # woke the main gather task (via this done function), or if the main + # gather task was cancelled externally. + return + elif not return_exceptions and not isinstance(er, StopIteration): + # A sub-task raised an exception, indicate that to the gather task. + state = er + else: + state -= 1 + if state: + # Still some sub-tasks running. + return + # Gather waiting is done, schedule the main gather task. + core._task_queue.push_head(gather_task) ts = [core._promote_to_task(aw) for aw in aws] for i in range(len(ts)): - try: - # TODO handle cancel of gather itself - # if ts[i].coro: - # iter(ts[i]).waiting.push_head(cur_task) - # try: - # yield - # except CancelledError as er: - # # cancel all waiting tasks - # raise er - ts[i] = await ts[i] - except (core.CancelledError, Exception) as er: - if return_exceptions: - ts[i] = er - else: - raise er + if ts[i].state is not True: + # Task is not running, gather not currently supported for this case. + raise RuntimeError("can't gather") + # Register the callback to call when the task is done. + ts[i].state = done + + # Set the state for execution of the gather. + gather_task = core.cur_task + state = len(ts) + cancel_all = False + + # Wait for the a sub-task to need attention. + gather_task.data = _Remove + try: + core._never.state = False + await core._never + except core.CancelledError as er: + cancel_all = True + state = er + + # Clean up tasks. + for i in range(len(ts)): + if ts[i].state is done: + # Sub-task is still running, deregister the callback and cancel if needed. + ts[i].state = True + if cancel_all: + ts[i].cancel() + elif isinstance(ts[i].data, StopIteration): + # Sub-task ran to completion, get its return value. + ts[i] = ts[i].data.value + else: + # Sub-task had an exception with return_exceptions==True, so get its exception. + ts[i] = ts[i].data + + # Either this gather was cancelled, or one of the sub-tasks raised an exception with + # return_exceptions==False, so reraise the exception here. + if state is not 0: + raise state + + # Return the list of return values of each sub-task. return ts