Skip to content

Commit ef23a2a

Browse files
committed
Merge branch 'master' of https://github.com/datastax/python-driver into sync_with_upstream
* 'master' of https://github.com/datastax/python-driver: Merge pull request datastax#1126 from eamanu/fix-typos PYTHON-1294: Upgrade importlib-metadata to a much newer version Add tests for recent addition of execution profile support to cassandra.concurrent Merge pull request datastax#1122 from andy-slac/concurrent-execution-profiles Merge pull request datastax#1119 from datastax/python-1290 Merge pull request datastax#1117 from datastax/remove_unittest2 Removing file unexpectedly included in previous PR Merge pull request datastax#1114 from haaawk/stream_ids_fix Merge pull request datastax#1116 from Orenef11/fix_default_argument_value Comment update following off of datastax#1110 Merge pull request datastax#1103 from numberly/fix_empty_paging Merge pull request datastax#1103 from psarna/fix_deprecation_in_tracing Fixes to the Travis build. (datastax#1111)
2 parents 93573ae + 9a645c5 commit ef23a2a

File tree

136 files changed

+247
-594
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

136 files changed

+247
-594
lines changed

cassandra/cluster.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2451,7 +2451,7 @@ def default_consistency_level(self, cl):
24512451
*Deprecated:* use execution profiles instead
24522452
"""
24532453
warn("Setting the consistency level at the session level will be removed in 4.0. Consider using "
2454-
"execution profiles and setting the desired consitency level to the EXEC_PROFILE_DEFAULT profile."
2454+
"execution profiles and setting the desired consistency level to the EXEC_PROFILE_DEFAULT profile."
24552455
, DeprecationWarning)
24562456
self._validate_set_legacy_config('default_consistency_level', cl)
24572457

@@ -5212,6 +5212,12 @@ def next(self):
52125212
self._page_iter = iter(self._current_rows)
52135213
return self.next()
52145214

5215+
# Some servers can return empty pages in this case; Scylla is known to do
5216+
# so in some circumstances. Guard against this by recursing to handle
5217+
# the next(iter) call. If we have an empty page in that case it will
5218+
# get handled by the StopIteration handler when we recurse.
5219+
return self.next()
5220+
52155221
return next(self._page_iter)
52165222

52175223
__next__ = next

cassandra/concurrent.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@
2121
from threading import Condition
2222
import sys
2323

24-
from cassandra.cluster import ResultSet
24+
from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT
2525

2626
import logging
2727
log = logging.getLogger(__name__)
2828

2929

3030
ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc'])
3131

32-
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False):
32+
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
3333
"""
3434
Executes a sequence of (statement, parameters) tuples concurrently. Each
3535
``parameters`` item must be a sequence or :const:`None`.
@@ -56,6 +56,9 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
5656
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
5757
on-the-fly).
5858
59+
`execution_profile` argument is the execution profile to use for this
60+
request, it is passed directly to :meth:`Session.execute_async`.
61+
5962
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
6063
in the same order that the statements were passed in. If ``success`` is :const:`False`,
6164
there was an error executing the statement, and ``result_or_exc`` will be
@@ -90,17 +93,19 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, rais
9093
if not statements_and_parameters:
9194
return []
9295

93-
executor = ConcurrentExecutorGenResults(session, statements_and_parameters) if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters)
96+
executor = ConcurrentExecutorGenResults(session, statements_and_parameters, execution_profile) \
97+
if results_generator else ConcurrentExecutorListResults(session, statements_and_parameters, execution_profile)
9498
return executor.execute(concurrency, raise_on_first_error)
9599

96100

97101
class _ConcurrentExecutor(object):
98102

99103
max_error_recursion = 100
100104

101-
def __init__(self, session, statements_and_params):
105+
def __init__(self, session, statements_and_params, execution_profile):
102106
self.session = session
103107
self._enum_statements = enumerate(iter(statements_and_params))
108+
self._execution_profile = execution_profile
104109
self._condition = Condition()
105110
self._fail_fast = False
106111
self._results_queue = []
@@ -132,7 +137,7 @@ def _execute_next(self):
132137
def _execute(self, idx, statement, params):
133138
self._exec_depth += 1
134139
try:
135-
future = self.session.execute_async(statement, params, timeout=None)
140+
future = self.session.execute_async(statement, params, timeout=None, execution_profile=self._execution_profile)
136141
args = (future, idx)
137142
future.add_callbacks(
138143
callback=self._on_success, callback_args=args,

cassandra/connection.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,8 @@ class Connection(object):
769769

770770
_is_checksumming_enabled = False
771771

772+
_on_orphaned_stream_released = None
773+
772774
@property
773775
def _iobuf(self):
774776
# backward compatibility, to avoid any change in the reactors
@@ -778,8 +780,8 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
778780
ssl_options=None, sockopts=None, compression=True,
779781
cql_version=None, protocol_version=ProtocolVersion.MAX_SUPPORTED, is_control_connection=False,
780782
user_type_map=None, connect_timeout=None, allow_beta_protocol_version=False, no_compact=False,
781-
ssl_context=None, owning_pool=None, shard_id=None, total_shards=None):
782-
783+
ssl_context=None, owning_pool=None, shard_id=None, total_shards=None,
784+
on_orphaned_stream_released=None):
783785
# TODO next major rename host to endpoint and remove port kwarg.
784786
self.endpoint = host if isinstance(host, EndPoint) else DefaultEndPoint(host, port)
785787

