Skip to content

Commit 315077f

Browse files
committed
Don't send RESET on READY (clean) connections
1 parent 56f1606 commit 315077f

File tree

5 files changed

+117
-29
lines changed

5 files changed

+117
-29
lines changed

neo4j/io/__init__.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class Bolt(abc.ABC):
123123
PROTOCOL_VERSION = None
124124

125125
# flag if connection needs RESET to go back to READY state
126-
_is_reset = True
126+
is_reset = False
127127

128128
# The socket
129129
in_use = False
@@ -460,10 +460,6 @@ def rollback(self, **handlers):
460460
""" Appends a ROLLBACK message to the output queue."""
461461
pass
462462

463-
@property
464-
def is_reset(self):
465-
return self._is_reset
466-
467463
@abc.abstractmethod
468464
def reset(self):
469465
""" Appends a RESET message to the outgoing queue, sends it and consumes

neo4j/io/_bolt3.py

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
# See the License for the specific language governing permissions and
1919
# limitations under the License.
2020

21+
from enum import Enum
2122
from logging import getLogger
2223
from ssl import SSLSocket
2324

@@ -52,6 +53,38 @@
5253
log = getLogger("neo4j")
5354

5455

56+
class ServerStates(Enum):
57+
CONNECTED = "CONNECTED"
58+
READY = "READY"
59+
STREAMING = "STREAMING"
60+
TX_READY_OR_TX_STREAMING = "TX_READY||TX_STREAMING"
61+
FAILED = "FAILED"
62+
63+
64+
STATE_TRANSITIONS = {
65+
ServerStates.CONNECTED: {
66+
"hello": ServerStates.READY,
67+
},
68+
ServerStates.READY: {
69+
"run": ServerStates.STREAMING,
70+
"begin": ServerStates.TX_READY_OR_TX_STREAMING,
71+
},
72+
ServerStates.STREAMING: {
73+
"pull": ServerStates.READY,
74+
"discard": ServerStates.READY,
75+
"reset": ServerStates.READY,
76+
},
77+
ServerStates.TX_READY_OR_TX_STREAMING: {
78+
"commit": ServerStates.READY,
79+
"rollback": ServerStates.READY,
80+
"reset": ServerStates.READY,
81+
},
82+
ServerStates.FAILED: {
83+
"reset": ServerStates.READY,
84+
}
85+
}
86+
87+
5588
class Bolt3(Bolt):
5689
""" Protocol handler for Bolt 3.
5790
@@ -64,6 +97,16 @@ class Bolt3(Bolt):
6497

6598
supports_multiple_databases = False
6699

100+
_server_state = ServerStates.CONNECTED
101+
102+
@property
103+
def is_reset(self):
104+
if self.responses:
105+
# we can't be sure of the server's state as there are still pending
106+
# responses.
107+
return False
108+
return self._server_state == ServerStates.READY
109+
67110
@property
68111
def encrypted(self):
69112
return isinstance(self.socket, SSLSocket)
@@ -92,7 +135,8 @@ def hello(self):
92135
logged_headers["credentials"] = "*******"
93136
log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers)
94137
self._append(b"\x01", (headers,),
95-
response=InitResponse(self, on_success=self.server_info.update))
138+
response=InitResponse(self, "hello",
139+
on_success=self.server_info.update))
96140
self.send_all()
97141
self.fetch_all()
98142
check_supported_server_product(self.server_info.agent)
@@ -155,20 +199,20 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
155199
fields = (query, parameters, extra)
156200
log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields)))
157201
if query.upper() == u"COMMIT":
158-
self._append(b"\x10", fields, CommitResponse(self, **handlers))
202+
self._append(b"\x10", fields, CommitResponse(self, "run",
203+
**handlers))
159204
else:
160-
self._append(b"\x10", fields, Response(self, **handlers))
161-
self._is_reset = False
205+
self._append(b"\x10", fields, Response(self, "run", **handlers))
162206

163207
def discard(self, n=-1, qid=-1, **handlers):
164208
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
165209
log.debug("[#%04X] C: DISCARD_ALL", self.local_port)
166-
self._append(b"\x2F", (), Response(self, **handlers))
210+
self._append(b"\x2F", (), Response(self, "discard", **handlers))
167211

168212
def pull(self, n=-1, qid=-1, **handlers):
169213
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
170214
log.debug("[#%04X] C: PULL_ALL", self.local_port)
171-
self._append(b"\x3F", (), Response(self, **handlers))
215+
self._append(b"\x3F", (), Response(self, "pull", **handlers))
172216
self._is_reset = False
173217

174218
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
@@ -193,16 +237,16 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None,
193237
except TypeError:
194238
raise TypeError("Timeout must be specified as a number of seconds")
195239
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
196-
self._append(b"\x11", (extra,), Response(self, **handlers))
240+
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))
197241
self._is_reset = False
198242

199243
def commit(self, **handlers):
200244
log.debug("[#%04X] C: COMMIT", self.local_port)
201-
self._append(b"\x12", (), CommitResponse(self, **handlers))
245+
self._append(b"\x12", (), CommitResponse(self, "commit", **handlers))
202246

