Skip to content

Commit bcd0b77

Browse files
committed
Execute iterable operations as a separate task
Fixes future operation requests that were being blocked
1 parent 59872a2 commit bcd0b77

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

graphql_ws/django/consumers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ class GraphQLSubscriptionConsumer(AsyncJsonWebsocketConsumer):
77
async def connect(self):
88
self.connection_context = None
99
if WS_PROTOCOL in self.scope["subprotocols"]:
10-
self.connection_context = await subscription_server.handle(self, self.scope)
10+
self.connection_context = await subscription_server.handle(
11+
ws=self, request_context=self.scope)
1112
await self.accept(subprotocol=WS_PROTOCOL)
1213
else:
1314
await self.close()

graphql_ws/django/subscriptions.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from asyncio import ensure_future
12
from inspect import isawaitable
23
from graphene_django.settings import graphene_settings
34
from graphql.execution.executors.asyncio import AsyncioExecutor
@@ -68,16 +69,21 @@ async def on_start(self, connection_context, op_id, params):
6869
await self.send_execution_result(
6970
connection_context, op_id, execution_result
7071
)
71-
else:
72-
iterator = await execution_result.__aiter__()
73-
connection_context.register_operation(op_id, iterator)
74-
async for single_result in iterator:
75-
if not connection_context.has_operation(op_id):
76-
break
77-
await self.send_execution_result(
78-
connection_context, op_id, single_result
79-
)
8072
await self.send_message(connection_context, op_id, GQL_COMPLETE)
73+
return
74+
75+
iterator = await execution_result.__aiter__()
76+
ensure_future(self.run_op(connection_context, op_id, iterator))
77+
78+
async def run_op(self, connection_context, op_id, iterator):
79+
connection_context.register_operation(op_id, iterator)
80+
async for single_result in iterator:
81+
if not connection_context.has_operation(op_id):
82+
break
83+
await self.send_execution_result(
84+
connection_context, op_id, single_result
85+
)
86+
await self.send_message(connection_context, op_id, GQL_COMPLETE)
8187

8288
async def on_close(self, connection_context):
8389
remove_operations = list(connection_context.operations.keys())

0 commit comments

Comments
 (0)