Skip to content

Sync with 3.29.0 #406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
3.29.0
======
December 19, 2023

Features
--------
* Add support for Python 3.9 through 3.12, drop support for 3.7 (PYTHON-1283)
* Removal of dependency on six module (PR 1172)
* Raise explicit exception when deserializing a vector with a subtype that isn’t a constant size (PYTHON-1371)

Others
------
* Remove outdated Python pre-3.7 references (PR 1186)
* Remove backup(.bak) files (PR 1185)
* Fix doc typo in add_callbacks (PR 1177)

3.28.0
======
June 5, 2023
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from cassandra.io.libevreactor import LibevConnection
have_libev = True
supported_reactors.append(LibevConnection)
except ImportError as exc:
except cassandra.DependencyException as exc:
pass

have_asyncio = False
Expand Down
26 changes: 25 additions & 1 deletion cassandra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def emit(self, record):

logging.getLogger('cassandra').addHandler(NullHandler())

__version_info__ = (3, 28, 2)
__version_info__ = (3, 29, 0)
__version__ = '.'.join(map(str, __version_info__))


Expand Down Expand Up @@ -735,6 +735,7 @@ class OperationType(Enum):
Read = 0
Write = 1


class RateLimitReached(ConfigurationException):
'''
Rate limit was exceeded for a partition affected by the request.
Expand All @@ -747,3 +748,26 @@ def __init__(self, op_type=None, rejected_by_coordinator=False):
self.rejected_by_coordinator = rejected_by_coordinator
message = f"[request_error_rate_limit_reached OpType={op_type.name} RejectedByCoordinator={rejected_by_coordinator}]"
Exception.__init__(self, message)


class DependencyException(Exception):
"""
Specific exception class for handling issues with driver dependencies
"""

excs = []
"""
A sequence of child exceptions
"""

def __init__(self, msg, excs=[]):
complete_msg = msg
if excs:
complete_msg += ("\nThe following exceptions were observed: \n - " + '\n - '.join(str(e) for e in excs))
Exception.__init__(self, complete_msg)

class VectorDeserializationFailure(DriverException):
"""
The driver was unable to deserialize a given vector
"""
pass
85 changes: 61 additions & 24 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from collections.abc import Mapping
from concurrent.futures import ThreadPoolExecutor, FIRST_COMPLETED, wait as wait_futures
from copy import copy
from functools import partial, wraps
from functools import partial, reduce, wraps
from itertools import groupby, count, chain
import json
import logging
Expand All @@ -45,7 +45,7 @@
from cassandra import (ConsistencyLevel, AuthenticationFailed, InvalidRequest,
OperationTimedOut, UnsupportedOperation,
SchemaTargetType, DriverException, ProtocolVersion,
UnresolvableContactPoints)
UnresolvableContactPoints, DependencyException)
from cassandra.auth import _proxy_execute_key, PlainTextAuthProvider
from cassandra.connection import (ConnectionException, ConnectionShutdown,
ConnectionHeartbeat, ProtocolVersionUnsupported,
Expand Down Expand Up @@ -113,6 +113,19 @@
except ImportError:
from cassandra.util import WeakSet # NOQA

def _is_gevent_monkey_patched():
if 'gevent.monkey' not in sys.modules:
return False
import gevent.socket
return socket.socket is gevent.socket.socket

def _try_gevent_import():
if _is_gevent_monkey_patched():
from cassandra.io.geventreactor import GeventConnection
return (GeventConnection,None)
else:
return (None,None)

def _is_eventlet_monkey_patched():
if 'eventlet.patcher' not in sys.modules:
return False
Expand All @@ -124,28 +137,46 @@ def _is_eventlet_monkey_patched():
# TODO: remove it when eventlet issue would be fixed
return False

def _is_gevent_monkey_patched():
if 'gevent.monkey' not in sys.modules:
return False
import gevent.socket
return socket.socket is gevent.socket.socket
def _try_eventlet_import():
if _is_eventlet_monkey_patched():
from cassandra.io.eventletreactor import EventletConnection
return (EventletConnection,None)
else:
return (None,None)

def _try_libev_import():
try:
from cassandra.io.libevreactor import LibevConnection
return (LibevConnection,None)
except DependencyException as e:
return (None, e)

# default to gevent when we are monkey patched with gevent, eventlet when
# monkey patched with eventlet, otherwise if libev is available, use that as
# the default because it's fastest. Otherwise, use asyncore.
if _is_gevent_monkey_patched():
from cassandra.io.geventreactor import GeventConnection as DefaultConnection
elif _is_eventlet_monkey_patched():
from cassandra.io.eventletreactor import EventletConnection as DefaultConnection
else:
def _try_asyncore_import():
try:
from cassandra.io.libevreactor import LibevConnection as DefaultConnection # NOQA
except ImportError:
try:
from cassandra.io.asyncorereactor import AsyncoreConnection as DefaultConnection # NOQA
except ImportError:
from cassandra.io.asyncioreactor import AsyncioConnection as DefaultConnection # NOQA
from cassandra.io.asyncorereactor import AsyncoreConnection
return (AsyncoreConnection,None)
except DependencyException as e:
return (None, e)

def _try_asyncio_import():
from cassandra.io.asyncioreactor import AsyncioConnection
return (AsyncioConnection, None)

def _connection_reduce_fn(val,import_fn):
(rv, excs) = val
# If we've already found a workable Connection class return immediately
if rv:
return val
(import_result, exc) = import_fn()
if exc:
excs.append(exc)
return (rv or import_result, excs)

conn_fns = (_try_gevent_import, _try_eventlet_import, _try_libev_import, _try_asyncore_import, _try_asyncio_import)
(conn_class, excs) = reduce(_connection_reduce_fn, conn_fns, (None,[]))
if not conn_class:
raise DependencyException("Exception loading connection class dependencies", excs)
DefaultConnection = conn_class

# Forces load of utf8 encoding module to avoid deadlock that occurs
# if code that is being imported tries to import the module in a seperate
Expand Down Expand Up @@ -802,9 +833,9 @@ def default_retry_policy(self, policy):
Using ssl_options without ssl_context is deprecated and will be removed in the
next major release.

An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket`` (or
``ssl.wrap_socket()`` if used without ssl_context) when new sockets are created.
This should be used when client encryption is enabled in Cassandra.
An optional dict which will be used as kwargs for ``ssl.SSLContext.wrap_socket``
when new sockets are created. This should be used when client encryption is enabled
in Cassandra.

The following documentation only applies when ssl_options is used without ssl_context.

Expand All @@ -820,6 +851,12 @@ def default_retry_policy(self, policy):
should almost always require the option ``'cert_reqs': ssl.CERT_REQUIRED``. Note also that this functionality was not built into
Python standard library until (2.7.9, 3.2). To enable this mechanism in earlier versions, patch ``ssl.match_hostname``
with a custom or `back-ported function <https://pypi.org/project/backports.ssl_match_hostname/>`_.

.. versionchanged:: 3.29.0

``ssl.match_hostname`` has been deprecated since Python 3.7 (and removed in Python 3.12). This functionality is now implemented
via ``ssl.SSLContext.check_hostname``. All options specified above (including ``check_hostname``) should continue to behave in a
way that is consistent with prior implementations.
"""

ssl_context = None
Expand Down
85 changes: 64 additions & 21 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,6 @@ class Connection(object):
_socket = None

_socket_impl = socket
_ssl_impl = ssl

_check_hostname = False
_product_type = None
Expand Down Expand Up @@ -780,7 +779,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
self.endpoint = host if isinstance(host, EndPoint) else DefaultEndPoint(host, port)

self.authenticator = authenticator
self.ssl_options = ssl_options.copy() if ssl_options else None
self.ssl_options = ssl_options.copy() if ssl_options else {}
self.ssl_context = ssl_context
self.sockopts = sockopts
self.compression = compression
Expand All @@ -800,15 +799,20 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
self._on_orphaned_stream_released = on_orphaned_stream_released

if ssl_options:
self._check_hostname = bool(self.ssl_options.pop('check_hostname', False))
if self._check_hostname:
if not getattr(ssl, 'match_hostname', None):
raise RuntimeError("ssl_options specify 'check_hostname', but ssl.match_hostname is not provided. "
"Patch or upgrade Python to use this option.")
self.ssl_options.update(self.endpoint.ssl_options or {})
elif self.endpoint.ssl_options:
self.ssl_options = self.endpoint.ssl_options

# PYTHON-1331
#
# We always use SSLContext.wrap_socket() now but legacy configs may have other params that were passed to ssl.wrap_socket()...
# and either could have 'check_hostname'. Remove these params into a separate map and use them to build an SSLContext if
# we need to do so.
#
# Note the use of pop() here; we are very deliberately removing these params from ssl_options if they're present. After this
# operation ssl_options should contain only args needed for the ssl_context.wrap_socket() call.
if not self.ssl_context and self.ssl_options:
self.ssl_context = self._build_ssl_context_from_options()

if protocol_version >= 3:
self.max_request_id = min(self.max_in_flight - 1, (2 ** 15) - 1)
Expand Down Expand Up @@ -882,15 +886,48 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs):
else:
return conn

def _build_ssl_context_from_options(self):

# Extract a subset of names from self.ssl_options which apply to SSLContext creation
ssl_context_opt_names = ['ssl_version', 'cert_reqs', 'check_hostname', 'keyfile', 'certfile', 'ca_certs', 'ciphers']
opts = {k:self.ssl_options.get(k, None) for k in ssl_context_opt_names if k in self.ssl_options}

# Python >= 3.10 requires either PROTOCOL_TLS_CLIENT or PROTOCOL_TLS_SERVER so we'll get ahead of things by always
# being explicit
ssl_version = opts.get('ssl_version', None) or ssl.PROTOCOL_TLS_CLIENT
cert_reqs = opts.get('cert_reqs', None) or ssl.CERT_REQUIRED
rv = ssl.SSLContext(protocol=int(ssl_version))
rv.check_hostname = bool(opts.get('check_hostname', False))
rv.options = int(cert_reqs)

certfile = opts.get('certfile', None)
keyfile = opts.get('keyfile', None)
if certfile:
rv.load_cert_chain(certfile, keyfile)
ca_certs = opts.get('ca_certs', None)
if ca_certs:
rv.load_verify_locations(ca_certs)
ciphers = opts.get('ciphers', None)
if ciphers:
rv.set_ciphers(ciphers)

return rv

def _wrap_socket_from_context(self):
ssl_options = self.ssl_options or {}

# Extract a subset of names from self.ssl_options which apply to SSLContext.wrap_socket (or at least the parts
# of it that don't involve building an SSLContext under the covers)
wrap_socket_opt_names = ['server_side', 'do_handshake_on_connect', 'suppress_ragged_eofs', 'server_hostname']
opts = {k:self.ssl_options.get(k, None) for k in wrap_socket_opt_names if k in self.ssl_options}

# PYTHON-1186: set the server_hostname only if the SSLContext has
# check_hostname enabled and it is not already provided by the EndPoint ssl options
if (self.ssl_context.check_hostname and
'server_hostname' not in ssl_options):
ssl_options = ssl_options.copy()
ssl_options['server_hostname'] = self.endpoint.address
self._socket = self.ssl_context.wrap_socket(self._socket, **ssl_options)
#opts['server_hostname'] = self.endpoint.address
if (self.ssl_context.check_hostname and 'server_hostname' not in opts):
server_hostname = self.endpoint.address
opts['server_hostname'] = server_hostname

return self.ssl_context.wrap_socket(self._socket, **opts)

def _initiate_connection(self, sockaddr):
if self.features.shard_id is not None:
Expand All @@ -904,8 +941,11 @@ def _initiate_connection(self, sockaddr):

self._socket.connect(sockaddr)

def _match_hostname(self):
ssl.match_hostname(self._socket.getpeercert(), self.endpoint.address)
# PYTHON-1331
#
# Allow implementations specific to an event loop to add additional behaviours
def _validate_hostname(self):
pass

def _get_socket_addresses(self):
address, port = self.endpoint.resolve()
Expand All @@ -927,18 +967,21 @@ def _connect_socket(self):
try:
self._socket = self._socket_impl.socket(af, socktype, proto)
if self.ssl_context:
self._wrap_socket_from_context()
elif self.ssl_options:
if not self._ssl_impl:
raise RuntimeError("This version of Python was not compiled with SSL support")
self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
self._socket = self._wrap_socket_from_context()
self._socket.settimeout(self.connect_timeout)
self._initiate_connection(sockaddr)
self._socket.settimeout(None)

local_addr = self._socket.getsockname()
log.debug("Connection %s: '%s' -> '%s'", id(self), local_addr, sockaddr)

# PYTHON-1331
#
# Most checking is done via the check_hostname param on the SSLContext.
# Subclasses can add additional behaviours via _validate_hostname() so
# run that here.
if self._check_hostname:
self._match_hostname()
self._validate_hostname()
sockerr = None
break
except socket.error as err:
Expand Down
Loading
Loading