Skip to content

Commit d618b7b

Browse files
committed
Add tests for pool when connection opener fails
1 parent 1ed96f9 commit d618b7b

File tree

4 files changed

+44
-26
lines changed

4 files changed

+44
-26
lines changed

neo4j/_async/io/_pool.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -248,24 +248,14 @@ async def deactivate(self, address):
248248
await conn.close()
249249
except OSError:
250250
pass
251-
if not connections:
252-
await self.remove(address)
251+
if not self.connections[address]:
252+
del self.connections[address]
253253

254254
def on_write_failure(self, address):
255255
raise WriteServiceUnavailable(
256256
"No write service available for pool {}".format(self)
257257
)
258258

259-
async def remove(self, address):
260-
""" Remove an address from the connection pool, if present, closing
261-
all connections to that address.
262-
"""
263-
async with self.lock:
264-
for connection in self.connections.pop(address, ()):
265-
try:
266-
await connection.close()
267-
except OSError:
268-
pass
269259

270260
async def close(self):
271261
""" Close all connections and empty the pool.
@@ -274,7 +264,8 @@ async def close(self):
274264
try:
275265
async with self.lock:
276266
for address in list(self.connections):
277-
await self.remove(address)
267+
for connection in self.connections.pop(address, ()):
268+
await connection.close()
278269
except TypeError:
279270
pass
280271

neo4j/_sync/io/_pool.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -248,24 +248,14 @@ def deactivate(self, address):
248248
conn.close()
249249
except OSError:
250250
pass
251-
if not connections:
252-
self.remove(address)
251+
if not self.connections[address]:
252+
del self.connections[address]
253253

254254
def on_write_failure(self, address):
255255
raise WriteServiceUnavailable(
256256
"No write service available for pool {}".format(self)
257257
)
258258

259-
def remove(self, address):
260-
""" Remove an address from the connection pool, if present, closing
261-
all connections to that address.
262-
"""
263-
with self.lock:
264-
for connection in self.connections.pop(address, ()):
265-
try:
266-
connection.close()
267-
except OSError:
268-
pass
269259

270260
def close(self):
271261
""" Close all connections and empty the pool.
@@ -274,7 +264,8 @@ def close(self):
274264
try:
275265
with self.lock:
276266
for address in list(self.connections):
277-
self.remove(address)
267+
for connection in self.connections.pop(address, ()):
268+
connection.close()
278269
except TypeError:
279270
pass
280271

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
)
3636

3737
from ...._async_compat import mark_async_test
38+
from ..._async_compat import (
39+
AsyncMock,
40+
mark_async_test,
41+
)
3842
from ..work import async_fake_connection_generator
3943

4044

@@ -384,3 +388,17 @@ def liveness_side_effect(*args, **kwargs):
384388
cx3.reset.assert_awaited_once()
385389
assert cx1 not in pool.connections[cx1.addr]
386390
assert cx3 in pool.connections[cx1.addr]
391+
392+
393+
394+
@mark_async_test
395+
async def test_failing_opener_leaves_connections_in_use_alone(opener):
396+
pool = AsyncNeo4jPool(
397+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
398+
)
399+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
400+
401+
opener.side_effect = ServiceUnavailable("Server overloaded")
402+
with pytest.raises((ServiceUnavailable, SessionExpired)):
403+
await pool.acquire(READ_ACCESS, 30, "test_db", None)
404+
assert not cx1.closed()

tests/unit/sync/io/test_neo4j_pool.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
)
3636

3737
from ...._async_compat import mark_sync_test
38+
from ..._async_compat import (
39+
mark_sync_test,
40+
Mock,
41+
)
3842
from ..work import fake_connection_generator
3943

4044

@@ -384,3 +388,17 @@ def liveness_side_effect(*args, **kwargs):
384388
cx3.reset.assert_called_once()
385389
assert cx1 not in pool.connections[cx1.addr]
386390
assert cx3 in pool.connections[cx1.addr]
391+
392+
393+
394+
@mark_sync_test
395+
def test_failing_opener_leaves_connections_in_use_alone(opener):
396+
pool = Neo4jPool(
397+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
398+
)
399+
cx1 = pool.acquire(READ_ACCESS, 30, "test_db", None)
400+
401+
opener.side_effect = ServiceUnavailable("Server overloaded")
402+
with pytest.raises((ServiceUnavailable, SessionExpired)):
403+
pool.acquire(READ_ACCESS, 30, "test_db", None)
404+
assert not cx1.closed()

0 commit comments

Comments
 (0)