58
58
)
59
59
from threading import (
60
60
Condition ,
61
- Lock ,
62
61
RLock ,
63
62
)
64
63
from time import perf_counter
@@ -880,7 +879,7 @@ def __init__(self, opener, pool_config, workspace_config, address):
880
879
log .debug ("[#0000] C: <NEO4J POOL> routing address %r" , address )
881
880
self .address = address
882
881
self .routing_tables = {workspace_config .database : RoutingTable (database = workspace_config .database , routers = [address ])}
883
- self .refresh_lock = Lock ()
882
+ self .refresh_lock = RLock ()
884
883
885
884
def __repr__ (self ):
886
885
""" The representation shows the initial routing addresses.
@@ -914,12 +913,13 @@ def get_routing_table_for_default_database(self):
914
913
return self .routing_tables [self .workspace_config .database ]
915
914
916
915
def get_or_create_routing_table (self , database ):
917
- if database not in self .routing_tables :
918
- self .routing_tables [database ] = RoutingTable (
919
- database = database ,
920
- routers = self .get_default_database_initial_router_addresses ()
921
- )
922
- return self .routing_tables [database ]
916
+ with self .refresh_lock :
917
+ if database not in self .routing_tables :
918
+ self .routing_tables [database ] = RoutingTable (
919
+ database = database ,
920
+ routers = self .get_default_database_initial_router_addresses ()
921
+ )
922
+ return self .routing_tables [database ]
923
923
924
924
def fetch_routing_info (self , address , database , imp_user , bookmarks ,
925
925
timeout ):
@@ -1024,8 +1024,8 @@ def fetch_routing_table(self, *, address, timeout, database, imp_user,
1024
1024
# At least one of each is fine, so return this table
1025
1025
return new_routing_table
1026
1026
1027
- def update_routing_table_from (self , * routers , database = None , imp_user = None ,
1028
- bookmarks = None , database_callback = None ):
1027
+ def _update_routing_table_from (self , * routers , database = None , imp_user = None ,
1028
+ bookmarks = None , database_callback = None ):
1029
1029
""" Try to update routing tables with the given routers.
1030
1030
1031
1031
:return: True if the routing table is successfully updated,
@@ -1071,42 +1071,43 @@ def update_routing_table(self, *, database, imp_user, bookmarks,
1071
1071
1072
1072
:raise neo4j.exceptions.ServiceUnavailable:
1073
1073
"""
1074
- # copied because it can be modified
1075
- existing_routers = set (
1076
- self .get_or_create_routing_table (database ).routers
1077
- )
1078
-
1079
- prefer_initial_routing_address = \
1080
- self .routing_tables [database ].missing_fresh_writer ()
1074
+ with self .refresh_lock :
1075
+ # copied because it can be modified
1076
+ existing_routers = set (
1077
+ self .get_or_create_routing_table (database ).routers
1078
+ )
1081
1079
1082
- if prefer_initial_routing_address :
1083
- # TODO: Test this state
1084
- if self .update_routing_table_from (
1085
- self .first_initial_routing_address , database = database ,
1086
- imp_user = imp_user , bookmarks = bookmarks ,
1080
+ prefer_initial_routing_address = \
1081
+ self .routing_tables [database ].missing_fresh_writer ()
1082
+
1083
+ if prefer_initial_routing_address :
1084
+ # TODO: Test this state
1085
+ if self ._update_routing_table_from (
1086
+ self .first_initial_routing_address , database = database ,
1087
+ imp_user = imp_user , bookmarks = bookmarks ,
1088
+ database_callback = database_callback
1089
+ ):
1090
+ # Why is only the first initial routing address used?
1091
+ return
1092
+ if self ._update_routing_table_from (
1093
+ * (existing_routers - {self .first_initial_routing_address }),
1094
+ database = database , imp_user = imp_user , bookmarks = bookmarks ,
1087
1095
database_callback = database_callback
1088
1096
):
1089
- # Why is only the first initial routing address used?
1090
1097
return
1091
- if self .update_routing_table_from (
1092
- * (existing_routers - {self .first_initial_routing_address }),
1093
- database = database , imp_user = imp_user , bookmarks = bookmarks ,
1094
- database_callback = database_callback
1095
- ):
1096
- return
1097
1098
1098
- if not prefer_initial_routing_address :
1099
- if self .update_routing_table_from (
1100
- self .first_initial_routing_address , database = database ,
1101
- imp_user = imp_user , bookmarks = bookmarks ,
1102
- database_callback = database_callback
1103
- ):
1104
- # Why is only the first initial routing address used?
1105
- return
1099
+ if not prefer_initial_routing_address :
1100
+ if self ._update_routing_table_from (
1101
+ self .first_initial_routing_address , database = database ,
1102
+ imp_user = imp_user , bookmarks = bookmarks ,
1103
+ database_callback = database_callback
1104
+ ):
1105
+ # Why is only the first initial routing address used?
1106
+ return
1106
1107
1107
- # None of the routers have been successful, so just fail
1108
- log .error ("Unable to retrieve routing information" )
1109
- raise ServiceUnavailable ("Unable to retrieve routing information" )
1108
+ # None of the routers have been successful, so just fail
1109
+ log .error ("Unable to retrieve routing information" )
1110
+ raise ServiceUnavailable ("Unable to retrieve routing information" )
1110
1111
1111
1112
def update_connection_pool (self , * , database ):
1112
1113
servers = self .get_or_create_routing_table (database ).servers ()
@@ -1129,11 +1130,11 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database, imp_user,
1129
1130
:return: `True` if an update was required, `False` otherwise.
1130
1131
"""
1131
1132
from neo4j .api import READ_ACCESS
1132
- if self .get_or_create_routing_table (database )\
1133
- .is_fresh (readonly = (access_mode == READ_ACCESS )):
1134
- # Readers are fresh.
1135
- return False
1136
1133
with self .refresh_lock :
1134
+ if self .get_or_create_routing_table (database )\
1135
+ .is_fresh (readonly = (access_mode == READ_ACCESS )):
1136
+ # Readers are fresh.
1137
+ return False
1137
1138
1138
1139
self .update_routing_table (
1139
1140
database = database , imp_user = imp_user , bookmarks = bookmarks ,
0 commit comments