Skip to content

Commit 2277afb

Browse files
committed
Support for routing when running on transactions
1 parent d29d74e commit 2277afb

File tree

6 files changed

+57
-4
lines changed

6 files changed

+57
-4
lines changed

src/v1/internal/stream-observer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class StreamObserver {
3434
* @constructor
3535
* @param errorCallback optional callback to be used for adding additional logic on error
3636
*/
37-
constructor(errorCallback = () => {}) {
37+
constructor(errorCallback = (err) => {return err}) {
3838
this._fieldKeys = null;
3939
this._fieldLookup = null;
4040
this._queuedRecords = [];

src/v1/session.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class Session {
8787
}
8888

8989
this._hasTx = true;
90-
return new Transaction(this._connectionPromise, () => {this._hasTx = false});
90+
return new Transaction(this._connectionPromise, () => {this._hasTx = false}, this._onRunFailure());
9191
}
9292

9393
/**

src/v1/transaction.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,18 @@ class Transaction {
3030
* @param {Promise} connectionPromise - A connection to use
3131
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
3232
*/
33-
constructor(connectionPromise, onClose) {
33+
constructor(connectionPromise, onClose, errorTransformer) {
3434
this._connectionPromise = connectionPromise;
3535
let streamObserver = new _TransactionStreamObserver(this);
3636
this._connectionPromise.then((conn) => {
37+
streamObserver.resolveConnection(conn);
3738
conn.run("BEGIN", {}, streamObserver);
3839
conn.discardAll(streamObserver);
3940
}).catch(streamObserver.onError);
4041

4142
this._state = _states.ACTIVE;
4243
this._onClose = onClose;
44+
this._errorTransformer = errorTransformer;
4345
}
4446

4547
/**
@@ -98,7 +100,7 @@ class Transaction {
98100
/** Internal stream observer used for transactional results*/
99101
class _TransactionStreamObserver extends StreamObserver {
100102
constructor(tx) {
101-
super();
103+
super(tx._errorTransformer || ((err) => {return err}));
102104
this._tx = tx;
103105
//this is to to avoid multiple calls to onError caused by IGNORED
104106
this._hasFailed = false;
@@ -126,6 +128,7 @@ let _states = {
126128
},
127129
run: (connectionPromise, observer, statement, parameters) => {
128130
connectionPromise.then((conn) => {
131+
observer.resolveConnection(conn);
129132
conn.run( statement, parameters || {}, observer );
130133
conn.pullAll( observer );
131134
conn.sync();
@@ -204,6 +207,7 @@ let _states = {
204207

205208
function _runDiscardAll(msg, connectionPromise, observer) {
206209
connectionPromise.then((conn) => {
210+
observer.resolveConnection(conn);
207211
conn.run(msg, {}, observer);
208212
conn.discardAll(observer);
209213
conn.sync();

test/resources/boltkit/not_able_to_write.script

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
!: AUTO INIT
22
!: AUTO RESET
33
!: AUTO PULL_ALL
4+
!: AUTO DISCARD_ALL
45
!: AUTO RUN "ROLLBACK" {}
56
!: AUTO RUN "BEGIN" {}
67
!: AUTO PULL_ALL
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO DISCARD_ALL
5+
!: AUTO RUN "ROLLBACK" {}
6+
!: AUTO RUN "BEGIN" {}
7+
!: AUTO PULL_ALL
8+
9+
C: RUN "CREATE ()" {}
10+
C: PULL_ALL
11+
S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"}
12+
S: IGNORED
13+
C: RUN "COMMIT" {}
14+
S: IGNORED

test/v1/routing.driver.test.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,5 +487,39 @@ describe('routing driver ', function() {
487487
});
488488
});
489489
});
490+
491+
it('should handle leader switch while writing on transaction', function (done) {
492+
if (!boltkit.BoltKitSupport) {
493+
done();
494+
return;
495+
}
496+
// Given
497+
var kit = new boltkit.BoltKit(true);
498+
var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
499+
var readServer = kit.start('./test/resources/boltkit/not_able_to_write_in_transaction.script', 9007);
500+
501+
kit.run(function () {
502+
var driver = neo4j.driver("bolt+routing://127.0.0.1:9001", neo4j.auth.basic("neo4j", "neo4j"));
503+
// When
504+
var session = driver.session();
505+
var tx = session.beginTransaction();
506+
tx.run("CREATE ()");
507+
508+
tx.commit().catch(function (err) {
509+
//the server at 9007 should have been removed
510+
expect(driver._clusterView.writers.toArray()).toEqual([ '127.0.0.1:9008']);
511+
expect(err.code).toEqual(neo4j.SESSION_EXPIRED);
512+
session.close();
513+
driver.close();
514+
seedServer.exit(function (code1) {
515+
readServer.exit(function (code2) {
516+
expect(code1).toEqual(0);
517+
expect(code2).toEqual(0);
518+
done();
519+
});
520+
});
521+
});
522+
});
523+
});
490524
});
491525

0 commit comments

Comments
 (0)