diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 100df2df17..1e16bca287 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -161,7 +161,12 @@ class ProtocolVersion(object): V5 = 5 """ - v5, in beta from 3.x+ + v5, in beta from 3.x+. Finalised in 4.0-beta5 + """ + + V6 = 6 + """ + v6, in beta from 4.0-beta5 """ DSE_V1 = 0x41 @@ -174,12 +179,12 @@ class ProtocolVersion(object): DSE private protocol v2, supported in DSE 6.0+ """ - SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V5, V4, V3, V2, V1) + SUPPORTED_VERSIONS = (DSE_V2, DSE_V1, V6, V5, V4, V3, V2, V1) """ A tuple of all supported protocol versions """ - BETA_VERSIONS = (V5,) + BETA_VERSIONS = (V6,) """ A tuple of all beta protocol versions """ diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 45e1fb410b..7e101afba8 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -63,7 +63,7 @@ BatchMessage, RESULT_KIND_PREPARED, RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS, RESULT_KIND_SCHEMA_CHANGE, ProtocolHandler, - RESULT_KIND_VOID) + RESULT_KIND_VOID, ProtocolException) from cassandra.metadata import Metadata, protect_name, murmur3, _NodeInfo from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy, ExponentialReconnectionPolicy, HostDistance, @@ -3548,6 +3548,14 @@ def _try_connect(self, host): break except ProtocolVersionUnsupported as e: self._cluster.protocol_downgrade(host.endpoint, e.startup_version) + except ProtocolException as e: + # protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver + # protocol version. If the protocol version was not explicitly specified, + # and that the server raises a beta protocol error, we should downgrade. + if not self._cluster._protocol_version_explicit and e.is_beta_protocol_error: + self._cluster.protocol_downgrade(host.endpoint, self._cluster.protocol_version) + else: + raise log.debug("[control connection] Established new connection %r, " "registering watchers and refreshing schema and topology", diff --git a/cassandra/connection.py b/cassandra/connection.py index 477eaf2f28..48b3caefed 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -897,6 +897,10 @@ def _connect_socket(self): for args in self.sockopts: self._socket.setsockopt(*args) + def _enable_compression(self): + if self._compressor: + self.compressor = self._compressor + def _enable_checksumming(self): self._io_buffer.set_checksumming_buffer() self._is_checksumming_enabled = True @@ -1328,8 +1332,7 @@ def _handle_startup_response(self, startup_response, did_authenticate=False): self.authenticator.__class__.__name__) log.debug("Got ReadyMessage on new connection (%s) from %s", id(self), self.endpoint) - if self._compressor: - self.compressor = self._compressor + self._enable_compression() if ProtocolVersion.has_checksumming_support(self.protocol_version): self._enable_checksumming() @@ -1345,6 +1348,10 @@ def _handle_startup_response(self, startup_response, did_authenticate=False): "if DSE authentication is configured with transitional mode" % (self.host,)) raise AuthenticationFailed('Remote end requires authentication') + self._enable_compression() + if ProtocolVersion.has_checksumming_support(self.protocol_version): + self._enable_checksumming() + if isinstance(self.authenticator, dict): log.debug("Sending credentials-based auth response on %s", self) cm = CredentialsMessage(creds=self.authenticator) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index c454824637..ed92a76679 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -180,6 +180,10 @@ class ProtocolException(ErrorMessageSub): summary = 'Protocol error' error_code = 0x000A + @property + def is_beta_protocol_error(self): + return 'USE_BETA flag is unset' in str(self) + class BadCredentials(ErrorMessageSub): summary = 'Bad credentials' diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 1e1f582804..9d350af707 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -207,8 +207,6 @@ def get_default_protocol(): if DSE_VERSION: return ProtocolVersion.DSE_V2 else: - global ALLOW_BETA_PROTOCOL - ALLOW_BETA_PROTOCOL = True return ProtocolVersion.V5 if CASSANDRA_VERSION >= Version('3.10'): if DSE_VERSION: @@ -234,9 +232,12 @@ def get_supported_protocol_versions(): 3.X -> 4, 3 3.10(C*) -> 5(beta),4,3 3.10(DSE) -> DSE_V1,4,3 - 4.0(C*) -> 5(beta),4,3 + 4.0(C*) -> 6(beta),5,4,3 4.0(DSE) -> DSE_v2, DSE_V1,4,3 ` """ + if CASSANDRA_VERSION >= Version('4.0-beta5'): + if not DSE_VERSION: + return (3, 4, 5, 6) if CASSANDRA_VERSION >= Version('4.0-a'): if DSE_VERSION: return (3, 4, ProtocolVersion.DSE_V1, ProtocolVersion.DSE_V2) @@ -316,7 +317,7 @@ def _id_and_mark(f): notprotocolv1 = unittest.skipUnless(PROTOCOL_VERSION > 1, 'Protocol v1 not supported') lessthenprotocolv4 = unittest.skipUnless(PROTOCOL_VERSION < 4, 'Protocol versions 4 or greater not supported') greaterthanprotocolv3 = unittest.skipUnless(PROTOCOL_VERSION >= 4, 'Protocol versions less than 4 are not supported') -protocolv5 = unittest.skipUnless(5 in get_supported_protocol_versions(), 'Protocol versions less than 5 are not supported') +protocolv6 = unittest.skipUnless(6 in get_supported_protocol_versions(), 'Protocol versions less than 6 are not supported') greaterthancass20 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.1'), 'Cassandra version 2.1 or greater required') greaterthancass21 = unittest.skipUnless(CASSANDRA_VERSION >= Version('2.2'), 'Cassandra version 2.2 or greater required') greaterthanorequalcass30 = unittest.skipUnless(CASSANDRA_VERSION >= Version('3.0'), 'Cassandra version 3.0 or greater required') diff --git a/tests/integration/simulacron/test_empty_column.py b/tests/integration/simulacron/test_empty_column.py index bd7fe6ead0..91c76985e1 100644 --- a/tests/integration/simulacron/test_empty_column.py +++ b/tests/integration/simulacron/test_empty_column.py @@ -27,8 +27,8 @@ from cassandra.cqlengine.connection import set_session from cassandra.cqlengine.models import Model -from tests.integration import PROTOCOL_VERSION, requiressimulacron -from tests.integration.simulacron import SimulacronCluster +from tests.integration import requiressimulacron +from tests.integration.simulacron import PROTOCOL_VERSION, SimulacronCluster from tests.integration.simulacron.utils import PrimeQuery, prime_request diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index cdb6f1f3b7..c7d8266fd9 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -42,7 +42,7 @@ from tests import notwindows from tests.integration import use_singledc, get_server_versions, CASSANDRA_VERSION, \ execute_until_pass, execute_with_long_wait_retry, get_node, MockLoggingHandler, get_unsupported_lower_protocol, \ - get_unsupported_upper_protocol, protocolv5, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \ + get_unsupported_upper_protocol, protocolv6, local, CASSANDRA_IP, greaterthanorequalcass30, lessthanorequalcass40, \ DSE_VERSION, TestCluster, PROTOCOL_VERSION from tests.integration.util import assert_quiescent_pool_state import sys @@ -261,6 +261,18 @@ def test_protocol_negotiation(self): elif DSE_VERSION and DSE_VERSION >= Version("5.1"): self.assertEqual(updated_protocol_version, cassandra.ProtocolVersion.DSE_V1) self.assertEqual(updated_cluster_version, cassandra.ProtocolVersion.DSE_V1) + elif CASSANDRA_VERSION >= Version('4.0-beta5'): + self.assertEqual(updated_protocol_version, cassandra.ProtocolVersion.V5) + self.assertEqual(updated_cluster_version, cassandra.ProtocolVersion.V5) + elif CASSANDRA_VERSION >= Version('4.0-a'): + self.assertEqual(updated_protocol_version, cassandra.ProtocolVersion.V4) + self.assertEqual(updated_cluster_version, cassandra.ProtocolVersion.V4) + elif CASSANDRA_VERSION >= Version('3.11'): + self.assertEqual(updated_protocol_version, cassandra.ProtocolVersion.V4) + self.assertEqual(updated_cluster_version, cassandra.ProtocolVersion.V4) + elif CASSANDRA_VERSION >= Version('3.0'): + self.assertEqual(updated_protocol_version, cassandra.ProtocolVersion.V4) + self.assertEqual(updated_cluster_version, cassandra.ProtocolVersion.V4) elif CASSANDRA_VERSION >= Version('2.2'): self.assertEqual(updated_protocol_version, 4) self.assertEqual(updated_cluster_version, 4) @@ -1473,42 +1485,42 @@ def test_prepare_on_ignored_hosts(self): cluster.shutdown() -@protocolv5 +@protocolv6 class BetaProtocolTest(unittest.TestCase): - @protocolv5 + @protocolv6 def test_invalid_protocol_version_beta_option(self): """ - Test cluster connection with protocol v5 and beta flag not set + Test cluster connection with protocol v6 and beta flag not set @since 3.7.0 - @jira_ticket PYTHON-614 - @expected_result client shouldn't connect with V5 and no beta flag set + @jira_ticket PYTHON-614, PYTHON-1232 + @expected_result client shouldn't connect with V6 and no beta flag set @test_category connection """ - cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V5, allow_beta_protocol_version=False) + cluster = TestCluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=False) try: with self.assertRaises(NoHostAvailable): cluster.connect() except Exception as e: self.fail("Unexpected error encountered {0}".format(e.message)) - @protocolv5 + @protocolv6 def test_valid_protocol_version_beta_options_connect(self): """ Test cluster connection with protocol version 5 and beta flag set @since 3.7.0 - @jira_ticket PYTHON-614 - @expected_result client should connect with protocol v5 and beta flag set. + @jira_ticket PYTHON-614, PYTHON-1232 + @expected_result client should connect with protocol v6 and beta flag set. @test_category connection """ - cluster = Cluster(protocol_version=cassandra.ProtocolVersion.V5, allow_beta_protocol_version=True) + cluster = Cluster(protocol_version=cassandra.ProtocolVersion.V6, allow_beta_protocol_version=True) session = cluster.connect() - self.assertEqual(cluster.protocol_version, cassandra.ProtocolVersion.V5) + self.assertEqual(cluster.protocol_version, cassandra.ProtocolVersion.V6) self.assertTrue(session.execute("select release_version from system.local")[0]) cluster.shutdown() diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 249c0a17cc..620f642084 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -209,6 +209,8 @@ def test_protocol_downgrade_test(self): lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V2) self.assertEqual(ProtocolVersion.DSE_V1, lower) lower = ProtocolVersion.get_lower_supported(ProtocolVersion.DSE_V1) + self.assertEqual(ProtocolVersion.V5,lower) + lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V5) self.assertEqual(ProtocolVersion.V4,lower) lower = ProtocolVersion.get_lower_supported(ProtocolVersion.V4) self.assertEqual(ProtocolVersion.V3,lower)