Skip to content

Commit aa5b0d6

Browse files
lsabilsabigabor-boros
authored
Upgrade @asyncio.coroutine to async (#181)
* Upgrade @asyncio.coroutine to async * 🚧 2 tests do not pass yet * 🚧 1 test error (nursery not found WTF) and 1 failing but don't know why..yet * Remove commented code * Use ImportError instead of Exception Co-authored-by: lsabi <luca.sabiucciu@gmail.com> Co-authored-by: Boros Gábor <gabor.brs@gmail.com>
1 parent 66a6485 commit aa5b0d6

File tree

3 files changed

+68
-73
lines changed

3 files changed

+68
-73
lines changed

rethinkdb/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
# limitations under the License.
1414
# The builtins here defends against re-importing something obscuring `object`.
1515
import builtins
16-
import imp
16+
try:
17+
import imp
18+
except ImportError:
19+
import importlib as imp
20+
1721
import os
1822

1923
import pkg_resources

rethinkdb/asyncio_net/net_asyncio.py

Lines changed: 55 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,19 @@
3939
pQuery = ql2_pb2.Query.QueryType
4040

4141

42-
@asyncio.coroutine
43-
def _read_until(streamreader, delimiter):
42+
async def _read_until(streamreader, delimiter):
4443
"""Naive implementation of reading until a delimiter"""
4544
buffer = bytearray()
4645

4746
while True:
48-
c = yield from streamreader.read(1)
47+
c = yield streamreader.read(1)
4948
if c == b"":
5049
break # EOF
5150
buffer.append(c[0])
5251
if c == delimiter:
5352
break
5453

55-
return bytes(buffer)
54+
yield bytes(buffer)
5655

5756

5857
def reusable_waiter(loop, timeout):
@@ -62,22 +61,22 @@ def reusable_waiter(loop, timeout):
6261
6362
waiter = reusable_waiter(event_loop, 10.0)
6463
while some_condition:
65-
yield from waiter(some_future)
64+
yield waiter(some_future)
6665
"""
6766
if timeout is not None:
6867
deadline = loop.time() + timeout
6968
else:
7069
deadline = None
7170

72-
@asyncio.coroutine
73-
def wait(future):
71+
async def wait(future):
7472
if deadline is not None:
7573
new_timeout = max(deadline - loop.time(), 0)
7674
else:
7775
new_timeout = None
78-
return (yield from asyncio.wait_for(future, new_timeout, loop=loop))
76+
yield asyncio.wait_for(future, new_timeout, loop=loop)
77+
return
7978

80-
return wait
79+
yield wait
8180

8281

8382
@contextlib.contextmanager
@@ -101,20 +100,18 @@ def __init__(self, *args, **kwargs):
101100
def __aiter__(self):
102101
return self
103102

104-
@asyncio.coroutine
105-
def __anext__(self):
103+
async def __anext__(self):
106104
try:
107-
return (yield from self._get_next(None))
105+
yield self._get_next(None)
108106
except ReqlCursorEmpty:
109107
raise StopAsyncIteration
110108

111-
@asyncio.coroutine
112-
def close(self):
109+
async def close(self):
113110
if self.error is None:
114111
self.error = self._empty_error()
115112
if self.conn.is_open():
116113
self.outstanding_requests += 1
117-
yield from self.conn._parent._stop(self)
114+
yield self.conn._parent._stop(self)
118115

119116
def _extend(self, res_buf):
120117
Cursor._extend(self, res_buf)
@@ -123,35 +120,35 @@ def _extend(self, res_buf):
123120

124121
# Convenience function so users know when they've hit the end of the cursor
125122
# without having to catch an exception
126-
@asyncio.coroutine
127-
def fetch_next(self, wait=True):
123+
async def fetch_next(self, wait=True):
128124
timeout = Cursor._wait_to_timeout(wait)
129125
waiter = reusable_waiter(self.conn._io_loop, timeout)
130126
while len(self.items) == 0 and self.error is None:
131127
self._maybe_fetch_batch()
132128
if self.error is not None:
133129
raise self.error
134130
with translate_timeout_errors():
135-
yield from waiter(asyncio.shield(self.new_response))
131+
yield waiter(asyncio.shield(self.new_response))
136132
# If there is a (non-empty) error to be received, we return True, so the
137133
# user will receive it on the next `next` call.
138-
return len(self.items) != 0 or not isinstance(self.error, RqlCursorEmpty)
134+
yield len(self.items) != 0 or not isinstance(self.error, RqlCursorEmpty)
135+
return
139136

140137
def _empty_error(self):
141138
# We do not have RqlCursorEmpty inherit from StopIteration as that interferes
142139
# with mechanisms to return from a coroutine.
143140
return RqlCursorEmpty()
144141

145-
@asyncio.coroutine
146-
def _get_next(self, timeout):
142+
async def _get_next(self, timeout):
147143
waiter = reusable_waiter(self.conn._io_loop, timeout)
148144
while len(self.items) == 0:
149145
self._maybe_fetch_batch()
150146
if self.error is not None:
151147
raise self.error
152148
with translate_timeout_errors():
153-
yield from waiter(asyncio.shield(self.new_response))
154-
return self.items.popleft()
149+
yield waiter(asyncio.shield(self.new_response))
150+
yield self.items.popleft()
151+
return
155152

156153
def _maybe_fetch_batch(self):
157154
if (
@@ -186,8 +183,7 @@ def client_address(self):
186183
if self.is_open():
187184
return self._streamwriter.get_extra_info("sockname")[0]
188185

189-
@asyncio.coroutine
190-
def connect(self, timeout):
186+
async def connect(self, timeout):
191187
try:
192188
ssl_context = None
193189
if len(self._parent.ssl) > 0:
@@ -199,7 +195,7 @@ def connect(self, timeout):
199195
ssl_context.check_hostname = True # redundant with match_hostname
200196
ssl_context.load_verify_locations(self._parent.ssl["ca_certs"])
201197

202-
self._streamreader, self._streamwriter = yield from asyncio.open_connection(
198+
self._streamreader, self._streamwriter = yield asyncio.open_connection(
203199
self._parent.host,
204200
self._parent.port,
205201
loop=self._io_loop,
@@ -230,23 +226,23 @@ def connect(self, timeout):
230226
if request is not "":
231227
self._streamwriter.write(request)
232228

233-
response = yield from asyncio.wait_for(
229+
response = yield asyncio.wait_for(
234230
_read_until(self._streamreader, b"\0"),
235231
timeout,
236232
loop=self._io_loop,
237233
)
238234
response = response[:-1]
239235
except ReqlAuthError:
240-
yield from self.close()
236+
yield self.close()
241237
raise
242238
except ReqlTimeoutError as err:
243-
yield from self.close()
239+
yield self.close()
244240
raise ReqlDriverError(
245241
"Connection interrupted during handshake with %s:%s. Error: %s"
246242
% (self._parent.host, self._parent.port, str(err))
247243
)
248244
except Exception as err:
249-
yield from self.close()
245+
yield self.close()
250246
raise ReqlDriverError(
251247
"Could not connect to %s:%s. Error: %s"
252248
% (self._parent.host, self._parent.port, str(err))
@@ -255,13 +251,13 @@ def connect(self, timeout):
255251
# Start a parallel function to perform reads
256252
# store a reference to it so it doesn't get destroyed
257253
self._reader_task = asyncio.ensure_future(self._reader(), loop=self._io_loop)
258-
return self._parent
254+
yield self._parent
255+
return
259256

260257
def is_open(self):
261258
return not (self._closing or self._streamreader.at_eof())
262259

263-
@asyncio.coroutine
264-
def close(self, noreply_wait=False, token=None, exception=None):
260+
async def close(self, noreply_wait=False, token=None, exception=None):
265261
self._closing = True
266262
if exception is not None:
267263
err_message = "Connection is closed (%s)." % str(exception)
@@ -281,38 +277,37 @@ def close(self, noreply_wait=False, token=None, exception=None):
281277

282278
if noreply_wait:
283279
noreply = Query(pQuery.NOREPLY_WAIT, token, None, None)
284-
yield from self.run_query(noreply, False)
280+
yield self.run_query(noreply, False)
285281

286282
self._streamwriter.close()
287283
# We must not wait for the _reader_task if we got an exception, because that
288284
# means that we were called from it. Waiting would lead to a deadlock.
289285
if self._reader_task and exception is None:
290-
yield from self._reader_task
286+
yield self._reader_task
291287

292-
return None
288+
return
293289

294-
@asyncio.coroutine
295-
def run_query(self, query, noreply):
290+
async def run_query(self, query, noreply):
296291
self._streamwriter.write(query.serialize(self._parent._get_json_encoder(query)))
297292
if noreply:
298-
return None
293+
return
299294

300295
response_future = asyncio.Future()
301296
self._user_queries[query.token] = (query, response_future)
302-
return (yield from response_future)
297+
yield response_future
298+
return
303299

304300
# The _reader coroutine runs in parallel, reading responses
305301
# off of the socket and forwarding them to the appropriate Future or Cursor.
306302
# This is shut down as a consequence of closing the stream, or an error in the
307303
# socket/protocol from the server. Unexpected errors in this coroutine will
308304
# close the ConnectionInstance and be passed to any open Futures or Cursors.
309-
@asyncio.coroutine
310-
def _reader(self):
305+
async def _reader(self):
311306
try:
312307
while True:
313-
buf = yield from self._streamreader.readexactly(12)
308+
buf = yield self._streamreader.readexactly(12)
314309
(token, length,) = struct.unpack("<qL", buf)
315-
buf = yield from self._streamreader.readexactly(length)
310+
buf = yield self._streamreader.readexactly(length)
316311

317312
cursor = self._cursor_cache.get(token)
318313
if cursor is not None:
@@ -341,7 +336,7 @@ def _reader(self):
341336
raise ReqlDriverError("Unexpected response received.")
342337
except Exception as ex:
343338
if not self._closing:
344-
yield from self.close(exception=ex)
339+
yield self.close(exception=ex)
345340

346341

347342
class Connection(ConnectionBase):
@@ -354,30 +349,29 @@ def __init__(self, *args, **kwargs):
354349
"Could not convert port %s to an integer." % self.port
355350
)
356351

357-
@asyncio.coroutine
358-
def __aenter__(self):
352+
async def __aenter__(self):
359353
return self
360354

361-
@asyncio.coroutine
362-
def __aexit__(self, exception_type, exception_val, traceback):
363-
yield from self.close(False)
355+
async def __aexit__(self, exception_type, exception_val, traceback):
356+
yield self.close(False)
364357

365-
@asyncio.coroutine
366-
def _stop(self, cursor):
358+
async def _stop(self, cursor):
367359
self.check_open()
368360
q = Query(pQuery.STOP, cursor.query.token, None, None)
369-
return (yield from self._instance.run_query(q, True))
361+
yield self._instance.run_query(q, True)
362+
return
370363

371-
@asyncio.coroutine
372-
def reconnect(self, noreply_wait=True, timeout=None):
364+
async def reconnect(self, noreply_wait=True, timeout=None):
373365
# We close before reconnect so reconnect doesn't try to close us
374366
# and then fail to return the Future (this is a little awkward).
375-
yield from self.close(noreply_wait)
367+
yield self.close(noreply_wait)
376368
self._instance = self._conn_type(self, **self._child_kwargs)
377-
return (yield from self._instance.connect(timeout))
369+
yield self._instance.connect(timeout)
370+
return
378371

379-
@asyncio.coroutine
380-
def close(self, noreply_wait=True):
372+
async def close(self, noreply_wait=True):
381373
if self._instance is None:
382-
return None
383-
return (yield from ConnectionBase.close(self, noreply_wait=noreply_wait))
374+
yield None
375+
return
376+
yield ConnectionBase.close(self, noreply_wait=noreply_wait)
377+
return

tests/integration/test_asyncio.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import sys
2-
from asyncio import coroutine
3-
42
import pytest
53

64
from tests.helpers import INTEGRATION_TEST_DB, IntegrationTestCaseBase
@@ -22,21 +20,20 @@ def teardown_method(self):
2220
super(TestAsyncio, self).teardown_method()
2321
self.r.set_loop_type(None)
2422

25-
@coroutine
26-
def test_flow_coroutine_paradigm(self):
27-
connection = yield from self.conn
23+
async def test_flow_coroutine_paradigm(self):
24+
connection = yield self.conn
2825

29-
yield from self.r.table_create(self.table_name).run(connection)
26+
yield self.r.table_create(self.table_name).run(connection)
3027

3128
table = self.r.table(self.table_name)
32-
yield from table.insert(
29+
yield table.insert(
3330
{"id": 1, "name": "Iron Man", "first_appearance": "Tales of Suspense #39"}
3431
).run(connection)
3532

36-
cursor = yield from table.run(connection)
33+
cursor = yield table.run(connection)
3734

38-
while (yield from cursor.fetch_next()):
39-
hero = yield from cursor.__anext__()
35+
while (yield cursor.fetch_next()):
36+
hero = yield cursor.__anext__()
4037
assert hero["name"] == "Iron Man"
4138

42-
yield from connection.close()
39+
yield connection.close()

0 commit comments

Comments
 (0)