@@ -1156,23 +1156,25 @@ def _select_address(self, *, access_mode, database, imp_user, bookmarks):
1156
1156
from neo4j .api import READ_ACCESS
1157
1157
""" Selects the address with the fewest in-use connections.
1158
1158
"""
1159
- self .ensure_routing_table_is_fresh (
1160
- access_mode = access_mode , database = database , imp_user = imp_user ,
1161
- bookmarks = bookmarks
1162
- )
1163
- log .debug ("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r" , self .routing_tables )
1164
- if access_mode == READ_ACCESS :
1165
- addresses = self .get_or_create_routing_table (database ).readers
1166
- else :
1167
- addresses = self .get_or_create_routing_table (database ).writers
1168
- addresses_by_usage = {}
1169
- for address in addresses :
1170
- addresses_by_usage .setdefault (self .in_use_connection_count (address ), []).append (address )
1159
+ with self .refresh_lock :
1160
+ if access_mode == READ_ACCESS :
1161
+ addresses = self .routing_tables [database ].readers
1162
+ else :
1163
+ addresses = self .routing_tables [database ].writers
1164
+ addresses_by_usage = {}
1165
+ for address in addresses :
1166
+ addresses_by_usage .setdefault (
1167
+ self .in_use_connection_count (address ), []
1168
+ ).append (address )
1171
1169
if not addresses_by_usage :
1172
1170
if access_mode == READ_ACCESS :
1173
- raise ReadServiceUnavailable ("No read service currently available" )
1171
+ raise ReadServiceUnavailable (
1172
+ "No read service currently available"
1173
+ )
1174
1174
else :
1175
- raise WriteServiceUnavailable ("No write service currently available" )
1175
+ raise WriteServiceUnavailable (
1176
+ "No write service currently available"
1177
+ )
1176
1178
return choice (addresses_by_usage [min (addresses_by_usage )])
1177
1179
1178
1180
def acquire (self , access_mode = None , timeout = None , database = None ,
@@ -1184,17 +1186,25 @@ def acquire(self, access_mode=None, timeout=None, database=None,
1184
1186
1185
1187
from neo4j .api import check_access_mode
1186
1188
access_mode = check_access_mode (access_mode )
1189
+ with self .refresh_lock :
1190
+ log .debug ("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r" ,
1191
+ self .routing_tables )
1192
+ self .ensure_routing_table_is_fresh (
1193
+ access_mode = access_mode , database = database , imp_user = imp_user ,
1194
+ bookmarks = bookmarks
1195
+ )
1196
+
1187
1197
while True :
1188
1198
try :
1189
1199
# Get an address for a connection that have the fewest in-use connections.
1190
1200
address = self ._select_address (
1191
1201
access_mode = access_mode , database = database ,
1192
1202
imp_user = imp_user , bookmarks = bookmarks
1193
1203
)
1194
- log .debug ("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r" , database , address )
1195
1204
except (ReadServiceUnavailable , WriteServiceUnavailable ) as err :
1196
1205
raise SessionExpired ("Failed to obtain connection towards '%s' server." % access_mode ) from err
1197
1206
try :
1207
+ log .debug ("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r" , database , address )
1198
1208
connection = self ._acquire (address , timeout = timeout ) # should always be a resolved address
1199
1209
except ServiceUnavailable :
1200
1210
self .deactivate (address = address )
0 commit comments