Skip to content

Commit 3c42269

Browse files
authored
Merge pull request #401 from lutovich/1.7-eager-conn-init
Improve ordering of connection initialization
2 parents b2e62e6 + 986b4b0 commit 3c42269

27 files changed

+918
-844
lines changed

src/v1/driver.js

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919

2020
import Session from './session';
2121
import Pool from './internal/pool';
22-
import {connect} from './internal/connector';
22+
import Connection from './internal/connection';
2323
import StreamObserver from './internal/stream-observer';
2424
import {newError, SERVICE_UNAVAILABLE} from './error';
2525
import {DirectConnectionProvider} from './internal/connection-providers';
2626
import Bookmark from './internal/bookmark';
2727
import ConnectivityVerifier from './internal/connectivity-verifier';
2828
import PoolConfig, {DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE} from './internal/pool-config';
2929
import Logger from './internal/logger';
30+
import ConnectionErrorHandler from './internal/connection-error-handler';
3031

3132
const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000; // 1 hour
3233

@@ -62,18 +63,18 @@ class Driver {
6263
* @constructor
6364
* @param {string} hostPort
6465
* @param {string} userAgent
65-
* @param {object} token
66+
* @param {object} authToken
6667
* @param {object} config
6768
* @protected
6869
*/
69-
constructor(hostPort, userAgent, token = {}, config = {}) {
70+
constructor(hostPort, userAgent, authToken = {}, config = {}) {
7071
sanitizeConfig(config);
7172

7273
this._id = idGenerator++;
7374
this._hostPort = hostPort;
7475
this._userAgent = userAgent;
7576
this._openConnections = {};
76-
this._token = token;
77+
this._authToken = authToken;
7778
this._config = config;
7879
this._log = Logger.create(config);
7980
this._pool = new Pool(
@@ -127,18 +128,24 @@ class Driver {
127128
}
128129

129130
/**
130-
* Create a new connection instance.
131-
* @return {Connection} new connector-api session instance, a low level session API.
131+
* Create a new connection and initialize it.
132+
* @return {Promise<Connection>} promise resolved with a new connection or rejected when failed to connect.
132133
* @access private
133134
*/
134135
_createConnection(hostPort, release) {
135-
const conn = connect(hostPort, this._config, this._connectionErrorCode(), this._log);
136-
const streamObserver = new _ConnectionStreamObserver(this, conn);
137-
conn.protocol().initialize(this._userAgent, this._token, streamObserver);
138-
conn._release = () => release(hostPort, conn);
139-
140-
this._openConnections[conn.id] = conn;
141-
return conn;
136+
const connection = Connection.create(hostPort, this._config, this._createConnectionErrorHandler(), this._log);
137+
connection._release = () => release(hostPort, connection);
138+
this._openConnections[connection.id] = connection;
139+
140+
return connection.connect(this._userAgent, this._authToken)
141+
.catch(error => {
142+
if (this.onError) {
143+
// notify Driver.onError callback about connection initialization errors
144+
this.onError(error);
145+
}
146+
// propagate the error because connection failed to connect / initialize
147+
throw error;
148+
});
142149
}
143150

144151
/**
@@ -186,7 +193,7 @@ class Driver {
186193
const sessionMode = Driver._validateSessionMode(mode);
187194
const connectionProvider = this._getOrCreateConnectionProvider();
188195
const bookmark = new Bookmark(bookmarkOrBookmarks);
189-
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
196+
return new Session(sessionMode, connectionProvider, bookmark, this._config);
190197
}
191198

192199
static _validateSessionMode(rawMode) {
@@ -203,14 +210,8 @@ class Driver {
203210
}
204211

205212
// Extension point
206-
_createSession(mode, connectionProvider, bookmark, config) {
207-
return new Session(mode, connectionProvider, bookmark, config);
208-
}
209-
210-
// Extension point
211-
_connectionErrorCode() {
212-
// connection errors might result in different error codes depending on the driver
213-
return SERVICE_UNAVAILABLE;
213+
_createConnectionErrorHandler() {
214+
return new ConnectionErrorHandler(SERVICE_UNAVAILABLE);
214215
}
215216

216217
_getOrCreateConnectionProvider() {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
21+
22+
export default class ConnectionErrorHandler {
23+
24+
constructor(errorCode, handleUnavailability, handleWriteFailure) {
25+
this._errorCode = errorCode;
26+
this._handleUnavailability = handleUnavailability || noOpHandler;
27+
this._handleWriteFailure = handleWriteFailure || noOpHandler;
28+
}
29+
30+
/**
31+
* Error code to use for network errors.
32+
* @return {string} the error code.
33+
*/
34+
errorCode() {
35+
return this._errorCode;
36+
}
37+
38+
/**
39+
* Handle and transform the error.
40+
* @param {Neo4jError} error the original error.
41+
* @param {string} hostPort the host and port of the connection where the error happened.
42+
* @return {Neo4jError} new error that should be propagated to the user.
43+
*/
44+
handleAndTransformError(error, hostPort) {
45+
if (isAvailabilityError(error)) {
46+
return this._handleUnavailability(error, hostPort);
47+
}
48+
if (isFailureToWrite(error)) {
49+
return this._handleWriteFailure(error, hostPort);
50+
}
51+
return error;
52+
}
53+
}
54+
55+
function isAvailabilityError(error) {
56+
if (error) {
57+
return error.code === SESSION_EXPIRED ||
58+
error.code === SERVICE_UNAVAILABLE ||
59+
error.code === 'Neo.TransientError.General.DatabaseUnavailable';
60+
}
61+
return false;
62+
}
63+
64+
function isFailureToWrite(error) {
65+
if (error) {
66+
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
67+
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
68+
}
69+
return false;
70+
}
71+
72+
function noOpHandler(error) {
73+
return error;
74+
}

src/v1/internal/connection-holder.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export default class ConnectionHolder {
5555
getConnection(streamObserver) {
5656
return this._connectionPromise.then(connection => {
5757
streamObserver.resolveConnection(connection);
58-
return connection.initializationCompleted();
58+
return connection;
5959
});
6060
}
6161

src/v1/internal/connection-providers.js

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import hasFeature from './features';
2626
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
2727
import RoutingUtil from './routing-util';
2828

29+
const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized';
30+
2931
class ConnectionProvider {
3032

3133
acquireConnection(mode) {
@@ -195,20 +197,32 @@ export class LoadBalancer extends ConnectionProvider {
195197

196198
// try next router
197199
return this._createSessionForRediscovery(currentRouter).then(session => {
198-
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter)
200+
if (session) {
201+
return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter);
202+
} else {
203+
// unable to acquire connection and create session towards the current router
204+
// return null to signal that the next router should be tried
205+
return null;
206+
}
199207
});
200208
});
201209
}, Promise.resolve(null));
202210
}
203211

