Skip to content

Commit 6aa15c2

Browse files
authored
Don't close stale connections while in use (#632)
The pool will close connections marked as stale when trying to pick them up from the pool. It should not do so while the connection is in use (i.e., already borrowed from the pool). In concurrent systems, this would lead to all sorts of errors caused by the connection being managed from different threads: * the thread trying to acquire a connection will close the stale connection on encounter * while the thread that borrowed it might still be using it, e.g., to fetch results or run a query In fact, in corner cases it is also possible to hit this issue without concurrency by having a workload that relies on multiple sessions to perform work simultaneously.
1 parent db66e09 commit 6aa15c2

File tree

2 files changed

+45
-5
lines changed

2 files changed

+45
-5
lines changed

neo4j/io/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
defaultdict,
3939
deque,
4040
)
41+
import logging
4142
from logging import getLogger
4243
from random import choice
4344
import selectors
@@ -643,11 +644,19 @@ def time_remaining():
643644
# try to find a free connection in pool
644645
for connection in list(self.connections.get(address, [])):
645646
if (connection.closed() or connection.defunct()
646-
or connection.stale()):
647+
or (connection.stale() and not connection.in_use)):
647648
# `close` is a noop on already closed connections.
648649
# This is to make sure that the connection is gracefully
649650
# closed, e.g. if it's just marked as `stale` but still
650651
# alive.
652+
if log.isEnabledFor(logging.DEBUG):
653+
log.debug(
654+
"[#%04X] C: <POOL> removing old connection "
655+
"(closed=%s, defunct=%s, stale=%s, in_use=%s)",
656+
connection.local_port,
657+
connection.closed(), connection.defunct(),
658+
connection.stale(), connection.in_use
659+
)
651660
connection.close()
652661
try:
653662
self.connections.get(address, []).remove(connection)

tests/unit/io/test_neo4j_pool.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# See the License for the specific language governing permissions and
1919
# limitations under the License.
2020

21-
import inspect
21+
2222
from unittest.mock import Mock
2323

2424
import pytest
@@ -73,7 +73,7 @@ def test_chooses_right_connection_type(opener, type_):
7373
cx1 = pool.acquire(READ_ACCESS if type_ == "r" else WRITE_ACCESS,
7474
30, "test_db", None)
7575
pool.release(cx1)
76-
if type_ == "r":
76+
if type_ == "r":
7777
assert cx1.addr == READER_ADDRESS
7878
else:
7979
assert cx1.addr == WRITER_ADDRESS
@@ -99,7 +99,7 @@ def break_connection():
9999
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
100100
pool.release(cx1)
101101
assert cx1 in pool.connections[cx1.addr]
102-
# simulate connection going stale (e.g. exceeding) and than breaking when
102+
# simulate connection going stale (e.g. exceeding) and then breaking when
103103
# the pool tries to close the connection
104104
cx1.stale.return_value = True
105105
cx_close_mock = cx1.close
@@ -108,8 +108,39 @@ def break_connection():
108108
cx_close_mock.side_effect = break_connection
109109
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
110110
pool.release(cx2)
111-
assert cx1.close.called_once()
111+
if break_on_close:
112+
assert cx1.close.call_count >= 1
113+
else:
114+
assert cx1.close.call_count == 1
112115
assert cx2 is not cx1
113116
assert cx2.addr == cx1.addr
114117
assert cx1 not in pool.connections[cx1.addr]
115118
assert cx2 in pool.connections[cx2.addr]
119+
120+
121+
def test_does_not_close_stale_connections_in_use(opener):
122+
pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS)
123+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
124+
assert cx1 in pool.connections[cx1.addr]
125+
# simulate connection going stale (e.g. exceeding) while being in use
126+
cx1.stale.return_value = True
127+
cx2 = pool.acquire(READ_ACCESS, 30, "test_db", None)
128+
pool.release(cx2)
129+
cx1.close.assert_not_called()
130+
assert cx2 is not cx1
131+
assert cx2.addr == cx1.addr
132+
assert cx1 in pool.connections[cx1.addr]
133+
assert cx2 in pool.connections[cx2.addr]
134+
135+
pool.release(cx1)
136+
# now that cx1 is back in the pool and still stale,
137+
# it should be closed when trying to acquire the next connection
138+
cx1.close.assert_not_called()
139+
140+
cx3 = pool.acquire(READ_ACCESS, 30, "test_db", None)
141+
pool.release(cx3)
142+
assert cx1.close.call_count == 1
143+
assert cx2 is cx3
144+
assert cx3.addr == cx1.addr
145+
assert cx1 not in pool.connections[cx1.addr]
146+
assert cx3 in pool.connections[cx2.addr]

0 commit comments

Comments
 (0)