From f10f94dd515892768e6e9046f6f549fc5a7fcd1f Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Thu, 16 Dec 2021 18:06:31 +0100 Subject: [PATCH 1/2] Don't close stale connections while in use The pool will close connections marked as stale when trying to pick them up from the pool. It should not do so while the connection is in use (i.e., already borrowed from the pool). In concurrent systems, this would lead to all sorts of errors caused by the connection being managed from different threads: * the thread trying to acquire a connection will close the stale connection on encounter * while the thread that borrowed it might still be using it, e.g., to fetch results or run a query --- neo4j/io/__init__.py | 11 ++++++++- tests/unit/io/test_neo4j_pool.py | 39 ++++++++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 4241b4f7..da09d7c6 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -38,6 +38,7 @@ defaultdict, deque, ) +import logging from logging import getLogger from random import choice import selectors @@ -643,11 +644,19 @@ def time_remaining(): # try to find a free connection in pool for connection in list(self.connections.get(address, [])): if (connection.closed() or connection.defunct() - or connection.stale()): + or (connection.stale() and not connection.in_use)): # `close` is a noop on already closed connections. # This is to make sure that the connection is gracefully # closed, e.g. if it's just marked as `stale` but still # alive. + if log.isEnabledFor(logging.DEBUG): + log.debug( + "[#%04X] C: removing old connection " + "(closed=%s, defunct=%s, stale=%s, in_use=%s)", + connection.local_port, + connection.closed(), connection.defunct(), + connection.stale(), connection.in_use + ) connection.close() try: self.connections.get(address, []).remove(connection) diff --git a/tests/unit/io/test_neo4j_pool.py b/tests/unit/io/test_neo4j_pool.py index aadf6300..62088c58 100644 --- a/tests/unit/io/test_neo4j_pool.py +++ b/tests/unit/io/test_neo4j_pool.py @@ -18,7 +18,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import inspect + from unittest.mock import Mock import pytest @@ -73,7 +73,7 @@ def test_chooses_right_connection_type(opener, type_): cx1 = pool.acquire(READ_ACCESS if type_ == "r" else WRITE_ACCESS, 30, "test_db", None) pool.release(cx1) - if type_ == "r": + if type_ == "r": assert cx1.addr == READER_ADDRESS else: assert cx1.addr == WRITER_ADDRESS @@ -99,7 +99,7 @@ def break_connection(): cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None) pool.release(cx1) assert cx1 in pool.connections[cx1.addr] - # simulate connection going stale (e.g. exceeding) and than breaking when + # simulate connection going stale (e.g. exceeding) and then breaking when # the pool tries to close the connection cx1.stale.return_value = True cx_close_mock = cx1.close @@ -108,8 +108,39 @@ def break_connection(): cx_close_mock.side_effect = break_connection cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None) pool.release(cx2) - assert cx1.close.called_once() + if break_on_close: + cx1.close.assert_called() + else: + cx1.close.assert_called_once() assert cx2 is not cx1 assert cx2.addr == cx1.addr assert cx1 not in pool.connections[cx1.addr] assert cx2 in pool.connections[cx2.addr] + + +def test_does_not_close_stale_connections_in_use(opener): + pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS) + cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None) + assert cx1 in pool.connections[cx1.addr] + # simulate connection going stale (e.g. exceeding) while being in use + cx1.stale.return_value = True + cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None) + pool.release(cx2) + cx1.close.assert_not_called() + assert cx2 is not cx1 + assert cx2.addr == cx1.addr + assert cx1 in pool.connections[cx1.addr] + assert cx2 in pool.connections[cx2.addr] + + pool.release(cx1) + # now that cx1 is back in the pool and still stale, + # it should be closed when trying to acquire the next connection + cx1.close.assert_not_called() + + cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None) + pool.release(cx3) + cx1.close.assert_called_once() + assert cx2 is cx3 + assert cx3.addr == cx1.addr + assert cx1 not in pool.connections[cx1.addr] + assert cx3 in pool.connections[cx2.addr] From 0076a35a781b7a011c377c7ed7b9e73c8883a7a4 Mon Sep 17 00:00:00 2001 From: Rouven Bauer Date: Fri, 17 Dec 2021 13:05:42 +0100 Subject: [PATCH 2/2] Fix test compatibility with Python3.5 --- tests/unit/io/test_neo4j_pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/io/test_neo4j_pool.py b/tests/unit/io/test_neo4j_pool.py index 62088c58..80c692ad 100644 --- a/tests/unit/io/test_neo4j_pool.py +++ b/tests/unit/io/test_neo4j_pool.py @@ -109,9 +109,9 @@ def break_connection(): cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None) pool.release(cx2) if break_on_close: - cx1.close.assert_called() + assert cx1.close.call_count >= 1 else: - cx1.close.assert_called_once() + assert cx1.close.call_count == 1 assert cx2 is not cx1 assert cx2.addr == cx1.addr assert cx1 not in pool.connections[cx1.addr] @@ -139,7 +139,7 @@ def test_does_not_close_stale_connections_in_use(opener): cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None) pool.release(cx3) - cx1.close.assert_called_once() + assert cx1.close.call_count == 1 assert cx2 is cx3 assert cx3.addr == cx1.addr assert cx1 not in pool.connections[cx1.addr]