203247
def rollback(self, **handlers):
204248
log.debug("[#%04X] C: ROLLBACK", self.local_port)
205-
self._append(b"\x13", (), Response(self, **handlers))
249+
self._append(b"\x13", (), Response(self, "rollback", **handlers))
206250

207251
def reset(self):
208252
""" Add a RESET message to the outgoing queue, send
@@ -213,11 +257,22 @@ def fail(metadata):
213257
raise BoltProtocolError("RESET failed %r" % metadata, address=self.unresolved_address)
214258

215259
log.debug("[#%04X] C: RESET", self.local_port)
216-
self._append(b"\x0F", response=Response(self, on_failure=fail))
260+
self._append(b"\x0F", response=Response(self, "reset", on_failure=fail))
217261
self.send_all()
218262
self.fetch_all()
219263
self._is_reset = True
220264

265+
def _update_server_state_on_success(self, metadata, message):
266+
if metadata.get("has_more"):
267+
return
268+
state_before = self._server_state
269+
self._server_state = STATE_TRANSITIONS\
270+
.get(self._server_state, {})\
271+
.get(message, self._server_state)
272+
if state_before != self._server_state:
273+
log.debug("[#%04X] State: %s", self.local_port,
274+
self._server_state.name)
275+
221276
def fetch_message(self):
222277
""" Receive at most one message from the server, if available.
223278
@@ -249,12 +304,15 @@ def fetch_message(self):
249304
response.complete = True
250305
if summary_signature == b"\x70":
251306
log.debug("[#%04X] S: SUCCESS %r", self.local_port, summary_metadata)
307+
self._update_server_state_on_success(summary_metadata,
308+
response.message)
252309
response.on_success(summary_metadata or {})
253310
elif summary_signature == b"\x7E":
254311
log.debug("[#%04X] S: IGNORED", self.local_port)
255312
response.on_ignored(summary_metadata or {})
256313
elif summary_signature == b"\x7F":
257314
log.debug("[#%04X] S: FAILURE %r", self.local_port, summary_metadata)
315+
self._server_state = ServerStates.FAILED
258316
try:
259317
response.on_failure(summary_metadata or {})
260318
except (ServiceUnavailable, DatabaseUnavailable):

neo4j/io/_bolt4.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
# See the License for the specific language governing permissions and
1919
# limitations under the License.
2020

21+
from enum import Enum
2122
from logging import getLogger
2223
from ssl import SSLSocket
2324

@@ -37,7 +38,6 @@
3738
Neo4jError,
3839
NotALeader,
3940
ServiceUnavailable,
40-
SessionExpired,
4141
)
4242
from neo4j.io import (
4343
Bolt,
@@ -48,6 +48,10 @@
4848
InitResponse,
4949
Response,
5050
)
51+
from neo4j.io._bolt3 import (
52+
ServerStates,
53+
STATE_TRANSITIONS,
54+
)
5155

5256

5357
log = getLogger("neo4j")
@@ -65,6 +69,16 @@ class Bolt4x0(Bolt):
6569

6670
supports_multiple_databases = True
6771

72+
_server_state = ServerStates.CONNECTED
73+
74+
@property
75+
def is_reset(self):
76+
if self.responses:
77+
# we can't be sure of the server's state as there are still pending
78+
# responses.
79+
return False
80+
return self._server_state == ServerStates.READY
81+
6882
@property
6983
def encrypted(self):
7084
return isinstance(self.socket, SSLSocket)
@@ -93,7 +107,8 @@ def hello(self):
93107
logged_headers["credentials"] = "*******"
94108
log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers)
95109
self._append(b"\x01", (headers,),
96-
response=InitResponse(self, on_success=self.server_info.update))
110+
response=InitResponse(self, "hello",
111+
on_success=self.server_info.update))
97112
self.send_all()
98113
self.fetch_all()
99114
check_supported_server_product(self.server_info.agent)
@@ -162,24 +177,25 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
162177
fields = (query, parameters, extra)
163178
log.debug("[#%04X] C: RUN %s", self.local_port, " ".join(map(repr, fields)))
164179
if query.upper() == u"COMMIT":
165-
self._append(b"\x10", fields, CommitResponse(self, **handlers))
180+
self._append(b"\x10", fields, CommitResponse(self, "run",
181+
**handlers))
166182
else:
167-
self._append(b"\x10", fields, Response(self, **handlers))
183+
self._append(b"\x10", fields, Response(self, "run", **handlers))
168184
self._is_reset = False
169185

170186
def discard(self, n=-1, qid=-1, **handlers):
171187
extra = {"n": n}
172188
if qid != -1:
173189
extra["qid"] = qid
174190
log.debug("[#%04X] C: DISCARD %r", self.local_port, extra)
175-
self._append(b"\x2F", (extra,), Response(self, **handlers))
191+
self._append(b"\x2F", (extra,), Response(self, "discard", **handlers))
176192

