Skip to content

Commit 1479707

Browse files
committed
Replace ACK_FAILURE with RESET
Both have equivalent semantics with regards to failure handing. ACK_FAILURE will also be removed in the next Bolt protocol version.
1 parent b63ff06 commit 1479707

File tree

5 files changed

+60
-65
lines changed

5 files changed

+60
-65
lines changed

src/v1/internal/connector.js

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ else {
4444
let
4545
// Signature bytes for each message type
4646
INIT = 0x01, // 0000 0001 // INIT <user_agent>
47-
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE
47+
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE - unused
4848
RESET = 0x0F, // 0000 1111 // RESET
4949
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
5050
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
@@ -106,7 +106,6 @@ class Connection {
106106
this._packer = packStreamUtil.createLatestPacker(this._chunker);
107107
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
108108

109-
this._ackFailureMuted = false;
110109
this._currentFailure = null;
111110

112111
this._state = new ConnectionState(this);
@@ -247,7 +246,7 @@ class Connection {
247246
} finally {
248247
this._updateCurrentObserver();
249248
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
250-
this._ackFailureIfNeeded();
249+
this._resetOnFailure();
251250
}
252251
break;
253252
case IGNORED:
@@ -326,13 +325,8 @@ class Connection {
326325
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
327326
*/
328327
resetAndFlush() {
329-
if (this._log.isDebugEnabled()) {
330-
this._log.debug(`${this} C: RESET`);
331-
}
332-
this._ackFailureMuted = true;
333-
334328
return new Promise((resolve, reject) => {
335-
const observer = {
329+
this._reset({
336330
onNext: record => {
337331
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
338332
reject(neo4jError);
@@ -347,48 +341,35 @@ class Connection {
347341
}
348342
},
349343
onCompleted: () => {
350-
this._ackFailureMuted = false;
351344
resolve();
352345
}
353-
};
354-
const queued = this._queueObserver(observer);
355-
if (queued) {
356-
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
357-
this._chunker.messageBoundary();
358-
this.sync();
359-
}
346+
});
360347
});
361348
}
362349

363-
_ackFailureIfNeeded() {
364-
if (this._ackFailureMuted) {
365-
return;
366-
}
367-
368-
if (this._log.isDebugEnabled()) {
369-
this._log.debug(`${this} C: ACK_FAILURE`);
370-
}
371-
372-
const observer = {
350+
_resetOnFailure() {
351+
this._reset({
373352
onNext: record => {
374-
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
353+
this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
375354
},
376-
onError: error => {
377-
if (!this._isBroken && !this._ackFailureMuted) {
378-
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
379-
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
380-
} else {
381-
this._currentFailure = null;
382-
}
355+
// clear the current failure when response for RESET is received
356+
onError: () => {
357+
this._currentFailure = null;
383358
},
384359
onCompleted: () => {
385360
this._currentFailure = null;
386361
}
387-
};
362+
});
363+
}
364+
365+
_reset(observer) {
366+
if (this._log.isDebugEnabled()) {
367+
this._log.debug(`${this} C: RESET`);
368+
}
388369

389370
const queued = this._queueObserver(observer);
390371
if (queued) {
391-
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
372+
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
392373
this._chunker.messageBoundary();
393374
this.sync();
394375
}
@@ -477,7 +458,6 @@ class Connection {
477458
}
478459

479460
_handleProtocolError(message) {
480-
this._ackFailureMuted = false;
481461
this._currentFailure = null;
482462
this._updateCurrentObserver();
483463
const error = newError(message, PROTOCOL_ERROR);

test/internal/connector.test.js

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -270,10 +270,6 @@ describe('connector', () => {
270270
testQueueingOfObserversWithBrokenConnection(resetAction);
271271
});
272272

273-
it('should not queue ACK_FAILURE observer when broken', () => {
274-
testQueueingOfObserversWithBrokenConnection(connection => connection._ackFailureIfNeeded());
275-
});
276-
277273
it('should reset and flush when SUCCESS received', done => {
278274
connection = connect('bolt://localhost');
279275

@@ -315,36 +311,16 @@ describe('connector', () => {
315311
connection._handleMessage(RECORD_MESSAGE);
316312
});
317313

318-
it('should ACK_FAILURE when SUCCESS received', () => {
314+
it('should acknowledge failure with RESET when SUCCESS received', () => {
319315
connection = connect('bolt://localhost');
320316

321317
connection._currentFailure = newError('Hello');
322-
connection._ackFailureIfNeeded();
318+
connection._resetOnFailure();
323319

324320
connection._handleMessage(SUCCESS_MESSAGE);
325321
expect(connection._currentFailure).toBeNull();
326322
});
327323

328-
it('should fail the connection when ACK_FAILURE receives FAILURE', () => {
329-
connection = connect('bolt://localhost');
330-
331-
connection._ackFailureIfNeeded();
332-
333-
connection._handleMessage(FAILURE_MESSAGE);
334-
expect(connection._isBroken).toBeTruthy();
335-
expect(connection.isOpen()).toBeFalsy();
336-
});
337-
338-
it('should fail the connection when ACK_FAILURE receives RECORD', () => {
339-
connection = connect('bolt://localhost');
340-
341-
connection._ackFailureIfNeeded();
342-
343-
connection._handleMessage(RECORD_MESSAGE);
344-
expect(connection._isBroken).toBeTruthy();
345-
expect(connection.isOpen()).toBeFalsy();
346-
});
347-
348324
function packedHandshakeMessage() {
349325
const result = alloc(4);
350326
result.putInt32(0, 1);

test/resources/boltstub/address_unavailable_template.script.mst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
!: AUTO INIT
22
!: AUTO RESET
33
!: AUTO PULL_ALL
4-
!: AUTO ACK_FAILURE
54
!: AUTO RUN "ROLLBACK" {}
65
!: AUTO RUN "BEGIN" {}
76
!: AUTO RUN "COMMIT" {}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: AUTO INIT
2+
!: AUTO PULL_ALL
3+
4+
C: RUN "RETURN 10 / 0" {}
5+
C: PULL_ALL
6+
S: FAILURE {"code": "Neo.ClientError.Statement.ArithmeticError", "message": "/ by zero"}
7+
S: IGNORED
8+
C: RESET
9+
S: SUCCESS
10+
C: RESET
11+
S: SUCCESS

test/v1/direct.driver.boltkit.test.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,4 +304,33 @@ describe('direct driver with stub server', () => {
304304
});
305305
});
306306

307+
it('should send RESET on error', done => {
308+
if (!boltStub.supported) {
309+
done();
310+
return;
311+
}
312+
313+
const server = boltStub.start('./test/resources/boltstub/query_with_error.script', 9001);
314+
315+
boltStub.run(() => {
316+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001');
317+
const session = driver.session();
318+
319+
session.run('RETURN 10 / 0').then(result => {
320+
done.fail('Should fail but received a result: ' + JSON.stringify(result));
321+
}).catch(error => {
322+
expect(error.code).toEqual('Neo.ClientError.Statement.ArithmeticError');
323+
expect(error.message).toEqual('/ by zero');
324+
325+
session.close(() => {
326+
driver.close();
327+
server.exit(code => {
328+
expect(code).toEqual(0);
329+
done();
330+
});
331+
});
332+
});
333+
});
334+
});
335+
307336
});

0 commit comments

Comments
 (0)