Skip to content

Commit 140813c

Browse files
author
Jonathan Ellis
committed
add Session.execute_concurrent and execute_concurrent_with_args
1 parent 6e2ffd4 commit 140813c

File tree

2 files changed

+76
-64
lines changed

2 files changed

+76
-64
lines changed

cassandra/cluster.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
from cassandra.marshal import int64_pack
8282
from cassandra.timestamps import MonotonicTimestampGenerator
8383
from cassandra.util import _resolve_contact_points_to_string_map, Version
84+
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
8485

8586
from cassandra.datastax.insights.reporter import MonitorReporter
8687
from cassandra.datastax.insights.util import version_supports_insights
@@ -2725,6 +2726,79 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
27252726
future.send_request()
27262727
return future
27272728

2729+
def execute_concurrent(self, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
2730+
"""
2731+
Executes a sequence of (statement, parameters) tuples concurrently. Each
2732+
``parameters`` item must be a sequence or :const:`None`.
2733+
2734+
The `concurrency` parameter controls how many statements will be executed
2735+
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
2736+
it is recommended that this be kept below 100 times the number of
2737+
core connections per host times the number of connected hosts (see
2738+
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
2739+
the event loop thread may attempt to block on new connection creation,
2740+
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
2741+
is 3 or higher, you can safely experiment with higher levels of concurrency.
2742+
2743+
If `raise_on_first_error` is left as :const:`True`, execution will stop
2744+
after the first failed statement and the corresponding exception will be
2745+
raised.
2746+
2747+
`results_generator` controls how the results are returned.
2748+
2749+
* If :const:`False`, the results are returned only after all requests have completed.
2750+
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
2751+
memory footprint when the results set will be large -- results are yielded
2752+
as they return instead of materializing the entire list at once. The trade for lower memory
2753+
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
2754+
on-the-fly).
2755+
2756+
`execution_profile` argument is the execution profile to use for this
2757+
request, it is passed directly to :meth:`Session.execute_async`.
2758+
2759+
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
2760+
in the same order that the statements were passed in. If ``success`` is :const:`False`,
2761+
there was an error executing the statement, and ``result_or_exc``
2762+
will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
2763+
will be the query result.
2764+
2765+
Example usage::
2766+
2767+
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
2768+
2769+
statements_and_params = []
2770+
for user_id in user_ids:
2771+
params = (user_id, )
2772+
statements_and_params.append((select_statement, params))
2773+
2774+
results = session.execute_concurrent(statements_and_params, raise_on_first_error=False)
2775+
2776+
for (success, result) in results:
2777+
if not success:
2778+
handle_error(result) # result will be an Exception
2779+
else:
2780+
process_user(result[0]) # result will be a list of rows
2781+
2782+
Note: in the case that `generators` are used, it is important to ensure the consumers do not
2783+
block or attempt further synchronous requests, because no further IO will be processed until
2784+
the consumer returns. This may also produce a deadlock in the IO event thread.
2785+
"""
2786+
return execute_concurrent(self, statements_and_parameters, concurrency, raise_on_first_error, results_generator, execution_profile)
2787+
2788+
def execute_concurrent_with_args(self, statement, parameters, *args, **kwargs):
2789+
"""
2790+
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
2791+
statement and a sequence of parameters. Each item in ``parameters``
2792+
should be a sequence or :const:`None`.
2793+
2794+
Example usage::
2795+
2796+
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
2797+
parameters = [(x,) for x in range(1000)]
2798+
session.execute_concurrent_with_args(statement, parameters, concurrency=50)
2799+
"""
2800+
return execute_concurrent_with_args(self, statement, parameters, *args, **kwargs)
2801+
27282802
def execute_graph(self, query, parameters=None, trace=False, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, execute_as=None):
27292803
"""
27302804
Executes a Gremlin query string or GraphStatement synchronously,

cassandra/concurrent.py

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -29,61 +29,7 @@
2929

3030
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
3131
"""
32-
Executes a sequence of (statement, parameters) tuples concurrently. Each
33-
``parameters`` item must be a sequence or :const:`None`.
34-
35-
The `concurrency` parameter controls how many statements will be executed
36-
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
37-
it is recommended that this be kept below 100 times the number of
38-
core connections per host times the number of connected hosts (see
39-
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
40-
the event loop thread may attempt to block on new connection creation,
41-
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
42-
is 3 or higher, you can safely experiment with higher levels of concurrency.
43-
44-
If `raise_on_first_error` is left as :const:`True`, execution will stop
45-
after the first failed statement and the corresponding exception will be
46-
raised.
47-
48-
`results_generator` controls how the results are returned.
49-
50-
* If :const:`False`, the results are returned only after all requests have completed.
51-
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
52-
memory footprint when the results set will be large -- results are yielded
53-
as they return instead of materializing the entire list at once. The trade for lower memory
54-
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
55-
on-the-fly).
56-
57-
`execution_profile` argument is the execution profile to use for this
58-
request, it is passed directly to :meth:`Session.execute_async`.
59-
60-
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
61-
in the same order that the statements were passed in. If ``success`` is :const:`False`,
62-
there was an error executing the statement, and ``result_or_exc`` will be
63-
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
64-
will be the query result.
65-
66-
Example usage::
67-
68-
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
69-
70-
statements_and_params = []
71-
for user_id in user_ids:
72-
params = (user_id, )
73-
statements_and_params.append((select_statement, params))
74-
75-
results = execute_concurrent(
76-
session, statements_and_params, raise_on_first_error=False)
77-
78-
for (success, result) in results:
79-
if not success:
80-
handle_error(result) # result will be an Exception
81-
else:
82-
process_user(result[0]) # result will be a list of rows
83-
84-
Note: in the case that `generators` are used, it is important to ensure the consumers do not
85-
block or attempt further synchronous requests, because no further IO will be processed until
86-
the consumer returns. This may also produce a deadlock in the IO event thread.
32+
See :meth:`.Session.execute_concurrent`.
8733
"""
8834
if concurrency <= 0:
8935
raise ValueError("concurrency must be greater than 0")
@@ -216,14 +162,6 @@ def _results(self):
216162

217163
def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
218164
"""
219-
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
220-
statement and a sequence of parameters. Each item in ``parameters``
221-
should be a sequence or :const:`None`.
222-
223-
Example usage::
224-
225-
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
226-
parameters = [(x,) for x in range(1000)]
227-
execute_concurrent_with_args(session, statement, parameters, concurrency=50)
165+
See :meth:`.Session.execute_concurrent_with_args`.
228166
"""
229167
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)

0 commit comments

Comments
 (0)