Skip to content

Commit 0abb9f8

Browse files
author
Zhen
committed
Do not purge inUse connections when connection error happens or new routing table is available
Instead we deactive the server (a.k.a. closing all idle connections) in connection pool. When an connection failure happens, the connection server will be removed from the routing table and all idle connections with this server will be closed in the connection pool. When a new routing table is available, all idle connections towards the servers that are not in the new routing table will be removed. If the server has no inUse connections, then this server will also be totally removed from the connection pool. Note: there is no extra protection to against `acquire` to create a new connection with an inactive server except that the server should not be in the routing table and therefore should not be used to `acquire` new connections. When error happens on a connection, we will deactivate the server but we will probably not remove all server's connections from connection pool as the failed connection is highly still inUse by the broken session.
1 parent 60966e7 commit 0abb9f8

File tree

6 files changed

+130
-16
lines changed

6 files changed

+130
-16
lines changed

neo4j/bolt/connection.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,25 @@ def in_use_connection_count(self, address):
492492
else:
493493
return sum(1 if connection.in_use else 0 for connection in connections)
494494

495+
def deactivate(self, address):
496+
""" Deactivate an address from the connection pool, if present, closing
497+
all idle connection to that address
498+
"""
499+
with self.lock:
500+
try:
501+
connections = self.connections[address]
502+
except KeyError: # already removed from the connection pool
503+
return
504+
for conn in list(connections):
505+
if not conn.in_use:
506+
connections.remove(conn)
507+
try:
508+
conn.close()
509+
except IOError:
510+
pass
511+
if len(connections) == 0:
512+
self.remove(address)
513+
495514
def remove(self, address):
496515
""" Remove an address from the connection pool, if present, closing
497516
all connections to that address.

neo4j/v1/routing.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ def replace(self, elements=()):
8181
e.clear()
8282
e.update(OrderedDict.fromkeys(elements))
8383

84+
def elements(self):
85+
return list(self._elements)
86+
8487

8588
class RoutingTable(object):
8689

@@ -140,6 +143,9 @@ def update(self, new_routing_table):
140143
self.last_updated_time = self.timer()
141144
self.ttl = new_routing_table.ttl
142145

146+
def servers(self):
147+
return set(self.routers.elements() + self.writers.elements() + self.readers.elements())
148+
143149

144150
class RoutingSession(BoltSession):
145151

@@ -249,9 +255,9 @@ class RoutingConnectionErrorHandler(ConnectionErrorHandler):
249255

250256
def __init__(self, pool):
251257
super(RoutingConnectionErrorHandler, self).__init__({
252-
SessionExpired: lambda address: pool.remove(address),
253-
ServiceUnavailable: lambda address: pool.remove(address),
254-
DatabaseUnavailableError: lambda address: pool.remove(address),
258+
SessionExpired: lambda address: pool.deactivate(address),
259+
ServiceUnavailable: lambda address: pool.deactivate(address),
260+
DatabaseUnavailableError: lambda address: pool.deactivate(address),
255261
NotALeaderError: lambda address: pool.remove_writer(address),
256262
ForbiddenOnReadOnlyDatabaseError: lambda address: pool.remove_writer(address)
257263
})
@@ -288,7 +294,7 @@ def fetch_routing_info(self, address):
288294
else:
289295
raise ServiceUnavailable("Routing support broken on server {!r}".format(address))
290296
except ServiceUnavailable:
291-
self.remove(address)
297+
self.deactivate(address)
292298
return None
293299

294300
def fetch_routing_table(self, address):
@@ -365,6 +371,12 @@ def update_routing_table(self):
365371
# None of the routers have been successful, so just fail
366372
raise ServiceUnavailable("Unable to retrieve routing information")
367373

374+
def update_connection_pool(self):
375+
servers = self.routing_table.servers()
376+
for address in list(self.connections):
377+
if address not in servers:
378+
super(RoutingConnectionPool, self).deactivate(address)
379+
368380
def ensure_routing_table_is_fresh(self, access_mode):
369381
""" Update the routing table if stale.
370382
@@ -387,6 +399,7 @@ def ensure_routing_table_is_fresh(self, access_mode):
387399
self.missing_writer = not self.routing_table.is_fresh(WRITE_ACCESS)
388400
return False
389401
self.update_routing_table()
402+
self.update_connection_pool()
390403
return True
391404

