@@ -4220,15 +4220,14 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4220
4220
if self ._is_shutdown :
4221
4221
return
4222
4222
4223
- if not connection :
4224
- connection = self ._connection
4223
+ current_connection = connection or self ._connection
4225
4224
4226
4225
if preloaded_results :
4227
4226
log .debug ("[control connection] Attempting to use preloaded results for schema agreement" )
4228
4227
4229
4228
peers_result = preloaded_results [0 ]
4230
4229
local_result = preloaded_results [1 ]
4231
- schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , connection .endpoint )
4230
+ schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , current_connection .endpoint )
4232
4231
if schema_mismatches is None :
4233
4232
return True
4234
4233
@@ -4237,16 +4236,27 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4237
4236
elapsed = 0
4238
4237
cl = ConsistencyLevel .ONE
4239
4238
schema_mismatches = None
4240
- select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , connection )
4239
+ select_peers_query = self ._get_peers_query (self .PeersQueryType .PEERS_SCHEMA , current_connection )
4240
+ error_signaled = False
4241
4241
4242
4242
while elapsed < total_timeout :
4243
+ if current_connection != connection or self ._connection :
4244
+ current_connection = connection or self ._connection
4245
+ error_signaled = False
4246
+
4247
+ if current_connection .is_defunct or current_connection .is_closed :
4248
+ log .debug ("[control connection] connection is closed, wait and trying again" )
4249
+ self ._time .sleep (0.2 )
4250
+ elapsed = self ._time .time () - start
4251
+ continue
4252
+
4243
4253
peers_query = QueryMessage (query = maybe_add_timeout_to_query (select_peers_query , self ._metadata_request_timeout ),
4244
4254
consistency_level = cl )
4245
4255
local_query = QueryMessage (query = maybe_add_timeout_to_query (self ._SELECT_SCHEMA_LOCAL , self ._metadata_request_timeout ),
4246
4256
consistency_level = cl )
4247
4257
try :
4248
4258
timeout = min (self ._timeout , total_timeout - elapsed )
4249
- peers_result , local_result = connection .wait_for_responses (
4259
+ peers_result , local_result = current_connection .wait_for_responses (
4250
4260
peers_query , local_query , timeout = timeout )
4251
4261
except OperationTimedOut as timeout :
4252
4262
log .debug ("[control connection] Timed out waiting for "
@@ -4257,10 +4267,12 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4257
4267
if self ._is_shutdown :
4258
4268
log .debug ("[control connection] Aborting wait for schema match due to shutdown" )
4259
4269
return None
4260
- else :
4261
- raise
4270
+ elif not error_signaled :
4271
+ self ._signal_error ()
4272
+ error_signaled = True
4273
+ continue
4262
4274
4263
- schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , connection .endpoint )
4275
+ schema_mismatches = self ._get_schema_mismatches (peers_result , local_result , current_connection .endpoint )
4264
4276
if schema_mismatches is None :
4265
4277
return True
4266
4278
@@ -4269,7 +4281,7 @@ def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wai
4269
4281
elapsed = self ._time .time () - start
4270
4282
4271
4283
log .warning ("Node %s is reporting a schema disagreement: %s" ,
4272
- connection .endpoint , schema_mismatches )
4284
+ current_connection .endpoint , schema_mismatches )
4273
4285
return False
4274
4286
4275
4287
def _get_schema_mismatches (self , peers_result , local_result , local_address ):
0 commit comments