diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 29bcaa9d77..150f96a624 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -2,6 +2,7 @@ APM ARGV BFCommands CacheImpl +CAS CFCommands CMSCommands ClusterNode diff --git a/.gitignore b/.gitignore index 5f77dcfde4..7184ad4e20 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ vagrant/.vagrant .cache .eggs .idea +.vscode .coverage env venv diff --git a/CHANGES b/CHANGES index 50126a86d7..1a1f4eca11 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Support transactions in ClusterPipeline * Removing support for RedisGraph module. RedisGraph support is deprecated since Redis Stack 7.2 (https://redis.com/blog/redisgraph-eol/) * Fix lock.extend() typedef to accept float TTL extension * Update URL in the readme linking to Redis University diff --git a/docs/advanced_features.rst b/docs/advanced_features.rst index 603e728e84..11ab8af716 100644 --- a/docs/advanced_features.rst +++ b/docs/advanced_features.rst @@ -167,6 +167,7 @@ the server. .. code:: python + >>> rc = RedisCluster() >>> with rc.pipeline() as pipe: ... pipe.set('foo', 'value1') ... pipe.set('bar', 'value2') @@ -177,20 +178,110 @@ the server. ... pipe.set('foo1', 'bar1').get('foo1').execute() [True, b'bar1'] -Please note: - RedisCluster pipelines currently only support key-based -commands. - The pipeline gets its ‘read_from_replicas’ value from the -cluster’s parameter. Thus, if read from replications is enabled in the -cluster instance, the pipeline will also direct read commands to -replicas. - The ‘transaction’ option is NOT supported in cluster-mode. -In non-cluster mode, the ‘transaction’ option is available when -executing pipelines. This wraps the pipeline commands with MULTI/EXEC -commands, and effectively turns the pipeline commands into a single -transaction block. This means that all commands are executed -sequentially without any interruptions from other clients. However, in -cluster-mode this is not possible, because commands are partitioned -according to their respective destination nodes. This means that we can -not turn the pipeline commands into one transaction block, because in -most cases they are split up into several smaller pipelines. +Please note: + +- RedisCluster pipelines currently only support key-based commands. +- The pipeline gets its ‘load_balancing_strategy’ value from the + cluster’s parameter. Thus, if read from replications is enabled in + the cluster instance, the pipeline will also direct read commands to + replicas. + + +Transactions in clusters +~~~~~~~~~~~~~~~~~~~~~~~~ + +Transactions are supported in cluster-mode with one caveat: all keys of +all commands issued on a transaction pipeline must reside on the +same slot. This is similar to the limitation of multikey commands in +cluster. The reason behind this is that the Redis engine does not offer +a mechanism to block or exchange key data across nodes on the fly. A +client may add some logic to abstract engine limitations when running +on a cluster, such as the pipeline behavior explained on the previous +block, but there is no simple way that a client can enforce atomicity +across nodes on a distributed system. + +The compromise of limiting the transaction pipeline to same-slot keys +is exactly that: a compromise. While this behavior is different from +non-transactional cluster pipelines, it simplifies migration of clients +from standalone to cluster under some circumstances. Note that application +code that issues multi/exec commands on a standalone client without +embedding them within a pipeline would eventually get ‘AttributeError’. +With this approach, if the application uses ‘client.pipeline(transaction=True)’, +then switching the client with a cluster-aware instance would simplify +code changes (to some extent). This may be true for application code that +makes use of hash keys, since its transactions may already be +mapping all commands to the same slot. + +An alternative is some kind of two-step commit solution, where a slot +validation is run before the actual commands are run. This could work +with controlled node maintenance but does not cover single node failures. + +Given the cluster limitations for transactions, by default pipeline isn't in +transactional mode. To enable transactional context set: + +.. code:: python + + >>> p = rc.pipeline(transaction=True) + +After entering the transactional context you can add commands to a transactional +context, by one of the following ways: + +.. code:: python + + >>> p = rc.pipeline(transaction=True) # Chaining commands + >>> p.set("key", "value") + >>> p.get("key") + >>> response = p.execute() + +Or + +.. code:: python + + >>> with rc.pipeline(transaction=True) as pipe: # Using context manager + ... pipe.set("key", "value") + ... pipe.get("key") + ... response = pipe.execute() + +As you see there's no need to explicitly send `MULTI/EXEC` commands to control context start/end +`ClusterPipeline` will take care of it. + +To ensure that different keys will be mapped to a same hash slot on the server side +prepend your keys with the same hash tag, the technique that allows you to control +keys distribution. +More information `here `_ + +.. code:: python + + >>> with rc.pipeline(transaction=True) as pipe: + ... pipe.set("{tag}foo", "bar") + ... pipe.set("{tag}bar", "foo") + ... pipe.get("{tag}foo") + ... pipe.get("{tag}bar") + ... response = pipe.execute() + +CAS Transactions +~~~~~~~~~~~~~~~~~~~~~~~~ + +If you want to apply optimistic locking for certain keys, you have to execute +`WATCH` command in transactional context. `WATCH` command follows the same limitations +as any other multi key command - all keys should be mapped to the same hash slot. + +However, the difference between CAS transaction and normal one is that you have to +explicitly call MULTI command to indicate the start of transactional context, WATCH +command itself and any subsequent commands before MULTI will be immediately executed +on the server side so you can apply optimistic locking and get necessary data before +transaction execution. + +.. code:: python + + >>> with rc.pipeline(transaction=True) as pipe: + ... pipe.watch("mykey") # Apply locking by immediately executing command + ... val = pipe.get("mykey") # Immediately retrieves value + ... val = val + 1 # Increment value + ... pipe.multi() # Starting transaction context + ... pipe.set("mykey", val) # Command will be pipelined + ... response = pipe.execute() # Returns OK or None if key was modified in the meantime + Publish / Subscribe ------------------- diff --git a/redis/__init__.py b/redis/__init__.py index f82a876b2d..14030205e3 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -16,11 +16,14 @@ BusyLoadingError, ChildDeadlockedError, ConnectionError, + CrossSlotTransactionError, DataError, + InvalidPipelineStack, InvalidResponse, OutOfMemoryError, PubSubError, ReadOnlyError, + RedisClusterException, RedisError, ResponseError, TimeoutError, @@ -56,15 +59,18 @@ def int_or_str(value): "ConnectionError", "ConnectionPool", "CredentialProvider", + "CrossSlotTransactionError", "DataError", "from_url", "default_backoff", + "InvalidPipelineStack", "InvalidResponse", "OutOfMemoryError", "PubSubError", "ReadOnlyError", "Redis", "RedisCluster", + "RedisClusterException", "RedisError", "ResponseError", "Sentinel", diff --git a/redis/client.py b/redis/client.py index 2ef95600c2..dc4f0f9d0c 100755 --- a/redis/client.py +++ b/redis/client.py @@ -34,6 +34,7 @@ from redis.commands.core import Script from redis.connection import ( AbstractConnection, + Connection, ConnectionPool, SSLConnection, UnixDomainSocketConnection, @@ -1297,9 +1298,15 @@ class Pipeline(Redis): UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} - def __init__(self, connection_pool, response_callbacks, transaction, shard_hint): + def __init__( + self, + connection_pool: ConnectionPool, + response_callbacks, + transaction, + shard_hint, + ): self.connection_pool = connection_pool - self.connection = None + self.connection: Optional[Connection] = None self.response_callbacks = response_callbacks self.transaction = transaction self.shard_hint = shard_hint @@ -1434,7 +1441,9 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline": self.command_stack.append((args, options)) return self - def _execute_transaction(self, connection, commands, raise_on_error) -> List: + def _execute_transaction( + self, connection: Connection, commands, raise_on_error + ) -> List: cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})]) all_cmds = connection.pack_commands( [args for args, options in cmds if EMPTY_RESPONSE not in options] diff --git a/redis/cluster.py b/redis/cluster.py index 6e3505404a..b614c598f9 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -3,18 +3,25 @@ import sys import threading import time +from abc import ABC, abstractmethod from collections import OrderedDict +from copy import copy from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from itertools import chain +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from redis._parsers import CommandsParser, Encoder from redis._parsers.helpers import parse_scan from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface -from redis.client import CaseInsensitiveDict, PubSub, Redis +from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis from redis.commands import READ_COMMANDS, RedisClusterCommands from redis.commands.helpers import list_or_args -from redis.connection import ConnectionPool, parse_url +from redis.connection import ( + Connection, + ConnectionPool, + parse_url, +) from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.event import ( AfterPooledConnectionsInstantiationEvent, @@ -28,7 +35,10 @@ ClusterDownError, ClusterError, ConnectionError, + CrossSlotTransactionError, DataError, + ExecAbortError, + InvalidPipelineStack, MovedError, RedisClusterException, RedisError, @@ -36,6 +46,7 @@ SlotNotCoveredError, TimeoutError, TryAgainError, + WatchError, ) from redis.lock import Lock from redis.retry import Retry @@ -60,7 +71,7 @@ def get_node_name(host: str, port: Union[str, int]) -> str: reason="Use get_connection(redis_node) instead", version="5.3.0", ) -def get_connection(redis_node, *args, **options): +def get_connection(redis_node: Redis, *args, **options) -> Connection: return redis_node.connection or redis_node.connection_pool.get_connection() @@ -741,7 +752,7 @@ def on_connect(self, connection): if self.user_on_connect_func is not None: self.user_on_connect_func(connection) - def get_redis_connection(self, node): + def get_redis_connection(self, node: "ClusterNode") -> Redis: if not node.redis_connection: with self._lock: if not node.redis_connection: @@ -839,9 +850,6 @@ def pipeline(self, transaction=None, shard_hint=None): if shard_hint: raise RedisClusterException("shard_hint is deprecated in cluster mode") - if transaction: - raise RedisClusterException("transaction is deprecated in cluster mode") - return ClusterPipeline( nodes_manager=self.nodes_manager, commands_parser=self.commands_parser, @@ -854,6 +862,7 @@ def pipeline(self, transaction=None, shard_hint=None): reinitialize_steps=self.reinitialize_steps, retry=self.retry, lock=self._lock, + transaction=transaction, ) def lock( @@ -1015,7 +1024,7 @@ def _get_command_keys(self, *args): redis_conn = self.get_default_node().redis_connection return self.commands_parser.get_keys(redis_conn, *args) - def determine_slot(self, *args): + def determine_slot(self, *args) -> int: """ Figure out what slot to use based on args. @@ -1228,8 +1237,6 @@ def _execute_command(self, target_node, *args, **kwargs): except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: - # Connection retries are being handled in the node's - # Retry object. # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. @@ -1330,6 +1337,28 @@ def load_external_module(self, funcname, func): """ setattr(self, funcname, func) + def transaction(self, func, *watches, **kwargs): + """ + Convenience method for executing the callable `func` as a transaction + while watching all keys specified in `watches`. The 'func' callable + should expect a single argument which is a Pipeline object. + """ + shard_hint = kwargs.pop("shard_hint", None) + value_from_callable = kwargs.pop("value_from_callable", False) + watch_delay = kwargs.pop("watch_delay", None) + with self.pipeline(True, shard_hint) as pipe: + while True: + try: + if watches: + pipe.watch(*watches) + func_value = func(pipe) + exec_value = pipe.execute() + return func_value if value_from_callable else exec_value + except WatchError: + if watch_delay is not None and watch_delay > 0: + time.sleep(watch_delay) + continue + class ClusterNode: def __init__(self, host, port, server_type=None, redis_connection=None): @@ -1427,7 +1456,7 @@ def __init__( event_dispatcher: Optional[EventDispatcher] = None, **kwargs, ): - self.nodes_cache = {} + self.nodes_cache: Dict[str, Redis] = {} self.slots_cache = {} self.startup_nodes = {} self.default_node = None @@ -1527,7 +1556,7 @@ def get_node_from_slot( read_from_replicas=False, load_balancing_strategy=None, server_type=None, - ): + ) -> ClusterNode: """ Gets a node that servers this hash slot """ @@ -1823,6 +1852,16 @@ def remap_host_port(self, host: str, port: int) -> Tuple[str, int]: return self.address_remap((host, port)) return host, port + def find_connection_owner(self, connection: Connection) -> Optional[Redis]: + node_name = get_node_name(connection.host, connection.port) + for node in tuple(self.nodes_cache.values()): + if node.redis_connection: + conn_args = node.redis_connection.connection_pool.connection_kwargs + if node_name == get_node_name( + conn_args.get("host"), conn_args.get("port") + ): + return node + class ClusterPubSub(PubSub): """ @@ -2082,6 +2121,10 @@ class ClusterPipeline(RedisCluster): TryAgainError, ) + NO_SLOTS_COMMANDS = {"UNWATCH"} + IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} + UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} + @deprecated_args( args_to_warn=[ "cluster_error_retry_attempts", @@ -2102,6 +2145,7 @@ def __init__( reinitialize_steps: int = 5, retry: Optional[Retry] = None, lock=None, + transaction=False, **kwargs, ): """ """ @@ -2135,6 +2179,10 @@ def __init__( if lock is None: lock = threading.Lock() self._lock = lock + self.parent_execute_command = super().execute_command + self._execution_strategy: ExecutionStrategy = ( + PipelineStrategy(self) if not transaction else TransactionStrategy(self) + ) def __repr__(self): """ """ @@ -2156,7 +2204,7 @@ def __del__(self): def __len__(self): """ """ - return len(self.command_stack) + return len(self._execution_strategy.command_queue) def __bool__(self): "Pipeline instances should always evaluate to True on Python 3+" @@ -2166,45 +2214,35 @@ def execute_command(self, *args, **kwargs): """ Wrapper function for pipeline_execute_command """ - return self.pipeline_execute_command(*args, **kwargs) + return self._execution_strategy.execute_command(*args, **kwargs) def pipeline_execute_command(self, *args, **options): """ - Appends the executed command to the pipeline's command stack - """ - self.command_stack.append( - PipelineCommand(args, options, len(self.command_stack)) - ) - return self + Stage a command to be executed when execute() is next called - def raise_first_error(self, stack): - """ - Raise the first exception on the stack + Returns the current Pipeline object back so commands can be + chained together, such as: + + pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') + + At some other point, you can then run: pipe.execute(), + which will execute all commands queued in the pipe. """ - for c in stack: - r = c.result - if isinstance(r, Exception): - self.annotate_exception(r, c.position + 1, c.args) - raise r + return self._execution_strategy.execute_command(*args, **options) def annotate_exception(self, exception, number, command): """ Provides extra context to the exception prior to it being handled """ - cmd = " ".join(map(safe_str, command)) - msg = ( - f"Command # {number} ({truncate_text(cmd)}) of pipeline " - f"caused error: {exception.args[0]}" - ) - exception.args = (msg,) + exception.args[1:] + self._execution_strategy.annotate_exception(exception, number, command) def execute(self, raise_on_error: bool = True) -> List[Any]: """ Execute all the commands in the current pipeline """ - stack = self.command_stack + try: - return self.send_cluster_commands(stack, raise_on_error) + return self._execution_strategy.execute(raise_on_error) finally: self.reset() @@ -2212,312 +2250,53 @@ def reset(self): """ Reset back to empty pipeline. """ - self.command_stack = [] - - self.scripts = set() - - # TODO: Implement - # make sure to reset the connection state in the event that we were - # watching something - # if self.watching and self.connection: - # try: - # # call this manually since our unwatch or - # # immediate_execute_command methods can call reset() - # self.connection.send_command('UNWATCH') - # self.connection.read_response() - # except ConnectionError: - # # disconnect will also remove any previous WATCHes - # self.connection.disconnect() - - # clean up the other instance attributes - self.watching = False - self.explicit_transaction = False - - # TODO: Implement - # we can safely return the connection to the pool here since we're - # sure we're no longer WATCHing anything - # if self.connection: - # self.connection_pool.release(self.connection) - # self.connection = None + self._execution_strategy.reset() def send_cluster_commands( self, stack, raise_on_error=True, allow_redirections=True ): - """ - Wrapper for CLUSTERDOWN error handling. - - If the cluster reports it is down it is assumed that: - - connection_pool was disconnected - - connection_pool was reseted - - refereh_table_asap set to True - - It will try the number of times specified by - the retries in config option "self.retry" - which defaults to 3 unless manually configured. - - If it reaches the number of times, the command will - raises ClusterDownException. - """ - if not stack: - return [] - retry_attempts = self.retry.get_retries() - while True: - try: - return self._send_cluster_commands( - stack, - raise_on_error=raise_on_error, - allow_redirections=allow_redirections, - ) - except RedisCluster.ERRORS_ALLOW_RETRY as e: - if retry_attempts > 0: - # Try again with the new cluster setup. All other errors - # should be raised. - retry_attempts -= 1 - pass - else: - raise e - - def _send_cluster_commands( - self, stack, raise_on_error=True, allow_redirections=True - ): - """ - Send a bunch of cluster commands to the redis cluster. - - `allow_redirections` If the pipeline should follow - `ASK` & `MOVED` responses automatically. If set - to false it will raise RedisClusterException. - """ - # the first time sending the commands we send all of - # the commands that were queued up. - # if we have to run through it again, we only retry - # the commands that failed. - attempt = sorted(stack, key=lambda x: x.position) - is_default_node = False - # build a list of node objects based on node names we need to - nodes = {} - - # as we move through each command that still needs to be processed, - # we figure out the slot number that command maps to, then from - # the slot determine the node. - for c in attempt: - while True: - # refer to our internal node -> slot table that - # tells us where a given command should route to. - # (it might be possible we have a cached node that no longer - # exists in the cluster, which is why we do this in a loop) - passed_targets = c.options.pop("target_nodes", None) - if passed_targets and not self._is_nodes_flag(passed_targets): - target_nodes = self._parse_target_nodes(passed_targets) - else: - target_nodes = self._determine_nodes( - *c.args, node_flag=passed_targets - ) - if not target_nodes: - raise RedisClusterException( - f"No targets were found to execute {c.args} command on" - ) - if len(target_nodes) > 1: - raise RedisClusterException( - f"Too many targets for command {c.args}" - ) - - node = target_nodes[0] - if node == self.get_default_node(): - is_default_node = True - - # now that we know the name of the node - # ( it's just a string in the form of host:port ) - # we can build a list of commands for each node. - node_name = node.name - if node_name not in nodes: - redis_node = self.get_redis_connection(node) - try: - connection = get_connection(redis_node) - except (ConnectionError, TimeoutError): - for n in nodes.values(): - n.connection_pool.release(n.connection) - # Connection retries are being handled in the node's - # Retry object. Reinitialize the node -> slot table. - self.nodes_manager.initialize() - if is_default_node: - self.replace_default_node() - raise - nodes[node_name] = NodeCommands( - redis_node.parse_response, - redis_node.connection_pool, - connection, - ) - nodes[node_name].append(c) - break - - # send the commands in sequence. - # we write to all the open sockets for each node first, - # before reading anything - # this allows us to flush all the requests out across the - # network essentially in parallel - # so that we can read them all in parallel as they come back. - # we dont' multiplex on the sockets as they come available, - # but that shouldn't make too much difference. - node_commands = nodes.values() - try: - node_commands = nodes.values() - for n in node_commands: - n.write() - - for n in node_commands: - n.read() - finally: - # release all of the redis connections we allocated earlier - # back into the connection pool. - # we used to do this step as part of a try/finally block, - # but it is really dangerous to - # release connections back into the pool if for some - # reason the socket has data still left in it - # from a previous operation. The write and - # read operations already have try/catch around them for - # all known types of errors including connection - # and socket level errors. - # So if we hit an exception, something really bad - # happened and putting any oF - # these connections back into the pool is a very bad idea. - # the socket might have unread buffer still sitting in it, - # and then the next time we read from it we pass the - # buffered result back from a previous command and - # every single request after to that connection will always get - # a mismatched result. - for n in nodes.values(): - n.connection_pool.release(n.connection) - - # if the response isn't an exception it is a - # valid response from the node - # we're all done with that command, YAY! - # if we have more commands to attempt, we've run into problems. - # collect all the commands we are allowed to retry. - # (MOVED, ASK, or connection errors or timeout errors) - attempt = sorted( - ( - c - for c in attempt - if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) - ), - key=lambda x: x.position, + return self._execution_strategy.send_cluster_commands( + stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections ) - if attempt and allow_redirections: - # RETRY MAGIC HAPPENS HERE! - # send these remaining commands one at a time using `execute_command` - # in the main client. This keeps our retry logic - # in one place mostly, - # and allows us to be more confident in correctness of behavior. - # at this point any speed gains from pipelining have been lost - # anyway, so we might as well make the best - # attempt to get the correct behavior. - # - # The client command will handle retries for each - # individual command sequentially as we pass each - # one into `execute_command`. Any exceptions - # that bubble out should only appear once all - # retries have been exhausted. - # - # If a lot of commands have failed, we'll be setting the - # flag to rebuild the slots table from scratch. - # So MOVED errors should correct themselves fairly quickly. - self.reinitialize_counter += 1 - if self._should_reinitialized(): - self.nodes_manager.initialize() - if is_default_node: - self.replace_default_node() - for c in attempt: - try: - # send each command individually like we - # do in the main client. - c.result = super().execute_command(*c.args, **c.options) - except RedisError as e: - c.result = e - - # turn the response back into a simple flat array that corresponds - # to the sequence of commands issued in the stack in pipeline.execute() - response = [] - for c in sorted(stack, key=lambda x: x.position): - if c.args[0] in self.cluster_response_callbacks: - # Remove keys entry, it needs only for cache. - c.options.pop("keys", None) - c.result = self.cluster_response_callbacks[c.args[0]]( - c.result, **c.options - ) - response.append(c.result) - - if raise_on_error: - self.raise_first_error(stack) - - return response - - def _fail_on_redirect(self, allow_redirections): - """ """ - if not allow_redirections: - raise RedisClusterException( - "ASK & MOVED redirection not allowed in this pipeline" - ) def exists(self, *keys): - return self.execute_command("EXISTS", *keys) + return self._execution_strategy.exists(*keys) def eval(self): """ """ - raise RedisClusterException("method eval() is not implemented") + return self._execution_strategy.eval() def multi(self): - """ """ - raise RedisClusterException("method multi() is not implemented") - - def immediate_execute_command(self, *args, **options): - """ """ - raise RedisClusterException( - "method immediate_execute_command() is not implemented" - ) + """ + Start a transactional block of the pipeline after WATCH commands + are issued. End the transactional block with `execute`. + """ + self._execution_strategy.multi() - def _execute_transaction(self, *args, **kwargs): + def load_scripts(self): """ """ - raise RedisClusterException("method _execute_transaction() is not implemented") + self._execution_strategy.load_scripts() - def load_scripts(self): + def discard(self): """ """ - raise RedisClusterException("method load_scripts() is not implemented") + self._execution_strategy.discard() def watch(self, *names): - """ """ - raise RedisClusterException("method watch() is not implemented") + """Watches the values at keys ``names``""" + self._execution_strategy.watch(*names) def unwatch(self): - """ """ - raise RedisClusterException("method unwatch() is not implemented") + """Unwatches all previously specified keys""" + self._execution_strategy.unwatch() def script_load_for_pipeline(self, *args, **kwargs): - """ """ - raise RedisClusterException( - "method script_load_for_pipeline() is not implemented" - ) + self._execution_strategy.script_load_for_pipeline(*args, **kwargs) def delete(self, *names): - """ - "Delete a key specified by ``names``" - """ - if len(names) != 1: - raise RedisClusterException( - "deleting multiple keys is not implemented in pipeline command" - ) - - return self.execute_command("DEL", names[0]) + self._execution_strategy.delete(*names) def unlink(self, *names): - """ - "Unlink a key specified by ``names``" - """ - if len(names) != 1: - raise RedisClusterException( - "unlinking multiple keys is not implemented in pipeline command" - ) - - return self.execute_command("UNLINK", names[0]) + self._execution_strategy.unlink(*names) def block_pipeline_command(name: str) -> Callable[..., Any]: @@ -2694,3 +2473,880 @@ def read(self): return except RedisError: c.result = sys.exc_info()[1] + + +class ExecutionStrategy(ABC): + @property + @abstractmethod + def command_queue(self): + pass + + @abstractmethod + def execute_command(self, *args, **kwargs): + """ + Execution flow for current execution strategy. + + See: ClusterPipeline.execute_command() + """ + pass + + @abstractmethod + def annotate_exception(self, exception, number, command): + """ + Annotate exception according to current execution strategy. + + See: ClusterPipeline.annotate_exception() + """ + pass + + @abstractmethod + def pipeline_execute_command(self, *args, **options): + """ + Pipeline execution flow for current execution strategy. + + See: ClusterPipeline.pipeline_execute_command() + """ + pass + + @abstractmethod + def execute(self, raise_on_error: bool = True) -> List[Any]: + """ + Executes current execution strategy. + + See: ClusterPipeline.execute() + """ + pass + + @abstractmethod + def send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): + """ + Sends commands according to current execution strategy. + + See: ClusterPipeline.send_cluster_commands() + """ + pass + + @abstractmethod + def reset(self): + """ + Resets current execution strategy. + + See: ClusterPipeline.reset() + """ + pass + + @abstractmethod + def exists(self, *keys): + pass + + @abstractmethod + def eval(self): + pass + + @abstractmethod + def multi(self): + """ + Starts transactional context. + + See: ClusterPipeline.multi() + """ + pass + + @abstractmethod + def load_scripts(self): + pass + + @abstractmethod + def watch(self, *names): + pass + + @abstractmethod + def unwatch(self): + """ + Unwatches all previously specified keys + + See: ClusterPipeline.unwatch() + """ + pass + + @abstractmethod + def script_load_for_pipeline(self, *args, **kwargs): + pass + + @abstractmethod + def delete(self, *names): + """ + "Delete a key specified by ``names``" + + See: ClusterPipeline.delete() + """ + pass + + @abstractmethod + def unlink(self, *names): + """ + "Unlink a key specified by ``names``" + + See: ClusterPipeline.unlink() + """ + pass + + @abstractmethod + def discard(self): + pass + + +class AbstractStrategy(ExecutionStrategy): + def __init__( + self, + pipe: ClusterPipeline, + ): + self._command_queue: List[PipelineCommand] = [] + self._pipe = pipe + self._nodes_manager = self._pipe.nodes_manager + + @property + def command_queue(self): + return self._command_queue + + @command_queue.setter + def command_queue(self, queue: List[PipelineCommand]): + self._command_queue = queue + + @abstractmethod + def execute_command(self, *args, **kwargs): + pass + + def pipeline_execute_command(self, *args, **options): + self._command_queue.append( + PipelineCommand(args, options, len(self._command_queue)) + ) + return self._pipe + + @abstractmethod + def execute(self, raise_on_error: bool = True) -> List[Any]: + pass + + @abstractmethod + def send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): + pass + + @abstractmethod + def reset(self): + pass + + def exists(self, *keys): + return self.execute_command("EXISTS", *keys) + + def eval(self): + """ """ + raise RedisClusterException("method eval() is not implemented") + + def load_scripts(self): + """ """ + raise RedisClusterException("method load_scripts() is not implemented") + + def script_load_for_pipeline(self, *args, **kwargs): + """ """ + raise RedisClusterException( + "method script_load_for_pipeline() is not implemented" + ) + + def annotate_exception(self, exception, number, command): + """ + Provides extra context to the exception prior to it being handled + """ + cmd = " ".join(map(safe_str, command)) + msg = ( + f"Command # {number} ({truncate_text(cmd)}) of pipeline " + f"caused error: {exception.args[0]}" + ) + exception.args = (msg,) + exception.args[1:] + + +class PipelineStrategy(AbstractStrategy): + def __init__(self, pipe: ClusterPipeline): + super().__init__(pipe) + self.command_flags = pipe.command_flags + + def execute_command(self, *args, **kwargs): + return self.pipeline_execute_command(*args, **kwargs) + + def _raise_first_error(self, stack): + """ + Raise the first exception on the stack + """ + for c in stack: + r = c.result + if isinstance(r, Exception): + self.annotate_exception(r, c.position + 1, c.args) + raise r + + def execute(self, raise_on_error: bool = True) -> List[Any]: + stack = self._command_queue + if not stack: + return [] + + try: + return self.send_cluster_commands(stack, raise_on_error) + finally: + self.reset() + + def reset(self): + """ + Reset back to empty pipeline. + """ + self._command_queue = [] + + def send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): + """ + Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling. + + If one of the retryable exceptions has been thrown we assume that: + - connection_pool was disconnected + - connection_pool was reseted + - refereh_table_asap set to True + + It will try the number of times specified by + the retries in config option "self.retry" + which defaults to 3 unless manually configured. + + If it reaches the number of times, the command will + raises ClusterDownException. + """ + if not stack: + return [] + retry_attempts = self._pipe.retry.get_retries() + while True: + try: + return self._send_cluster_commands( + stack, + raise_on_error=raise_on_error, + allow_redirections=allow_redirections, + ) + except RedisCluster.ERRORS_ALLOW_RETRY as e: + if retry_attempts > 0: + # Try again with the new cluster setup. All other errors + # should be raised. + retry_attempts -= 1 + pass + else: + raise e + + def _send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): + """ + Send a bunch of cluster commands to the redis cluster. + + `allow_redirections` If the pipeline should follow + `ASK` & `MOVED` responses automatically. If set + to false it will raise RedisClusterException. + """ + # the first time sending the commands we send all of + # the commands that were queued up. + # if we have to run through it again, we only retry + # the commands that failed. + attempt = sorted(stack, key=lambda x: x.position) + is_default_node = False + # build a list of node objects based on node names we need to + nodes = {} + + # as we move through each command that still needs to be processed, + # we figure out the slot number that command maps to, then from + # the slot determine the node. + for c in attempt: + while True: + # refer to our internal node -> slot table that + # tells us where a given command should route to. + # (it might be possible we have a cached node that no longer + # exists in the cluster, which is why we do this in a loop) + passed_targets = c.options.pop("target_nodes", None) + if passed_targets and not self._is_nodes_flag(passed_targets): + target_nodes = self._parse_target_nodes(passed_targets) + else: + target_nodes = self._determine_nodes( + *c.args, node_flag=passed_targets + ) + if not target_nodes: + raise RedisClusterException( + f"No targets were found to execute {c.args} command on" + ) + if len(target_nodes) > 1: + raise RedisClusterException( + f"Too many targets for command {c.args}" + ) + + node = target_nodes[0] + if node == self._pipe.get_default_node(): + is_default_node = True + + # now that we know the name of the node + # ( it's just a string in the form of host:port ) + # we can build a list of commands for each node. + node_name = node.name + if node_name not in nodes: + redis_node = self._pipe.get_redis_connection(node) + try: + connection = get_connection(redis_node) + except (ConnectionError, TimeoutError): + for n in nodes.values(): + n.connection_pool.release(n.connection) + # Connection retries are being handled in the node's + # Retry object. Reinitialize the node -> slot table. + self._nodes_manager.initialize() + if is_default_node: + self._pipe.replace_default_node() + raise + nodes[node_name] = NodeCommands( + redis_node.parse_response, + redis_node.connection_pool, + connection, + ) + nodes[node_name].append(c) + break + + # send the commands in sequence. + # we write to all the open sockets for each node first, + # before reading anything + # this allows us to flush all the requests out across the + # network + # so that we can read them from different sockets as they come back. + # we dont' multiplex on the sockets as they come available, + # but that shouldn't make too much difference. + try: + node_commands = nodes.values() + for n in node_commands: + n.write() + + for n in node_commands: + n.read() + finally: + # release all of the redis connections we allocated earlier + # back into the connection pool. + # we used to do this step as part of a try/finally block, + # but it is really dangerous to + # release connections back into the pool if for some + # reason the socket has data still left in it + # from a previous operation. The write and + # read operations already have try/catch around them for + # all known types of errors including connection + # and socket level errors. + # So if we hit an exception, something really bad + # happened and putting any oF + # these connections back into the pool is a very bad idea. + # the socket might have unread buffer still sitting in it, + # and then the next time we read from it we pass the + # buffered result back from a previous command and + # every single request after to that connection will always get + # a mismatched result. + for n in nodes.values(): + n.connection_pool.release(n.connection) + + # if the response isn't an exception it is a + # valid response from the node + # we're all done with that command, YAY! + # if we have more commands to attempt, we've run into problems. + # collect all the commands we are allowed to retry. + # (MOVED, ASK, or connection errors or timeout errors) + attempt = sorted( + ( + c + for c in attempt + if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY) + ), + key=lambda x: x.position, + ) + if attempt and allow_redirections: + # RETRY MAGIC HAPPENS HERE! + # send these remaining commands one at a time using `execute_command` + # in the main client. This keeps our retry logic + # in one place mostly, + # and allows us to be more confident in correctness of behavior. + # at this point any speed gains from pipelining have been lost + # anyway, so we might as well make the best + # attempt to get the correct behavior. + # + # The client command will handle retries for each + # individual command sequentially as we pass each + # one into `execute_command`. Any exceptions + # that bubble out should only appear once all + # retries have been exhausted. + # + # If a lot of commands have failed, we'll be setting the + # flag to rebuild the slots table from scratch. + # So MOVED errors should correct themselves fairly quickly. + self._pipe.reinitialize_counter += 1 + if self._pipe._should_reinitialized(): + self._nodes_manager.initialize() + if is_default_node: + self._pipe.replace_default_node() + for c in attempt: + try: + # send each command individually like we + # do in the main client. + c.result = self._pipe.parent_execute_command(*c.args, **c.options) + except RedisError as e: + c.result = e + + # turn the response back into a simple flat array that corresponds + # to the sequence of commands issued in the stack in pipeline.execute() + response = [] + for c in sorted(stack, key=lambda x: x.position): + if c.args[0] in self._pipe.cluster_response_callbacks: + # Remove keys entry, it needs only for cache. + c.options.pop("keys", None) + c.result = self._pipe.cluster_response_callbacks[c.args[0]]( + c.result, **c.options + ) + response.append(c.result) + + if raise_on_error: + self._raise_first_error(stack) + + return response + + def _is_nodes_flag(self, target_nodes): + return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags + + def _parse_target_nodes(self, target_nodes): + if isinstance(target_nodes, list): + nodes = target_nodes + elif isinstance(target_nodes, ClusterNode): + # Supports passing a single ClusterNode as a variable + nodes = [target_nodes] + elif isinstance(target_nodes, dict): + # Supports dictionaries of the format {node_name: node}. + # It enables to execute commands with multi nodes as follows: + # rc.cluster_save_config(rc.get_primaries()) + nodes = target_nodes.values() + else: + raise TypeError( + "target_nodes type can be one of the following: " + "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," + "ClusterNode, list, or dict. " + f"The passed type is {type(target_nodes)}" + ) + return nodes + + def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: + # Determine which nodes should be executed the command on. + # Returns a list of target nodes. + command = args[0].upper() + if ( + len(args) >= 2 + and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags + ): + command = f"{args[0]} {args[1]}".upper() + + nodes_flag = kwargs.pop("nodes_flag", None) + if nodes_flag is not None: + # nodes flag passed by the user + command_flag = nodes_flag + else: + # get the nodes group for this command if it was predefined + command_flag = self._pipe.command_flags.get(command) + if command_flag == self._pipe.RANDOM: + # return a random node + return [self._pipe.get_random_node()] + elif command_flag == self._pipe.PRIMARIES: + # return all primaries + return self._pipe.get_primaries() + elif command_flag == self._pipe.REPLICAS: + # return all replicas + return self._pipe.get_replicas() + elif command_flag == self._pipe.ALL_NODES: + # return all nodes + return self._pipe.get_nodes() + elif command_flag == self._pipe.DEFAULT_NODE: + # return the cluster's default node + return [self._nodes_manager.default_node] + elif command in self._pipe.SEARCH_COMMANDS[0]: + return [self._nodes_manager.default_node] + else: + # get the node that holds the key's slot + slot = self._pipe.determine_slot(*args) + node = self._nodes_manager.get_node_from_slot( + slot, + self._pipe.read_from_replicas and command in READ_COMMANDS, + self._pipe.load_balancing_strategy + if command in READ_COMMANDS + else None, + ) + return [node] + + def multi(self): + raise RedisClusterException( + "method multi() is not supported outside of transactional context" + ) + + def discard(self): + raise RedisClusterException( + "method discard() is not supported outside of transactional context" + ) + + def watch(self, *names): + raise RedisClusterException( + "method watch() is not supported outside of transactional context" + ) + + def unwatch(self, *names): + raise RedisClusterException( + "method unwatch() is not supported outside of transactional context" + ) + + def delete(self, *names): + if len(names) != 1: + raise RedisClusterException( + "deleting multiple keys is not implemented in pipeline command" + ) + + return self.execute_command("DEL", names[0]) + + def unlink(self, *names): + if len(names) != 1: + raise RedisClusterException( + "unlinking multiple keys is not implemented in pipeline command" + ) + + return self.execute_command("UNLINK", names[0]) + + +class TransactionStrategy(AbstractStrategy): + NO_SLOTS_COMMANDS = {"UNWATCH"} + IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"} + UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"} + SLOT_REDIRECT_ERRORS = (AskError, MovedError) + CONNECTION_ERRORS = ( + ConnectionError, + OSError, + ClusterDownError, + SlotNotCoveredError, + ) + + def __init__(self, pipe: ClusterPipeline): + super().__init__(pipe) + self._explicit_transaction = False + self._watching = False + self._pipeline_slots: Set[int] = set() + self._transaction_connection: Optional[Connection] = None + self._executing = False + self._retry = copy(self._pipe.retry) + self._retry.update_supported_errors( + RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS + ) + + def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]: + """ + Find a connection for a pipeline transaction. + + For running an atomic transaction, watch keys ensure that contents have not been + altered as long as the watch commands for those keys were sent over the same + connection. So once we start watching a key, we fetch a connection to the + node that owns that slot and reuse it. + """ + if not self._pipeline_slots: + raise RedisClusterException( + "At least a command with a key is needed to identify a node" + ) + + node: ClusterNode = self._nodes_manager.get_node_from_slot( + list(self._pipeline_slots)[0], False + ) + redis_node: Redis = self._pipe.get_redis_connection(node) + if self._transaction_connection: + if not redis_node.connection_pool.owns_connection( + self._transaction_connection + ): + previous_node = self._nodes_manager.find_connection_owner( + self._transaction_connection + ) + previous_node.connection_pool.release(self._transaction_connection) + self._transaction_connection = None + + if not self._transaction_connection: + self._transaction_connection = get_connection(redis_node) + + return redis_node, self._transaction_connection + + def execute_command(self, *args, **kwargs): + slot_number: Optional[int] = None + if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS: + slot_number = self._pipe.determine_slot(*args) + + if ( + self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS + ) and not self._explicit_transaction: + if args[0] == "WATCH": + self._validate_watch() + + if slot_number is not None: + if self._pipeline_slots and slot_number not in self._pipeline_slots: + raise CrossSlotTransactionError( + "Cannot watch or send commands on different slots" + ) + + self._pipeline_slots.add(slot_number) + elif args[0] not in self.NO_SLOTS_COMMANDS: + raise RedisClusterException( + f"Cannot identify slot number for command: {args[0]}," + "it cannot be triggered in a transaction" + ) + + return self._immediate_execute_command(*args, **kwargs) + else: + if slot_number is not None: + self._pipeline_slots.add(slot_number) + + return self.pipeline_execute_command(*args, **kwargs) + + def _validate_watch(self): + if self._explicit_transaction: + raise RedisError("Cannot issue a WATCH after a MULTI") + + self._watching = True + + def _immediate_execute_command(self, *args, **options): + return self._retry.call_with_retry( + lambda: self._get_connection_and_send_command(*args, **options), + self._reinitialize_on_error, + ) + + def _get_connection_and_send_command(self, *args, **options): + redis_node, connection = self._get_client_and_connection_for_transaction() + return self._send_command_parse_response( + connection, redis_node, args[0], *args, **options + ) + + def _send_command_parse_response( + self, conn, redis_node: Redis, command_name, *args, **options + ): + """ + Send a command and parse the response + """ + + conn.send_command(*args) + output = redis_node.parse_response(conn, command_name, **options) + + if command_name in self.UNWATCH_COMMANDS: + self._watching = False + return output + + def _reinitialize_on_error(self, error): + if self._watching: + if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: + raise WatchError("Slot rebalancing occurred while watching keys") + + if ( + type(error) in self.SLOT_REDIRECT_ERRORS + or type(error) in self.CONNECTION_ERRORS + ): + if self._transaction_connection: + self._transaction_connection = None + + self._pipe.reinitialize_counter += 1 + if self._pipe._should_reinitialized(): + self._nodes_manager.initialize() + self.reinitialize_counter = 0 + else: + self._nodes_manager.update_moved_exception(error) + + self._executing = False + + def _raise_first_error(self, responses, stack): + """ + Raise the first exception on the stack + """ + for r, cmd in zip(responses, stack): + if isinstance(r, Exception): + self.annotate_exception(r, cmd.position + 1, cmd.args) + raise r + + def execute(self, raise_on_error: bool = True) -> List[Any]: + stack = self._command_queue + if not stack and (not self._watching or not self._pipeline_slots): + return [] + + return self._execute_transaction_with_retries(stack, raise_on_error) + + def _execute_transaction_with_retries( + self, stack: List["PipelineCommand"], raise_on_error: bool + ): + return self._retry.call_with_retry( + lambda: self._execute_transaction(stack, raise_on_error), + self._reinitialize_on_error, + ) + + def _execute_transaction( + self, stack: List["PipelineCommand"], raise_on_error: bool + ): + if len(self._pipeline_slots) > 1: + raise CrossSlotTransactionError( + "All keys involved in a cluster transaction must map to the same slot" + ) + + self._executing = True + + redis_node, connection = self._get_client_and_connection_for_transaction() + + stack = chain( + [PipelineCommand(("MULTI",))], + stack, + [PipelineCommand(("EXEC",))], + ) + commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options] + packed_commands = connection.pack_commands(commands) + connection.send_packed_command(packed_commands) + errors = [] + + # parse off the response for MULTI + # NOTE: we need to handle ResponseErrors here and continue + # so that we read all the additional command messages from + # the socket + try: + redis_node.parse_response(connection, "MULTI") + except ResponseError as e: + self.annotate_exception(e, 0, "MULTI") + errors.append(e) + except self.CONNECTION_ERRORS as cluster_error: + self.annotate_exception(cluster_error, 0, "MULTI") + raise + + # and all the other commands + for i, command in enumerate(self._command_queue): + if EMPTY_RESPONSE in command.options: + errors.append((i, command.options[EMPTY_RESPONSE])) + else: + try: + _ = redis_node.parse_response(connection, "_") + except self.SLOT_REDIRECT_ERRORS as slot_error: + self.annotate_exception(slot_error, i + 1, command.args) + errors.append(slot_error) + except self.CONNECTION_ERRORS as cluster_error: + self.annotate_exception(cluster_error, i + 1, command.args) + raise + except ResponseError as e: + self.annotate_exception(e, i + 1, command.args) + errors.append(e) + + response = None + # parse the EXEC. + try: + response = redis_node.parse_response(connection, "EXEC") + except ExecAbortError: + if errors: + raise errors[0] + raise + + self._executing = False + + # EXEC clears any watched keys + self._watching = False + + if response is None: + raise WatchError("Watched variable changed.") + + # put any parse errors into the response + for i, e in errors: + response.insert(i, e) + + if len(response) != len(self._command_queue): + raise InvalidPipelineStack( + "Unexpected response length for cluster pipeline EXEC." + " Command stack was {} but response had length {}".format( + [c.args[0] for c in self._command_queue], len(response) + ) + ) + + # find any errors in the response and raise if necessary + if raise_on_error or len(errors) > 0: + self._raise_first_error( + response, + self._command_queue, + ) + + # We have to run response callbacks manually + data = [] + for r, cmd in zip(response, self._command_queue): + if not isinstance(r, Exception): + command_name = cmd.args[0] + if command_name in self._pipe.cluster_response_callbacks: + r = self._pipe.cluster_response_callbacks[command_name]( + r, **cmd.options + ) + data.append(r) + return data + + def reset(self): + self._command_queue = [] + + # make sure to reset the connection state in the event that we were + # watching something + if self._transaction_connection: + try: + # call this manually since our unwatch or + # immediate_execute_command methods can call reset() + self._transaction_connection.send_command("UNWATCH") + self._transaction_connection.read_response() + # we can safely return the connection to the pool here since we're + # sure we're no longer WATCHing anything + node = self._nodes_manager.find_connection_owner( + self._transaction_connection + ) + node.redis_connection.connection_pool.release( + self._transaction_connection + ) + self._transaction_connection = None + except self.CONNECTION_ERRORS: + # disconnect will also remove any previous WATCHes + if self._transaction_connection: + self._transaction_connection.disconnect() + + # clean up the other instance attributes + self._watching = False + self._explicit_transaction = False + self._pipeline_slots = set() + self._executing = False + + def send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): + raise NotImplementedError( + "send_cluster_commands cannot be executed in transactional context." + ) + + def multi(self): + if self._explicit_transaction: + raise RedisError("Cannot issue nested calls to MULTI") + if self._command_queue: + raise RedisError( + "Commands without an initial WATCH have already been issued" + ) + self._explicit_transaction = True + + def watch(self, *names): + if self._explicit_transaction: + raise RedisError("Cannot issue a WATCH after a MULTI") + + return self.execute_command("WATCH", *names) + + def unwatch(self): + if self._watching: + return self.execute_command("UNWATCH") + + return True + + def discard(self): + self.reset() + + def delete(self, *names): + return self.execute_command("DEL", *names) + + def unlink(self, *names): + return self.execute_command("UNLINK", *names) diff --git a/redis/exceptions.py b/redis/exceptions.py index bad447a086..a00ac65ac1 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -221,3 +221,21 @@ class SlotNotCoveredError(RedisClusterException): class MaxConnectionsError(ConnectionError): ... + + +class CrossSlotTransactionError(RedisClusterException): + """ + Raised when a transaction or watch is triggered in a pipeline + and not all keys or all commands belong to the same slot. + """ + + pass + + +class InvalidPipelineStack(RedisClusterException): + """ + Raised on unexpected response length on pipelines. This is + most likely a handling error on the stack. + """ + + pass diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d4e48e199b..d360ab07f7 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -3015,24 +3015,10 @@ def test_blocked_methods(self, r): They maybe implemented in the future. """ pipe = r.pipeline() - with pytest.raises(RedisClusterException): - pipe.multi() - - with pytest.raises(RedisClusterException): - pipe.immediate_execute_command() - - with pytest.raises(RedisClusterException): - pipe._execute_transaction(None, None, None) with pytest.raises(RedisClusterException): pipe.load_scripts() - with pytest.raises(RedisClusterException): - pipe.watch() - - with pytest.raises(RedisClusterException): - pipe.unwatch() - with pytest.raises(RedisClusterException): pipe.script_load_for_pipeline(None) @@ -3044,14 +3030,6 @@ def test_blocked_arguments(self, r): Currently some arguments is blocked when using in cluster mode. They maybe implemented in the future. """ - with pytest.raises(RedisClusterException) as ex: - r.pipeline(transaction=True) - - assert ( - str(ex.value).startswith("transaction is deprecated in cluster mode") - is True - ) - with pytest.raises(RedisClusterException) as ex: r.pipeline(shard_hint=True) @@ -3109,7 +3087,7 @@ def test_delete_single(self, r): pipe.delete("a") assert pipe.execute() == [1] - def test_multi_delete_unsupported(self, r): + def test_multi_delete_unsupported_cross_slot(self, r): """ Test that multi delete operation is unsupported """ @@ -3119,6 +3097,16 @@ def test_multi_delete_unsupported(self, r): with pytest.raises(RedisClusterException): pipe.delete("a", "b") + def test_multi_delete_supported_single_slot(self, r): + """ + Test that multi delete operation is supported when all keys are in the same hash slot + """ + with r.pipeline(transaction=True) as pipe: + r["{key}:a"] = 1 + r["{key}:b"] = 2 + pipe.delete("{key}:a", "{key}:b") + assert pipe.execute() + def test_unlink_single(self, r): """ Test a single unlink operation @@ -3374,6 +3362,87 @@ def test_empty_stack(self, r): result = p.execute() assert result == [] + @pytest.mark.onlycluster + def test_exec_error_in_response(self, r): + """ + an invalid pipeline command at exec time adds the exception instance + to the list of returned values + """ + hashkey = "{key}" + r[f"{hashkey}:c"] = "a" + with r.pipeline() as pipe: + pipe.set(f"{hashkey}:a", 1).set(f"{hashkey}:b", 2) + pipe.lpush(f"{hashkey}:c", 3).set(f"{hashkey}:d", 4) + result = pipe.execute(raise_on_error=False) + + assert result[0] + assert r[f"{hashkey}:a"] == b"1" + assert result[1] + assert r[f"{hashkey}:b"] == b"2" + + # we can't lpush to a key that's a string value, so this should + # be a ResponseError exception + assert isinstance(result[2], redis.ResponseError) + assert r[f"{hashkey}:c"] == b"a" + + # since this isn't a transaction, the other commands after the + # error are still executed + assert result[3] + assert r[f"{hashkey}:d"] == b"4" + + # make sure the pipe was restored to a working state + assert pipe.set(f"{hashkey}:z", "zzz").execute() == [True] + assert r[f"{hashkey}:z"] == b"zzz" + + def test_exec_error_in_no_transaction_pipeline(self, r): + r["a"] = 1 + with r.pipeline(transaction=False) as pipe: + pipe.llen("a") + pipe.expire("a", 100) + + with pytest.raises(redis.ResponseError) as ex: + pipe.execute() + + assert str(ex.value).startswith( + "Command # 1 (LLEN a) of pipeline caused error: " + ) + + assert r["a"] == b"1" + + @pytest.mark.onlycluster + @skip_if_server_version_lt("2.0.0") + def test_pipeline_discard(self, r): + hashkey = "{key}" + + # empty pipeline should raise an error + with r.pipeline() as pipe: + pipe.set(f"{hashkey}:key", "someval") + with pytest.raises(redis.exceptions.RedisClusterException) as ex: + pipe.discard() + + assert str(ex.value).startswith( + "method discard() is not supported outside of transactional context" + ) + + # setting a pipeline and discarding should do the same + with r.pipeline() as pipe: + pipe.set(f"{hashkey}:key", "someval") + pipe.set(f"{hashkey}:someotherkey", "val") + response = pipe.execute() + pipe.set(f"{hashkey}:key", "another value!") + with pytest.raises(redis.exceptions.RedisClusterException) as ex: + pipe.discard() + + assert str(ex.value).startswith( + "method discard() is not supported outside of transactional context" + ) + + pipe.set(f"{hashkey}:foo", "bar") + response = pipe.execute() + + assert response[0] + assert r.get(f"{hashkey}:foo") == b"bar" + @pytest.mark.onlycluster class TestReadOnlyPipeline: diff --git a/tests/test_cluster_transaction.py b/tests/test_cluster_transaction.py new file mode 100644 index 0000000000..0eb7a4f256 --- /dev/null +++ b/tests/test_cluster_transaction.py @@ -0,0 +1,392 @@ +import threading +from typing import Tuple +from unittest.mock import patch, Mock + +import pytest + +import redis +from redis import CrossSlotTransactionError, ConnectionPool, RedisClusterException +from redis.backoff import NoBackoff +from redis.client import Redis +from redis.cluster import PRIMARY, ClusterNode, NodesManager, RedisCluster +from redis.retry import Retry + +from .conftest import skip_if_server_version_lt + + +def _find_source_and_target_node_for_slot( + r: RedisCluster, slot: int +) -> Tuple[ClusterNode, ClusterNode]: + """Returns a pair of ClusterNodes, where the first node is the + one that owns the slot and the second is a possible target + for that slot, i.e. a primary node different from the first + one. + """ + node_migrating = r.nodes_manager.get_node_from_slot(slot) + assert node_migrating, f"No node could be found that owns slot #{slot}" + + available_targets = [ + n + for n in r.nodes_manager.startup_nodes.values() + if node_migrating.name != n.name and n.server_type == PRIMARY + ] + + assert available_targets, f"No possible target nodes for slot #{slot}" + return node_migrating, available_targets[0] + + +class TestClusterTransaction: + @pytest.mark.onlycluster + def test_pipeline_is_true(self, r): + "Ensure pipeline instances are not false-y" + with r.pipeline(transaction=True) as pipe: + assert pipe + + @pytest.mark.onlycluster + def test_pipeline_empty_transaction(self, r): + r["a"] = 0 + + with r.pipeline(transaction=True) as pipe: + assert pipe.execute() == [] + + @pytest.mark.onlycluster + def test_executes_transaction_against_cluster(self, r): + with r.pipeline(transaction=True) as tx: + tx.set("{foo}bar", "value1") + tx.set("{foo}baz", "value2") + tx.set("{foo}bad", "value3") + tx.get("{foo}bar") + tx.get("{foo}baz") + tx.get("{foo}bad") + assert tx.execute() == [ + b"OK", + b"OK", + b"OK", + b"value1", + b"value2", + b"value3", + ] + + r.flushall() + + tx = r.pipeline(transaction=True) + tx.set("{foo}bar", "value1") + tx.set("{foo}baz", "value2") + tx.set("{foo}bad", "value3") + tx.get("{foo}bar") + tx.get("{foo}baz") + tx.get("{foo}bad") + assert tx.execute() == [b"OK", b"OK", b"OK", b"value1", b"value2", b"value3"] + + @pytest.mark.onlycluster + def test_throws_exception_on_different_hash_slots(self, r): + with r.pipeline(transaction=True) as tx: + tx.set("{foo}bar", "value1") + tx.set("{foobar}baz", "value2") + + with pytest.raises( + CrossSlotTransactionError, + match="All keys involved in a cluster transaction must map to the same slot", + ): + tx.execute() + + @pytest.mark.onlycluster + def test_throws_exception_with_watch_on_different_hash_slots(self, r): + with r.pipeline(transaction=True) as tx: + with pytest.raises( + RedisClusterException, + match="WATCH - all keys must map to the same key slot", + ): + tx.watch("key1", "key2") + + @pytest.mark.onlycluster + def test_transaction_with_watched_keys(self, r): + r["a"] = 0 + + with r.pipeline(transaction=True) as pipe: + pipe.watch("a") + a = pipe.get("a") + pipe.multi() + pipe.set("a", int(a) + 1) + assert pipe.execute() == [b"OK"] + + @pytest.mark.onlycluster + def test_retry_transaction_during_unfinished_slot_migration(self, r): + """ + When a transaction is triggered during a migration, MovedError + or AskError may appear (depends on the key being already migrated + or the key not existing already). The patch on parse_response + simulates such an error, but the slot cache is not updated + (meaning the migration is still ongogin) so the pipeline eventually + fails as if it was retried but the migration is not yet complete. + """ + key = "book" + slot = r.keyslot(key) + node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot) + + with patch.object(Redis, "parse_response") as parse_response, patch.object( + NodesManager, "_update_moved_slots" + ) as manager_update_moved_slots: + + def ask_redirect_effect(connection, *args, **options): + if "MULTI" in args: + return + elif "EXEC" in args: + raise redis.exceptions.ExecAbortError() + + raise redis.exceptions.AskError(f"{slot} {node_importing.name}") + + parse_response.side_effect = ask_redirect_effect + + with r.pipeline(transaction=True) as pipe: + pipe.set(key, "val") + with pytest.raises(redis.exceptions.AskError) as ex: + pipe.execute() + + assert str(ex.value).startswith( + "Command # 1 (SET book val) of pipeline caused error:" + f" {slot} {node_importing.name}" + ) + + manager_update_moved_slots.assert_called() + + @pytest.mark.onlycluster + def test_retry_transaction_during_slot_migration_successful(self, r): + """ + If a MovedError or AskError appears when calling EXEC and no key is watched, + the pipeline is retried after updating the node manager slot table. If the + migration was completed, the transaction may then complete successfully. + """ + key = "book" + slot = r.keyslot(key) + node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot) + + with patch.object(Redis, "parse_response") as parse_response, patch.object( + NodesManager, "_update_moved_slots" + ) as manager_update_moved_slots: + + def ask_redirect_effect(conn, *args, **options): + # first call should go here, we trigger an AskError + if f"{conn.host}:{conn.port}" == node_migrating.name: + if "MULTI" in args: + return + elif "EXEC" in args: + raise redis.exceptions.ExecAbortError() + + raise redis.exceptions.AskError(f"{slot} {node_importing.name}") + # if the slot table is updated, the next call will go here + elif f"{conn.host}:{conn.port}" == node_importing.name: + if "EXEC" in args: + return [ + "MOCK_OK" + ] # mock value to validate this section was called + return + else: + assert False, f"unexpected node {conn.host}:{conn.port} was called" + + def update_moved_slot(): # simulate slot table update + ask_error = r.nodes_manager._moved_exception + assert ask_error is not None, "No AskError was previously triggered" + assert f"{ask_error.host}:{ask_error.port}" == node_importing.name + r.nodes_manager._moved_exception = None + r.nodes_manager.slots_cache[slot] = [node_importing] + + parse_response.side_effect = ask_redirect_effect + manager_update_moved_slots.side_effect = update_moved_slot + + result = None + with r.pipeline(transaction=True) as pipe: + pipe.multi() + pipe.set(key, "val") + result = pipe.execute() + + assert result and "MOCK_OK" in result, "Target node was not called" + + @pytest.mark.onlycluster + def test_retry_transaction_with_watch_after_slot_migration(self, r): + """ + If a MovedError or AskError appears when calling WATCH, the client + must attempt to recover itself before proceeding and no WatchError + should appear. + """ + key = "book" + slot = r.keyslot(key) + r.reinitialize_steps = 1 + + # force a MovedError on the first call to pipe.watch() + # by switching the node that owns the slot to another one + _node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot) + r.nodes_manager.slots_cache[slot] = [node_importing] + + with r.pipeline(transaction=True) as pipe: + pipe.watch(key) + pipe.multi() + pipe.set(key, "val") + assert pipe.execute() == [b"OK"] + + @pytest.mark.onlycluster + def test_retry_transaction_with_watch_during_slot_migration(self, r): + """ + If a MovedError or AskError appears when calling EXEC and keys were + being watched before the migration started, a WatchError should appear. + These errors imply resetting the connection and connecting to a new node, + so watches are lost anyway and the client code must be notified. + """ + key = "book" + slot = r.keyslot(key) + node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot) + + with patch.object(Redis, "parse_response") as parse_response: + + def ask_redirect_effect(conn, *args, **options): + if f"{conn.host}:{conn.port}" == node_migrating.name: + # we simulate the watch was sent before the migration started + if "WATCH" in args: + return b"OK" + # but the pipeline was triggered after the migration started + elif "MULTI" in args: + return + elif "EXEC" in args: + raise redis.exceptions.ExecAbortError() + + raise redis.exceptions.AskError(f"{slot} {node_importing.name}") + # we should not try to connect to any other node + else: + assert False, f"unexpected node {conn.host}:{conn.port} was called" + + parse_response.side_effect = ask_redirect_effect + + with r.pipeline(transaction=True) as pipe: + pipe.watch(key) + pipe.multi() + pipe.set(key, "val") + with pytest.raises(redis.exceptions.WatchError) as ex: + pipe.execute() + + assert str(ex.value).startswith( + "Slot rebalancing occurred while watching keys" + ) + + @pytest.mark.onlycluster + def test_retry_transaction_on_connection_error(self, r, mock_connection): + key = "book" + slot = r.keyslot(key) + + mock_connection.read_response.side_effect = redis.exceptions.ConnectionError( + "Conn error" + ) + mock_connection.retry = Retry(NoBackoff(), 0) + mock_pool = Mock(spec=ConnectionPool) + mock_pool.get_connection.return_value = mock_connection + mock_pool._available_connections = [mock_connection] + mock_pool._lock = threading.Lock() + + _node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot) + node_importing.redis_connection.connection_pool = mock_pool + r.nodes_manager.slots_cache[slot] = [node_importing] + r.reinitialize_steps = 1 + + with r.pipeline(transaction=True) as pipe: + pipe.set(key, "val") + assert pipe.execute() == [b"OK"] + + @pytest.mark.onlycluster + def test_retry_transaction_on_connection_error_with_watched_keys( + self, r, mock_connection + ): + key = "book" + slot = r.keyslot(key) + + mock_connection.read_response.side_effect = redis.exceptions.ConnectionError( + "Conn error" + ) + mock_connection.retry = Retry(NoBackoff(), 0) + mock_pool = Mock(spec=ConnectionPool) + mock_pool.get_connection.return_value = mock_connection + mock_pool._available_connections = [mock_connection] + mock_pool._lock = threading.Lock() + + _node_migrating, node_importing = _find_source_and_target_node_for_slot(r, slot) + node_importing.redis_connection.connection_pool = mock_pool + r.nodes_manager.slots_cache[slot] = [node_importing] + r.reinitialize_steps = 1 + + with r.pipeline(transaction=True) as pipe: + pipe.watch(key) + pipe.multi() + pipe.set(key, "val") + assert pipe.execute() == [b"OK"] + + @pytest.mark.onlycluster + def test_exec_error_raised(self, r): + hashkey = "{key}" + r[f"{hashkey}:c"] = "a" + with r.pipeline(transaction=True) as pipe: + pipe.set(f"{hashkey}:a", 1).set(f"{hashkey}:b", 2) + pipe.lpush(f"{hashkey}:c", 3).set(f"{hashkey}:d", 4) + with pytest.raises(redis.ResponseError) as ex: + pipe.execute() + assert str(ex.value).startswith( + "Command # 3 (LPUSH {key}:c 3) of pipeline caused error: " + ) + + # make sure the pipe was restored to a working state + assert pipe.set(f"{hashkey}:z", "zzz").execute() == [b"OK"] + assert r[f"{hashkey}:z"] == b"zzz" + + @pytest.mark.onlycluster + def test_parse_error_raised(self, r): + hashkey = "{key}" + with r.pipeline(transaction=True) as pipe: + # the zrem is invalid because we don't pass any keys to it + pipe.set(f"{hashkey}:a", 1).zrem(f"{hashkey}:b").set(f"{hashkey}:b", 2) + with pytest.raises(redis.ResponseError) as ex: + pipe.execute() + + assert str(ex.value).startswith( + "Command # 2 (ZREM {key}:b) of pipeline caused error: wrong number" + ) + + # make sure the pipe was restored to a working state + assert pipe.set(f"{hashkey}:z", "zzz").execute() == [b"OK"] + assert r[f"{hashkey}:z"] == b"zzz" + + @pytest.mark.onlycluster + def test_transaction_callable(self, r): + hashkey = "{key}" + r[f"{hashkey}:a"] = 1 + r[f"{hashkey}:b"] = 2 + has_run = [] + + def my_transaction(pipe): + a_value = pipe.get(f"{hashkey}:a") + assert a_value in (b"1", b"2") + b_value = pipe.get(f"{hashkey}:b") + assert b_value == b"2" + + # silly run-once code... incr's "a" so WatchError should be raised + # forcing this all to run again. this should incr "a" once to "2" + if not has_run: + r.incr(f"{hashkey}:a") + has_run.append("it has") + + pipe.multi() + pipe.set(f"{hashkey}:c", int(a_value) + int(b_value)) + + result = r.transaction(my_transaction, f"{hashkey}:a", f"{hashkey}:b") + assert result == [b"OK"] + assert r[f"{hashkey}:c"] == b"4" + + @pytest.mark.onlycluster + @skip_if_server_version_lt("2.0.0") + def test_transaction_discard(self, r): + hashkey = "{key}" + + # pipelines enabled as transactions can be discarded at any point + with r.pipeline(transaction=True) as pipe: + pipe.watch(f"{hashkey}:key") + pipe.set(f"{hashkey}:key", "someval") + pipe.discard() + + assert not pipe._execution_strategy._watching + assert not pipe.command_stack