|
1 | 1 | from asgiref.sync import async_to_sync
|
2 |
| -from channels.generic.websocket import AsyncJsonWebsocketConsumer |
3 | 2 | from graphene_django.settings import graphene_settings
|
4 | 3 | from graphql.execution.executors.asyncio import AsyncioExecutor
|
5 |
| -from django.urls import path |
6 | 4 | from rx import Observer, Observable
|
7 | 5 | from ..base import BaseConnectionContext, BaseSubscriptionServer
|
8 |
| -from ..constants import GQL_CONNECTION_ACK, GQL_CONNECTION_ERROR, WS_PROTOCOL |
| 6 | +from ..constants import GQL_CONNECTION_ACK, GQL_CONNECTION_ERROR |
| 7 | + |
| 8 | + |
| 9 | +class SubscriptionObserver(Observer): |
| 10 | + def __init__( |
| 11 | + self, connection_context, op_id, send_execution_result, send_error, on_close |
| 12 | + ): |
| 13 | + self.connection_context = connection_context |
| 14 | + self.op_id = op_id |
| 15 | + self.send_execution_result = send_execution_result |
| 16 | + self.send_error = send_error |
| 17 | + self.on_close = on_close |
| 18 | + |
| 19 | + def on_next(self, value): |
| 20 | + self.send_execution_result(self.connection_context, self.op_id, value) |
| 21 | + |
| 22 | + def on_completed(self): |
| 23 | + self.on_close(self.connection_context) |
| 24 | + |
| 25 | + def on_error(self, error): |
| 26 | + self.send_error(self.connection_context, self.op_id, error) |
9 | 27 |
|
10 | 28 |
|
11 | 29 | class ChannelsConnectionContext(BaseConnectionContext):
|
@@ -87,43 +105,3 @@ async def on_stop(self, connection_context, op_id):
|
87 | 105 |
|
88 | 106 |
|
89 | 107 | subscription_server = ChannelsSubscriptionServer(schema=graphene_settings.SCHEMA)
|
90 |
| - |
91 |
| - |
92 |
| -class GraphQLSubscriptionConsumer(AsyncJsonWebsocketConsumer): |
93 |
| - async def connect(self): |
94 |
| - self.connection_context = None |
95 |
| - if WS_PROTOCOL in self.scope["subprotocols"]: |
96 |
| - self.connection_context = await subscription_server.handle(self, self.scope) |
97 |
| - await self.accept(subprotocol=WS_PROTOCOL) |
98 |
| - else: |
99 |
| - await self.close() |
100 |
| - |
101 |
| - async def disconnect(self, code): |
102 |
| - if self.connection_context: |
103 |
| - await subscription_server.on_close(self.connection_context) |
104 |
| - |
105 |
| - async def receive_json(self, content): |
106 |
| - await subscription_server.on_message(self.connection_context, content) |
107 |
| - |
108 |
| - |
109 |
| -class SubscriptionObserver(Observer): |
110 |
| - def __init__( |
111 |
| - self, connection_context, op_id, send_execution_result, send_error, on_close |
112 |
| - ): |
113 |
| - self.connection_context = connection_context |
114 |
| - self.op_id = op_id |
115 |
| - self.send_execution_result = send_execution_result |
116 |
| - self.send_error = send_error |
117 |
| - self.on_close = on_close |
118 |
| - |
119 |
| - def on_next(self, value): |
120 |
| - self.send_execution_result(self.connection_context, self.op_id, value) |
121 |
| - |
122 |
| - def on_completed(self): |
123 |
| - self.on_close(self.connection_context) |
124 |
| - |
125 |
| - def on_error(self, error): |
126 |
| - self.send_error(self.connection_context, self.op_id, error) |
127 |
| - |
128 |
| - |
129 |
| -websocket_urlpatterns = [path("subscriptions", GraphQLSubscriptionConsumer)] |
0 commit comments