392405
def acquire(self, access_mode=None):
@@ -410,21 +423,22 @@ def acquire(self, access_mode=None):
410423
connection = self.acquire_direct(address) # should always be a resolved address
411424
connection.Error = SessionExpired
412425
except ServiceUnavailable:
413-
self.remove(address)
426+
self.deactivate(address)
414427
else:
415428
return connection
416429
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode)
417430

418-
def remove(self, address):
419-
""" Remove an address from the connection pool, if present, closing
420-
all connections to that address. Also remove from the routing table.
431+
def deactivate(self, address):
432+
""" Deactivate an address from the connection pool,
433+
if present, remove from the routing table and also closing
434+
all idle connections to that address.
421435
"""
422436
# We use `discard` instead of `remove` here since the former
423437
# will not fail if the address has already been removed.
424438
self.routing_table.routers.discard(address)
425439
self.routing_table.readers.discard(address)
426440
self.routing_table.writers.discard(address)
427-
super(RoutingConnectionPool, self).remove(address)
441+
super(RoutingConnectionPool, self).deactivate(address)
428442

429443
def remove_writer(self, address):
430444
""" Remove a writer address from the routing table, if present.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
4+
C: RUN "CALL dbms.cluster.routing.getServers" {}
5+
PULL_ALL
6+
S: SUCCESS {"fields": ["ttl", "servers"]}
7+
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002"]},{"role":"READ","addresses":["127.0.0.1:9001","127.0.0.1:9003"]},{"role":"WRITE","addresses":["127.0.0.1:9004"]}]]
8+
SUCCESS {}

test/stub/test_routing.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,59 @@ def test_should_flag_reading_without_writer(self):
303303
pool.ensure_routing_table_is_fresh(READ_ACCESS)
304304
assert pool.missing_writer
305305

306+
def test_should_purge_idle_connections_from_connection_pool(self):
307+
with StubCluster({9006: "router.script", 9001: "router_with_multiple_servers.script"}):
308+
address = ("127.0.0.1", 9006)
309+
with RoutingPool(address) as pool:
310+
# close the acquired connection with init router and then set it to be idle
311+
conn = pool.acquire(WRITE_ACCESS)
312+
conn.close()
313+
conn.in_use = False
314+
315+
table = pool.routing_table
316+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
317+
("127.0.0.1", 9003)}
318+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
319+
assert table.writers == {("127.0.0.1", 9006)}
320+
assert set(pool.connections.keys()) == {("127.0.0.1", 9006)}
321+
322+
# immediately expire the routing table to enforce update a new routing table
323+
pool.routing_table.ttl = 0
324+
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
325+
table = pool.routing_table
326+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002)}
327+
assert table.readers == {("127.0.0.1", 9001), ("127.0.0.1", 9003)}
328+
assert table.writers == {("127.0.0.1", 9004)}
329+
330+
assert set(pool.connections.keys()) == {("127.0.0.1", 9001)}
331+
332+
def test_should_not_purge_idle_connections_from_connection_pool(self):
333+
with StubCluster({9006: "router.script", 9001: "router_with_multiple_servers.script"}):
334+
address = ("127.0.0.1", 9006)
335+
with RoutingPool(address) as pool:
336+
# close the acquired connection with init router and then set it to be inUse
337+
conn = pool.acquire(WRITE_ACCESS)
338+
conn.close()
339+
conn.in_use = True
340+
341+
table = pool.routing_table
342+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002),
343+
("127.0.0.1", 9003)}
344+
assert table.readers == {("127.0.0.1", 9004), ("127.0.0.1", 9005)}
345+
assert table.writers == {("127.0.0.1", 9006)}
346+
assert set(pool.connections.keys()) == {("127.0.0.1", 9006)}
347+
348+
# immediately expire the routing table to enforce update a new routing table
349+
pool.routing_table.ttl = 0
350+
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
351+
table = pool.routing_table
352+
assert table.routers == {("127.0.0.1", 9001), ("127.0.0.1", 9002)}
353+
assert table.readers == {("127.0.0.1", 9001), ("127.0.0.1", 9003)}
354+
assert table.writers == {("127.0.0.1", 9004)}
355+
356+
assert set(pool.connections.keys()) == {("127.0.0.1", 9001), ("127.0.0.1", 9006)}
357+
358+
306359
# TODO: fix flaky test
307360
# def test_concurrent_refreshes_should_not_block_if_fresh(self):
308361
# address = ("127.0.0.1", 9001)
@@ -481,15 +534,15 @@ def test_should_error_to_writer_in_absent_of_reader(self):
481534
assert not pool.missing_writer
482535

