From 56b1079199c622f8004401b5000deeb1d546b29f Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 20 Jul 2017 15:02:18 +0200 Subject: [PATCH] Forget CC members on DatabaseUnavailable This commit makes routing driver forget Causal Cluster members when they fail with `Neo.TransientError.General.DatabaseUnavailable` error code. It means database is either shutting down or doing store copy. Generally we should not hammer database in such state with new requests and give it some time to either recover (in case of store copy) or just stop using it. Also restructured a bit error handling code in `RoutingDriver` and `RoutingSession`. --- src/v1/routing-driver.js | 70 +++++++++++-------- .../address_unavailable_template.script.mst | 12 ++++ test/v1/routing.driver.boltkit.it.js | 58 +++++++++++++++ 3 files changed, 111 insertions(+), 29 deletions(-) create mode 100644 test/resources/boltkit/address_unavailable_template.script.mst diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 859f6bea9..4310067fc 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -31,7 +31,7 @@ import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './inte class RoutingDriver extends Driver { constructor(url, routingContext, userAgent, token = {}, config = {}) { - super(url, userAgent, token, RoutingDriver._validateConfig(config)); + super(url, userAgent, token, validateConfig(config)); this._routingContext = routingContext; } @@ -42,16 +42,18 @@ class RoutingDriver extends Driver { _createSession(mode, connectionProvider, bookmark, config) { return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => { - if (error.code === SESSION_EXPIRED) { - this._forgetConnection(conn); + if (!conn) { + // connection can be undefined if error happened before connection was acquired return error; - } else if (RoutingDriver._isFailureToWrite(error)) { - let url = 'UNKNOWN'; - // connection is undefined if error happened before connection was acquired - if (conn) { - url = conn.url; - this._connectionProvider.forgetWriter(conn.url); - } + } + + const url = conn.url; + + if (error.code === SESSION_EXPIRED || isDatabaseUnavailable(error)) { + this._connectionProvider.forget(url); + return error; + } else if (isFailureToWrite(error)) { + this._connectionProvider.forgetWriter(url); return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED); } else { return error; @@ -65,30 +67,12 @@ class RoutingDriver extends Driver { return SESSION_EXPIRED; } - _forgetConnection(connection) { - // connection is undefined if error happened before connection was acquired - if (connection) { - this._connectionProvider.forget(connection.url); - } - } - - static _validateConfig(config) { - if(config.trust === 'TRUST_ON_FIRST_USE') { - throw newError('The chosen trust mode is not compatible with a routing driver'); - } - return config; - } - - static _isFailureToWrite(error) { - return error.code === 'Neo.ClientError.Cluster.NotALeader' || - error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; - } - /** * Create new load balancing strategy based on the config. * @param {object} config the user provided config. * @param {Pool} connectionPool the connection pool for this driver. * @return {LoadBalancingStrategy} new strategy. + * @private */ static _createLoadBalancingStrategy(config, connectionPool) { const configuredValue = config.loadBalancingStrategy; @@ -102,6 +86,34 @@ class RoutingDriver extends Driver { } } +/** + * @private + */ +function validateConfig(config) { + if (config.trust === 'TRUST_ON_FIRST_USE') { + throw newError('The chosen trust mode is not compatible with a routing driver'); + } + return config; +} + +/** + * @private + */ +function isFailureToWrite(error) { + return error.code === 'Neo.ClientError.Cluster.NotALeader' || + error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; +} + +/** + * @private + */ +function isDatabaseUnavailable(error) { + return error.code === 'Neo.TransientError.General.DatabaseUnavailable'; +} + +/** + * @private + */ class RoutingSession extends Session { constructor(mode, connectionProvider, bookmark, config, onFailedConnection) { super(mode, connectionProvider, bookmark, config); diff --git a/test/resources/boltkit/address_unavailable_template.script.mst b/test/resources/boltkit/address_unavailable_template.script.mst new file mode 100644 index 000000000..e102105c5 --- /dev/null +++ b/test/resources/boltkit/address_unavailable_template.script.mst @@ -0,0 +1,12 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL +!: AUTO ACK_FAILURE +!: AUTO RUN "ROLLBACK" {} +!: AUTO RUN "BEGIN" {} +!: AUTO RUN "COMMIT" {} + +C: RUN "{{{query}}}" {} +C: PULL_ALL +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database is busy doing store copy"} +S: IGNORED diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index d9bd4666e..ffb8647f0 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -1896,6 +1896,52 @@ describe('routing driver', () => { }); }); + it('should forget writer on database unavailable error', done => { + testAddressPurgeOnDatabaseError(`CREATE (n {name:'Bob'})`, WRITE, done); + }); + + it('should forget reader on database unavailable error', done => { + testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done); + }); + + function testAddressPurgeOnDatabaseError(query, accessMode, done) { + const kit = new boltkit.BoltKit(); + + const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010); + + const serverPort = accessMode === READ ? 9005 : 9007; + const serverAddress = '127.0.0.1:' + serverPort; + const serverTemplateScript = './test/resources/boltkit/address_unavailable_template.script.mst'; + const server = kit.startWithTemplate(serverTemplateScript, {query: query}, serverPort); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9010'); + + const session = driver.session(accessMode); + session.run(query).catch(error => { + expect(error.message).toEqual('Database is busy doing store copy'); + expect(error.code).toEqual('Neo.TransientError.General.DatabaseUnavailable'); + + expect(hasAddressInConnectionPool(driver, serverAddress)).toBeFalsy(); + expect(hasRouterInRoutingTable(driver, serverAddress)).toBeFalsy(); + expect(hasReaderInRoutingTable(driver, serverAddress)).toBeFalsy(); + expect(hasWriterInRoutingTable(driver, serverAddress)).toBeFalsy(); + + session.close(() => { + driver.close(); + + router.exit(code1 => { + server.exit(code2 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + done(); + }); + }); + }); + }); + }); + } + function moveNextDateNow30SecondsForward() { const currentTime = Date.now(); hijackNextDateNowCall(currentTime + 30 * 1000 + 1); @@ -2028,6 +2074,18 @@ describe('routing driver', () => { return getConnectionPool(driver).has(address); } + function hasRouterInRoutingTable(driver, expectedRouter) { + return getRoutingTable(driver).routers.indexOf(expectedRouter) > -1; + } + + function hasReaderInRoutingTable(driver, expectedReader) { + return getRoutingTable(driver).readers.indexOf(expectedReader) > -1; + } + + function hasWriterInRoutingTable(driver, expectedWriter) { + return getRoutingTable(driver).writers.indexOf(expectedWriter) > -1; + } + function assertHasRouters(driver, expectedRouters) { expect(getRoutingTable(driver).routers).toEqual(expectedRouters); }