Skip to content

Commit bfa2542

Browse files
authored
Fix flaky tests (#541)
* Fix flaky test * Handle SessionExpired while acquiring routing table
1 parent b66c14c commit bfa2542

File tree

2 files changed

+67
-9
lines changed

2 files changed

+67
-9
lines changed

neo4j/io/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -893,7 +893,7 @@ def fetch_routing_info(self, address, database, bookmarks, timeout):
893893
# routing is broken.
894894
log.debug("Routing is broken (%s)", error)
895895
raise ServiceUnavailable(*error.args)
896-
except ServiceUnavailable as error:
896+
except (ServiceUnavailable, SessionExpired) as error:
897897
# The routing table request suffered a connection
898898
# failure. This should return a null routing table,
899899
# signalling to the caller to retry the request

tests/unit/io/test_direct.py

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@
2222
from unittest import TestCase
2323
import pytest
2424
from threading import (
25-
Thread,
25+
Condition,
2626
Event,
27+
Lock,
28+
Thread,
2729
)
30+
import time
31+
2832
from neo4j import (
2933
Config,
3034
PoolConfig,
@@ -120,6 +124,52 @@ def test_ping_timeout(self):
120124
assert protocol_version is None
121125

122126

127+
class MultiEvent:
128+
# Adopted from threading.Event
129+
130+
def __init__(self):
131+
super().__init__()
132+
self._cond = Condition(Lock())
133+
self._counter = 0
134+
135+
def _reset_internal_locks(self):
136+
# private! called by Thread._reset_internal_locks by _after_fork()
137+
self._cond.__init__(Lock())
138+
139+
def counter(self):
140+
return self._counter
141+
142+
def increment(self):
143+
with self._cond:
144+
self._counter += 1
145+
self._cond.notify_all()
146+
147+
def decrement(self):
148+
with self._cond:
149+
self._counter -= 1
150+
self._cond.notify_all()
151+
152+
def clear(self):
153+
with self._cond:
154+
self._counter = 0
155+
self._cond.notify_all()
156+
157+
def wait(self, value=0, timeout=None):
158+
with self._cond:
159+
t_start = time.time()
160+
while True:
161+
if value == self._counter:
162+
return True
163+
if timeout is None:
164+
time_left = None
165+
else:
166+
time_left = timeout - (time.time() - t_start)
167+
if time_left <= 0:
168+
return False
169+
if not self._cond.wait(time_left):
170+
return False
171+
172+
123173
class ConnectionPoolTestCase(TestCase):
124174

125175
def setUp(self):
@@ -200,28 +250,36 @@ def test_max_conn_pool_size(self):
200250
def test_multithread(self):
201251
with FakeBoltPool((), max_connection_pool_size=5) as pool:
202252
address = ("127.0.0.1", 7687)
203-
releasing_event = Event()
253+
acquired_counter = MultiEvent()
254+
release_event = Event()
204255

205-
# We start 10 threads to compete connections from pool with size of 5
256+
# start 10 threads competing for connections from a pool of size 5
206257
threads = []
207258
for i in range(10):
208-
t = Thread(target=acquire_release_conn, args=(pool, address, releasing_event))
259+
t = Thread(
260+
target=acquire_release_conn,
261+
args=(pool, address, acquired_counter, release_event),
262+
daemon=True
263+
)
209264
t.start()
210265
threads.append(t)
211266

267+
if not acquired_counter.wait(5, timeout=1):
268+
raise RuntimeError("Acquire threads not fast enough")
212269
# The pool size should be 5, all are in-use
213270
self.assert_pool_size(address, 5, 0, pool)
214271
# Now we allow thread to release connections they obtained from pool
215-
releasing_event.set()
272+
release_event.set()
216273

217274
# wait for all threads to release connections back to pool
218275
for t in threads:
219-
t.join()
276+
t.join(timeout=1)
220277
# The pool size is still 5, but all are free
221278
self.assert_pool_size(address, 0, 5, pool)
222279

223280

224-
def acquire_release_conn(pool, address, releasing_event):
281+
def acquire_release_conn(pool, address, acquired_counter, release_event):
225282
conn = pool._acquire(address, timeout=3)
226-
releasing_event.wait()
283+
acquired_counter.increment()
284+
release_event.wait()
227285
pool.release(conn)

0 commit comments

Comments
 (0)