Skip to content

Commit e28cd52

Browse files
committed
max_pool_size
1 parent 02f4dda commit e28cd52

File tree

2 files changed

+65
-14
lines changed

2 files changed

+65
-14
lines changed

neo4j/v1/session.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class which can be used to obtain `Driver` instances that are used for
3636
from .typesystem import hydrated
3737

3838

39+
DEFAULT_MAX_POOL_SIZE = 50
40+
3941
STATEMENT_TYPE_READ_ONLY = "r"
4042
STATEMENT_TYPE_READ_WRITE = "rw"
4143
STATEMENT_TYPE_WRITE_ONLY = "w"
@@ -91,7 +93,8 @@ def __init__(self, url, **config):
9193
else:
9294
raise ValueError("Unsupported URL scheme: %s" % parsed.scheme)
9395
self.config = config
94-
self.sessions = deque()
96+
self.max_pool_size = config.get("max_pool_size", DEFAULT_MAX_POOL_SIZE)
97+
self.session_pool = deque()
9598

9699
def session(self):
97100
""" Create a new session based on the graph database details
@@ -102,14 +105,33 @@ def session(self):
102105
>>> session = driver.session()
103106
104107
"""
105-
try:
106-
session = self.sessions.pop()
107-
except IndexError:
108-
session = Session(self)
109-
else:
110-
session.connection.reset()
108+
session = None
109+
done = False
110+
while not done:
111+
try:
112+
session = self.session_pool.pop()
113+
except IndexError:
114+
session = Session(self)
115+
done = True
116+
else:
117+
if session.healthy:
118+
session.connection.reset()
119+
done = session.healthy
111120
return session
112121

122+
def recycle(self, session):
123+
""" Pass a session back to the driver for recycling, if healthy.
124+
125+
:param session:
126+
:return:
127+
"""
128+
pool = self.session_pool
129+
for s in pool:
130+
if not s.healthy:
131+
pool.remove(s)
132+
if session.healthy and len(pool) < self.max_pool_size and session not in pool:
133+
pool.appendleft(session)
134+
113135

114136
class Result(list):
115137
""" A handler for the result of Cypher statement execution.
@@ -344,19 +366,25 @@ def __init__(self, driver):
344366
self.connection = connect(driver.host, driver.port, **driver.config)
345367
self.transaction = None
346368
self.bench_tests = []
347-
self.closed = False
348369

349370
def __del__(self):
350-
if not self.closed:
371+
if not self.connection.closed:
351372
self.connection.close()
352-
self.closed = True
353373

354374
def __enter__(self):
355375
return self
356376

357377
def __exit__(self, exc_type, exc_value, traceback):
358378
self.close()
359379

380+
@property
381+
def healthy(self):
382+
""" Return ``True`` if this session is healthy, ``False`` if
383+
unhealthy and ``None`` if closed.
384+
"""
385+
connection = self.connection
386+
return None if connection.closed else not connection.defunct
387+
360388
def run(self, statement, parameters=None):
361389
""" Run a parameterised Cypher statement.
362390
@@ -411,9 +439,7 @@ def run(self, statement, parameters=None):
411439
def close(self):
412440
""" If still usable, return this session to the driver pool it came from.
413441
"""
414-
if not self.connection.defunct:
415-
self.driver.sessions.appendleft(self)
416-
self.closed = True
442+
self.driver.recycle(self)
417443

418444
def begin_transaction(self):
419445
""" Create a new :class:`.Transaction` within this session.

test/test_session.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,35 @@
2424
from mock import patch
2525
from neo4j.v1.session import GraphDatabase, CypherError, Record, record
2626
from neo4j.v1.typesystem import Node, Relationship, Path
27-
from test.util import watch
27+
28+
29+
class DriverTestCase(TestCase):
30+
31+
def test_healthy_session_will_be_returned_to_the_pool_on_close(self):
32+
driver = GraphDatabase.driver("bolt://localhost")
33+
assert len(driver.session_pool) == 0
34+
driver.session().close()
35+
assert len(driver.session_pool) == 1
36+
37+
def test_unhealthy_session_will_not_be_returned_to_the_pool_on_close(self):
38+
driver = GraphDatabase.driver("bolt://localhost")
39+
assert len(driver.session_pool) == 0
40+
session = driver.session()
41+
session.connection.defunct = True
42+
session.close()
43+
assert len(driver.session_pool) == 0
44+
45+
def session_pool_cannot_exceed_max_size(self):
46+
driver = GraphDatabase.driver("bolt://localhost", max_pool_size=1)
47+
assert len(driver.session_pool) == 0
48+
driver.session().close()
49+
assert len(driver.session_pool) == 1
50+
driver.session().close()
51+
assert len(driver.session_pool) == 1
2852

2953

3054
class RunTestCase(TestCase):
55+
3156
def test_must_use_valid_url_scheme(self):
3257
with self.assertRaises(ValueError):
3358
GraphDatabase.driver("x://xxx")

0 commit comments

Comments
 (0)