From 6dd4e35d0173ea52cc6f68233b649ce8cd6b5d7a Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 30 Jan 2023 17:29:04 +0000 Subject: [PATCH 1/2] asyncio_net/net_asyncio.py: async/await syntax for python 3.11.1 compatibility --- rethinkdb/asyncio_net/net_asyncio.py | 126 ++++++++++++++++----------- 1 file changed, 74 insertions(+), 52 deletions(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 73cbcdd..12d2e96 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -39,13 +39,14 @@ pQuery = ql2_pb2.Query.QueryType -@asyncio.coroutine -def _read_until(streamreader, delimiter): +# @asyncio.coroutine +async def _read_until(streamreader, delimiter): """Naive implementation of reading until a delimiter""" buffer = bytearray() while True: - c = yield from streamreader.read(1) + # c = yield from streamreader.read(1) + c = await streamreader.read(1) if c == b"": break # EOF buffer.append(c[0]) @@ -69,13 +70,14 @@ def reusable_waiter(loop, timeout): else: deadline = None - @asyncio.coroutine - def wait(future): + # @asyncio.coroutine + async def wait(future): if deadline is not None: new_timeout = max(deadline - loop.time(), 0) else: new_timeout = None - return (yield from asyncio.wait_for(future, new_timeout)) + # return (yield from asyncio.wait_for(future, new_timeout)) + return (await asyncio.wait_for(future, new_timeout)) return wait @@ -101,20 +103,22 @@ def __init__(self, *args, **kwargs): def __aiter__(self): return self - @asyncio.coroutine - def __anext__(self): + # @asyncio.coroutine + async def __anext__(self): try: - return (yield from self._get_next(None)) + # return (yield from self._get_next(None)) + return (await self._get_next(None)) except ReqlCursorEmpty: raise StopAsyncIteration - @asyncio.coroutine - def close(self): + # @asyncio.coroutine + async def close(self): if self.error is None: self.error = self._empty_error() if self.conn.is_open(): self.outstanding_requests += 1 - yield from self.conn._parent._stop(self) + # yield from self.conn._parent._stop(self) + await self.conn._parent._stop(self) def _extend(self, res_buf): Cursor._extend(self, res_buf) @@ -123,8 +127,8 @@ def _extend(self, res_buf): # Convenience function so users know when they've hit the end of the cursor # without having to catch an exception - @asyncio.coroutine - def fetch_next(self, wait=True): + # @asyncio.coroutine + async def fetch_next(self, wait=True): timeout = Cursor._wait_to_timeout(wait) waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0 and self.error is None: @@ -132,7 +136,8 @@ def fetch_next(self, wait=True): if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + # yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) # If there is a (non-empty) error to be received, we return True, so the # user will receive it on the next `next` call. return len(self.items) != 0 or not isinstance(self.error, RqlCursorEmpty) @@ -142,15 +147,16 @@ def _empty_error(self): # with mechanisms to return from a coroutine. return RqlCursorEmpty() - @asyncio.coroutine - def _get_next(self, timeout): + # @asyncio.coroutine + async def _get_next(self, timeout): waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0: self._maybe_fetch_batch() if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + # yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) return self.items.popleft() def _maybe_fetch_batch(self): @@ -186,8 +192,8 @@ def client_address(self): if self.is_open(): return self._streamwriter.get_extra_info("sockname")[0] - @asyncio.coroutine - def connect(self, timeout): + # @asyncio.coroutine + async def connect(self, timeout): try: ssl_context = None if len(self._parent.ssl) > 0: @@ -199,7 +205,8 @@ def connect(self, timeout): ssl_context.check_hostname = True # redundant with match_hostname ssl_context.load_verify_locations(self._parent.ssl["ca_certs"]) - self._streamreader, self._streamwriter = yield from asyncio.open_connection( + # self._streamreader, self._streamwriter = yield from asyncio.open_connection( + self._streamreader, self._streamwriter = await asyncio.open_connection( self._parent.host, self._parent.port, ssl=ssl_context, @@ -229,22 +236,26 @@ def connect(self, timeout): if request != "": self._streamwriter.write(request) - response = yield from asyncio.wait_for( + # response = yield from asyncio.wait_for( + response = await asyncio.wait_for( _read_until(self._streamreader, b"\0"), timeout, ) response = response[:-1] except ReqlAuthError: - yield from self.close() + # yield from self.close() + await self.close() raise except ReqlTimeoutError as err: - yield from self.close() + # yield from self.close() + await self.close() raise ReqlDriverError( "Connection interrupted during handshake with %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) ) except Exception as err: - yield from self.close() + # yield from self.close() + await self.close() raise ReqlDriverError( "Could not connect to %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) @@ -258,8 +269,8 @@ def connect(self, timeout): def is_open(self): return not (self._closing or self._streamreader.at_eof()) - @asyncio.coroutine - def close(self, noreply_wait=False, token=None, exception=None): + # @asyncio.coroutine + async def close(self, noreply_wait=False, token=None, exception=None): self._closing = True if exception is not None: err_message = "Connection is closed (%s)." % str(exception) @@ -279,38 +290,43 @@ def close(self, noreply_wait=False, token=None, exception=None): if noreply_wait: noreply = Query(pQuery.NOREPLY_WAIT, token, None, None) - yield from self.run_query(noreply, False) + # yield from self.run_query(noreply, False) + await self.run_query(noreply, False) self._streamwriter.close() # We must not wait for the _reader_task if we got an exception, because that # means that we were called from it. Waiting would lead to a deadlock. if self._reader_task and exception is None: - yield from self._reader_task + # yield from self._reader_task + await self._reader_task return None - @asyncio.coroutine - def run_query(self, query, noreply): + # @asyncio.coroutine + async def run_query(self, query, noreply): self._streamwriter.write(query.serialize(self._parent._get_json_encoder(query))) if noreply: return None response_future = asyncio.Future() self._user_queries[query.token] = (query, response_future) - return (yield from response_future) + # return (yield from response_future) + return (await response_future) # The _reader coroutine runs in parallel, reading responses # off of the socket and forwarding them to the appropriate Future or Cursor. # This is shut down as a consequence of closing the stream, or an error in the # socket/protocol from the server. Unexpected errors in this coroutine will # close the ConnectionInstance and be passed to any open Futures or Cursors. - @asyncio.coroutine - def _reader(self): + # @asyncio.coroutine + async def _reader(self): try: while True: - buf = yield from self._streamreader.readexactly(12) + # buf = yield from self._streamreader.readexactly(12) + buf = await self._streamreader.readexactly(12) (token, length,) = struct.unpack(" Date: Wed, 4 Oct 2023 14:12:17 -0700 Subject: [PATCH 2/2] Remove commented pre-3.11 asyncio lines --- rethinkdb/asyncio_net/net_asyncio.py | 37 ---------------------------- 1 file changed, 37 deletions(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 12d2e96..e9bec3e 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -39,13 +39,11 @@ pQuery = ql2_pb2.Query.QueryType -# @asyncio.coroutine async def _read_until(streamreader, delimiter): """Naive implementation of reading until a delimiter""" buffer = bytearray() while True: - # c = yield from streamreader.read(1) c = await streamreader.read(1) if c == b"": break # EOF @@ -70,13 +68,11 @@ def reusable_waiter(loop, timeout): else: deadline = None - # @asyncio.coroutine async def wait(future): if deadline is not None: new_timeout = max(deadline - loop.time(), 0) else: new_timeout = None - # return (yield from asyncio.wait_for(future, new_timeout)) return (await asyncio.wait_for(future, new_timeout)) return wait @@ -103,21 +99,17 @@ def __init__(self, *args, **kwargs): def __aiter__(self): return self - # @asyncio.coroutine async def __anext__(self): try: - # return (yield from self._get_next(None)) return (await self._get_next(None)) except ReqlCursorEmpty: raise StopAsyncIteration - # @asyncio.coroutine async def close(self): if self.error is None: self.error = self._empty_error() if self.conn.is_open(): self.outstanding_requests += 1 - # yield from self.conn._parent._stop(self) await self.conn._parent._stop(self) def _extend(self, res_buf): @@ -127,7 +119,6 @@ def _extend(self, res_buf): # Convenience function so users know when they've hit the end of the cursor # without having to catch an exception - # @asyncio.coroutine async def fetch_next(self, wait=True): timeout = Cursor._wait_to_timeout(wait) waiter = reusable_waiter(self.conn._io_loop, timeout) @@ -136,7 +127,6 @@ async def fetch_next(self, wait=True): if self.error is not None: raise self.error with translate_timeout_errors(): - # yield from waiter(asyncio.shield(self.new_response)) await waiter(asyncio.shield(self.new_response)) # If there is a (non-empty) error to be received, we return True, so the # user will receive it on the next `next` call. @@ -147,7 +137,6 @@ def _empty_error(self): # with mechanisms to return from a coroutine. return RqlCursorEmpty() - # @asyncio.coroutine async def _get_next(self, timeout): waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0: @@ -155,7 +144,6 @@ async def _get_next(self, timeout): if self.error is not None: raise self.error with translate_timeout_errors(): - # yield from waiter(asyncio.shield(self.new_response)) await waiter(asyncio.shield(self.new_response)) return self.items.popleft() @@ -192,7 +180,6 @@ def client_address(self): if self.is_open(): return self._streamwriter.get_extra_info("sockname")[0] - # @asyncio.coroutine async def connect(self, timeout): try: ssl_context = None @@ -205,7 +192,6 @@ async def connect(self, timeout): ssl_context.check_hostname = True # redundant with match_hostname ssl_context.load_verify_locations(self._parent.ssl["ca_certs"]) - # self._streamreader, self._streamwriter = yield from asyncio.open_connection( self._streamreader, self._streamwriter = await asyncio.open_connection( self._parent.host, self._parent.port, @@ -236,25 +222,21 @@ async def connect(self, timeout): if request != "": self._streamwriter.write(request) - # response = yield from asyncio.wait_for( response = await asyncio.wait_for( _read_until(self._streamreader, b"\0"), timeout, ) response = response[:-1] except ReqlAuthError: - # yield from self.close() await self.close() raise except ReqlTimeoutError as err: - # yield from self.close() await self.close() raise ReqlDriverError( "Connection interrupted during handshake with %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) ) except Exception as err: - # yield from self.close() await self.close() raise ReqlDriverError( "Could not connect to %s:%s. Error: %s" @@ -269,7 +251,6 @@ async def connect(self, timeout): def is_open(self): return not (self._closing or self._streamreader.at_eof()) - # @asyncio.coroutine async def close(self, noreply_wait=False, token=None, exception=None): self._closing = True if exception is not None: @@ -290,19 +271,16 @@ async def close(self, noreply_wait=False, token=None, exception=None): if noreply_wait: noreply = Query(pQuery.NOREPLY_WAIT, token, None, None) - # yield from self.run_query(noreply, False) await self.run_query(noreply, False) self._streamwriter.close() # We must not wait for the _reader_task if we got an exception, because that # means that we were called from it. Waiting would lead to a deadlock. if self._reader_task and exception is None: - # yield from self._reader_task await self._reader_task return None - # @asyncio.coroutine async def run_query(self, query, noreply): self._streamwriter.write(query.serialize(self._parent._get_json_encoder(query))) if noreply: @@ -310,7 +288,6 @@ async def run_query(self, query, noreply): response_future = asyncio.Future() self._user_queries[query.token] = (query, response_future) - # return (yield from response_future) return (await response_future) # The _reader coroutine runs in parallel, reading responses @@ -318,14 +295,11 @@ async def run_query(self, query, noreply): # This is shut down as a consequence of closing the stream, or an error in the # socket/protocol from the server. Unexpected errors in this coroutine will # close the ConnectionInstance and be passed to any open Futures or Cursors. - # @asyncio.coroutine async def _reader(self): try: while True: - # buf = yield from self._streamreader.readexactly(12) buf = await self._streamreader.readexactly(12) (token, length,) = struct.unpack("