Skip to content

Commit ecb7930

Browse files
authored
Send RESET when putting a connection back into the pool (#539)
* Send RESET when putting a connection back into the pool * Fix bubbling of unwanted exception when fetching RT * Update skipped tests to accommodate previous changes
1 parent bfa2542 commit ecb7930

File tree

5 files changed

+60
-80
lines changed

5 files changed

+60
-80
lines changed

neo4j/io/__init__.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
ConfigurationError,
8484
DriverError,
8585
IncompleteCommit,
86+
Neo4jError,
8687
ReadServiceUnavailable,
8788
ServiceUnavailable,
8889
SessionExpired,
@@ -121,6 +122,9 @@ class Bolt(abc.ABC):
121122

122123
PROTOCOL_VERSION = None
123124

125+
# flag if connection needs RESET to go back to READY state
126+
_is_reset = True
127+
124128
# The socket
125129
in_use = False
126130

@@ -144,7 +148,6 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
144148
self.responses = deque()
145149
self._max_connection_lifetime = max_connection_lifetime
146150
self._creation_timestamp = perf_counter()
147-
self._is_reset = True
148151
self.routing_context = routing_context
149152

150153
# Determine the user agent
@@ -447,6 +450,10 @@ def rollback(self, **handlers):
447450
""" Appends a ROLLBACK message to the output queue."""
448451
pass
449452

453+
@property
454+
def is_reset(self):
455+
return self._is_reset
456+
450457
@abc.abstractmethod
451458
def reset(self):
452459
""" Appends a RESET message to the outgoing queue, sends it and consumes
@@ -564,23 +571,26 @@ def _set_defunct(self, message, error=None, silent=False):
564571
def stale(self):
565572
return (self._stale
566573
or (0 <= self._max_connection_lifetime
567-
<= perf_counter()- self._creation_timestamp))
574+
<= perf_counter() - self._creation_timestamp))
568575

569576
_stale = False
570577

571578
def set_stale(self):
572579
self._stale = True
573580

581+
@abc.abstractmethod
574582
def close(self):
575583
""" Close the connection.
576584
"""
577-
raise NotImplementedError
585+
pass
578586

587+
@abc.abstractmethod
579588
def closed(self):
580-
raise NotImplementedError
589+
pass
581590

591+
@abc.abstractmethod
582592
def defunct(self):
583-
raise NotImplementedError
593+
pass
584594

585595

586596
class IOPool:
@@ -682,6 +692,13 @@ def release(self, *connections):
682692
"""
683693
with self.lock:
684694
for connection in connections:
695+
if not connection.is_reset:
696+
try:
697+
connection.reset()
698+
except (Neo4jError, DriverError, BoltError) as e:
699+
log.debug(
700+
"Failed to reset connection on release: %s", e
701+
)
685702
connection.in_use = False
686703
self.cond.notify_all()
687704

@@ -920,7 +937,7 @@ def fetch_routing_table(self, *, address, timeout, database, bookmarks):
920937
try:
921938
new_routing_info = self.fetch_routing_info(address, database,
922939
bookmarks, timeout)
923-
except ServiceUnavailable:
940+
except (ServiceUnavailable, SessionExpired):
924941
new_routing_info = None
925942
if not new_routing_info:
926943
log.debug("Failed to fetch routing info %s", address)

neo4j/io/_bolt4.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,6 @@ class Bolt4x0(Bolt):
6161

6262
PROTOCOL_VERSION = Version(4, 0)
6363

64-
# The socket
65-
in_use = False
66-
67-
# The socket
68-
_closed = False
69-
70-
# The socket
71-
_defunct = False
72-
73-
#: The pool of which this connection is a member
74-
pool = None
75-
7664
supports_multiple_results = True
7765

7866
supports_multiple_databases = True

neo4j/work/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def rollback(self):
160160
raise TransactionError("Transaction closed")
161161
metadata = {}
162162
try:
163-
if not self._connection._is_reset:
163+
if not self._connection.is_reset:
164164
# DISCARD pending records then do a rollback.
165165
self._consume_results()
166166
self._connection.rollback(on_success=metadata.update)

testkitbackend/skipped_tests.json

Lines changed: 12 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,24 @@
11
{
2-
"stub.routing.Routing.test_should_use_resolver_during_rediscovery_when_existing_routers_fail":
3-
"DNS resolver not implemented",
4-
"stub.routing.RoutingV3.test_should_use_resolver_during_rediscovery_when_existing_routers_fail":
5-
"DNS resolver not implemented",
6-
"stub.routing.RoutingV4.test_should_use_resolver_during_rediscovery_when_existing_routers_fail":
7-
"DNS resolver not implemented",
8-
"stub.routing.NoRouting.test_should_use_resolver_during_rediscovery_when_existing_routers_fail":
9-
"DNS resolver not implemented",
10-
"stub.routing.Routing.test_should_request_rt_from_all_initial_routers_until_successful":
11-
"DNS resolver not implemented",
12-
"stub.routing.RoutingV3.test_should_request_rt_from_all_initial_routers_until_successful":
13-
"DNS resolver not implemented",
14-
"stub.routing.RoutingV4.test_should_request_rt_from_all_initial_routers_until_successful":
15-
"DNS resolver not implemented",
16-
"stub.routing.NoRouting.test_should_request_rt_from_all_initial_routers_until_successful":
17-
"DNS resolver not implemented",
18-
"stub.routing.Routing.test_should_successfully_acquire_rt_when_router_ip_changes":
19-
"DNS resolver not implemented",
20-
"stub.routing.RoutingV3.test_should_successfully_acquire_rt_when_router_ip_changes":
21-
"DNS resolver not implemented",
22-
"stub.routing.RoutingV4.test_should_successfully_acquire_rt_when_router_ip_changes":
23-
"DNS resolver not implemented",
24-
"stub.routing.NoRouting.test_should_successfully_acquire_rt_when_router_ip_changes":
25-
"DNS resolver not implemented",
26-
"stub.routing.Routing.test_should_read_successfully_on_empty_discovery_result_using_session_run":
27-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
28-
"stub.routing.RoutingV3.test_should_read_successfully_on_empty_discovery_result_using_session_run":
29-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
30-
"stub.routing.RoutingV4.test_should_read_successfully_on_empty_discovery_result_using_session_run":
31-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
32-
"stub.routing.NoRouting.test_should_read_successfully_on_empty_discovery_result_using_session_run":
33-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
34-
"stub.routing.Routing.test_should_retry_read_tx_and_rediscovery_until_success":
35-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
36-
"stub.routing.RoutingV3.test_should_retry_read_tx_and_rediscovery_until_success":
37-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
38-
"stub.routing.RoutingV4.test_should_retry_read_tx_and_rediscovery_until_success":
39-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
40-
"stub.routing.NoRouting.test_should_retry_read_tx_and_rediscovery_until_success":
41-
"Driver iterates over results of custom resolver and settles on first connection yielding a successful handshake",
422
"stub.routing.Routing.test_should_retry_write_until_success_with_leader_change_using_tx_function":
43-
"Driver opens a new connection each time to get a fresh routing table",
3+
"Driver closes connection to router if DNS resolved name not in routing table",
444
"stub.routing.RoutingV3.test_should_retry_write_until_success_with_leader_change_using_tx_function":
45-
"Driver opens a new connection each time to get a fresh routing table",
5+
"Driver closes connection to router if DNS resolved name not in routing table",
466
"stub.routing.RoutingV4.test_should_retry_write_until_success_with_leader_change_using_tx_function":
47-
"Driver opens a new connection each time to get a fresh routing table",
48-
"stub.routing.NoRouting.test_should_retry_write_until_success_with_leader_change_using_tx_function":
49-
"Driver opens a new connection each time to get a fresh routing table",
7+
"Driver closes connection to router if DNS resolved name not in routing table",
508
"stub.routing.Routing.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function":
51-
"Driver opens a new connection each time to get a fresh routing table",
9+
"Driver closes connection to router if DNS resolved name not in routing table",
5210
"stub.routing.RoutingV3.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function":
53-
"Driver opens a new connection each time to get a fresh routing table",
11+
"Driver closes connection to router if DNS resolved name not in routing table",
5412
"stub.routing.RoutingV4.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function":
55-
"Driver opens a new connection each time to get a fresh routing table",
56-
"stub.routing.NoRouting.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function":
57-
"Driver opens a new connection each time to get a fresh routing table",
58-
"stub.routing.Routing.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors":
59-
"Driver uses custom resolver for each connection, not only initial seeding",
60-
"stub.routing.RoutingV3.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors":
61-
"Driver uses custom resolver for each connection, not only initial seeding",
62-
"stub.routing.RoutingV4.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors":
63-
"Driver uses custom resolver for each connection, not only initial seeding",
64-
"stub.routing.NoRouting.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors":
65-
"Driver uses custom resolver for each connection, not only initial seeding",
13+
"Driver closes connection to router if DNS resolved name not in routing table",
14+
"stub.routing.Routing.test_should_successfully_acquire_rt_when_router_ip_changes":
15+
"Test makes assumptions about how verify_connectivity is implemented",
16+
"stub.routing.RoutingV3.test_should_successfully_acquire_rt_when_router_ip_changes":
17+
"Test makes assumptions about how verify_connectivity is implemented",
18+
"stub.routing.RoutingV4.test_should_successfully_acquire_rt_when_router_ip_changes":
19+
"Test makes assumptions about how verify_connectivity is implemented",
6620
"stub.retry.TestRetryClustering.test_retry_ForbiddenOnReadOnlyDatabase_ChangingWriter":
6721
"Test makes assumptions about how verify_connectivity is implemented",
68-
"stub.disconnected.SessionRunDisconnected.test_fail_on_reset":
69-
"It is not reseting connection when putting back to pool",
7022
"stub.authorization.AuthorizationTests.test_should_retry_on_auth_expired_on_begin_using_tx_function":
7123
"Flaky: test requires the driver to contact servers in a specific order",
7224
"stub.authorization.AuthorizationTestsV3.test_should_retry_on_auth_expired_on_begin_using_tx_function":

tests/unit/io/test_direct.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
# limitations under the License.
2020

2121

22-
from unittest import TestCase
22+
from unittest import (
23+
mock,
24+
TestCase,
25+
)
2326
import pytest
2427
from threading import (
2528
Condition,
@@ -277,6 +280,26 @@ def test_multithread(self):
277280
# The pool size is still 5, but all are free
278281
self.assert_pool_size(address, 0, 5, pool)
279282

283+
def test_reset_when_released(self):
284+
def test(is_reset):
285+
with mock.patch(__name__ + ".QuickConnection.is_reset",
286+
new_callable=mock.PropertyMock) as is_reset_mock:
287+
with mock.patch(__name__ + ".QuickConnection.reset",
288+
new_callable=mock.MagicMock) as reset_mock:
289+
is_reset_mock.return_value = is_reset
290+
connection = self.pool._acquire(address, timeout=3)
291+
self.assertIsInstance(connection, QuickConnection)
292+
self.assertEqual(is_reset_mock.call_count, 0)
293+
self.assertEqual(reset_mock.call_count, 0)
294+
self.pool.release(connection)
295+
self.assertEqual(is_reset_mock.call_count, 1)
296+
self.assertEqual(reset_mock.call_count, int(not is_reset))
297+
298+
address = ("127.0.0.1", 7687)
299+
for is_reset in (True, False):
300+
with self.subTest():
301+
test(is_reset)
302+
280303

281304
def acquire_release_conn(pool, address, acquired_counter, release_event):
282305
conn = pool._acquire(address, timeout=3)

0 commit comments

Comments
 (0)