From 44a0875bf162b8903db6c7d1ca9ea4bcd305882d Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 1 Mar 2022 21:50:49 +0000 Subject: [PATCH 1/7] python 3.10 compatibility fixes --- rethinkdb/ast.py | 31 ++++++++++++++-------------- rethinkdb/asyncio_net/net_asyncio.py | 9 ++++---- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py index 7bd73b0c..7d0ae4a2 100644 --- a/rethinkdb/ast.py +++ b/rethinkdb/ast.py @@ -20,21 +20,13 @@ import base64 import binascii +import collections import datetime import json -import sys import threading from rethinkdb import ql2_pb2 -from rethinkdb.errors import (QueryPrinter, ReqlDriverCompileError, - ReqlDriverError, T) - -if sys.version_info < (3, 3): - # python < 3.3 uses collections - import collections -else: - # but collections is deprecated from python >= 3.3 - import collections.abc as collections +from rethinkdb.errors import QueryPrinter, ReqlDriverCompileError, ReqlDriverError, T P_TERM = ql2_pb2.Term.TermType @@ -48,6 +40,13 @@ except NameError: xrange = range +try: + collections.abc.Callable +except AttributeError: + collections.abc.Callable = collections.Callable + collections.abc.Mapping = collections.Mapping + collections.abc.Iterable = collections.Iterable + def dict_items(dictionary): return list(dictionary.items()) @@ -82,7 +81,7 @@ def clear(cls): def expr(val, nesting_depth=20): """ - Convert a Python primitive into a RQL primitive value + Convert a Python primitive into a RQL primitive value """ if not isinstance(nesting_depth, int): raise ReqlDriverCompileError("Second argument to `r.expr` must be a number.") @@ -92,7 +91,7 @@ def expr(val, nesting_depth=20): if isinstance(val, RqlQuery): return val - elif isinstance(val, collections.Callable): + elif isinstance(val, collections.abc.Callable): return Func(val) elif isinstance(val, (datetime.datetime, datetime.date)): if not hasattr(val, "tzinfo") or not val.tzinfo: @@ -113,14 +112,14 @@ def expr(val, nesting_depth=20): return Datum(val) elif isinstance(val, bytes): return Binary(val) - elif isinstance(val, collections.Mapping): + elif isinstance(val, collections.abc.Mapping): # MakeObj doesn't take the dict as a keyword args to avoid # conflicting with the `self` parameter. obj = {} for k, v in dict_items(val): obj[k] = expr(v, nesting_depth - 1) return MakeObj(obj) - elif isinstance(val, collections.Iterable): + elif isinstance(val, collections.abc.Iterable): val = [expr(v, nesting_depth - 1) for v in val] return MakeArray(*val) else: @@ -767,7 +766,7 @@ def recursively_make_hashable(obj): class ReQLEncoder(json.JSONEncoder): """ - Default JSONEncoder subclass to handle query conversion. + Default JSONEncoder subclass to handle query conversion. """ def __init__(self): @@ -787,7 +786,7 @@ def default(self, obj): class ReQLDecoder(json.JSONDecoder): """ - Default JSONDecoder subclass to handle pseudo-type conversion. + Default JSONDecoder subclass to handle pseudo-type conversion. """ def __init__(self, reql_format_opts=None): diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 781081e5..b92893d7 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -75,7 +75,7 @@ def wait(future): new_timeout = max(deadline - loop.time(), 0) else: new_timeout = None - return (yield from asyncio.wait_for(future, new_timeout, loop=loop)) + return (yield from asyncio.wait_for(future, new_timeout)) return wait @@ -176,7 +176,8 @@ def __init__(self, parent, io_loop=None): self._ready = asyncio.Future() self._io_loop = io_loop if self._io_loop is None: - self._io_loop = asyncio.get_event_loop() + # self._io_loop = asyncio.get_event_loop() + self._io_loop = asyncio.get_running_loop() def client_port(self): if self.is_open(): @@ -202,7 +203,6 @@ def connect(self, timeout): self._streamreader, self._streamwriter = yield from asyncio.open_connection( self._parent.host, self._parent.port, - loop=self._io_loop, ssl=ssl_context, ) self._streamwriter.get_extra_info("socket").setsockopt( @@ -233,7 +233,6 @@ def connect(self, timeout): response = yield from asyncio.wait_for( _read_until(self._streamreader, b"\0"), timeout, - loop=self._io_loop, ) response = response[:-1] except ReqlAuthError: @@ -254,7 +253,7 @@ def connect(self, timeout): # Start a parallel function to perform reads # store a reference to it so it doesn't get destroyed - self._reader_task = asyncio.ensure_future(self._reader(), loop=self._io_loop) + self._reader_task = asyncio.ensure_future(self._reader()) return self._parent def is_open(self): From c478079a56a37df04df00d805fd301fb328e6d7e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 1 Mar 2022 22:03:43 +0000 Subject: [PATCH 2/7] added rethinkdb/ql2_pb2.py --- rethinkdb/ql2_pb2.py | 266 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 rethinkdb/ql2_pb2.py diff --git a/rethinkdb/ql2_pb2.py b/rethinkdb/ql2_pb2.py new file mode 100644 index 00000000..d70f70d6 --- /dev/null +++ b/rethinkdb/ql2_pb2.py @@ -0,0 +1,266 @@ +# DO NOT EDIT +# Autogenerated by convert_protofile.py + +class VersionDummy: + class Version: + V0_1 = 1063369270 + V0_2 = 1915781601 + V0_3 = 1601562686 + V0_4 = 1074539808 + V1_0 = 885177795 + + class Protocol: + PROTOBUF = 656407617 + JSON = 2120839367 + +class Query: + class QueryType: + START = 1 + CONTINUE = 2 + STOP = 3 + NOREPLY_WAIT = 4 + SERVER_INFO = 5 + + class AssocPair: + pass + +class Frame: + class FrameType: + POS = 1 + OPT = 2 + +class Backtrace: + pass + +class Response: + class ResponseType: + SUCCESS_ATOM = 1 + SUCCESS_SEQUENCE = 2 + SUCCESS_PARTIAL = 3 + WAIT_COMPLETE = 4 + SERVER_INFO = 5 + CLIENT_ERROR = 16 + COMPILE_ERROR = 17 + RUNTIME_ERROR = 18 + + class ErrorType: + INTERNAL = 1000000 + RESOURCE_LIMIT = 2000000 + QUERY_LOGIC = 3000000 + NON_EXISTENCE = 3100000 + OP_FAILED = 4100000 + OP_INDETERMINATE = 4200000 + USER = 5000000 + PERMISSION_ERROR = 6000000 + + class ResponseNote: + SEQUENCE_FEED = 1 + ATOM_FEED = 2 + ORDER_BY_LIMIT_FEED = 3 + UNIONED_FEED = 4 + INCLUDES_STATES = 5 + +class Datum: + class DatumType: + R_NULL = 1 + R_BOOL = 2 + R_NUM = 3 + R_STR = 4 + R_ARRAY = 5 + R_OBJECT = 6 + R_JSON = 7 + + class AssocPair: + pass + +class Term: + class TermType: + DATUM = 1 + MAKE_ARRAY = 2 + MAKE_OBJ = 3 + VAR = 10 + JAVASCRIPT = 11 + UUID = 169 + HTTP = 153 + ERROR = 12 + IMPLICIT_VAR = 13 + DB = 14 + TABLE = 15 + GET = 16 + GET_ALL = 78 + EQ = 17 + NE = 18 + LT = 19 + LE = 20 + GT = 21 + GE = 22 + NOT = 23 + ADD = 24 + SUB = 25 + MUL = 26 + DIV = 27 + MOD = 28 + FLOOR = 183 + CEIL = 184 + ROUND = 185 + APPEND = 29 + PREPEND = 80 + DIFFERENCE = 95 + SET_INSERT = 88 + SET_INTERSECTION = 89 + SET_UNION = 90 + SET_DIFFERENCE = 91 + SLICE = 30 + SKIP = 70 + LIMIT = 71 + OFFSETS_OF = 87 + CONTAINS = 93 + GET_FIELD = 31 + KEYS = 94 + VALUES = 186 + OBJECT = 143 + HAS_FIELDS = 32 + WITH_FIELDS = 96 + PLUCK = 33 + WITHOUT = 34 + MERGE = 35 + BETWEEN_DEPRECATED = 36 + BETWEEN = 182 + REDUCE = 37 + MAP = 38 + FOLD = 187 + FILTER = 39 + CONCAT_MAP = 40 + ORDER_BY = 41 + DISTINCT = 42 + COUNT = 43 + IS_EMPTY = 86 + UNION = 44 + NTH = 45 + BRACKET = 170 + INNER_JOIN = 48 + OUTER_JOIN = 49 + EQ_JOIN = 50 + ZIP = 72 + RANGE = 173 + INSERT_AT = 82 + DELETE_AT = 83 + CHANGE_AT = 84 + SPLICE_AT = 85 + COERCE_TO = 51 + TYPE_OF = 52 + UPDATE = 53 + DELETE = 54 + REPLACE = 55 + INSERT = 56 + DB_CREATE = 57 + DB_DROP = 58 + DB_LIST = 59 + TABLE_CREATE = 60 + TABLE_DROP = 61 + TABLE_LIST = 62 + CONFIG = 174 + STATUS = 175 + WAIT = 177 + RECONFIGURE = 176 + REBALANCE = 179 + SYNC = 138 + GRANT = 188 + INDEX_CREATE = 75 + INDEX_DROP = 76 + INDEX_LIST = 77 + INDEX_STATUS = 139 + INDEX_WAIT = 140 + INDEX_RENAME = 156 + SET_WRITE_HOOK = 189 + GET_WRITE_HOOK = 190 + FUNCALL = 64 + BRANCH = 65 + OR = 66 + AND = 67 + FOR_EACH = 68 + FUNC = 69 + ASC = 73 + DESC = 74 + INFO = 79 + MATCH = 97 + UPCASE = 141 + DOWNCASE = 142 + SAMPLE = 81 + DEFAULT = 92 + JSON = 98 + ISO8601 = 99 + TO_ISO8601 = 100 + EPOCH_TIME = 101 + TO_EPOCH_TIME = 102 + NOW = 103 + IN_TIMEZONE = 104 + DURING = 105 + DATE = 106 + TIME_OF_DAY = 126 + TIMEZONE = 127 + YEAR = 128 + MONTH = 129 + DAY = 130 + DAY_OF_WEEK = 131 + DAY_OF_YEAR = 132 + HOURS = 133 + MINUTES = 134 + SECONDS = 135 + TIME = 136 + MONDAY = 107 + TUESDAY = 108 + WEDNESDAY = 109 + THURSDAY = 110 + FRIDAY = 111 + SATURDAY = 112 + SUNDAY = 113 + JANUARY = 114 + FEBRUARY = 115 + MARCH = 116 + APRIL = 117 + MAY = 118 + JUNE = 119 + JULY = 120 + AUGUST = 121 + SEPTEMBER = 122 + OCTOBER = 123 + NOVEMBER = 124 + DECEMBER = 125 + LITERAL = 137 + GROUP = 144 + SUM = 145 + AVG = 146 + MIN = 147 + MAX = 148 + SPLIT = 149 + UNGROUP = 150 + RANDOM = 151 + CHANGES = 152 + ARGS = 154 + BINARY = 155 + GEOJSON = 157 + TO_GEOJSON = 158 + POINT = 159 + LINE = 160 + POLYGON = 161 + DISTANCE = 162 + INTERSECTS = 163 + INCLUDES = 164 + CIRCLE = 165 + GET_INTERSECTING = 166 + FILL = 167 + GET_NEAREST = 168 + POLYGON_SUB = 171 + TO_JSON_STRING = 172 + MINVAL = 180 + MAXVAL = 181 + BIT_AND = 191 + BIT_OR = 192 + BIT_XOR = 193 + BIT_NOT = 194 + BIT_SAL = 195 + BIT_SAR = 196 + + class AssocPair: + pass From 29eb8a229b8fef397d17903674f8aab1a764cbd7 Mon Sep 17 00:00:00 2001 From: stephanelsmith Date: Fri, 21 Oct 2022 20:48:26 +0000 Subject: [PATCH 3/7] commented out signal to make _export/_import asyncio to_thread friendly --- rethinkdb/_export.py | 17 +++++++++-------- rethinkdb/_import.py | 19 ++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/rethinkdb/_export.py b/rethinkdb/_export.py index 01bae2f4..709721f4 100755 --- a/rethinkdb/_export.py +++ b/rethinkdb/_export.py @@ -28,7 +28,7 @@ import optparse import os import platform -import signal +# import signal import sys import tempfile import time @@ -273,9 +273,9 @@ def export_table( hook_counter, exit_event, ): - signal.signal( - signal.SIGINT, signal.SIG_DFL - ) # prevent signal handlers from being set in child processes + # signal.signal( + # signal.SIGINT, signal.SIG_DFL + # ) # prevent signal handlers from being set in child processes writer = None @@ -470,9 +470,9 @@ def run_clients(options, workingDir, db_table_set): sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0) hook_counter = multiprocessing.Value(ctypes.c_longlong, 0) - signal.signal( - signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event) - ) + # signal.signal( + # signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event) + # ) errors = [] try: @@ -552,7 +552,8 @@ def plural(num, text, plural_text): ) ) finally: - signal.signal(signal.SIGINT, signal.SIG_DFL) + pass + # signal.signal(signal.SIGINT, signal.SIG_DFL) if interrupt_event.is_set(): raise RuntimeError("Interrupted") diff --git a/rethinkdb/_import.py b/rethinkdb/_import.py index 0ce90bfc..3dea65b8 100755 --- a/rethinkdb/_import.py +++ b/rethinkdb/_import.py @@ -29,7 +29,7 @@ import multiprocessing import optparse import os -import signal +# import signal import sys import time import traceback @@ -441,10 +441,10 @@ def read_to_queue( ignore_signals=True, batch_size=None, ): - if ( - ignore_signals - ): # ToDo: work out when we are in a worker process automatically - signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these + # if ( + # ignore_signals + # ): # ToDo: work out when we are in a worker process automatically + # signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these if batch_size is None: batch_size = utils_common.default_batch_size @@ -1078,7 +1078,7 @@ def parse_options(argv, prog=None): def table_writer( tables, options, work_queue, error_queue, warning_queue, exit_event, timing_queue ): - signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these + # signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these db = table = batch = None try: @@ -1188,7 +1188,7 @@ def table_writer( def update_progress(tables, debug, exit_event, sleep=0.2): - signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should not get these + # signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should not get these # give weights to each of the tables based on file size totalSize = sum([x.bytes_size for x in tables]) @@ -1269,7 +1269,7 @@ def import_tables(options, sources, files_ignored=None): progress_bar_sleep = 0.2 # - setup KeyboardInterupt handler - signal.signal(signal.SIGINT, lambda a, b: utils_common.abort(pools, exit_event)) + # signal.signal(signal.SIGINT, lambda a, b: utils_common.abort(pools, exit_event)) # - queue draining def drain_queues(): @@ -1494,7 +1494,8 @@ def plural(num, text): for key, value in sorted(timing_sums.items(), key=lambda x: x[0]): print(" %s: %.2f" % (key, value)) finally: - signal.signal(signal.SIGINT, signal.SIG_DFL) + pass + # signal.signal(signal.SIGINT, signal.SIG_DFL) drain_queues() From 8cfeb678a37bdf32d98c2d7993117855ef2a2934 Mon Sep 17 00:00:00 2001 From: stephanelsmith Date: Fri, 21 Oct 2022 20:53:42 +0000 Subject: [PATCH 4/7] remove dump command extra verbose text --- rethinkdb/_dump.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rethinkdb/_dump.py b/rethinkdb/_dump.py index ec8a714b..6fec5450 100755 --- a/rethinkdb/_dump.py +++ b/rethinkdb/_dump.py @@ -173,12 +173,12 @@ def main(argv=None, prog=None): try: if not options.quiet: # Print a warning about the capabilities of dump, so no one is confused (hopefully) - print( - """\ - NOTE: 'rethinkdb-dump' saves data, secondary indexes, and write hooks, but does *not* save - cluster metadata. You will need to recreate your cluster setup yourself after - you run 'rethinkdb-restore'.""" - ) + # print( + # """\ + # NOTE: 'rethinkdb-dump' saves data, secondary indexes, and write hooks, but does *not* save + # cluster metadata. You will need to recreate your cluster setup yourself after + # you run 'rethinkdb-restore'.""" + # ) try: start_time = time.time() From 53b9da8f43058336ad82e11b3fadc604f90321ae Mon Sep 17 00:00:00 2001 From: stephanelsmith Date: Fri, 21 Oct 2022 20:56:06 +0000 Subject: [PATCH 5/7] fixed bug in last commit --- rethinkdb/_dump.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rethinkdb/_dump.py b/rethinkdb/_dump.py index 6fec5450..7078af1d 100755 --- a/rethinkdb/_dump.py +++ b/rethinkdb/_dump.py @@ -172,6 +172,7 @@ def main(argv=None, prog=None): options = parse_options(argv or sys.argv[1:], prog=prog) try: if not options.quiet: + pass #ssmith, remove options # Print a warning about the capabilities of dump, so no one is confused (hopefully) # print( # """\ From 918f9acbb27608d1675aa4380a5c1d520c881abc Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 30 Jan 2023 17:29:04 +0000 Subject: [PATCH 6/7] asyncio_net/net_asyncio.py: async/await syntax for python 3.11.1 compatibility --- rethinkdb/asyncio_net/net_asyncio.py | 126 ++++++++++++++++----------- 1 file changed, 74 insertions(+), 52 deletions(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index b92893d7..d0f9ca37 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -39,13 +39,14 @@ pQuery = ql2_pb2.Query.QueryType -@asyncio.coroutine -def _read_until(streamreader, delimiter): +# @asyncio.coroutine +async def _read_until(streamreader, delimiter): """Naive implementation of reading until a delimiter""" buffer = bytearray() while True: - c = yield from streamreader.read(1) + # c = yield from streamreader.read(1) + c = await streamreader.read(1) if c == b"": break # EOF buffer.append(c[0]) @@ -69,13 +70,14 @@ def reusable_waiter(loop, timeout): else: deadline = None - @asyncio.coroutine - def wait(future): + # @asyncio.coroutine + async def wait(future): if deadline is not None: new_timeout = max(deadline - loop.time(), 0) else: new_timeout = None - return (yield from asyncio.wait_for(future, new_timeout)) + # return (yield from asyncio.wait_for(future, new_timeout)) + return (await asyncio.wait_for(future, new_timeout)) return wait @@ -101,20 +103,22 @@ def __init__(self, *args, **kwargs): def __aiter__(self): return self - @asyncio.coroutine - def __anext__(self): + # @asyncio.coroutine + async def __anext__(self): try: - return (yield from self._get_next(None)) + # return (yield from self._get_next(None)) + return (await self._get_next(None)) except ReqlCursorEmpty: raise StopAsyncIteration - @asyncio.coroutine - def close(self): + # @asyncio.coroutine + async def close(self): if self.error is None: self.error = self._empty_error() if self.conn.is_open(): self.outstanding_requests += 1 - yield from self.conn._parent._stop(self) + # yield from self.conn._parent._stop(self) + await self.conn._parent._stop(self) def _extend(self, res_buf): Cursor._extend(self, res_buf) @@ -123,8 +127,8 @@ def _extend(self, res_buf): # Convenience function so users know when they've hit the end of the cursor # without having to catch an exception - @asyncio.coroutine - def fetch_next(self, wait=True): + # @asyncio.coroutine + async def fetch_next(self, wait=True): timeout = Cursor._wait_to_timeout(wait) waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0 and self.error is None: @@ -132,7 +136,8 @@ def fetch_next(self, wait=True): if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + # yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) # If there is a (non-empty) error to be received, we return True, so the # user will receive it on the next `next` call. return len(self.items) != 0 or not isinstance(self.error, RqlCursorEmpty) @@ -142,15 +147,16 @@ def _empty_error(self): # with mechanisms to return from a coroutine. return RqlCursorEmpty() - @asyncio.coroutine - def _get_next(self, timeout): + # @asyncio.coroutine + async def _get_next(self, timeout): waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0: self._maybe_fetch_batch() if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + # yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) return self.items.popleft() def _maybe_fetch_batch(self): @@ -187,8 +193,8 @@ def client_address(self): if self.is_open(): return self._streamwriter.get_extra_info("sockname")[0] - @asyncio.coroutine - def connect(self, timeout): + # @asyncio.coroutine + async def connect(self, timeout): try: ssl_context = None if len(self._parent.ssl) > 0: @@ -200,7 +206,8 @@ def connect(self, timeout): ssl_context.check_hostname = True # redundant with match_hostname ssl_context.load_verify_locations(self._parent.ssl["ca_certs"]) - self._streamreader, self._streamwriter = yield from asyncio.open_connection( + # self._streamreader, self._streamwriter = yield from asyncio.open_connection( + self._streamreader, self._streamwriter = await asyncio.open_connection( self._parent.host, self._parent.port, ssl=ssl_context, @@ -230,22 +237,26 @@ def connect(self, timeout): if request is not "": self._streamwriter.write(request) - response = yield from asyncio.wait_for( + # response = yield from asyncio.wait_for( + response = await asyncio.wait_for( _read_until(self._streamreader, b"\0"), timeout, ) response = response[:-1] except ReqlAuthError: - yield from self.close() + # yield from self.close() + await self.close() raise except ReqlTimeoutError as err: - yield from self.close() + # yield from self.close() + await self.close() raise ReqlDriverError( "Connection interrupted during handshake with %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) ) except Exception as err: - yield from self.close() + # yield from self.close() + await self.close() raise ReqlDriverError( "Could not connect to %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) @@ -259,8 +270,8 @@ def connect(self, timeout): def is_open(self): return not (self._closing or self._streamreader.at_eof()) - @asyncio.coroutine - def close(self, noreply_wait=False, token=None, exception=None): + # @asyncio.coroutine + async def close(self, noreply_wait=False, token=None, exception=None): self._closing = True if exception is not None: err_message = "Connection is closed (%s)." % str(exception) @@ -280,38 +291,43 @@ def close(self, noreply_wait=False, token=None, exception=None): if noreply_wait: noreply = Query(pQuery.NOREPLY_WAIT, token, None, None) - yield from self.run_query(noreply, False) + # yield from self.run_query(noreply, False) + await self.run_query(noreply, False) self._streamwriter.close() # We must not wait for the _reader_task if we got an exception, because that # means that we were called from it. Waiting would lead to a deadlock. if self._reader_task and exception is None: - yield from self._reader_task + # yield from self._reader_task + await self._reader_task return None - @asyncio.coroutine - def run_query(self, query, noreply): + # @asyncio.coroutine + async def run_query(self, query, noreply): self._streamwriter.write(query.serialize(self._parent._get_json_encoder(query))) if noreply: return None response_future = asyncio.Future() self._user_queries[query.token] = (query, response_future) - return (yield from response_future) + # return (yield from response_future) + return (await response_future) # The _reader coroutine runs in parallel, reading responses # off of the socket and forwarding them to the appropriate Future or Cursor. # This is shut down as a consequence of closing the stream, or an error in the # socket/protocol from the server. Unexpected errors in this coroutine will # close the ConnectionInstance and be passed to any open Futures or Cursors. - @asyncio.coroutine - def _reader(self): + # @asyncio.coroutine + async def _reader(self): try: while True: - buf = yield from self._streamreader.readexactly(12) + # buf = yield from self._streamreader.readexactly(12) + buf = await self._streamreader.readexactly(12) (token, length,) = struct.unpack(" Date: Tue, 31 Jan 2023 18:33:39 +0000 Subject: [PATCH 7/7] asyncio_net/net_asyncio.py: update for SyntaxWarning: "is not" with a literal. Did you mean "!="? --- rethinkdb/asyncio_net/net_asyncio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index d0f9ca37..526a4506 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -234,7 +234,8 @@ async def connect(self, timeout): break # This may happen in the `V1_0` protocol where we send two requests as # an optimization, then need to read each separately - if request is not "": + # if request is not "": # SyntaxWarning: "is not" with a literal. Did you mean "!="? + if request != "": self._streamwriter.write(request) # response = yield from asyncio.wait_for(