39
39
pQuery = ql2_pb2 .Query .QueryType
40
40
41
41
42
- @asyncio .coroutine
43
- def _read_until (streamreader , delimiter ):
42
+ # @asyncio.coroutine
43
+ async def _read_until (streamreader , delimiter ):
44
44
"""Naive implementation of reading until a delimiter"""
45
45
buffer = bytearray ()
46
46
47
47
while True :
48
- c = yield from streamreader .read (1 )
48
+ # c = yield from streamreader.read(1)
49
+ c = await streamreader .read (1 )
49
50
if c == b"" :
50
51
break # EOF
51
52
buffer .append (c [0 ])
@@ -69,13 +70,14 @@ def reusable_waiter(loop, timeout):
69
70
else :
70
71
deadline = None
71
72
72
- @asyncio .coroutine
73
- def wait (future ):
73
+ # @asyncio.coroutine
74
+ async def wait (future ):
74
75
if deadline is not None :
75
76
new_timeout = max (deadline - loop .time (), 0 )
76
77
else :
77
78
new_timeout = None
78
- return (yield from asyncio .wait_for (future , new_timeout ))
79
+ # return (yield from asyncio.wait_for(future, new_timeout))
80
+ return (await asyncio .wait_for (future , new_timeout ))
79
81
80
82
return wait
81
83
@@ -101,20 +103,22 @@ def __init__(self, *args, **kwargs):
101
103
def __aiter__ (self ):
102
104
return self
103
105
104
- @asyncio .coroutine
105
- def __anext__ (self ):
106
+ # @asyncio.coroutine
107
+ async def __anext__ (self ):
106
108
try :
107
- return (yield from self ._get_next (None ))
109
+ # return (yield from self._get_next(None))
110
+ return (await self ._get_next (None ))
108
111
except ReqlCursorEmpty :
109
112
raise StopAsyncIteration
110
113
111
- @asyncio .coroutine
112
- def close (self ):
114
+ # @asyncio.coroutine
115
+ async def close (self ):
113
116
if self .error is None :
114
117
self .error = self ._empty_error ()
115
118
if self .conn .is_open ():
116
119
self .outstanding_requests += 1
117
- yield from self .conn ._parent ._stop (self )
120
+ # yield from self.conn._parent._stop(self)
121
+ await self .conn ._parent ._stop (self )
118
122
119
123
def _extend (self , res_buf ):
120
124
Cursor ._extend (self , res_buf )
@@ -123,16 +127,17 @@ def _extend(self, res_buf):
123
127
124
128
# Convenience function so users know when they've hit the end of the cursor
125
129
# without having to catch an exception
126
- @asyncio .coroutine
127
- def fetch_next (self , wait = True ):
130
+ # @asyncio.coroutine
131
+ async def fetch_next (self , wait = True ):
128
132
timeout = Cursor ._wait_to_timeout (wait )
129
133
waiter = reusable_waiter (self .conn ._io_loop , timeout )
130
134
while len (self .items ) == 0 and self .error is None :
131
135
self ._maybe_fetch_batch ()
132
136
if self .error is not None :
133
137
raise self .error
134
138
with translate_timeout_errors ():
135
- yield from waiter (asyncio .shield (self .new_response ))
139
+ # yield from waiter(asyncio.shield(self.new_response))
140
+ await waiter (asyncio .shield (self .new_response ))
136
141
# If there is a (non-empty) error to be received, we return True, so the
137
142
# user will receive it on the next `next` call.
138
143
return len (self .items ) != 0 or not isinstance (self .error , RqlCursorEmpty )
@@ -142,15 +147,16 @@ def _empty_error(self):
142
147
# with mechanisms to return from a coroutine.
143
148
return RqlCursorEmpty ()
144
149
145
- @asyncio .coroutine
146
- def _get_next (self , timeout ):
150
+ # @asyncio.coroutine
151
+ async def _get_next (self , timeout ):
147
152
waiter = reusable_waiter (self .conn ._io_loop , timeout )
148
153
while len (self .items ) == 0 :
149
154
self ._maybe_fetch_batch ()
150
155
if self .error is not None :
151
156
raise self .error
152
157
with translate_timeout_errors ():
153
- yield from waiter (asyncio .shield (self .new_response ))
158
+ # yield from waiter(asyncio.shield(self.new_response))
159
+ await waiter (asyncio .shield (self .new_response ))
154
160
return self .items .popleft ()
155
161
156
162
def _maybe_fetch_batch (self ):
@@ -187,8 +193,8 @@ def client_address(self):
187
193
if self .is_open ():
188
194
return self ._streamwriter .get_extra_info ("sockname" )[0 ]
189
195
190
- @asyncio .coroutine
191
- def connect (self , timeout ):
196
+ # @asyncio.coroutine
197
+ async def connect (self , timeout ):
192
198
try :
193
199
ssl_context = None
194
200
if len (self ._parent .ssl ) > 0 :
@@ -200,7 +206,8 @@ def connect(self, timeout):
200
206
ssl_context .check_hostname = True # redundant with match_hostname
201
207
ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
202
208
203
- self ._streamreader , self ._streamwriter = yield from asyncio .open_connection (
209
+ # self._streamreader, self._streamwriter = yield from asyncio.open_connection(
210
+ self ._streamreader , self ._streamwriter = await asyncio .open_connection (
204
211
self ._parent .host ,
205
212
self ._parent .port ,
206
213
ssl = ssl_context ,
@@ -230,22 +237,26 @@ def connect(self, timeout):
230
237
if request is not "" :
231
238
self ._streamwriter .write (request )
232
239
233
- response = yield from asyncio .wait_for (
240
+ # response = yield from asyncio.wait_for(
241
+ response = await asyncio .wait_for (
234
242
_read_until (self ._streamreader , b"\0 " ),
235
243
timeout ,
236
244
)
237
245
response = response [:- 1 ]
238
246
except ReqlAuthError :
239
- yield from self .close ()
247
+ # yield from self.close()
248
+ await self .close ()
240
249
raise
241
250
except ReqlTimeoutError as err :
242
- yield from self .close ()
251
+ # yield from self.close()
252
+ await self .close ()
243
253
raise ReqlDriverError (
244
254
"Connection interrupted during handshake with %s:%s. Error: %s"
245
255
% (self ._parent .host , self ._parent .port , str (err ))
246
256
)
247
257
except Exception as err :
248
- yield from self .close ()
258
+ # yield from self.close()
259
+ await self .close ()
249
260
raise ReqlDriverError (
250
261
"Could not connect to %s:%s. Error: %s"
251
262
% (self ._parent .host , self ._parent .port , str (err ))
@@ -259,8 +270,8 @@ def connect(self, timeout):
259
270
def is_open (self ):
260
271
return not (self ._closing or self ._streamreader .at_eof ())
261
272
262
- @asyncio .coroutine
263
- def close (self , noreply_wait = False , token = None , exception = None ):
273
+ # @asyncio.coroutine
274
+ async def close (self , noreply_wait = False , token = None , exception = None ):
264
275
self ._closing = True
265
276
if exception is not None :
266
277
err_message = "Connection is closed (%s)." % str (exception )
@@ -280,38 +291,43 @@ def close(self, noreply_wait=False, token=None, exception=None):
280
291
281
292
if noreply_wait :
282
293
noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
283
- yield from self .run_query (noreply , False )
294
+ # yield from self.run_query(noreply, False)
295
+ await self .run_query (noreply , False )
284
296
285
297
self ._streamwriter .close ()
286
298
# We must not wait for the _reader_task if we got an exception, because that
287
299
# means that we were called from it. Waiting would lead to a deadlock.
288
300
if self ._reader_task and exception is None :
289
- yield from self ._reader_task
301
+ # yield from self._reader_task
302
+ await self ._reader_task
290
303
291
304
return None
292
305
293
- @asyncio .coroutine
294
- def run_query (self , query , noreply ):
306
+ # @asyncio.coroutine
307
+ async def run_query (self , query , noreply ):
295
308
self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
296
309
if noreply :
297
310
return None
298
311
299
312
response_future = asyncio .Future ()
300
313
self ._user_queries [query .token ] = (query , response_future )
301
- return (yield from response_future )
314
+ # return (yield from response_future)
315
+ return (await response_future )
302
316
303
317
# The _reader coroutine runs in parallel, reading responses
304
318
# off of the socket and forwarding them to the appropriate Future or Cursor.
305
319
# This is shut down as a consequence of closing the stream, or an error in the
306
320
# socket/protocol from the server. Unexpected errors in this coroutine will
307
321
# close the ConnectionInstance and be passed to any open Futures or Cursors.
308
- @asyncio .coroutine
309
- def _reader (self ):
322
+ # @asyncio.coroutine
323
+ async def _reader (self ):
310
324
try :
311
325
while True :
312
- buf = yield from self ._streamreader .readexactly (12 )
326
+ # buf = yield from self._streamreader.readexactly(12)
327
+ buf = await self ._streamreader .readexactly (12 )
313
328
(token , length ,) = struct .unpack ("<qL" , buf )
314
- buf = yield from self ._streamreader .readexactly (length )
329
+ # buf = yield from self._streamreader.readexactly(length)
330
+ buf = await self ._streamreader .readexactly (length )
315
331
316
332
cursor = self ._cursor_cache .get (token )
317
333
if cursor is not None :
@@ -340,7 +356,8 @@ def _reader(self):
340
356
raise ReqlDriverError ("Unexpected response received." )
341
357
except Exception as ex :
342
358
if not self ._closing :
343
- yield from self .close (exception = ex )
359
+ # yield from self.close(exception=ex)
360
+ await self .close (exception = ex )
344
361
345
362
346
363
class Connection (ConnectionBase ):
@@ -353,30 +370,35 @@ def __init__(self, *args, **kwargs):
353
370
"Could not convert port %s to an integer." % self .port
354
371
)
355
372
356
- @asyncio .coroutine
357
- def __aenter__ (self ):
373
+ # @asyncio.coroutine
374
+ async def __aenter__ (self ):
358
375
return self
359
376
360
- @asyncio .coroutine
361
- def __aexit__ (self , exception_type , exception_val , traceback ):
362
- yield from self .close (False )
377
+ # @asyncio.coroutine
378
+ async def __aexit__ (self , exception_type , exception_val , traceback ):
379
+ # yield from self.close(False)
380
+ await self .close (False )
363
381
364
- @asyncio .coroutine
365
- def _stop (self , cursor ):
382
+ # @asyncio.coroutine
383
+ async def _stop (self , cursor ):
366
384
self .check_open ()
367
385
q = Query (pQuery .STOP , cursor .query .token , None , None )
368
- return (yield from self ._instance .run_query (q , True ))
386
+ # return (yield from self._instance.run_query(q, True))
387
+ return (await self ._instance .run_query (q , True ))
369
388
370
- @asyncio .coroutine
371
- def reconnect (self , noreply_wait = True , timeout = None ):
389
+ # @asyncio.coroutine
390
+ async def reconnect (self , noreply_wait = True , timeout = None ):
372
391
# We close before reconnect so reconnect doesn't try to close us
373
392
# and then fail to return the Future (this is a little awkward).
374
- yield from self .close (noreply_wait )
393
+ # yield from self.close(noreply_wait)
394
+ await self .close (noreply_wait )
375
395
self ._instance = self ._conn_type (self , ** self ._child_kwargs )
376
- return (yield from self ._instance .connect (timeout ))
396
+ # return (yield from self._instance.connect(timeout))
397
+ return (await self ._instance .connect (timeout ))
377
398
378
- @asyncio .coroutine
379
- def close (self , noreply_wait = True ):
399
+ # @asyncio.coroutine
400
+ async def close (self , noreply_wait = True ):
380
401
if self ._instance is None :
381
402
return None
382
- return (yield from ConnectionBase .close (self , noreply_wait = noreply_wait ))
403
+ # return (yield from ConnectionBase.close(self, noreply_wait=noreply_wait))
404
+ return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments