Skip to content

Commit c2cfedf

Browse files
committed
Close session and connection used for rediscovery
Previously session and the underlying connection were not returned to the pool after the rediscovery procedure. This happened because rediscovery session was not initalized with an appropriate callback. This commit fixes the problem by extracting the release callback code into a function that gets called for both "normal" and "rediscovery" sessions. Probably a cleaner way to fix this would be to move functionality into `Session#close()` and make connection release itself to the pool. However this resulted into lots of non-trivial changes and thus should go into a separate PR.
1 parent f8ebe97 commit c2cfedf

File tree

5 files changed

+110
-20
lines changed

5 files changed

+110
-20
lines changed

src/v1/driver.js

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -119,30 +119,33 @@ class Driver {
119119
//we don't need to tell the driver about this error
120120
}
121121
});
122-
return this._createSession(connectionPromise, (cb) => {
123-
// This gets called on Session#close(), and is where we return
124-
// the pooled 'connection' instance.
125-
126-
// We don't pool Session instances, to avoid users using the Session
127-
// after they've called close. The `Session` object is just a thin
128-
// wrapper around Connection anyway, so it makes little difference.
129-
130-
// Queue up a 'reset', to ensure the next user gets a clean
131-
// session to work with.
122+
return this._createSession(connectionPromise, this._releaseConnection(connectionPromise));
123+
}
132124

133-
connectionPromise.then( (conn) => {
125+
/**
126+
* The returned function gets called on Session#close(), and is where we return the pooled 'connection' instance.
127+
* We don't pool Session instances, to avoid users using the Session after they've called close.
128+
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
129+
* @param {Promise} connectionPromise - promise resolved with the connection.
130+
* @return {function(*=)} - function that releases the connection and then executes an optional callback.
131+
* @protected
132+
*/
133+
_releaseConnection(connectionPromise) {
134+
return userDefinedCallback => {
135+
connectionPromise.then(conn => {
136+
// Queue up a 'reset', to ensure the next user gets a clean session to work with.
134137
conn.reset();
135138
conn.sync();
136139

137140
// Return connection to the pool
138141
conn._release();
139-
}).catch( () => {/*ignore errors here*/});
142+
}).catch(ignoredError => {
143+
});
140144

141-
// Call user callback
142-
if (cb) {
143-
cb();
145+
if (userDefinedCallback) {
146+
userDefinedCallback();
144147
}
145-
});
148+
};
146149
}
147150

148151
static _validateSessionMode(rawMode) {

src/v1/internal/get-servers-util.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
2727
export default class GetServersUtil {
2828

2929
callGetServers(session, routerAddress) {
30-
// todo: session should be closed here along with the underlying connection
3130
return session.run(PROCEDURE_CALL).then(result => {
31+
session.close();
3232
return result.records;
3333
}).catch(error => {
3434
if (error.code === PROCEDURE_NOT_FOUND_CODE) {

src/v1/routing-driver.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ class RoutingDriver extends Driver {
129129
this._forget(previousRouter);
130130
}
131131

132-
// todo: properly close this connection
133132
const connection = this._pool.acquire(currentRouter);
134-
const session = this._createSession(Promise.resolve(connection));
133+
const connectionPromise = Promise.resolve(connection);
134+
const session = this._createSession(connectionPromise, this._releaseConnection(connectionPromise));
135135

136136
// try next router
137137
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);

test/internal/get-servers-util.test.js

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,30 @@ describe('get-servers-util', () => {
4040
it('should return retrieved records when query succeeds', done => {
4141
const session = FakeSession.successful({records: ['foo', 'bar', 'baz']});
4242

43-
callGetServers(session, '').then(records => {
43+
callGetServers(session).then(records => {
4444
expect(records).toEqual(['foo', 'bar', 'baz']);
4545
done();
4646
}).catch(console.log);
4747
});
4848

49+
it('should close session when query succeeds', done => {
50+
const session = FakeSession.successful({records: ['foo', 'bar', 'baz']});
51+
52+
callGetServers(session).then(() => {
53+
expect(session.isClosed()).toBeTruthy();
54+
done();
55+
}).catch(console.log);
56+
});
57+
58+
it('should not close session when query fails', done => {
59+
const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED));
60+
61+
callGetServers(session).then(() => {
62+
expect(session.isClosed()).toBeFalsy();
63+
done();
64+
}).catch(console.log);
65+
});
66+
4967
it('should return null on connection error', done => {
5068
const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED));
5169

@@ -229,6 +247,7 @@ describe('get-servers-util', () => {
229247

230248
constructor(runResponse) {
231249
this._runResponse = runResponse;
250+
this._closed = false;
232251
}
233252

234253
static successful(result) {
@@ -242,5 +261,13 @@ describe('get-servers-util', () => {
242261
run() {
243262
return this._runResponse;
244263
}
264+
265+
close() {
266+
this._closed = true;
267+
}
268+
269+
isClosed() {
270+
return this._closed;
271+
}
245272
}
246273
});

test/v1/routing.driver.boltkit.it.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,66 @@ describe('routing driver', function () {
783783
});
784784
});
785785

786+
it('should close connection used for routing table refreshing', done => {
787+
if (!boltkit.BoltKitSupport) {
788+
done();
789+
return;
790+
}
791+
792+
const kit = new boltkit.BoltKit();
793+
// server is both router and writer
794+
const server = kit.start('./test/resources/boltkit/discover_new_servers.script', 9001);
795+
796+
kit.run(() => {
797+
const driver = newDriver('bolt+routing://127.0.0.1:9001');
798+
799+
const acquiredConnections = [];
800+
const releasedConnections = [];
801+
setUpPoolToMemorizeAllAcquiredAndReleasedConnections(driver, acquiredConnections, releasedConnections);
802+
803+
const session = driver.session();
804+
session.run('MATCH (n) RETURN n.name').then(() => {
805+
session.close(() => {
806+
driver.close();
807+
server.exit(code => {
808+
expect(code).toEqual(0);
809+
810+
// two connections should have been acquired: one for rediscovery and one for the query
811+
expect(acquiredConnections.length).toEqual(2);
812+
// same two connections should have been released
813+
expect(releasedConnections.length).toEqual(2);
814+
815+
// verify that acquired connections are those that we released
816+
for (let i = 0; i < acquiredConnections.length; i++) {
817+
expect(acquiredConnections[i]).toBe(releasedConnections[i]);
818+
}
819+
820+
done();
821+
});
822+
});
823+
});
824+
});
825+
});
826+
827+
function setUpPoolToMemorizeAllAcquiredAndReleasedConnections(driver, acquiredConnections, releasedConnections) {
828+
// make connection pool remember all acquired connections
829+
const originalAcquire = driver._pool.acquire.bind(driver._pool);
830+
const memorizingAcquire = (...args) => {
831+
const connection = originalAcquire(...args);
832+
acquiredConnections.push(connection);
833+
return connection;
834+
};
835+
driver._pool.acquire = memorizingAcquire;
836+
837+
// make connection pool remember all released connections
838+
const originalRelease = driver._pool._release;
839+
const rememberingRelease = (key, resource) => {
840+
originalRelease(key, resource);
841+
releasedConnections.push(resource);
842+
};
843+
driver._pool._release = rememberingRelease;
844+
}
845+
786846
function newDriver(url) {
787847
// BoltKit currently does not support encryption, create driver with encryption turned off
788848
return neo4j.driver(url, neo4j.auth.basic("neo4j", "neo4j"), {

0 commit comments

Comments
 (0)