@@ -801,7 +803,7 @@ def __init__(self, host='127.0.0.1', port=9042, authenticator=None,
801803
self._continuous_paging_sessions = {}
802804
self._socket_writable = True
803805
self.orphaned_request_ids = set()
804-
self._owning_pool = owning_pool
806+
self._on_orphaned_stream_released = on_orphaned_stream_released
805807

806808
if ssl_options:
807809
self._check_hostname = bool(self.ssl_options.pop('check_hostname', False))
@@ -1247,8 +1249,8 @@ def process_msg(self, header, body):
12471249
self.in_flight -= 1
12481250
self.orphaned_request_ids.remove(stream_id)
12491251
need_notify_of_release = True
1250-
if need_notify_of_release and self._owning_pool:
1251-
self._owning_pool.on_orphaned_stream_released()
1252+
if need_notify_of_release and self._on_orphaned_stream_released:
1253+
self._on_orphaned_stream_released()
12521254

12531255
try:
12541256
callback, decoder, result_metadata = self._requests.pop(stream_id)

cassandra/io/asyncioreactor.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ def __init__(self, timeout, callback, loop):
4646
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)
4747

4848
@staticmethod
49-
@asyncio.coroutine
50-
def _call_delayed_coro(timeout, callback, loop):
51-
yield from asyncio.sleep(timeout, loop=loop)
49+
async def _call_delayed_coro(timeout, callback, loop):
50+
await asyncio.sleep(timeout, loop=loop)
5251
return callback()
5352

5453
def __lt__(self, other):
@@ -136,8 +135,7 @@ def close(self):
136135
self._close(), loop=self._loop
137136
)
138137

139-
@asyncio.coroutine
140-
def _close(self):
138+
async def _close(self):
141139
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
142140
if self._write_watcher:
143141
self._write_watcher.cancel()
@@ -174,40 +172,39 @@ def push(self, data):
174172
# avoid races/hangs by just scheduling this, not using threadsafe
175173
self._loop.create_task(self._push_msg(chunks))
176174

177-
@asyncio.coroutine
178-
def _push_msg(self, chunks):
175+
async def _push_msg(self, chunks):
179176
# This lock ensures all chunks of a message are sequential in the Queue
180-
with (yield from self._write_queue_lock):
177+
with await self._write_queue_lock:
181178
for chunk in chunks:
182179
self._write_queue.put_nowait(chunk)
183180

184181

185-
@asyncio.coroutine
186-
def handle_write(self):
182+
async def handle_write(self):
187183
while True:
188184
try:
189-
next_msg = yield from self._write_queue.get()
185+
next_msg = await self._write_queue.get()
190186
if next_msg:
191-
yield from self._loop.sock_sendall(self._socket, next_msg)
187+
await self._loop.sock_sendall(self._socket, next_msg)
192188
except socket.error as err:
193189
log.debug("Exception in send for %s: %s", self, err)
194190
self.defunct(err)
195191
return
196192
except asyncio.CancelledError:
197193
return
198194