177193
def pull(self, n=-1, qid=-1, **handlers):
178194
extra = {"n": n}
179195
if qid != -1:
180196
extra["qid"] = qid
181197
log.debug("[#%04X] C: PULL %r", self.local_port, extra)
182-
self._append(b"\x3F", (extra,), Response(self, **handlers))
198+
self._append(b"\x3F", (extra,), Response(self, "pull", **handlers))
183199
self._is_reset = False
184200

185201
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
@@ -205,16 +221,16 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
205221
except TypeError:
206222
raise TypeError("Timeout must be specified as a number of seconds")
207223
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
208-
self._append(b"\x11", (extra,), Response(self, **handlers))
224+
self._append(b"\x11", (extra,), Response(self, "begin", **handlers))
209225
self._is_reset = False
210226

211227
def commit(self, **handlers):
212228
log.debug("[#%04X] C: COMMIT", self.local_port)
213-
self._append(b"\x12", (), CommitResponse(self, **handlers))
229+
self._append(b"\x12", (), CommitResponse(self, "commit", **handlers))
214230

215231
def rollback(self, **handlers):
216232
log.debug("[#%04X] C: ROLLBACK", self.local_port)
217-
self._append(b"\x13", (), Response(self, **handlers))
233+
self._append(b"\x13", (), Response(self, "rollback", **handlers))
218234

219235
def reset(self):
220236
""" Add a RESET message to the outgoing queue, send
@@ -225,11 +241,22 @@ def fail(metadata):
225241
raise BoltProtocolError("RESET failed %r" % metadata, self.unresolved_address)
226242

227243
log.debug("[#%04X] C: RESET", self.local_port)
228-
self._append(b"\x0F", response=Response(self, on_failure=fail))
244+
self._append(b"\x0F", response=Response(self, "reset", on_failure=fail))
229245
self.send_all()
230246
self.fetch_all()
231247
self._is_reset = True
232248

249+
def _update_server_state_on_success(self, metadata, message):
250+
if metadata.get("has_more"):
251+
return
252+
state_before = self._server_state
253+
self._server_state = STATE_TRANSITIONS\
254+
.get(self._server_state, {})\
255+
.get(message, self._server_state)
256+
if state_before != self._server_state:
257+
log.debug("[#%04X] [%s]", self.local_port,
258+
self._server_state.name)
259+
233260
def fetch_message(self):
234261
""" Receive at most one message from the server, if available.
235262
@@ -261,12 +288,15 @@ def fetch_message(self):
261288
response.complete = True
262289
if summary_signature == b"\x70":
263290
log.debug("[#%04X] S: SUCCESS %r", self.local_port, summary_metadata)
291+
self._update_server_state_on_success(summary_metadata,
292+
response.message)
264293
response.on_success(summary_metadata or {})
265294
elif summary_signature == b"\x7E":
266295
log.debug("[#%04X] S: IGNORED", self.local_port)
267296
response.on_ignored(summary_metadata or {})
268297
elif summary_signature == b"\x7F":
269298
log.debug("[#%04X] S: FAILURE %r", self.local_port, summary_metadata)
299+
self._server_state = ServerStates.FAILED
270300
try:
271301
response.on_failure(summary_metadata or {})
272302
except (ServiceUnavailable, DatabaseUnavailable):
@@ -372,7 +402,9 @@ def fail(md):
372402
else:
373403
bookmarks = list(bookmarks)
374404
self._append(b"\x66", (routing_context, bookmarks, database),
375-
response=Response(self, on_success=metadata.update, on_failure=fail))
405+
response=Response(self, "route",
406+
on_success=metadata.update,
407+
on_failure=fail))
376408
self.send_all()
377409
self.fetch_all()
378410
return [metadata.get("rt")]
@@ -400,7 +432,8 @@ def on_success(metadata):
400432
logged_headers["credentials"] = "*******"
401433
log.debug("[#%04X] C: HELLO %r", self.local_port, logged_headers)
402434
self._append(b"\x01", (headers,),
403-
response=InitResponse(self, on_success=on_success))
435+
response=InitResponse(self, "hello",
436+
on_success=on_success))
404437
self.send_all()
405438
self.fetch_all()
406439
check_supported_server_product(self.server_info.agent)

neo4j/io/_common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,10 @@ class Response:
144144
more detail messages followed by one summary message).
145145
"""
146146

147-
def __init__(self, connection, **handlers):
147+
def __init__(self, connection, message, **handlers):
148148
self.connection = connection
149149
self.handlers = handlers
150+
self.message = message
150151
self.complete = False
151152

152153
def on_records(self, records):

testkitbackend/test_config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
"features": {
3535
"AuthorizationExpiredTreatment": true,
3636
"Optimization:ImplicitDefaultArguments": true,
37-
"Optimization:MinimalResets": "Driver resets some clean connections when put back into pool",
37+
"Optimization:MinimalResets": true,
3838
"Optimization:ConnectionReuse": true,
3939
"Optimization:PullPipelining": true,
4040
"ConfHint:connection.recv_timeout_seconds": true,

0 commit comments

Comments
 (0)