-
Notifications
You must be signed in to change notification settings - Fork 557
PYTHON-1419 Connection failure to SNI endpoint when first host is unavailable #1243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
8c42514
9698893
98a592a
72978f5
36e2508
bb1f1d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,13 +15,16 @@ | |
|
||
import logging | ||
import socket | ||
import uuid | ||
|
||
from unittest.mock import patch, Mock | ||
from unittest.mock import patch, Mock, MagicMock | ||
|
||
from cassandra import ConsistencyLevel, DriverException, Timeout, Unavailable, RequestExecutionException, ReadTimeout, WriteTimeout, CoordinationFailure, ReadFailure, WriteFailure, FunctionFailure, AlreadyExists,\ | ||
InvalidRequest, Unauthorized, AuthenticationFailed, OperationTimedOut, UnsupportedOperation, RequestValidationException, ConfigurationException, ProtocolVersion | ||
from cassandra.cluster import _Scheduler, Session, Cluster, default_lbp_factory, \ | ||
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT | ||
ExecutionProfile, _ConfigMode, EXEC_PROFILE_DEFAULT, ControlConnection | ||
from cassandra.connection import SniEndPoint, Connection, SniEndPointFactory | ||
from cassandra.datastax.cloud import CloudConfig | ||
from cassandra.pool import Host | ||
from cassandra.policies import HostDistance, RetryPolicy, RoundRobinPolicy, DowngradingConsistencyRetryPolicy, SimpleConvictionPolicy | ||
from cassandra.query import SimpleStatement, named_tuple_factory, tuple_factory | ||
|
@@ -31,6 +34,7 @@ | |
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class ExceptionTypeTest(unittest.TestCase): | ||
|
||
def test_exception_types(self): | ||
|
@@ -85,6 +89,12 @@ def test_exception_types(self): | |
self.assertTrue(issubclass(UnsupportedOperation, DriverException)) | ||
|
||
|
||
class MockOrderedPolicy(RoundRobinPolicy): | ||
all_hosts = set() | ||
|
||
def make_query_plan(self, working_keyspace=None, query=None): | ||
return sorted(self.all_hosts, key=lambda x: x.endpoint.ssl_options['server_hostname']) | ||
|
||
class ClusterTest(unittest.TestCase): | ||
|
||
def test_tuple_for_contact_points(self): | ||
|
@@ -119,6 +129,66 @@ def test_requests_in_flight_threshold(self): | |
for n in (0, mn, 128): | ||
self.assertRaises(ValueError, c.set_max_requests_per_connection, d, n) | ||
|
||
# Tests verifies that driver can connect to SNI endpoint even when one IP | ||
# returned by the DNS resolution of SNI raises error. Mocked SNI resolution method | ||
# returns two IPs. Trying to connect to the first one always fails | ||
# with socket exception. | ||
def test_sni_round_robin_dns_resolution(self): | ||
def _mocked_cloud_config(cloud_config, create_pyopenssl_context): | ||
config = CloudConfig.from_dict({}) | ||
config.sni_host = 'proxy.datastax.com' | ||
config.sni_port = 9042 | ||
# for 2e25021d-8d72-41a7-a247-3da85c5d92d2 we return IP 127.0.0.1 to which connection fails | ||
config.host_ids = ['2e25021d-8d72-41a7-a247-3da85c5d92d2', '8c4b6ed7-f505-4226-b7a4-41f322520c1f'] | ||
return config | ||
|
||
def _mocked_proxy_dns_resolution(self): | ||
return [ | ||
(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, ('127.0.0.1', 9042)), | ||
(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, ('127.0.0.2', 9042)) | ||
] | ||
|
||
def _mocked_try_connect(self, host): | ||
address, port = host.endpoint.resolve() | ||
if address == '127.0.0.1': | ||
raise socket.error | ||
return MagicMock(spec=Connection) | ||
|
||
with patch('cassandra.datastax.cloud.get_cloud_config', _mocked_cloud_config): | ||
with patch.object(SniEndPoint, '_resolve_proxy_addresses', _mocked_proxy_dns_resolution): | ||
cloud_config = { | ||
'secure_connect_bundle': '/path/to/secure-connect-dbname.zip' | ||
} | ||
cluster = Cluster(cloud=cloud_config) | ||
lbp = MockOrderedPolicy() | ||
cluster.load_balancing_policy = lbp | ||
with patch.object(ControlConnection, '_try_connect', _mocked_try_connect): | ||
for endpoint in cluster.endpoints_resolved: | ||
host, new = cluster.add_host(endpoint, signal=False) | ||
lbp.all_hosts.add(host) | ||
# No NoHostAvailable indicates that test passed. | ||
cluster.control_connection.connect() | ||
cluster.shutdown() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's not... really an assertion in this test though, right? I mean the assertion is basically "did you raise socket.error or not"? I'm kinda wondering if it isn't enough just to validate this with a unit test that confirms that any query plan we get off of an LBP built with SNI end points now does the right thing: # Validate that at least the default LBP can create a query plan with end points that resolve to different addresses
# initially. This may not be exactly how things play out in practice (the control connection will muck with this even
# if nothing else does) but it should be a pretty good approximation.
def test_query_plan_for_sni_contains_unique_addresses(self):
node_cnt = 5
def _mocked_proxy_dns_resolution(self):
return [(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, ('127.0.0.%s' % (i,), 9042)) for i in range(node_cnt)]
c = Cluster(protocol_version=2)
lbp = c.load_balancing_policy
lbp.local_dc = "dc1"
factory = SniEndPointFactory("proxy.foo.bar", 9042)
for host in (Host(factory.create({"host_id": uuid.uuid4().hex, "dc": "dc1"}), SimpleConvictionPolicy) for i in range(node_cnt)):
lbp.on_up(host)
with patch.object(SniEndPoint, '_resolve_proxy_addresses', _mocked_proxy_dns_resolution):
addrs = [host.endpoint.resolve() for host in lbp.make_query_plan()]
self.assertEqual(len(addrs), len(set(addrs))) I've validated that this test passes with the other changes in this PR and fails with "5 != 1" when run against what's in master now. Both of those results are entirely expected. |
||
|
||
# Validate that at least the default LBP can create a query plan with end points that resolve | ||
# to different addresses initially. This may not be exactly how things play out in practice | ||
# (the control connection will muck with this even if nothing else does) but it should be | ||
# a pretty good approximation. | ||
def test_query_plan_for_sni_contains_unique_addresses(self): | ||
node_cnt = 5 | ||
def _mocked_proxy_dns_resolution(self): | ||
return [(socket.AF_UNIX, socket.SOCK_STREAM, 0, None, ('127.0.0.%s' % (i,), 9042)) for i in range(node_cnt)] | ||
|
||
c = Cluster() | ||
lbp = c.load_balancing_policy | ||
lbp.local_dc = "dc1" | ||
factory = SniEndPointFactory("proxy.foo.bar", 9042) | ||
for host in (Host(factory.create({"host_id": uuid.uuid4().hex, "dc": "dc1"}), SimpleConvictionPolicy) for _ in range(node_cnt)): | ||
lbp.on_up(host) | ||
with patch.object(SniEndPoint, '_resolve_proxy_addresses', _mocked_proxy_dns_resolution): | ||
addrs = [host.endpoint.resolve() for host in lbp.make_query_plan()] | ||
self.assertEqual(len(addrs), len(set(addrs))) | ||
|
||
|
||
class SchedulerTest(unittest.TestCase): | ||
# TODO: this suite could be expanded; for now just adding a test covering a ticket | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worthwhile being clear on the consequence of this change. If our proxy hostname resolves to N IP address we're basically exchanging a 1-in-N chance of complete failure if the first node is unavailable (because all our endpoints will return a common IP address) for an essentially guaranteed failure that the connection for one of our nodes will fail (since with the code in this PR at least one of our nodes will return the failing IP address from resolve()).
I'm not saying it's a problem that we're making this exchange; it probably is better to have a failure with connections to one of our nodes rather than to fail completely at connect time. But it is worth pointing out that this isn't a zero-cost abstraction.