39
39
pQuery = ql2_pb2 .Query .QueryType
40
40
41
41
42
- # @asyncio.coroutine
43
42
async def _read_until (streamreader , delimiter ):
44
43
"""Naive implementation of reading until a delimiter"""
45
44
buffer = bytearray ()
46
45
47
46
while True :
48
- # c = yield from streamreader.read(1)
49
47
c = await streamreader .read (1 )
50
48
if c == b"" :
51
49
break # EOF
@@ -70,13 +68,11 @@ def reusable_waiter(loop, timeout):
70
68
else :
71
69
deadline = None
72
70
73
- # @asyncio.coroutine
74
71
async def wait (future ):
75
72
if deadline is not None :
76
73
new_timeout = max (deadline - loop .time (), 0 )
77
74
else :
78
75
new_timeout = None
79
- # return (yield from asyncio.wait_for(future, new_timeout))
80
76
return (await asyncio .wait_for (future , new_timeout ))
81
77
82
78
return wait
@@ -103,21 +99,17 @@ def __init__(self, *args, **kwargs):
103
99
def __aiter__ (self ):
104
100
return self
105
101
106
- # @asyncio.coroutine
107
102
async def __anext__ (self ):
108
103
try :
109
- # return (yield from self._get_next(None))
110
104
return (await self ._get_next (None ))
111
105
except ReqlCursorEmpty :
112
106
raise StopAsyncIteration
113
107
114
- # @asyncio.coroutine
115
108
async def close (self ):
116
109
if self .error is None :
117
110
self .error = self ._empty_error ()
118
111
if self .conn .is_open ():
119
112
self .outstanding_requests += 1
120
- # yield from self.conn._parent._stop(self)
121
113
await self .conn ._parent ._stop (self )
122
114
123
115
def _extend (self , res_buf ):
@@ -127,7 +119,6 @@ def _extend(self, res_buf):
127
119
128
120
# Convenience function so users know when they've hit the end of the cursor
129
121
# without having to catch an exception
130
- # @asyncio.coroutine
131
122
async def fetch_next (self , wait = True ):
132
123
timeout = Cursor ._wait_to_timeout (wait )
133
124
waiter = reusable_waiter (self .conn ._io_loop , timeout )
@@ -136,7 +127,6 @@ async def fetch_next(self, wait=True):
136
127
if self .error is not None :
137
128
raise self .error
138
129
with translate_timeout_errors ():
139
- # yield from waiter(asyncio.shield(self.new_response))
140
130
await waiter (asyncio .shield (self .new_response ))
141
131
# If there is a (non-empty) error to be received, we return True, so the
142
132
# user will receive it on the next `next` call.
@@ -147,15 +137,13 @@ def _empty_error(self):
147
137
# with mechanisms to return from a coroutine.
148
138
return RqlCursorEmpty ()
149
139
150
- # @asyncio.coroutine
151
140
async def _get_next (self , timeout ):
152
141
waiter = reusable_waiter (self .conn ._io_loop , timeout )
153
142
while len (self .items ) == 0 :
154
143
self ._maybe_fetch_batch ()
155
144
if self .error is not None :
156
145
raise self .error
157
146
with translate_timeout_errors ():
158
- # yield from waiter(asyncio.shield(self.new_response))
159
147
await waiter (asyncio .shield (self .new_response ))
160
148
return self .items .popleft ()
161
149
@@ -192,7 +180,6 @@ def client_address(self):
192
180
if self .is_open ():
193
181
return self ._streamwriter .get_extra_info ("sockname" )[0 ]
194
182
195
- # @asyncio.coroutine
196
183
async def connect (self , timeout ):
197
184
try :
198
185
ssl_context = None
@@ -205,7 +192,6 @@ async def connect(self, timeout):
205
192
ssl_context .check_hostname = True # redundant with match_hostname
206
193
ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
207
194
208
- # self._streamreader, self._streamwriter = yield from asyncio.open_connection(
209
195
self ._streamreader , self ._streamwriter = await asyncio .open_connection (
210
196
self ._parent .host ,
211
197
self ._parent .port ,
@@ -236,25 +222,21 @@ async def connect(self, timeout):
236
222
if request != "" :
237
223
self ._streamwriter .write (request )
238
224
239
- # response = yield from asyncio.wait_for(
240
225
response = await asyncio .wait_for (
241
226
_read_until (self ._streamreader , b"\0 " ),
242
227
timeout ,
243
228
)
244
229
response = response [:- 1 ]
245
230
except ReqlAuthError :
246
- # yield from self.close()
247
231
await self .close ()
248
232
raise
249
233
except ReqlTimeoutError as err :
250
- # yield from self.close()
251
234
await self .close ()
252
235
raise ReqlDriverError (
253
236
"Connection interrupted during handshake with %s:%s. Error: %s"
254
237
% (self ._parent .host , self ._parent .port , str (err ))
255
238
)
256
239
except Exception as err :
257
- # yield from self.close()
258
240
await self .close ()
259
241
raise ReqlDriverError (
260
242
"Could not connect to %s:%s. Error: %s"
@@ -269,7 +251,6 @@ async def connect(self, timeout):
269
251
def is_open (self ):
270
252
return not (self ._closing or self ._streamreader .at_eof ())
271
253
272
- # @asyncio.coroutine
273
254
async def close (self , noreply_wait = False , token = None , exception = None ):
274
255
self ._closing = True
275
256
if exception is not None :
@@ -290,42 +271,35 @@ async def close(self, noreply_wait=False, token=None, exception=None):
290
271
291
272
if noreply_wait :
292
273
noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
293
- # yield from self.run_query(noreply, False)
294
274
await self .run_query (noreply , False )
295
275
296
276
self ._streamwriter .close ()
297
277
# We must not wait for the _reader_task if we got an exception, because that
298
278
# means that we were called from it. Waiting would lead to a deadlock.
299
279
if self ._reader_task and exception is None :
300
- # yield from self._reader_task
301
280
await self ._reader_task
302
281
303
282
return None
304
283
305
- # @asyncio.coroutine
306
284
async def run_query (self , query , noreply ):
307
285
self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
308
286
if noreply :
309
287
return None
310
288
311
289
response_future = asyncio .Future ()
312
290
self ._user_queries [query .token ] = (query , response_future )
313
- # return (yield from response_future)
314
291
return (await response_future )
315
292
316
293
# The _reader coroutine runs in parallel, reading responses
317
294
# off of the socket and forwarding them to the appropriate Future or Cursor.
318
295
# This is shut down as a consequence of closing the stream, or an error in the
319
296
# socket/protocol from the server. Unexpected errors in this coroutine will
320
297
# close the ConnectionInstance and be passed to any open Futures or Cursors.
321
- # @asyncio.coroutine
322
298
async def _reader (self ):
323
299
try :
324
300
while True :
325
- # buf = yield from self._streamreader.readexactly(12)
326
301
buf = await self ._streamreader .readexactly (12 )
327
302
(token , length ,) = struct .unpack ("<qL" , buf )
328
- # buf = yield from self._streamreader.readexactly(length)
329
303
buf = await self ._streamreader .readexactly (length )
330
304
331
305
cursor = self ._cursor_cache .get (token )
@@ -355,7 +329,6 @@ async def _reader(self):
355
329
raise ReqlDriverError ("Unexpected response received." )
356
330
except Exception as ex :
357
331
if not self ._closing :
358
- # yield from self.close(exception=ex)
359
332
await self .close (exception = ex )
360
333
361
334
@@ -369,35 +342,25 @@ def __init__(self, *args, **kwargs):
369
342
"Could not convert port %s to an integer." % self .port
370
343
)
371
344
372
- # @asyncio.coroutine
373
345
async def __aenter__ (self ):
374
346
return self
375
347
376
- # @asyncio.coroutine
377
348
async def __aexit__ (self , exception_type , exception_val , traceback ):
378
- # yield from self.close(False)
379
349
await self .close (False )
380
350
381
- # @asyncio.coroutine
382
351
async def _stop (self , cursor ):
383
352
self .check_open ()
384
353
q = Query (pQuery .STOP , cursor .query .token , None , None )
385
- # return (yield from self._instance.run_query(q, True))
386
354
return (await self ._instance .run_query (q , True ))
387
355
388
- # @asyncio.coroutine
389
356
async def reconnect (self , noreply_wait = True , timeout = None ):
390
357
# We close before reconnect so reconnect doesn't try to close us
391
358
# and then fail to return the Future (this is a little awkward).
392
- # yield from self.close(noreply_wait)
393
359
await self .close (noreply_wait )
394
360
self ._instance = self ._conn_type (self , ** self ._child_kwargs )
395
- # return (yield from self._instance.connect(timeout))
396
361
return (await self ._instance .connect (timeout ))
397
362
398
- # @asyncio.coroutine
399
363
async def close (self , noreply_wait = True ):
400
364
if self ._instance is None :
401
365
return None
402
- # return (yield from ConnectionBase.close(self, noreply_wait=noreply_wait))
403
366
return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments