@@ -1921,9 +1921,10 @@ def export_as_string(self):
1921
1921
1922
1922
class _SchemaParser (object ):
1923
1923
1924
- def __init__ (self , connection , timeout ):
1924
+ def __init__ (self , connection , timeout , fetch_size ):
1925
1925
self .connection = connection
1926
1926
self .timeout = timeout
1927
+ self .fetch_size = fetch_size
1927
1928
1928
1929
def _handle_results (self , success , result , expected_failures = tuple (), query_msg = None , timeout = None ):
1929
1930
"""
@@ -1975,17 +1976,13 @@ def _query_build_row(self, query_string, build_func):
1975
1976
return result [0 ] if result else None
1976
1977
1977
1978
def _query_build_rows (self , query_string , build_func ):
1978
- query = QueryMessage (query = query_string , consistency_level = ConsistencyLevel .ONE )
1979
+ query = QueryMessage (query = query_string , consistency_level = ConsistencyLevel .ONE , fetch_size = self . fetch_size )
1979
1980
responses = self .connection .wait_for_responses ((query ), timeout = self .timeout , fail_on_error = False )
1980
1981
(success , response ) = responses [0 ]
1981
- if success :
1982
- result = dict_factory (response .column_names , response .parsed_rows )
1983
- return [build_func (row ) for row in result ]
1984
- elif isinstance (response , InvalidRequest ):
1982
+ results = self ._handle_results (success , response , expected_failures = (InvalidRequest ), query_msg = query )
1983
+ if not results :
1985
1984
log .debug ("user types table not found" )
1986
- return []
1987
- else :
1988
- raise response
1985
+ return [build_func (row ) for row in results ]
1989
1986
1990
1987
1991
1988
class SchemaParserV22 (_SchemaParser ):
@@ -2029,8 +2026,8 @@ class SchemaParserV22(_SchemaParser):
2029
2026
"compression" ,
2030
2027
"default_time_to_live" )
2031
2028
2032
- def __init__ (self , connection , timeout ):
2033
- super (SchemaParserV22 , self ).__init__ (connection , timeout )
2029
+ def __init__ (self , connection , timeout , fetch_size ):
2030
+ super (SchemaParserV22 , self ).__init__ (connection , timeout , fetch_size )
2034
2031
self .keyspaces_result = []
2035
2032
self .tables_result = []
2036
2033
self .columns_result = []
@@ -2551,8 +2548,7 @@ class SchemaParserV3(SchemaParserV22):
2551
2548
'speculative_retry' )
2552
2549
2553
2550
def __init__ (self , connection , timeout , fetch_size ):
2554
- super (SchemaParserV3 , self ).__init__ (connection , timeout )
2555
- self .fetch_size = fetch_size
2551
+ super (SchemaParserV3 , self ).__init__ (connection , timeout , fetch_size )
2556
2552
self .indexes_result = []
2557
2553
self .keyspace_table_index_rows = defaultdict (lambda : defaultdict (list ))
2558
2554
self .keyspace_view_rows = defaultdict (list )
@@ -2566,17 +2562,18 @@ def get_all_keyspaces(self):
2566
2562
2567
2563
def get_table (self , keyspaces , keyspace , table ):
2568
2564
cl = ConsistencyLevel .ONE
2565
+ fetch_size = self .fetch_size
2569
2566
where_clause = bind_params (" WHERE keyspace_name = %%s AND %s = %%s" % (self ._table_name_col ), (keyspace , table ), _encoder )
2570
- cf_query = QueryMessage (query = self ._SELECT_TABLES + where_clause , consistency_level = cl )
2571
- col_query = QueryMessage (query = self ._SELECT_COLUMNS + where_clause , consistency_level = cl )
2572
- indexes_query = QueryMessage (query = self ._SELECT_INDEXES + where_clause , consistency_level = cl )
2573
- triggers_query = QueryMessage (query = self ._SELECT_TRIGGERS + where_clause , consistency_level = cl )
2574
- scylla_query = QueryMessage (query = self ._SELECT_SCYLLA + where_clause , consistency_level = cl )
2567
+ cf_query = QueryMessage (query = self ._SELECT_TABLES + where_clause , consistency_level = cl , fetch_size = fetch_size )
2568
+ col_query = QueryMessage (query = self ._SELECT_COLUMNS + where_clause , consistency_level = cl , fetch_size = fetch_size )
2569
+ indexes_query = QueryMessage (query = self ._SELECT_INDEXES + where_clause , consistency_level = cl , fetch_size = fetch_size )
2570
+ triggers_query = QueryMessage (query = self ._SELECT_TRIGGERS + where_clause , consistency_level = cl , fetch_size = fetch_size )
2571
+ scylla_query = QueryMessage (query = self ._SELECT_SCYLLA + where_clause , consistency_level = cl , fetch_size = fetch_size )
2575
2572
2576
2573
# in protocol v4 we don't know if this event is a view or a table, so we look for both
2577
2574
where_clause = bind_params (" WHERE keyspace_name = %s AND view_name = %s" , (keyspace , table ), _encoder )
2578
2575
view_query = QueryMessage (query = self ._SELECT_VIEWS + where_clause ,
2579
- consistency_level = cl )
2576
+ consistency_level = cl , fetch_size = fetch_size )
2580
2577
((cf_success , cf_result ), (col_success , col_result ),
2581
2578
(indexes_sucess , indexes_result ), (triggers_success , triggers_result ),
2582
2579
(view_success , view_result ),
@@ -2585,22 +2582,23 @@ def get_table(self, keyspaces, keyspace, table):
2585
2582
cf_query , col_query , indexes_query , triggers_query ,
2586
2583
view_query , scylla_query , timeout = self .timeout , fail_on_error = False )
2587
2584
)
2588
- table_result = self ._handle_results (cf_success , cf_result )
2589
- col_result = self ._handle_results (col_success , col_result )
2585
+ table_result = self ._handle_results (cf_success , cf_result , query_msg = cf_query )
2586
+ col_result = self ._handle_results (col_success , col_result , query_msg = col_query )
2590
2587
if table_result :
2591
- indexes_result = self ._handle_results (indexes_sucess , indexes_result )
2592
- triggers_result = self ._handle_results (triggers_success , triggers_result )
2588
+ indexes_result = self ._handle_results (indexes_sucess , indexes_result , query_msg = indexes_query )
2589
+ triggers_result = self ._handle_results (triggers_success , triggers_result , query_msg = triggers_query )
2593
2590
# in_memory property is stored in scylla private table
2594
2591
# add it to table properties if enabled
2595
- scylla_result = self ._handle_results (scylla_success , scylla_result , expected_failures = (InvalidRequest ,))
2592
+ scylla_result = self ._handle_results (scylla_success , scylla_result , expected_failures = (InvalidRequest ,),
2593
+ query_msg = scylla_query )
2596
2594
try :
2597
2595
if scylla_result [0 ]["in_memory" ] == True :
2598
2596
table_result [0 ]["in_memory" ] = True
2599
2597
except (IndexError , KeyError ):
2600
2598
pass
2601
2599
return self ._build_table_metadata (table_result [0 ], col_result , triggers_result , indexes_result )
2602
2600
2603
- view_result = self ._handle_results (view_success , view_result )
2601
+ view_result = self ._handle_results (view_success , view_result , query_msg = view_query )
2604
2602
if view_result :
2605
2603
return self ._build_view_metadata (view_result [0 ], col_result )
2606
2604
@@ -3353,7 +3351,7 @@ def get_schema_parser(connection, server_version, dse_version, timeout, fetch_si
3353
3351
else :
3354
3352
# we could further specialize by version. Right now just refactoring the
3355
3353
# multi-version parser we have as of C* 2.2.0rc1.
3356
- return SchemaParserV22 (connection , timeout )
3354
+ return SchemaParserV22 (connection , timeout , fetch_size )
3357
3355
3358
3356
3359
3357
def _cql_from_cass_type (cass_type ):
0 commit comments