204212
_createSessionForRediscovery(routerAddress) {
205-
return this._connectionPool.acquire(routerAddress).then(connection => {
206-
// initialized connection is required for routing procedure call
207-
// server version needs to be known to decide which routing procedure to use
208-
const initializedConnectionPromise = connection.initializationCompleted();
209-
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
210-
return new Session(READ, connectionProvider);
211-
});
213+
return this._connectionPool.acquire(routerAddress)
214+
.then(connection => {
215+
const connectionProvider = new SingleConnectionProvider(connection);
216+
return new Session(READ, connectionProvider);
217+
})
218+
.catch(error => {
219+
// unable to acquire connection towards the given router
220+
if (error && error.code === UNAUTHORIZED_ERROR_CODE) {
221+
// auth error is a sign of a configuration issue, rediscovery should not proceed
222+
throw error;
223+
}
224+
return null;
225+
});
212226
}
213227

214228
_applyRoutingTableIfPossible(newRoutingTable) {
@@ -257,14 +271,14 @@ export class LoadBalancer extends ConnectionProvider {
257271

258272
export class SingleConnectionProvider extends ConnectionProvider {
259273

260-
constructor(connectionPromise) {
274+
constructor(connection) {
261275
super();
262-
this._connectionPromise = connectionPromise;
276+
this._connection = connection;
263277
}
264278

265279
acquireConnection(mode) {
266-
const connectionPromise = this._connectionPromise;
267-
this._connectionPromise = null;
268-
return connectionPromise;
280+
const connection = this._connection;
281+
this._connection = null;
282+
return Promise.resolve(connection);
269283
}
270284
}

0 commit comments

Comments
 (0)