diff --git a/rethinkdb/_dump.py b/rethinkdb/_dump.py index ec8a714b..7078af1d 100755 --- a/rethinkdb/_dump.py +++ b/rethinkdb/_dump.py @@ -172,13 +172,14 @@ def main(argv=None, prog=None): options = parse_options(argv or sys.argv[1:], prog=prog) try: if not options.quiet: + pass #ssmith, remove options # Print a warning about the capabilities of dump, so no one is confused (hopefully) - print( - """\ - NOTE: 'rethinkdb-dump' saves data, secondary indexes, and write hooks, but does *not* save - cluster metadata. You will need to recreate your cluster setup yourself after - you run 'rethinkdb-restore'.""" - ) + # print( + # """\ + # NOTE: 'rethinkdb-dump' saves data, secondary indexes, and write hooks, but does *not* save + # cluster metadata. You will need to recreate your cluster setup yourself after + # you run 'rethinkdb-restore'.""" + # ) try: start_time = time.time() diff --git a/rethinkdb/_export.py b/rethinkdb/_export.py index 01bae2f4..709721f4 100755 --- a/rethinkdb/_export.py +++ b/rethinkdb/_export.py @@ -28,7 +28,7 @@ import optparse import os import platform -import signal +# import signal import sys import tempfile import time @@ -273,9 +273,9 @@ def export_table( hook_counter, exit_event, ): - signal.signal( - signal.SIGINT, signal.SIG_DFL - ) # prevent signal handlers from being set in child processes + # signal.signal( + # signal.SIGINT, signal.SIG_DFL + # ) # prevent signal handlers from being set in child processes writer = None @@ -470,9 +470,9 @@ def run_clients(options, workingDir, db_table_set): sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0) hook_counter = multiprocessing.Value(ctypes.c_longlong, 0) - signal.signal( - signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event) - ) + # signal.signal( + # signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event) + # ) errors = [] try: @@ -552,7 +552,8 @@ def plural(num, text, plural_text): ) ) finally: - signal.signal(signal.SIGINT, signal.SIG_DFL) + pass + # signal.signal(signal.SIGINT, signal.SIG_DFL) if interrupt_event.is_set(): raise RuntimeError("Interrupted") diff --git a/rethinkdb/_import.py b/rethinkdb/_import.py index 0ce90bfc..3dea65b8 100755 --- a/rethinkdb/_import.py +++ b/rethinkdb/_import.py @@ -29,7 +29,7 @@ import multiprocessing import optparse import os -import signal +# import signal import sys import time import traceback @@ -441,10 +441,10 @@ def read_to_queue( ignore_signals=True, batch_size=None, ): - if ( - ignore_signals - ): # ToDo: work out when we are in a worker process automatically - signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these + # if ( + # ignore_signals + # ): # ToDo: work out when we are in a worker process automatically + # signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these if batch_size is None: batch_size = utils_common.default_batch_size @@ -1078,7 +1078,7 @@ def parse_options(argv, prog=None): def table_writer( tables, options, work_queue, error_queue, warning_queue, exit_event, timing_queue ): - signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these + # signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should ignore these db = table = batch = None try: @@ -1188,7 +1188,7 @@ def table_writer( def update_progress(tables, debug, exit_event, sleep=0.2): - signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should not get these + # signal.signal(signal.SIGINT, signal.SIG_IGN) # workers should not get these # give weights to each of the tables based on file size totalSize = sum([x.bytes_size for x in tables]) @@ -1269,7 +1269,7 @@ def import_tables(options, sources, files_ignored=None): progress_bar_sleep = 0.2 # - setup KeyboardInterupt handler - signal.signal(signal.SIGINT, lambda a, b: utils_common.abort(pools, exit_event)) + # signal.signal(signal.SIGINT, lambda a, b: utils_common.abort(pools, exit_event)) # - queue draining def drain_queues(): @@ -1494,7 +1494,8 @@ def plural(num, text): for key, value in sorted(timing_sums.items(), key=lambda x: x[0]): print(" %s: %.2f" % (key, value)) finally: - signal.signal(signal.SIGINT, signal.SIG_DFL) + pass + # signal.signal(signal.SIGINT, signal.SIG_DFL) drain_queues() diff --git a/rethinkdb/ast.py b/rethinkdb/ast.py index 7bd73b0c..7d0ae4a2 100644 --- a/rethinkdb/ast.py +++ b/rethinkdb/ast.py @@ -20,21 +20,13 @@ import base64 import binascii +import collections import datetime import json -import sys import threading from rethinkdb import ql2_pb2 -from rethinkdb.errors import (QueryPrinter, ReqlDriverCompileError, - ReqlDriverError, T) - -if sys.version_info < (3, 3): - # python < 3.3 uses collections - import collections -else: - # but collections is deprecated from python >= 3.3 - import collections.abc as collections +from rethinkdb.errors import QueryPrinter, ReqlDriverCompileError, ReqlDriverError, T P_TERM = ql2_pb2.Term.TermType @@ -48,6 +40,13 @@ except NameError: xrange = range +try: + collections.abc.Callable +except AttributeError: + collections.abc.Callable = collections.Callable + collections.abc.Mapping = collections.Mapping + collections.abc.Iterable = collections.Iterable + def dict_items(dictionary): return list(dictionary.items()) @@ -82,7 +81,7 @@ def clear(cls): def expr(val, nesting_depth=20): """ - Convert a Python primitive into a RQL primitive value + Convert a Python primitive into a RQL primitive value """ if not isinstance(nesting_depth, int): raise ReqlDriverCompileError("Second argument to `r.expr` must be a number.") @@ -92,7 +91,7 @@ def expr(val, nesting_depth=20): if isinstance(val, RqlQuery): return val - elif isinstance(val, collections.Callable): + elif isinstance(val, collections.abc.Callable): return Func(val) elif isinstance(val, (datetime.datetime, datetime.date)): if not hasattr(val, "tzinfo") or not val.tzinfo: @@ -113,14 +112,14 @@ def expr(val, nesting_depth=20): return Datum(val) elif isinstance(val, bytes): return Binary(val) - elif isinstance(val, collections.Mapping): + elif isinstance(val, collections.abc.Mapping): # MakeObj doesn't take the dict as a keyword args to avoid # conflicting with the `self` parameter. obj = {} for k, v in dict_items(val): obj[k] = expr(v, nesting_depth - 1) return MakeObj(obj) - elif isinstance(val, collections.Iterable): + elif isinstance(val, collections.abc.Iterable): val = [expr(v, nesting_depth - 1) for v in val] return MakeArray(*val) else: @@ -767,7 +766,7 @@ def recursively_make_hashable(obj): class ReQLEncoder(json.JSONEncoder): """ - Default JSONEncoder subclass to handle query conversion. + Default JSONEncoder subclass to handle query conversion. """ def __init__(self): @@ -787,7 +786,7 @@ def default(self, obj): class ReQLDecoder(json.JSONDecoder): """ - Default JSONDecoder subclass to handle pseudo-type conversion. + Default JSONDecoder subclass to handle pseudo-type conversion. """ def __init__(self, reql_format_opts=None): diff --git a/rethinkdb/asyncio_net/net_asyncio.py b/rethinkdb/asyncio_net/net_asyncio.py index 781081e5..526a4506 100644 --- a/rethinkdb/asyncio_net/net_asyncio.py +++ b/rethinkdb/asyncio_net/net_asyncio.py @@ -39,13 +39,14 @@ pQuery = ql2_pb2.Query.QueryType -@asyncio.coroutine -def _read_until(streamreader, delimiter): +# @asyncio.coroutine +async def _read_until(streamreader, delimiter): """Naive implementation of reading until a delimiter""" buffer = bytearray() while True: - c = yield from streamreader.read(1) + # c = yield from streamreader.read(1) + c = await streamreader.read(1) if c == b"": break # EOF buffer.append(c[0]) @@ -69,13 +70,14 @@ def reusable_waiter(loop, timeout): else: deadline = None - @asyncio.coroutine - def wait(future): + # @asyncio.coroutine + async def wait(future): if deadline is not None: new_timeout = max(deadline - loop.time(), 0) else: new_timeout = None - return (yield from asyncio.wait_for(future, new_timeout, loop=loop)) + # return (yield from asyncio.wait_for(future, new_timeout)) + return (await asyncio.wait_for(future, new_timeout)) return wait @@ -101,20 +103,22 @@ def __init__(self, *args, **kwargs): def __aiter__(self): return self - @asyncio.coroutine - def __anext__(self): + # @asyncio.coroutine + async def __anext__(self): try: - return (yield from self._get_next(None)) + # return (yield from self._get_next(None)) + return (await self._get_next(None)) except ReqlCursorEmpty: raise StopAsyncIteration - @asyncio.coroutine - def close(self): + # @asyncio.coroutine + async def close(self): if self.error is None: self.error = self._empty_error() if self.conn.is_open(): self.outstanding_requests += 1 - yield from self.conn._parent._stop(self) + # yield from self.conn._parent._stop(self) + await self.conn._parent._stop(self) def _extend(self, res_buf): Cursor._extend(self, res_buf) @@ -123,8 +127,8 @@ def _extend(self, res_buf): # Convenience function so users know when they've hit the end of the cursor # without having to catch an exception - @asyncio.coroutine - def fetch_next(self, wait=True): + # @asyncio.coroutine + async def fetch_next(self, wait=True): timeout = Cursor._wait_to_timeout(wait) waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0 and self.error is None: @@ -132,7 +136,8 @@ def fetch_next(self, wait=True): if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + # yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) # If there is a (non-empty) error to be received, we return True, so the # user will receive it on the next `next` call. return len(self.items) != 0 or not isinstance(self.error, RqlCursorEmpty) @@ -142,15 +147,16 @@ def _empty_error(self): # with mechanisms to return from a coroutine. return RqlCursorEmpty() - @asyncio.coroutine - def _get_next(self, timeout): + # @asyncio.coroutine + async def _get_next(self, timeout): waiter = reusable_waiter(self.conn._io_loop, timeout) while len(self.items) == 0: self._maybe_fetch_batch() if self.error is not None: raise self.error with translate_timeout_errors(): - yield from waiter(asyncio.shield(self.new_response)) + # yield from waiter(asyncio.shield(self.new_response)) + await waiter(asyncio.shield(self.new_response)) return self.items.popleft() def _maybe_fetch_batch(self): @@ -176,7 +182,8 @@ def __init__(self, parent, io_loop=None): self._ready = asyncio.Future() self._io_loop = io_loop if self._io_loop is None: - self._io_loop = asyncio.get_event_loop() + # self._io_loop = asyncio.get_event_loop() + self._io_loop = asyncio.get_running_loop() def client_port(self): if self.is_open(): @@ -186,8 +193,8 @@ def client_address(self): if self.is_open(): return self._streamwriter.get_extra_info("sockname")[0] - @asyncio.coroutine - def connect(self, timeout): + # @asyncio.coroutine + async def connect(self, timeout): try: ssl_context = None if len(self._parent.ssl) > 0: @@ -199,10 +206,10 @@ def connect(self, timeout): ssl_context.check_hostname = True # redundant with match_hostname ssl_context.load_verify_locations(self._parent.ssl["ca_certs"]) - self._streamreader, self._streamwriter = yield from asyncio.open_connection( + # self._streamreader, self._streamwriter = yield from asyncio.open_connection( + self._streamreader, self._streamwriter = await asyncio.open_connection( self._parent.host, self._parent.port, - loop=self._io_loop, ssl=ssl_context, ) self._streamwriter.get_extra_info("socket").setsockopt( @@ -227,26 +234,30 @@ def connect(self, timeout): break # This may happen in the `V1_0` protocol where we send two requests as # an optimization, then need to read each separately - if request is not "": + # if request is not "": # SyntaxWarning: "is not" with a literal. Did you mean "!="? + if request != "": self._streamwriter.write(request) - response = yield from asyncio.wait_for( + # response = yield from asyncio.wait_for( + response = await asyncio.wait_for( _read_until(self._streamreader, b"\0"), timeout, - loop=self._io_loop, ) response = response[:-1] except ReqlAuthError: - yield from self.close() + # yield from self.close() + await self.close() raise except ReqlTimeoutError as err: - yield from self.close() + # yield from self.close() + await self.close() raise ReqlDriverError( "Connection interrupted during handshake with %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) ) except Exception as err: - yield from self.close() + # yield from self.close() + await self.close() raise ReqlDriverError( "Could not connect to %s:%s. Error: %s" % (self._parent.host, self._parent.port, str(err)) @@ -254,14 +265,14 @@ def connect(self, timeout): # Start a parallel function to perform reads # store a reference to it so it doesn't get destroyed - self._reader_task = asyncio.ensure_future(self._reader(), loop=self._io_loop) + self._reader_task = asyncio.ensure_future(self._reader()) return self._parent def is_open(self): return not (self._closing or self._streamreader.at_eof()) - @asyncio.coroutine - def close(self, noreply_wait=False, token=None, exception=None): + # @asyncio.coroutine + async def close(self, noreply_wait=False, token=None, exception=None): self._closing = True if exception is not None: err_message = "Connection is closed (%s)." % str(exception) @@ -281,38 +292,43 @@ def close(self, noreply_wait=False, token=None, exception=None): if noreply_wait: noreply = Query(pQuery.NOREPLY_WAIT, token, None, None) - yield from self.run_query(noreply, False) + # yield from self.run_query(noreply, False) + await self.run_query(noreply, False) self._streamwriter.close() # We must not wait for the _reader_task if we got an exception, because that # means that we were called from it. Waiting would lead to a deadlock. if self._reader_task and exception is None: - yield from self._reader_task + # yield from self._reader_task + await self._reader_task return None - @asyncio.coroutine - def run_query(self, query, noreply): + # @asyncio.coroutine + async def run_query(self, query, noreply): self._streamwriter.write(query.serialize(self._parent._get_json_encoder(query))) if noreply: return None response_future = asyncio.Future() self._user_queries[query.token] = (query, response_future) - return (yield from response_future) + # return (yield from response_future) + return (await response_future) # The _reader coroutine runs in parallel, reading responses # off of the socket and forwarding them to the appropriate Future or Cursor. # This is shut down as a consequence of closing the stream, or an error in the # socket/protocol from the server. Unexpected errors in this coroutine will # close the ConnectionInstance and be passed to any open Futures or Cursors. - @asyncio.coroutine - def _reader(self): + # @asyncio.coroutine + async def _reader(self): try: while True: - buf = yield from self._streamreader.readexactly(12) + # buf = yield from self._streamreader.readexactly(12) + buf = await self._streamreader.readexactly(12) (token, length,) = struct.unpack("