Skip to content

Commit 4b74096

Browse files
committed
Forget CC members on DatabaseUnavailable
This commit makes routing driver forget Causal Cluster members when they fail with `Neo.TransientError.General.DatabaseUnavailable` error code. It means database is either shutting down or doing store copy. Generally we should not hammer database in such state with new requests and give it some time to either recover (in case of store copy) or just stop using it. Also restructured a bit error handling code in `RoutingDriver` and `RoutingSession`.
1 parent e42c360 commit 4b74096

File tree

3 files changed

+122
-41
lines changed

3 files changed

+122
-41
lines changed

src/v1/routing-driver.js

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,29 @@ import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './inte
3131
class RoutingDriver extends Driver {
3232

3333
constructor(url, routingContext, userAgent, token = {}, config = {}) {
34-
super(url, userAgent, token, RoutingDriver._validateConfig(config));
34+
super(url, userAgent, token, validateConfig(config));
3535
this._routingContext = routingContext;
3636
}
3737

3838
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
39-
const loadBalancingStrategy = RoutingDriver._createLoadBalancingStrategy(this._config, connectionPool);
39+
const loadBalancingStrategy = createLoadBalancingStrategy(this._config, connectionPool);
4040
return new LoadBalancer(address, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback);
4141
}
4242

4343
_createSession(mode, connectionProvider, bookmark, config) {
4444
return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => {
45-
if (error.code === SESSION_EXPIRED) {
46-
this._forgetConnection(conn);
45+
if (!conn) {
46+
// connection can be undefined if error happened before connection was acquired
4747
return error;
48-
} else if (RoutingDriver._isFailureToWrite(error)) {
49-
let url = 'UNKNOWN';
50-
// connection is undefined if error happened before connection was acquired
51-
if (conn) {
52-
url = conn.url;
53-
this._connectionProvider.forgetWriter(conn.url);
54-
}
48+
}
49+
50+
const url = conn.url;
51+
52+
if (error.code === SESSION_EXPIRED || isDatabaseUnavailable(error)) {
53+
this._connectionProvider.forget(url);
54+
return error;
55+
} else if (isFailureToWrite(error)) {
56+
this._connectionProvider.forgetWriter(url);
5557
return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED);
5658
} else {
5759
return error;
@@ -64,44 +66,54 @@ class RoutingDriver extends Driver {
6466
// result in SESSION_EXPIRED because there might still exist other servers capable of serving the request
6567
return SESSION_EXPIRED;
6668
}
69+
}
6770

68-
_forgetConnection(connection) {
69-
// connection is undefined if error happened before connection was acquired
70-
if (connection) {
71-
this._connectionProvider.forget(connection.url);
72-
}
71+
/**
72+
* @private
73+
*/
74+
function validateConfig(config) {
75+
if (config.trust === 'TRUST_ON_FIRST_USE') {
76+
throw newError('The chosen trust mode is not compatible with a routing driver');
7377
}
78+
return config;
79+
}
7480

75-
static _validateConfig(config) {
76-
if(config.trust === 'TRUST_ON_FIRST_USE') {
77-
throw newError('The chosen trust mode is not compatible with a routing driver');
78-
}
79-
return config;
80-
}
81+
/**
82+
* @private
83+
*/
84+
function isFailureToWrite(error) {
85+
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
86+
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
87+
}
8188

82-
static _isFailureToWrite(error) {
83-
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
84-
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
85-
}
89+
/**
90+
* @private
91+
*/
92+
function isDatabaseUnavailable(error) {
93+
return error.code === 'Neo.TransientError.General.DatabaseUnavailable';
94+
}
8695

87-
/**
88-
* Create new load balancing strategy based on the config.
89-
* @param {object} config the user provided config.
90-
* @param {Pool} connectionPool the connection pool for this driver.
91-
* @return {LoadBalancingStrategy} new strategy.
92-
*/
93-
static _createLoadBalancingStrategy(config, connectionPool) {
94-
const configuredValue = config.loadBalancingStrategy;
95-
if (!configuredValue || configuredValue === LEAST_CONNECTED_STRATEGY_NAME) {
96-
return new LeastConnectedLoadBalancingStrategy(connectionPool);
97-
} else if (configuredValue === ROUND_ROBIN_STRATEGY_NAME) {
98-
return new RoundRobinLoadBalancingStrategy();
99-
} else {
100-
throw newError('Unknown load balancing strategy: ' + configuredValue);
101-
}
96+
/**
97+
* Create new load balancing strategy based on the config.
98+
* @param {object} config the user provided config.
99+
* @param {Pool} connectionPool the connection pool for this driver.
100+
* @return {LoadBalancingStrategy} new strategy.
101+
* @private
102+
*/
103+
function createLoadBalancingStrategy(config, connectionPool) {
104+
const configuredValue = config.loadBalancingStrategy;
105+
if (!configuredValue || configuredValue === LEAST_CONNECTED_STRATEGY_NAME) {
106+
return new LeastConnectedLoadBalancingStrategy(connectionPool);
107+
} else if (configuredValue === ROUND_ROBIN_STRATEGY_NAME) {
108+
return new RoundRobinLoadBalancingStrategy();
109+
} else {
110+
throw newError('Unknown load balancing strategy: ' + configuredValue);
102111
}
103112
}
104113

114+
/**
115+
* @private
116+
*/
105117
class RoutingSession extends Session {
106118
constructor(mode, connectionProvider, bookmark, config, onFailedConnection) {
107119
super(mode, connectionProvider, bookmark, config);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO ACK_FAILURE
5+
!: AUTO RUN "ROLLBACK" {}
6+
!: AUTO RUN "BEGIN" {}
7+
!: AUTO RUN "COMMIT" {}
8+
9+
C: RUN "{{{query}}}" {}
10+
PULL_ALL
11+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database is busy doing store copy"}

test/v1/routing.driver.boltkit.it.js

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,6 +1896,52 @@ describe('routing driver', () => {
18961896
});
18971897
});
18981898

1899+
it('should forget writer on database unavailable error', done => {
1900+
testAddressPurgeOnDatabaseError(`CREATE (n {name:'Bob'})`, WRITE, done);
1901+
});
1902+
1903+
it('should forget reader on database unavailable error', done => {
1904+
testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done);
1905+
});
1906+
1907+
function testAddressPurgeOnDatabaseError(query, accessMode, done) {
1908+
const kit = new boltkit.BoltKit();
1909+
1910+
const router = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9010);
1911+
1912+
const serverPort = accessMode === READ ? 9005 : 9007;
1913+
const serverAddress = '127.0.0.1:' + serverPort;
1914+
const serverTemplateScript = './test/resources/boltkit/address_unavailable_template.script.mst';
1915+
const server = kit.startWithTemplate(serverTemplateScript, {query: query}, serverPort);
1916+
1917+
kit.run(() => {
1918+
const driver = newDriver('bolt+routing://127.0.0.1:9010');
1919+
1920+
const session = driver.session(accessMode);
1921+
session.run(query).catch(error => {
1922+
expect(error.message).toEqual('Database is busy doing store copy');
1923+
expect(error.code).toEqual('Neo.TransientError.General.DatabaseUnavailable');
1924+
1925+
expect(hasAddressInConnectionPool(driver, serverAddress)).toBeFalsy();
1926+
expect(hasRouterInRoutingTable(driver, serverAddress)).toBeFalsy();
1927+
expect(hasReaderInRoutingTable(driver, serverAddress)).toBeFalsy();
1928+
expect(hasWriterInRoutingTable(driver, serverAddress)).toBeFalsy();
1929+
1930+
session.close(() => {
1931+
driver.close();
1932+
1933+
router.exit(code1 => {
1934+
server.exit(code2 => {
1935+
expect(code1).toEqual(0);
1936+
expect(code2).toEqual(0);
1937+
done();
1938+
});
1939+
});
1940+
});
1941+
});
1942+
});
1943+
}
1944+
18991945
function moveNextDateNow30SecondsForward() {
19001946
const currentTime = Date.now();
19011947
hijackNextDateNowCall(currentTime + 30 * 1000 + 1);
@@ -2028,6 +2074,18 @@ describe('routing driver', () => {
20282074
return getConnectionPool(driver).has(address);
20292075
}
20302076

2077+
function hasRouterInRoutingTable(driver, expectedRouter) {
2078+
return getRoutingTable(driver).routers.indexOf(expectedRouter) > -1;
2079+
}
2080+
2081+
function hasReaderInRoutingTable(driver, expectedReader) {
2082+
return getRoutingTable(driver).readers.indexOf(expectedReader) > -1;
2083+
}
2084+
2085+
function hasWriterInRoutingTable(driver, expectedWriter) {
2086+
return getRoutingTable(driver).writers.indexOf(expectedWriter) > -1;
2087+
}
2088+
20312089
function assertHasRouters(driver, expectedRouters) {
20322090
expect(getRoutingTable(driver).routers).toEqual(expectedRouters);
20332091
}

0 commit comments

Comments
 (0)