From ffd3b98efc28995c21b5b57aa6ffab1096059093 Mon Sep 17 00:00:00 2001 From: Taku Fukada Date: Sun, 3 Jun 2018 02:32:56 +0900 Subject: [PATCH] Cancel asyncio pending task on unsubscription --- graphql/execution/executors/asyncio.py | 2 +- graphql/execution/executors/asyncio_utils.py | 136 +++---------------- 2 files changed, 18 insertions(+), 120 deletions(-) diff --git a/graphql/execution/executors/asyncio.py b/graphql/execution/executors/asyncio.py index f20ddbf7..dfb3823c 100644 --- a/graphql/execution/executors/asyncio.py +++ b/graphql/execution/executors/asyncio.py @@ -59,5 +59,5 @@ def execute(self, fn, *args, **kwargs): self.futures.append(future) return Promise.resolve(future) elif isasyncgen(result): - return asyncgen_to_observable(result) + return asyncgen_to_observable(result, loop=self.loop) return result diff --git a/graphql/execution/executors/asyncio_utils.py b/graphql/execution/executors/asyncio_utils.py index 836d90d7..077f2299 100644 --- a/graphql/execution/executors/asyncio_utils.py +++ b/graphql/execution/executors/asyncio_utils.py @@ -1,129 +1,24 @@ from inspect import isasyncgen -from asyncio import ensure_future -from rx import Observable, AnonymousObserver -from rx.core import ObservableBase, Disposable, ObserverBase +from asyncio import ensure_future, wait, CancelledError +from rx import AnonymousObservable -from rx.concurrency import current_thread_scheduler -from rx.core import Observer, Observable, Disposable -from rx.core.anonymousobserver import AnonymousObserver -from rx.core.autodetachobserver import AutoDetachObserver - - -# class AsyncgenDisposable(Disposable): -# """Represents a Disposable that disposes the asyncgen automatically.""" - -# def __init__(self, asyncgen): -# """Initializes a new instance of the AsyncgenDisposable class.""" - -# self.asyncgen = asyncgen -# self.is_disposed = False - -# super(AsyncgenDisposable, self).__init__() - -# def dispose(self): -# """Sets the status to disposed""" -# self.asyncgen.aclose() -# self.is_disposed = True - - -class AsyncgenObserver(AutoDetachObserver): - def __init__(self, asyncgen, *args, **kwargs): - self._asyncgen = asyncgen - self.is_disposed = False - super(AsyncgenObserver, self).__init__(*args, **kwargs) - - async def dispose_asyncgen(self): - if self.is_disposed: - return - - try: - # await self._asyncgen.aclose() - await self._asyncgen.athrow(StopAsyncIteration) - self.is_disposed = True - except: - pass - - def dispose(self): - if self.is_disposed: - return - disposed = super(AsyncgenObserver, self).dispose() - # print("DISPOSE observer!", disposed) - ensure_future(self.dispose_asyncgen()) - - -class AsyncgenObservable(ObservableBase): - """Class to create an Observable instance from a delegate-based - implementation of the Subscribe method.""" - - def __init__(self, subscribe, asyncgen): - """Creates an observable sequence object from the specified - subscription function. - - Keyword arguments: - :param types.FunctionType subscribe: Subscribe method implementation. - """ - - self._subscribe = subscribe - self._asyncgen = asyncgen - super(AsyncgenObservable, self).__init__() - - def _subscribe_core(self, observer): - # print("GET SUBSCRIBER", observer) - return self._subscribe(observer) - # print("SUBSCRIBER RESULT", subscriber) - # return subscriber - - def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None): - - if isinstance(on_next, Observer): - observer = on_next - elif hasattr(on_next, "on_next") and callable(on_next.on_next): - observer = on_next - elif not observer: - observer = AnonymousObserver(on_next, on_error, on_completed) - - auto_detach_observer = AsyncgenObserver(self._asyncgen, observer) - - def fix_subscriber(subscriber): - """Fixes subscriber to make sure it returns a Disposable instead - of None or a dispose function""" - - if not hasattr(subscriber, "dispose"): - subscriber = Disposable.create(subscriber) - - return subscriber - - def set_disposable(scheduler=None, value=None): - try: - subscriber = self._subscribe_core(auto_detach_observer) - except Exception as ex: - if not auto_detach_observer.fail(ex): - raise - else: - auto_detach_observer.disposable = fix_subscriber(subscriber) +def asyncgen_to_observable(asyncgen, loop=None): + def emit(observer): + task = ensure_future( + iterate_asyncgen(asyncgen, observer), + loop=loop) - # Subscribe needs to set up the trampoline before for subscribing. - # Actually, the first call to Subscribe creates the trampoline so - # that it may assign its disposable before any observer executes - # OnNext over the CurrentThreadScheduler. This enables single- - # threaded cancellation - # https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27- - # 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche - # dulerschedulerequired-behaves?forum=rx - if current_thread_scheduler.schedule_required(): - current_thread_scheduler.schedule(set_disposable) - else: - set_disposable() + def dispose(): + async def await_task(): + await task - # Hide the identity of the auto detach observer - return Disposable.create(auto_detach_observer.dispose) + task.cancel() + ensure_future(await_task(), loop=loop) + return dispose -def asyncgen_to_observable(asyncgen): - def emit(observer): - ensure_future(iterate_asyncgen(asyncgen, observer)) - return AsyncgenObservable(emit, asyncgen) + return AnonymousObservable(emit) async def iterate_asyncgen(asyncgen, observer): @@ -131,5 +26,8 @@ async def iterate_asyncgen(asyncgen, observer): async for item in asyncgen: observer.on_next(item) observer.on_completed() + except CancelledError: + pass except Exception as e: observer.on_error(e) +