Skip to content

Cancel asyncio iteration task on unsubscription #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion graphql/execution/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ def execute(self, fn, *args, **kwargs):
self.futures.append(future)
return Promise.resolve(future)
elif isasyncgen(result):
return asyncgen_to_observable(result)
return asyncgen_to_observable(result, loop=self.loop)
return result
136 changes: 17 additions & 119 deletions graphql/execution/executors/asyncio_utils.py
Original file line number Diff line number Diff line change
@@ -1,135 +1,33 @@
from inspect import isasyncgen
from asyncio import ensure_future
from rx import Observable, AnonymousObserver
from rx.core import ObservableBase, Disposable, ObserverBase
from asyncio import ensure_future, wait, CancelledError
from rx import AnonymousObservable

from rx.concurrency import current_thread_scheduler

from rx.core import Observer, Observable, Disposable
from rx.core.anonymousobserver import AnonymousObserver
from rx.core.autodetachobserver import AutoDetachObserver


# class AsyncgenDisposable(Disposable):
# """Represents a Disposable that disposes the asyncgen automatically."""

# def __init__(self, asyncgen):
# """Initializes a new instance of the AsyncgenDisposable class."""

# self.asyncgen = asyncgen
# self.is_disposed = False

# super(AsyncgenDisposable, self).__init__()

# def dispose(self):
# """Sets the status to disposed"""
# self.asyncgen.aclose()
# self.is_disposed = True


class AsyncgenObserver(AutoDetachObserver):
def __init__(self, asyncgen, *args, **kwargs):
self._asyncgen = asyncgen
self.is_disposed = False
super(AsyncgenObserver, self).__init__(*args, **kwargs)

async def dispose_asyncgen(self):
if self.is_disposed:
return

try:
# await self._asyncgen.aclose()
await self._asyncgen.athrow(StopAsyncIteration)
self.is_disposed = True
except:
pass

def dispose(self):
if self.is_disposed:
return
disposed = super(AsyncgenObserver, self).dispose()
# print("DISPOSE observer!", disposed)
ensure_future(self.dispose_asyncgen())


class AsyncgenObservable(ObservableBase):
"""Class to create an Observable instance from a delegate-based
implementation of the Subscribe method."""

def __init__(self, subscribe, asyncgen):
"""Creates an observable sequence object from the specified
subscription function.

Keyword arguments:
:param types.FunctionType subscribe: Subscribe method implementation.
"""

self._subscribe = subscribe
self._asyncgen = asyncgen
super(AsyncgenObservable, self).__init__()

def _subscribe_core(self, observer):
# print("GET SUBSCRIBER", observer)
return self._subscribe(observer)
# print("SUBSCRIBER RESULT", subscriber)
# return subscriber

def subscribe(self, on_next=None, on_error=None, on_completed=None, observer=None):

if isinstance(on_next, Observer):
observer = on_next
elif hasattr(on_next, "on_next") and callable(on_next.on_next):
observer = on_next
elif not observer:
observer = AnonymousObserver(on_next, on_error, on_completed)

auto_detach_observer = AsyncgenObserver(self._asyncgen, observer)

def fix_subscriber(subscriber):
"""Fixes subscriber to make sure it returns a Disposable instead
of None or a dispose function"""

if not hasattr(subscriber, "dispose"):
subscriber = Disposable.create(subscriber)

return subscriber

def set_disposable(scheduler=None, value=None):
try:
subscriber = self._subscribe_core(auto_detach_observer)
except Exception as ex:
if not auto_detach_observer.fail(ex):
raise
else:
auto_detach_observer.disposable = fix_subscriber(subscriber)
def asyncgen_to_observable(asyncgen, loop=None):
def emit(observer):
task = ensure_future(
iterate_asyncgen(asyncgen, observer),
loop=loop)

# Subscribe needs to set up the trampoline before for subscribing.
# Actually, the first call to Subscribe creates the trampoline so
# that it may assign its disposable before any observer executes
# OnNext over the CurrentThreadScheduler. This enables single-
# threaded cancellation
# https://social.msdn.microsoft.com/Forums/en-US/eb82f593-9684-4e27-
# 97b9-8b8886da5c33/whats-the-rationale-behind-how-currentthreadsche
# dulerschedulerequired-behaves?forum=rx
if current_thread_scheduler.schedule_required():
current_thread_scheduler.schedule(set_disposable)
else:
set_disposable()
def dispose():
async def await_task():
await task

# Hide the identity of the auto detach observer
return Disposable.create(auto_detach_observer.dispose)
task.cancel()
ensure_future(await_task(), loop=loop)

return dispose

def asyncgen_to_observable(asyncgen):
def emit(observer):
ensure_future(iterate_asyncgen(asyncgen, observer))
return AsyncgenObservable(emit, asyncgen)
return AnonymousObservable(emit)


async def iterate_asyncgen(asyncgen, observer):
try:
async for item in asyncgen:
observer.on_next(item)
observer.on_completed()
except CancelledError:
pass
except Exception as e:
observer.on_error(e)