Skip to content

Commit 7d5570a

Browse files
authored
Merge pull request #195 from graphql-python/features/decider-backend
Improved Decider threaded backend
2 parents fa4eeda + 8e210f9 commit 7d5570a

File tree

6 files changed

+529
-61
lines changed

6 files changed

+529
-61
lines changed

graphql/backend/cache.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ def get_unique_document_id(query_str):
3939

4040

4141
class GraphQLCachedBackend(GraphQLBackend):
42+
"""GraphQLCachedBackend will cache the document response from the backend
43+
given a key for that document"""
44+
4245
def __init__(
4346
self,
4447
backend, # type: GraphQLBackend

graphql/backend/core.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ def execute_and_validate(
3535

3636

3737
class GraphQLCoreBackend(GraphQLBackend):
38+
"""GraphQLCoreBackend will return a document using the default
39+
graphql executor"""
40+
3841
def __init__(self, executor=None):
3942
# type: (Optional[Any]) -> None
4043
self.execute_params = {"executor": executor}

graphql/backend/decider.py

Lines changed: 197 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,209 @@
1+
import atexit
2+
import logging
3+
import threading
4+
import os
5+
from time import sleep, time
6+
7+
18
from .base import GraphQLBackend, GraphQLDocument
9+
from .cache import GraphQLCachedBackend
10+
from ..pyutils.compat import Queue, check_threads
211

312
# Necessary for static type checking
413
if False: # flake8: noqa
5-
from typing import List, Union, Any, Optional
14+
from typing import List, Union, Any, Optional, Hashable, Dict, Tuple
615
from ..type.schema import GraphQLSchema
716

817

9-
class GraphQLDeciderBackend(GraphQLBackend):
10-
def __init__(self, backends):
11-
# type: (List[GraphQLBackend], ) -> None
12-
if not backends:
18+
DEFAULT_TIMEOUT = 10
19+
20+
logger = logging.getLogger("graphql.errors")
21+
22+
23+
# Code shamelessly taken from
24+
# https://github.com/getsentry/raven-python/blob/master/raven/transport/threaded.py
25+
# Why to create when we can take something that works?
26+
# Attributions to the Sentry team :)
27+
class AsyncWorker(object):
28+
_terminator = object()
29+
30+
def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT):
31+
check_threads()
32+
self._queue = Queue(-1)
33+
self._lock = threading.Lock()
34+
self._thread = None
35+
self._thread_for_pid = None
36+
self.options = {"shutdown_timeout": shutdown_timeout}
37+
self.start()
38+
39+
def is_alive(self):
40+
if self._thread_for_pid != os.getpid():
41+
return False
42+
return self._thread and self._thread.is_alive()
43+
44+
def _ensure_thread(self):
45+
if self.is_alive():
46+
return
47+
self.start()
48+
49+
def main_thread_terminated(self):
50+
with self._lock:
51+
if not self.is_alive():
52+
# thread not started or already stopped - nothing to do
53+
return
54+
55+
# wake the processing thread up
56+
self._queue.put_nowait(self._terminator)
57+
58+
timeout = self.options["shutdown_timeout"]
59+
60+
# wait briefly, initially
61+
initial_timeout = min(0.1, timeout)
62+
63+
if not self._timed_queue_join(initial_timeout):
64+
# if that didn't work, wait a bit longer
65+
# NB that size is an approximation, because other threads may
66+
# add or remove items
67+
size = self._queue.qsize()
68+
69+
print("Sentry is attempting to send %i pending error messages" % size)
70+
print("Waiting up to %s seconds" % timeout)
71+
72+
if os.name == "nt":
73+
print("Press Ctrl-Break to quit")
74+
else:
75+
print("Press Ctrl-C to quit")
76+
77+
self._timed_queue_join(timeout - initial_timeout)
78+
79+
self._thread = None
80+
81+
def _timed_queue_join(self, timeout):
82+
"""
83+
implementation of Queue.join which takes a 'timeout' argument
84+
returns true on success, false on timeout
85+
"""
86+
deadline = time() + timeout
87+
queue = self._queue
88+
89+
queue.all_tasks_done.acquire()
90+
try:
91+
while queue.unfinished_tasks:
92+
delay = deadline - time()
93+
if delay <= 0:
94+
# timed out
95+
return False
96+
97+
queue.all_tasks_done.wait(timeout=delay)
98+
99+
return True
100+
101+
finally:
102+
queue.all_tasks_done.release()
103+
104+
def start(self):
105+
"""
106+
Starts the task thread.
107+
"""
108+
self._lock.acquire()
109+
try:
110+
if not self.is_alive():
111+
self._thread = threading.Thread(
112+
target=self._target, name="graphql.AsyncWorker"
113+
)
114+
self._thread.setDaemon(True)
115+
self._thread.start()
116+
self._thread_for_pid = os.getpid()
117+
finally:
118+
self._lock.release()
119+
atexit.register(self.main_thread_terminated)
120+
121+
def stop(self, timeout=None):
122+
"""
123+
Stops the task thread. Synchronous!
124+
"""
125+
with self._lock:
126+
if self._thread:
127+
self._queue.put_nowait(self._terminator)
128+
self._thread.join(timeout=timeout)
129+
self._thread = None
130+
self._thread_for_pid = None
131+
132+
def queue(self, callback, *args, **kwargs):
133+
self._ensure_thread()
134+
self._queue.put_nowait((callback, args, kwargs))
135+
136+
def _target(self):
137+
while True:
138+
record = self._queue.get()
139+
try:
140+
if record is self._terminator:
141+
break
142+
callback, args, kwargs = record
143+
try:
144+
callback(*args, **kwargs)
145+
except Exception:
146+
logger.error("Failed processing job", exc_info=True)
147+
finally:
148+
self._queue.task_done()
149+
150+
sleep(0)
151+
152+
153+
class GraphQLDeciderBackend(GraphQLCachedBackend):
154+
"""GraphQLDeciderBackend will offload the document generation to the
155+
main backend in a new thread, serving meanwhile the document from the fallback
156+
backend"""
157+
158+
_worker = None
159+
fallback_backend = None # type: GraphQLBackend
160+
# _in_queue = object()
161+
162+
def __init__(
163+
self,
164+
backend, # type: Union[List[GraphQLBackend], Tuple[GraphQLBackend, GraphQLBackend], GraphQLBackend]
165+
fallback_backend=None, # type: Optional[GraphQLBackend]
166+
cache_map=None, # type: Optional[Dict[Hashable, GraphQLDocument]]
167+
use_consistent_hash=False, # type: bool
168+
):
169+
# type: (...) -> None
170+
if not backend:
13171
raise Exception("Need to provide backends to decide into.")
14-
if not isinstance(backends, (list, tuple)):
15-
raise Exception("Provided backends need to be a list or tuple.")
16-
self.backends = backends
17-
super(GraphQLDeciderBackend, self).__init__()
172+
if isinstance(backend, (list, tuple)):
173+
if fallback_backend:
174+
raise Exception("Can't set a fallback backend and backends as array")
175+
if len(backend) != 2:
176+
raise Exception("Only two backends are supported for now")
177+
backend, fallback_backend = backend[0], backend[1] # type: ignore
178+
else:
179+
if not fallback_backend:
180+
raise Exception("Need to provide a fallback backend")
181+
182+
self.fallback_backend = fallback_backend # type: ignore
183+
super(GraphQLDeciderBackend, self).__init__(
184+
backend, cache_map=cache_map, use_consistent_hash=use_consistent_hash
185+
)
186+
187+
def queue_backend(self, key, schema, request_string):
188+
# type: (Hashable, GraphQLSchema, str) -> None
189+
self.cache_map[key] = self.backend.document_from_string(schema, request_string)
190+
191+
def get_worker(self):
192+
# type: () -> AsyncWorker
193+
if self._worker is None or not self._worker.is_alive():
194+
self._worker = AsyncWorker()
195+
return self._worker
18196

19197
def document_from_string(self, schema, request_string):
20198
# type: (GraphQLSchema, str) -> GraphQLDocument
21-
for backend in self.backends:
22-
try:
23-
return backend.document_from_string(schema, request_string)
24-
except Exception:
25-
continue
26-
27-
raise Exception(
28-
"GraphQLDeciderBackend was not able to retrieve a document. Backends tried: {}".format(
29-
repr(self.backends)
199+
"""This method returns a GraphQLQuery (from cache if present)"""
200+
key = self.get_key_for_schema_and_document_string(schema, request_string)
201+
if key not in self.cache_map:
202+
# We return from the fallback
203+
self.cache_map[key] = self.fallback_backend.document_from_string(
204+
schema, request_string
30205
)
31-
)
206+
# We ensure the main backend response is in the queue
207+
self.get_worker().queue(self.queue_backend, key, schema, request_string)
208+
209+
return self.cache_map[key]

graphql/backend/tests/test_core.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
from ..base import GraphQLBackend, GraphQLDocument
99
from ..core import GraphQLCoreBackend
1010
from .schema import schema
11-
from typing import Any
11+
12+
if False:
13+
from typing import Any
1214

1315

1416
def test_core_backend():

0 commit comments

Comments
 (0)