Skip to content

Commit cfa7881

Browse files
committed
Pool: separate validation on acquire and on release and add the acquisition context to create and validate
1 parent 4869203 commit cfa7881

File tree

18 files changed

+420
-304
lines changed

18 files changed

+420
-304
lines changed

packages/bolt-connection/src/connection-provider/authentication-provider.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* limitations under the License.
1818
*/
1919

20+
import { object } from '../lang'
21+
2022
/**
2123
* Class which provides Authorization for {@link Connection}
2224
*/
@@ -28,7 +30,14 @@ export default class AuthenticationProvider {
2830
this._refreshObserver = undefined
2931
}
3032

31-
async authenticate ({ connection }) {
33+
async authenticate ({ connection, auth }) {
34+
if (auth != null) {
35+
if (connection.authToken == null || (connection.supportsReAuth && !object.equals(connection.authToken, auth))) {
36+
return await connection.connect(this._userAgent, auth)
37+
}
38+
return connection
39+
}
40+
3241
if (!this._authToken || this._isTokenExpired) {
3342
await this._getFreshAuthToken()
3443
}

packages/bolt-connection/src/connection-provider/connection-provider-direct.js

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
ConnectionErrorHandler
2525
} from '../connection'
2626
import { internal, error } from 'neo4j-driver-core'
27+
import { object } from '../lang'
2728

2829
const {
2930
constants: { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_4 }
@@ -49,15 +50,11 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
4950
this._handleAuthorizationExpired(error, address, conn, database)
5051
})
5152

52-
const connection = await this._connectionPool.acquire(this._address)
53+
const connection = await this._connectionPool.acquire({ auth }, this._address)
5354

