20
20
from cassandra .cluster import ExecutionProfile , EXEC_PROFILE_DEFAULT
21
21
from cassandra .concurrent import execute_concurrent , execute_concurrent_with_args , ExecutionResult
22
22
from cassandra .policies import HostDistance
23
- from cassandra .query import tuple_factory , SimpleStatement
23
+ from cassandra .query import dict_factory , tuple_factory , SimpleStatement
24
24
25
25
from tests .integration import use_singledc , PROTOCOL_VERSION , TestCluster
26
26
@@ -35,13 +35,16 @@ def setup_module():
35
35
use_singledc ()
36
36
37
37
38
+ EXEC_PROFILE_DICT = "dict"
39
+
38
40
class ClusterTests (unittest .TestCase ):
39
41
40
42
@classmethod
41
43
def setUpClass (cls ):
42
44
cls .cluster = TestCluster (
43
45
execution_profiles = {
44
- EXEC_PROFILE_DEFAULT : ExecutionProfile (row_factory = tuple_factory )
46
+ EXEC_PROFILE_DEFAULT : ExecutionProfile (row_factory = tuple_factory ),
47
+ EXEC_PROFILE_DICT : ExecutionProfile (row_factory = dict_factory )
45
48
}
46
49
)
47
50
if PROTOCOL_VERSION < 3 :
@@ -52,11 +55,11 @@ def setUpClass(cls):
52
55
def tearDownClass (cls ):
53
56
cls .cluster .shutdown ()
54
57
55
- def execute_concurrent_helper (self , session , query , results_generator = False ):
58
+ def execute_concurrent_helper (self , session , query , ** kwargs ):
56
59
count = 0
57
60
while count < 100 :
58
61
try :
59
- return execute_concurrent (session , query , results_generator = False )
62
+ return execute_concurrent (session , query , results_generator = False , ** kwargs )
60
63
except (ReadTimeout , WriteTimeout , OperationTimedOut , ReadFailure , WriteFailure ):
61
64
ex_type , ex , tb = sys .exc_info ()
62
65
log .warning ("{0}: {1} Backtrace: {2}" .format (ex_type .__name__ , ex , traceback .extract_tb (tb )))
@@ -65,19 +68,19 @@ def execute_concurrent_helper(self, session, query, results_generator=False):
65
68
66
69
raise RuntimeError ("Failed to execute query after 100 attempts: {0}" .format (query ))
67
70
68
- def execute_concurrent_args_helper (self , session , query , params , results_generator = False ):
71
+ def execute_concurrent_args_helper (self , session , query , params , results_generator = False , ** kwargs ):
69
72
count = 0
70
73
while count < 100 :
71
74
try :
72
- return execute_concurrent_with_args (session , query , params , results_generator = results_generator )
75
+ return execute_concurrent_with_args (session , query , params , results_generator = results_generator , ** kwargs )
73
76
except (ReadTimeout , WriteTimeout , OperationTimedOut , ReadFailure , WriteFailure ):
74
77
ex_type , ex , tb = sys .exc_info ()
75
78
log .warning ("{0}: {1} Backtrace: {2}" .format (ex_type .__name__ , ex , traceback .extract_tb (tb )))
76
79
del tb
77
80
78
81
raise RuntimeError ("Failed to execute query after 100 attempts: {0}" .format (query ))
79
82
80
- def test_execute_concurrent (self ):
83
+ def execute_concurrent_base (self , test_fn , validate_fn , zip_args = True ):
81
84
for num_statements in (0 , 1 , 2 , 7 , 10 , 99 , 100 , 101 , 199 , 200 , 201 ):
82
85
# write
83
86
statement = SimpleStatement (
@@ -86,7 +89,9 @@ def test_execute_concurrent(self):
86
89
statements = cycle ((statement , ))
87
90
parameters = [(i , i ) for i in range (num_statements )]
88
91
89
- results = self .execute_concurrent_helper (self .session , list (zip (statements , parameters )))
92
+ results = \
93
+ test_fn (self .session , list (zip (statements , parameters ))) if zip_args else \
94
+ test_fn (self .session , statement , parameters )
90
95
self .assertEqual (num_statements , len (results ))
91
96
for success , result in results :
92
97
self .assertTrue (success )
@@ -99,32 +104,37 @@ def test_execute_concurrent(self):
99
104
statements = cycle ((statement , ))
100
105
parameters = [(i , ) for i in range (num_statements )]
101
106
102
- results = self .execute_concurrent_helper (self .session , list (zip (statements , parameters )))
107
+ results = \
108
+ test_fn (self .session , list (zip (statements , parameters ))) if zip_args else \
109
+ test_fn (self .session , statement , parameters )
110
+ validate_fn (num_statements , results )
111
+
112
+ def execute_concurrent_valiate_tuple (self , num_statements , results ):
103
113
self .assertEqual (num_statements , len (results ))
104
114
self .assertEqual ([(True , [(i ,)]) for i in range (num_statements )], results )
105
115
106
- def test_execute_concurrent_with_args (self ):
107
- for num_statements in (0 , 1 , 2 , 7 , 10 , 99 , 100 , 101 , 199 , 200 , 201 ):
108
- statement = SimpleStatement (
109
- "INSERT INTO test3rf.test (k, v) VALUES (%s, %s)" ,
110
- consistency_level = ConsistencyLevel .QUORUM )
111
- parameters = [(i , i ) for i in range (num_statements )]
112
-
113
- results = self .execute_concurrent_args_helper (self .session , statement , parameters )
116
+ def execute_concurrent_valiate_dict (self , num_statements , results ):
114
117
self .assertEqual (num_statements , len (results ))
115
- for success , result in results :
116
- self .assertTrue (success )
117
- self .assertFalse (result )
118
+ self .assertEqual ([(True , [{"v" :i }]) for i in range (num_statements )], results )
118
119
119
- # read
120
- statement = SimpleStatement (
121
- "SELECT v FROM test3rf.test WHERE k=%s" ,
122
- consistency_level = ConsistencyLevel .QUORUM )
123
- parameters = [(i , ) for i in range (num_statements )]
120
+ def test_execute_concurrent (self ):
121
+ self .execute_concurrent_base (self .execute_concurrent_helper , \
122
+ self .execute_concurrent_valiate_tuple )
124
123
125
- results = self .execute_concurrent_args_helper (self .session , statement , parameters )
126
- self .assertEqual (num_statements , len (results ))
127
- self .assertEqual ([(True , [(i ,)]) for i in range (num_statements )], results )
124
+ def test_execute_concurrent_with_args (self ):
125
+ self .execute_concurrent_base (self .execute_concurrent_args_helper , \
126
+ self .execute_concurrent_valiate_tuple , \
127
+ zip_args = False )
128
+
129
+ def test_execute_concurrent_with_execution_profile (self ):
130
+ def run_fn (* args , ** kwargs ):
131
+ return self .execute_concurrent_helper (* args , execution_profile = EXEC_PROFILE_DICT , ** kwargs )
132
+ self .execute_concurrent_base (run_fn , self .execute_concurrent_valiate_dict )
133
+
134
+ def test_execute_concurrent_with_args_and_execution_profile (self ):
135
+ def run_fn (* args , ** kwargs ):
136
+ return self .execute_concurrent_args_helper (* args , execution_profile = EXEC_PROFILE_DICT , ** kwargs )
137
+ self .execute_concurrent_base (run_fn , self .execute_concurrent_valiate_dict , zip_args = False )
128
138
129
139
def test_execute_concurrent_with_args_generator (self ):
130
140
"""
0 commit comments