199-
@asyncio.coroutine
200-
def handle_read(self):
195+
async def handle_read(self):
201196
while True:
202197
try:
203-
buf = yield from self._loop.sock_recv(self._socket, self.in_buffer_size)
198+
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
204199
self._iobuf.write(buf)
205200
# sock_recv expects EWOULDBLOCK if socket provides no data, but
206201
# nonblocking ssl sockets raise these instead, so we handle them
207202
# ourselves by yielding to the event loop, where the socket will
208203
# get the reading/writing it "wants" before retrying
209204
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
210-
yield
205+
# Apparently the preferred way to yield to the event loop from within
206+
# a native coroutine based on https://github.com/python/asyncio/issues/284
207+
await asyncio.sleep(0)
211208
continue
212209
except socket.error as err:
213210
log.debug("Exception during socket recv for %s: %s",

cassandra/pool.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ def __init__(self, host, host_distance, session):
423423
return
424424

425425
log.debug("Initializing connection for host %s", self.host)
426-
first_connection = session.cluster.connection_factory(self.host.endpoint, owning_pool=self)
426+
first_connection = session.cluster.connection_factory(self.host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released)
427427
log.debug("First connection created to %s for shard_id=%i", self.host, first_connection.shard_id)
428428
self._connections[first_connection.shard_id] = first_connection
429429
self._keyspace = session.keyspace
@@ -517,7 +517,10 @@ def borrow_connection(self, timeout, routing_key=None):
517517
last_retry = True
518518
continue
519519
with self._stream_available_condition:
520-
self._stream_available_condition.wait(remaining)
520+
if conn.orphaned_threshold_reached and conn.is_closed:
521+
conn = self._get_connection()
522+
else:
523+
self._stream_available_condition.wait(remaining)
521524

522525
raise NoConnectionsAvailable("All request IDs are currently in use")
523526

@@ -563,8 +566,8 @@ def return_connection(self, connection, stream_was_orphaned=False):
563566
with self._lock:
564567
if connection in self._trash:
565568
self._trash.remove(connection)
566-
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
567-
connection.close()
569+
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
570+
connection.close()
568571
return
569572

570573
def on_orphaned_stream_released(self):
@@ -588,7 +591,8 @@ def _replace(self, connection):
588591
self._connecting.add(connection.shard_id)
589592
self._session.submit(self._open_connection_to_missing_shard, connection.shard_id)
590593
else:
591-
connection = self._session.cluster.connection_factory(self.host.endpoint, owning_pool=self)
594+
connection = self._session.cluster.connection_factory(self.host.endpoint,
595+
on_orphaned_stream_released=self.on_orphaned_stream_released)
592596
if self._keyspace:
593597
connection.set_keyspace_blocking(self._keyspace)
594598
self._connections[connection.shard_id] = connection
@@ -690,12 +694,12 @@ def _open_connection_to_missing_shard(self, shard_id):
690694
log.debug("shard_aware_endpoint=%r", shard_aware_endpoint)
691695

692696
if shard_aware_endpoint:
693-
conn = self._session.cluster.connection_factory(shard_aware_endpoint, owning_pool=self,
697+
conn = self._session.cluster.connection_factory(shard_aware_endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released,
694698
shard_id=shard_id,
695699
total_shards=self.host.sharding_info.shards_count)
696700
conn.original_endpoint = self.host.endpoint
697701
else:
698-
conn = self._session.cluster.connection_factory(self.host.endpoint, owning_pool=self)
702+
conn = self._session.cluster.connection_factory(self.host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released)
699703

700704
log.debug("Received a connection %s for shard_id=%i on host %s", id(conn), conn.shard_id, self.host)
701705
if self.is_shutdown:
@@ -827,6 +831,16 @@ def _open_connections_for_all_shards(self, skip_shard_id=None):
827831
self._connecting.add(shard_id)
828832
self._shard_connections_futures.append(future)
829833

834+
trash_conns = None
835+
with self._lock:
836+
if self._trash:
837+
trash_conns = self._trash
838+
self._trash = set()
839+
840+
if trash_conns is not None:
841+
for conn in self._trash:
842+
conn.close()
843+
830844
def _set_keyspace_for_all_conns(self, keyspace, callback):
831845
"""
832846
Asynchronously sets the keyspace for all connections. When all
@@ -905,7 +919,7 @@ def __init__(self, host, host_distance, session):
905919

906920
log.debug("Initializing new connection pool for host %s", self.host)
907921
core_conns = session.cluster.get_core_connections_per_host(host_distance)
908-
self._connections = [session.cluster.connection_factory(host.endpoint, owning_pool=self)
922+
self._connections = [session.cluster.connection_factory(host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released)
909923
for i in range(core_conns)]
910924

911925
self._keyspace = session.keyspace
@@ -1009,7 +1023,7 @@ def _add_conn_if_under_max(self):
10091023

10101024
log.debug("Going to open new connection to host %s", self.host)
10111025
try:
1012-
conn = self._session.cluster.connection_factory(self.host.endpoint, owning_pool=self)
1026+
conn = self._session.cluster.connection_factory(self.host.endpoint, on_orphaned_stream_released=self.on_orphaned_stream_released)
10131027
if self._keyspace:
10141028
conn.set_keyspace_blocking(self._session.keyspace)
10151029
self._next_trash_allowed_at = time.time() + _MIN_TRASH_INTERVAL

cassandra/query.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,8 @@ def populate(self, max_wait=2.0, wait_for_complete=True, query_cl=None):
996996
SimpleStatement(self._SELECT_SESSIONS_FORMAT, consistency_level=query_cl), (self.trace_id,), time_spent, max_wait)
997997

998998
# PYTHON-730: There is race condition that the duration mutation is written before started_at the for fast queries
999-
is_complete = session_results and session_results[0].duration is not None and session_results[0].started_at is not None
999+
session_row = session_results.one() if session_results else None
1000+
is_complete = session_row is not None and session_row.duration is not None and session_row.started_at is not None
10001001
if not session_results or (wait_for_complete and not is_complete):
10011002
time.sleep(self._BASE_RETRY_SLEEP * (2 ** attempt))
10021003
attempt += 1
@@ -1006,7 +1007,6 @@ def populate(self, max_wait=2.0, wait_for_complete=True, query_cl=None):
10061007
else:
10071008
log.debug("Fetching parital trace info for trace ID: %s", self.trace_id)
10081009

1009-
session_row = session_results[0]
10101010
self.request_type = session_row.request
10111011
self.duration = timedelta(microseconds=session_row.duration) if is_complete else None
10121012
self.started_at = session_row.started_at

cassandra/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ class OrderedMap(Mapping):
797797
'''
798798
An ordered map that accepts non-hashable types for keys. It also maintains the
799799
insertion order of items, behaving as OrderedDict in that regard. These maps
800-
are constructed and read just as normal mapping types, exept that they may
800+
are constructed and read just as normal mapping types, except that they may
801801
contain arbitrary collections and other non-hashable items as keys::
802802
803803
>>> od = OrderedMap([({'one': 1, 'two': 2}, 'value'),

test-requirements.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
scales
33
nose
44
mock>1.1
5-
#ccm>=2.1.2
6-
unittest2; python_version < '3.5'
75
pytz
86
sure
97
pure-sasl

tests/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
try:
16-
import unittest2 as unittest
17-
except ImportError:
18-
import unittest # noqa
15+
import unittest
1916
import logging
2017
import sys
2118
import socket

tests/integration/__init__.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
if connection_class is not None:
2121
Cluster.connection_class = connection_class
2222

23-
try:
24-
import unittest2 as unittest
25-
except ImportError:
26-
import unittest # noqa
23+
import unittest
2724

2825
from packaging.version import Version
2926
import logging
@@ -405,15 +402,15 @@ def get_node(node_id):
405402
return CCM_CLUSTER.nodes['node%s' % node_id]
406403

407404

408-
def use_multidc(dc_list, workloads=[]):
405+
def use_multidc(dc_list, workloads=None):
409406
use_cluster(MULTIDC_CLUSTER_NAME, dc_list, start=True, workloads=workloads)
410407

411408

412-
def use_singledc(start=True, workloads=[], use_single_interface=USE_SINGLE_INTERFACE):
409+
def use_singledc(start=True, workloads=None, use_single_interface=USE_SINGLE_INTERFACE):
413410
use_cluster(CLUSTER_NAME, [3], start=start, workloads=workloads, use_single_interface=use_single_interface)
414411

415412

416-
def use_single_node(start=True, workloads=[], configuration_options={}, dse_options={}):
413+
def use_single_node(start=True, workloads=None, configuration_options=None, dse_options=None):
417414
use_cluster(SINGLE_NODE_CLUSTER_NAME, [1], start=start, workloads=workloads,
418415
configuration_options=configuration_options, dse_options=dse_options)
419416

@@ -475,10 +472,11 @@ def start_cluster_wait_for_up(cluster):
475472

476473

477474
def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, set_keyspace=True, ccm_options=None,
478-
configuration_options={}, dse_options={}, use_single_interface=USE_SINGLE_INTERFACE):
475+
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE):
476+
configuration_options = configuration_options or {}
477+
dse_options = dse_options or {}
478+
workloads = workloads or []
479479
dse_cluster = True if DSE_VERSION else False
480-
if not workloads:
481-
workloads = []
482480

483481
if ccm_options is None and DSE_VERSION:
484482
ccm_options = {"version": CCM_VERSION}

tests/integration/advanced/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
try:
16-
import unittest2 as unittest
17-
except ImportError:
18-
import unittest # noqa
15+
import unittest
1916

2017
from six.moves.urllib.request import build_opener, Request, HTTPHandler
2118
import re

0 commit comments

Comments
 (0)