483536

484-
class RoutingConnectionPoolRemoveTestCase(StubTestCase):
537+
class RoutingConnectionPoolDeactivateTestCase(StubTestCase):
485538
def test_should_remove_router_from_routing_table_if_present(self):
486539
with StubCluster({9001: "router.script"}):
487540
address = ("127.0.0.1", 9001)
488541
with RoutingPool(address) as pool:
489542
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
490543
target = ("127.0.0.1", 9001)
491544
assert target in pool.routing_table.routers
492-
pool.remove(target)
545+
pool.deactivate(target)
493546
assert target not in pool.routing_table.routers
494547

495548
def test_should_remove_reader_from_routing_table_if_present(self):
@@ -499,7 +552,7 @@ def test_should_remove_reader_from_routing_table_if_present(self):
499552
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
500553
target = ("127.0.0.1", 9004)
501554
assert target in pool.routing_table.readers
502-
pool.remove(target)
555+
pool.deactivate(target)
503556
assert target not in pool.routing_table.readers
504557

505558
def test_should_remove_writer_from_routing_table_if_present(self):
@@ -509,7 +562,7 @@ def test_should_remove_writer_from_routing_table_if_present(self):
509562
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
510563
target = ("127.0.0.1", 9006)
511564
assert target in pool.routing_table.writers
512-
pool.remove(target)
565+
pool.deactivate(target)
513566
assert target not in pool.routing_table.writers
514567

515568
def test_should_not_fail_if_absent(self):
@@ -518,4 +571,4 @@ def test_should_not_fail_if_absent(self):
518571
with RoutingPool(address) as pool:
519572
pool.ensure_routing_table_is_fresh(WRITE_ACCESS)
520573
target = ("127.0.0.1", 9007)
521-
pool.remove(target)
574+
pool.deactivate(target)

test/stub/test_routingdriver.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,13 +289,19 @@ def test_forgets_address_on_service_unavailable_error(self):
289289
pool = driver._pool
290290
table = pool.routing_table
291291

292-
# address should not have connections in the pool, it has failed
293-
assert ('127.0.0.1', 9004) not in pool.connections
292+
# address should have connections in the pool but be inactive, it has failed
293+
assert ('127.0.0.1', 9004) in pool.connections
294+
conns = pool.connections[('127.0.0.1', 9004)]
295+
conn = conns[0]
296+
assert conn._closed == True
297+
assert conn.in_use == True
294298
assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)}
295299
# reader 127.0.0.1:9004 should've been forgotten because of an error
296300
assert table.readers == {('127.0.0.1', 9005)}
297301
assert table.writers == {('127.0.0.1', 9006)}
298302

303+
assert conn.in_use == False
304+
299305
def test_forgets_address_on_database_unavailable_error(self):
300306
with StubCluster({9001: "router.script", 9004: "database_unavailable.script"}):
301307
uri = "bolt+routing://127.0.0.1:9001"

test/unit/test_routing.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,20 @@ def test_should_fail_on_multiple_records(self):
169169
_ = RoutingTable.parse_routing_info([VALID_ROUTING_RECORD, VALID_ROUTING_RECORD])
170170

171171

172+
class RoutingTableServersTestCase(TestCase):
173+
def test_should_return_all_distinct_servers_in_routing_table(self):
174+
routing_table = {
175+
"ttl": 300,
176+
"servers": [
177+
{"role": "ROUTE", "addresses": ["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]},
178+
{"role": "READ", "addresses": ["127.0.0.1:9001", "127.0.0.1:9005"]},
179+
{"role": "WRITE", "addresses": ["127.0.0.1:9002"]},
180+
],
181+
}
182+
table = RoutingTable.parse_routing_info([routing_table])
183+
assert table.servers() == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003), ('127.0.0.1', 9005)}
184+
185+
172186
class RoutingTableFreshnessTestCase(TestCase):
173187
def test_should_be_fresh_after_update(self):
174188
table = RoutingTable.parse_routing_info([VALID_ROUTING_RECORD])

0 commit comments

Comments
 (0)