54-
if (auth && auth !== connection.authToken) {
55-
if (connection.supportsReAuth) {
56-
await connection.connect(this._userAgent, auth)
57-
} else {
58-
await connection._release()
59-
return await this._createStickyConnection({ address: this._address, auth })
60-
}
55+
if (auth && !object.equals(auth, connection.authToken)) {
56+
await connection._release()
57+
return await this._createStickyConnection({ address: this._address, auth })
6158
}
6259

6360
return new DelegateConnection(connection, databaseSpecificErrorHandler)

packages/bolt-connection/src/connection-provider/connection-provider-pooled.js

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ export default class PooledConnectionProvider extends ConnectionProvider {
4747
this._connectionPool = new Pool({
4848
create: this._createConnection.bind(this),
4949
destroy: this._destroyConnection.bind(this),
50-
validate: this._validateConnection.bind(this),
50+
validateOnAcquire: this._validateConnectionOnAcquire.bind(this),
51+
validateOnRelease: this._validateConnectionOnRelease.bind(this),
5152
installIdleObserver: PooledConnectionProvider._installIdleObserverOnConnection.bind(
5253
this
5354
),
@@ -69,13 +70,13 @@ export default class PooledConnectionProvider extends ConnectionProvider {
6970
* @return {Promise<Connection>} promise resolved with a new connection or rejected when failed to connect.
7071
* @access private
7172
*/
72-
_createConnection (address, release) {
73+
_createConnection ({ auth }, address, release) {
7374
return this._createChannelConnection(address).then(connection => {
7475
connection._release = () => {
7576
return release(address, connection)
7677
}
7778
this._openConnections[connection.id] = connection
78-
return this._authenticationProvider.authenticate({ connection })
79+
return this._authenticationProvider.authenticate({ connection, auth })
7980
.catch(error => {
8081
// let's destroy this connection
8182
this._destroyConnection(connection)
@@ -85,12 +86,32 @@ export default class PooledConnectionProvider extends ConnectionProvider {
8586
})
8687
}
8788

89+
async _validateConnectionOnAcquire ({ auth }, conn) {
90+
if (!this._validateConnection(conn)) {
91+
return false
92+
}
93+
94+
try {
95+
await this._authenticationProvider.authenticate({ connection: conn, auth })
96+
return true
97+
} catch (error) {
98+
this._log.info(
99+
`The connection ${conn.id} is not valid because of an error ${error.code} '${error.message}'`
100+
)
101+
return false
102+
}
103+
}
104+
105+
_validateConnectionOnRelease (conn) {
106+
return this._validateConnection(conn)
107+
}
108+
88109
/**
89110
* Check that a connection is usable
90111
* @return {boolean} true if the connection is open
91112
* @access private
92113
**/
93-
async _validateConnection (conn) {
114+
_validateConnection (conn) {
94115
if (!conn.isOpen()) {
95116
return false
96117
}
@@ -101,16 +122,20 @@ export default class PooledConnectionProvider extends ConnectionProvider {
101122
return false
102123
}
103124

125+
return true
126+
}
127+
128+
async _createStickyConnection ({ address, auth }) {
129+
const connection = this._createChannelConnection(address)
130+
connection._release = () => this._destroyConnection(connection)
131+
this._openConnections[connection.id] = connection
132+
104133
try {
105-
await this._authenticationProvider.authenticate({ connection: conn })
134+
return await connection.connect(this._userAgent, auth)
106135
} catch (error) {
107-
this._log.info(
108-
`The connection ${conn.id} is not valid because of an error ${error.code} '${error.message}'`
109-
)
110-
return false
136+
await this._destroyConnection()
137+
throw error
111138
}
112-
113-
return true
114139
}
115140

116141
/**
@@ -130,7 +155,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
130155
* @return {Promise<ServerInfo>} the server info
131156
*/
132157
async _verifyConnectivityAndGetServerVersion ({ address }) {
133-
const connection = await this._connectionPool.acquire(address)
158+
const connection = await this._connectionPool.acquire({}, address)
134159
const serverInfo = new ServerInfo(connection.server, connection.protocol().version)
135160
try {
136161
if (!connection.protocol().isLastMessageLogin()) {

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 11 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
ConnectionErrorHandler,
2929
DelegateConnection
3030
} from '../connection'
31+
import { object } from '../lang'
3132

3233
const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error
3334
const {
@@ -186,19 +187,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
186187
}
187188

188189
try {
189-
const connection = await this._acquireConnectionToServer(
190-
address,
191-
name,
192-
routingTable
193-
)
190+
const connection = await this._connectionPool.acquire({ auth }, address)
194191

195192
if (auth && auth !== connection.authToken) {
196-
if (connection.supportsReAuth) {
197-
await connection.connect(this._userAgent, auth)
198-
} else {
199-
await connection._release()
200-
return await this._createStickyConnection({ address, auth })
201-
}
193+
await connection._release()
194+
return await this._createStickyConnection({ address, auth })
202195
}
203196

204197
return new DelegateConnection(connection, databaseSpecificErrorHandler)
@@ -316,10 +309,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
316309
})
317310
}
318311

319-
_acquireConnectionToServer (address, serverName, routingTable) {
320-
return this._connectionPool.acquire(address)
321-
}
322-
323312
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth } = {}) {
324313
const currentRoutingTable = this._routingTableRegistry.get(
325314
database,
@@ -544,33 +533,16 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
544533
)
545534
}
546535

547-
async _createStickyConnection ({ address, auth }) {
548-
const connection = await this._createChannelConnection(address)
549-
connection._release = () => this._destroyConnection(connection)
550-
this._openConnections[connection.id] = connection
551-
552-
try {
553-
return await connection.connect(this._userAgent, auth)
554-
} catch (error) {
555-
await this._destroyConnection()
556-
throw error
557-
}
558-
}
559-
560536
async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser, auth) {
561537
try {
562-
let connection = await this._connectionPool.acquire(routerAddress)
538+
let connection = await this._connectionPool.acquire({ auth }, routerAddress)
563539

564-
if (auth && connection.authToken !== auth) {
565-
if (connection.supportsReAuth) {
566-
await await connection.connect(this._userAgent, auth)
567-
} else {
568-
await connection._release()
569-
connection = await this._createStickyConnection({
570-
address: routerAddress,
571-
auth
572-
})
573-
}
540+
if (auth && object.equals(auth, connection.authToken)) {
541+
await connection._release()
542+
connection = await this._createStickyConnection({
543+
address: routerAddress,
544+
auth
545+
})
574546
}
575547

576548
const databaseSpecificErrorHandler = ConnectionErrorHandler.create({
@@ -767,7 +739,6 @@ function _isFailFastError (error) {
767739
}
768740

769741
function _isFailFastSecurityError (error) {
770-
console.error(error)
771742
return error.code.startsWith('Neo.ClientError.Security.') &&
772743
![
773744
AUTHORIZATION_EXPIRED_CODE

packages/bolt-connection/src/lang/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
*/
1919

2020
export * as functional from './functional'
21+
export * as object from './object'
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
export function equals (a, b) {
21+
if (a === b) {
22+
return true
23+
}
24+
25+
if (typeof a === 'object' && typeof b === 'object') {
26+
const keysA = Object.keys(a)
27+
const keysB = Object.keys(b)
28+
29+
if (keysA.length !== keysB.length) {
30+
return false
31+
}
32+
33+
for (const key of keysA) {
34+
if (a[key] !== b[key]) {
35+
return false
36+
}
37+
}
38+
39+
return true
40+
}
41+
42+
return false
43+
}

0 commit comments

Comments
 (0)