39
39
pQuery = ql2_pb2 .Query .QueryType
40
40
41
41
42
- @asyncio .coroutine
43
- def _read_until (streamreader , delimiter ):
42
+ # @asyncio.coroutine
43
+ async def _read_until (streamreader , delimiter ):
44
44
"""Naive implementation of reading until a delimiter"""
45
45
buffer = bytearray ()
46
46
47
47
while True :
48
- c = yield from streamreader .read (1 )
48
+ # c = yield from streamreader.read(1)
49
+ c = await streamreader .read (1 )
49
50
if c == b"" :
50
51
break # EOF
51
52
buffer .append (c [0 ])
@@ -69,13 +70,14 @@ def reusable_waiter(loop, timeout):
69
70
else :
70
71
deadline = None
71
72
72
- @asyncio .coroutine
73
- def wait (future ):
73
+ # @asyncio.coroutine
74
+ async def wait (future ):
74
75
if deadline is not None :
75
76
new_timeout = max (deadline - loop .time (), 0 )
76
77
else :
77
78
new_timeout = None
78
- return (yield from asyncio .wait_for (future , new_timeout ))
79
+ # return (yield from asyncio.wait_for(future, new_timeout))
80
+ return (await asyncio .wait_for (future , new_timeout ))
79
81
80
82
return wait
81
83
@@ -101,20 +103,22 @@ def __init__(self, *args, **kwargs):
101
103
def __aiter__ (self ):
102
104
return self
103
105
104
- @asyncio .coroutine
105
- def __anext__ (self ):
106
+ # @asyncio.coroutine
107
+ async def __anext__ (self ):
106
108
try :
107
- return (yield from self ._get_next (None ))
109
+ # return (yield from self._get_next(None))
110
+ return (await self ._get_next (None ))
108
111
except ReqlCursorEmpty :
109
112
raise StopAsyncIteration
110
113
111
- @asyncio .coroutine
112
- def close (self ):
114
+ # @asyncio.coroutine
115
+ async def close (self ):
113
116
if self .error is None :
114
117
self .error = self ._empty_error ()
115
118
if self .conn .is_open ():
116
119
self .outstanding_requests += 1
117
- yield from self .conn ._parent ._stop (self )
120
+ # yield from self.conn._parent._stop(self)
121
+ await self .conn ._parent ._stop (self )
118
122
119
123
def _extend (self , res_buf ):
120
124
Cursor ._extend (self , res_buf )
@@ -123,16 +127,17 @@ def _extend(self, res_buf):
123
127
124
128
# Convenience function so users know when they've hit the end of the cursor
125
129
# without having to catch an exception
126
- @asyncio .coroutine
127
- def fetch_next (self , wait = True ):
130
+ # @asyncio.coroutine
131
+ async def fetch_next (self , wait = True ):
128
132
timeout = Cursor ._wait_to_timeout (wait )
129
133
waiter = reusable_waiter (self .conn ._io_loop , timeout )
130
134
while len (self .items ) == 0 and self .error is None :
131
135
self ._maybe_fetch_batch ()
132
136
if self .error is not None :
133
137
raise self .error
134
138
with translate_timeout_errors ():
135
- yield from waiter (asyncio .shield (self .new_response ))
139
+ # yield from waiter(asyncio.shield(self.new_response))
140
+ await waiter (asyncio .shield (self .new_response ))
136
141
# If there is a (non-empty) error to be received, we return True, so the
137
142
# user will receive it on the next `next` call.
138
143
return len (self .items ) != 0 or not isinstance (self .error , RqlCursorEmpty )
@@ -142,15 +147,16 @@ def _empty_error(self):
142
147
# with mechanisms to return from a coroutine.
143
148
return RqlCursorEmpty ()
144
149
145
- @asyncio .coroutine
146
- def _get_next (self , timeout ):
150
+ # @asyncio.coroutine
151
+ async def _get_next (self , timeout ):
147
152
waiter = reusable_waiter (self .conn ._io_loop , timeout )
148
153
while len (self .items ) == 0 :
149
154
self ._maybe_fetch_batch ()
150
155
if self .error is not None :
151
156
raise self .error
152
157
with translate_timeout_errors ():
153
- yield from waiter (asyncio .shield (self .new_response ))
158
+ # yield from waiter(asyncio.shield(self.new_response))
159
+ await waiter (asyncio .shield (self .new_response ))
154
160
return self .items .popleft ()
155
161
156
162
def _maybe_fetch_batch (self ):
@@ -186,8 +192,8 @@ def client_address(self):
186
192
if self .is_open ():
187
193
return self ._streamwriter .get_extra_info ("sockname" )[0 ]
188
194
189
- @asyncio .coroutine
190
- def connect (self , timeout ):
195
+ # @asyncio.coroutine
196
+ async def connect (self , timeout ):
191
197
try :
192
198
ssl_context = None
193
199
if len (self ._parent .ssl ) > 0 :
@@ -199,7 +205,8 @@ def connect(self, timeout):
199
205
ssl_context .check_hostname = True # redundant with match_hostname
200
206
ssl_context .load_verify_locations (self ._parent .ssl ["ca_certs" ])
201
207
202
- self ._streamreader , self ._streamwriter = yield from asyncio .open_connection (
208
+ # self._streamreader, self._streamwriter = yield from asyncio.open_connection(
209
+ self ._streamreader , self ._streamwriter = await asyncio .open_connection (
203
210
self ._parent .host ,
204
211
self ._parent .port ,
205
212
ssl = ssl_context ,
@@ -229,22 +236,26 @@ def connect(self, timeout):
229
236
if request != "" :
230
237
self ._streamwriter .write (request )
231
238
232
- response = yield from asyncio .wait_for (
239
+ # response = yield from asyncio.wait_for(
240
+ response = await asyncio .wait_for (
233
241
_read_until (self ._streamreader , b"\0 " ),
234
242
timeout ,
235
243
)
236
244
response = response [:- 1 ]
237
245
except ReqlAuthError :
238
- yield from self .close ()
246
+ # yield from self.close()
247
+ await self .close ()
239
248
raise
240
249
except ReqlTimeoutError as err :
241
- yield from self .close ()
250
+ # yield from self.close()
251
+ await self .close ()
242
252
raise ReqlDriverError (
243
253
"Connection interrupted during handshake with %s:%s. Error: %s"
244
254
% (self ._parent .host , self ._parent .port , str (err ))
245
255
)
246
256
except Exception as err :
247
- yield from self .close ()
257
+ # yield from self.close()
258
+ await self .close ()
248
259
raise ReqlDriverError (
249
260
"Could not connect to %s:%s. Error: %s"
250
261
% (self ._parent .host , self ._parent .port , str (err ))
@@ -258,8 +269,8 @@ def connect(self, timeout):
258
269
def is_open (self ):
259
270
return not (self ._closing or self ._streamreader .at_eof ())
260
271
261
- @asyncio .coroutine
262
- def close (self , noreply_wait = False , token = None , exception = None ):
272
+ # @asyncio.coroutine
273
+ async def close (self , noreply_wait = False , token = None , exception = None ):
263
274
self ._closing = True
264
275
if exception is not None :
265
276
err_message = "Connection is closed (%s)." % str (exception )
@@ -279,38 +290,43 @@ def close(self, noreply_wait=False, token=None, exception=None):
279
290
280
291
if noreply_wait :
281
292
noreply = Query (pQuery .NOREPLY_WAIT , token , None , None )
282
- yield from self .run_query (noreply , False )
293
+ # yield from self.run_query(noreply, False)
294
+ await self .run_query (noreply , False )
283
295
284
296
self ._streamwriter .close ()
285
297
# We must not wait for the _reader_task if we got an exception, because that
286
298
# means that we were called from it. Waiting would lead to a deadlock.
287
299
if self ._reader_task and exception is None :
288
- yield from self ._reader_task
300
+ # yield from self._reader_task
301
+ await self ._reader_task
289
302
290
303
return None
291
304
292
- @asyncio .coroutine
293
- def run_query (self , query , noreply ):
305
+ # @asyncio.coroutine
306
+ async def run_query (self , query , noreply ):
294
307
self ._streamwriter .write (query .serialize (self ._parent ._get_json_encoder (query )))
295
308
if noreply :
296
309
return None
297
310
298
311
response_future = asyncio .Future ()
299
312
self ._user_queries [query .token ] = (query , response_future )
300
- return (yield from response_future )
313
+ # return (yield from response_future)
314
+ return (await response_future )
301
315
302
316
# The _reader coroutine runs in parallel, reading responses
303
317
# off of the socket and forwarding them to the appropriate Future or Cursor.
304
318
# This is shut down as a consequence of closing the stream, or an error in the
305
319
# socket/protocol from the server. Unexpected errors in this coroutine will
306
320
# close the ConnectionInstance and be passed to any open Futures or Cursors.
307
- @asyncio .coroutine
308
- def _reader (self ):
321
+ # @asyncio.coroutine
322
+ async def _reader (self ):
309
323
try :
310
324
while True :
311
- buf = yield from self ._streamreader .readexactly (12 )
325
+ # buf = yield from self._streamreader.readexactly(12)
326
+ buf = await self ._streamreader .readexactly (12 )
312
327
(token , length ,) = struct .unpack ("<qL" , buf )
313
- buf = yield from self ._streamreader .readexactly (length )
328
+ # buf = yield from self._streamreader.readexactly(length)
329
+ buf = await self ._streamreader .readexactly (length )
314
330
315
331
cursor = self ._cursor_cache .get (token )
316
332
if cursor is not None :
@@ -339,7 +355,8 @@ def _reader(self):
339
355
raise ReqlDriverError ("Unexpected response received." )
340
356
except Exception as ex :
341
357
if not self ._closing :
342
- yield from self .close (exception = ex )
358
+ # yield from self.close(exception=ex)
359
+ await self .close (exception = ex )
343
360
344
361
345
362
class Connection (ConnectionBase ):
@@ -352,30 +369,35 @@ def __init__(self, *args, **kwargs):
352
369
"Could not convert port %s to an integer." % self .port
353
370
)
354
371
355
- @asyncio .coroutine
356
- def __aenter__ (self ):
372
+ # @asyncio.coroutine
373
+ async def __aenter__ (self ):
357
374
return self
358
375
359
- @asyncio .coroutine
360
- def __aexit__ (self , exception_type , exception_val , traceback ):
361
- yield from self .close (False )
376
+ # @asyncio.coroutine
377
+ async def __aexit__ (self , exception_type , exception_val , traceback ):
378
+ # yield from self.close(False)
379
+ await self .close (False )
362
380
363
- @asyncio .coroutine
364
- def _stop (self , cursor ):
381
+ # @asyncio.coroutine
382
+ async def _stop (self , cursor ):
365
383
self .check_open ()
366
384
q = Query (pQuery .STOP , cursor .query .token , None , None )
367
- return (yield from self ._instance .run_query (q , True ))
385
+ # return (yield from self._instance.run_query(q, True))
386
+ return (await self ._instance .run_query (q , True ))
368
387
369
- @asyncio .coroutine
370
- def reconnect (self , noreply_wait = True , timeout = None ):
388
+ # @asyncio.coroutine
389
+ async def reconnect (self , noreply_wait = True , timeout = None ):
371
390
# We close before reconnect so reconnect doesn't try to close us
372
391
# and then fail to return the Future (this is a little awkward).
373
- yield from self .close (noreply_wait )
392
+ # yield from self.close(noreply_wait)
393
+ await self .close (noreply_wait )
374
394
self ._instance = self ._conn_type (self , ** self ._child_kwargs )
375
- return (yield from self ._instance .connect (timeout ))
395
+ # return (yield from self._instance.connect(timeout))
396
+ return (await self ._instance .connect (timeout ))
376
397
377
- @asyncio .coroutine
378
- def close (self , noreply_wait = True ):
398
+ # @asyncio.coroutine
399
+ async def close (self , noreply_wait = True ):
379
400
if self ._instance is None :
380
401
return None
381
- return (yield from ConnectionBase .close (self , noreply_wait = noreply_wait ))
402
+ # return (yield from ConnectionBase.close(self, noreply_wait=noreply_wait))
403
+ return (await ConnectionBase .close (self , noreply_wait = noreply_wait ))
0 commit comments