Skip to content

Introduce connectionLivenessCheckTimeout configuration #1162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 4, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Pool, { PoolConfig } from '../pool'
import { error, ConnectionProvider, ServerInfo, newError } from 'neo4j-driver-core'
import AuthenticationProvider from './authentication-provider'
import { object } from '../lang'
import LivenessCheckProvider from './liveness-check-provider'

const { SERVICE_UNAVAILABLE } = error
const AUTHENTICATION_ERRORS = [
Expand All @@ -40,6 +41,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
this._config = config
this._log = log
this._authenticationProvider = new AuthenticationProvider({ authTokenManager, userAgent, boltAgent })
this._livenessCheckProvider = new LivenessCheckProvider({ connectionLivenessCheckTimeout: config.connectionLivenessCheckTimeout })
this._userAgent = userAgent
this._boltAgent = boltAgent
this._createChannelConnection =
Expand Down Expand Up @@ -81,6 +83,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
_createConnection ({ auth }, address, release) {
return this._createChannelConnection(address).then(connection => {
connection.release = () => {
connection.idleTimestamp = Date.now()
return release(address, connection)
}
this._openConnections[connection.id] = connection
Expand All @@ -100,6 +103,15 @@ export default class PooledConnectionProvider extends ConnectionProvider {
return false
}

try {
await this._livenessCheckProvider.check(conn)
} catch (error) {
this._log.debug(
`The connection ${conn.id} is not alive because of an error ${error.code} '${error.message}'`
)
return false
}

try {
await this._authenticationProvider.authenticate({ connection: conn, auth, skipReAuth })
return true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export default class LivenessCheckProvider {
constructor ({ connectionLivenessCheckTimeout }) {
this._connectionLivenessCheckTimeout = connectionLivenessCheckTimeout
}

/**
* Checks connection liveness with configured params.
*
* @param {Connection} connection
* @returns {Promise<true>} If liveness checks succeed, throws otherwise
*/
async check (connection) {
if (this._isCheckDisabled || this._isNewlyCreatedConnection(connection)) {
return true
}

const idleFor = Date.now() - connection.idleTimestamp

if (this._connectionLivenessCheckTimeout === 0 ||
idleFor > this._connectionLivenessCheckTimeout) {
return await connection.resetAndFlush()
.then(() => true)
}

return true
}

get _isCheckDisabled () {
return this._connectionLivenessCheckTimeout == null || this._connectionLivenessCheckTimeout < 0
}

_isNewlyCreatedConnection (connection) {
return connection.authToken == null
}
}
14 changes: 13 additions & 1 deletion packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export default class ChannelConnection extends Connection {
this._id = idGenerator++
this._address = address
this._server = { address: address.asHostPort() }
this.creationTimestamp = Date.now()
this._creationTimestamp = Date.now()
this._disableLosslessIntegers = disableLosslessIntegers
this._ch = channel
this._chunker = chunker
Expand Down Expand Up @@ -220,6 +220,18 @@ export default class ChannelConnection extends Connection {
this._dbConnectionId = value
}

set idleTimestamp (value) {
this._idleTimestamp = value
}

get idleTimestamp () {
return this._idleTimestamp
}

get creationTimestamp () {
return this._creationTimestamp
}

/**
* Send initialization message.
* @param {string} userAgent the user agent for this driver.
Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/connection/connection-delegate.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ export default class DelegateConnection extends Connection {
this._delegate.version = value
}

get creationTimestamp () {
return this._delegate.creationTimestamp
}

set idleTimestamp (value) {
this._delegate.idleTimestamp = value
}

get idleTimestamp () {
return this._delegate.idleTimestamp
}

isOpen () {
return this._delegate.isOpen()
}
Expand Down
12 changes: 12 additions & 0 deletions packages/bolt-connection/src/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export default class Connection extends CoreConnection {
throw new Error('not implemented')
}

get creationTimestamp () {
throw new Error('not implemented')
}

set idleTimestamp (value) {
throw new Error('not implemented')
}

get idleTimestamp () {
throw new Error('not implemented')
}

/**
* @returns {BoltProtocol} the underlying bolt protocol assigned to this connection
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Connection, DelegateConnection } from '../../src/connection'
import { authTokenManagers, internal, newError, ServerInfo, staticAuthTokenManager } from 'neo4j-driver-core'
import AuthenticationProvider from '../../src/connection-provider/authentication-provider'
import { functional } from '../../src/lang'
import LivenessCheckProvider from '../../src/connection-provider/liveness-check-provider'

const {
serverAddress: { ServerAddress },
Expand Down Expand Up @@ -281,16 +282,23 @@ describe('constructor', () => {
})

it('should register the release function into the connection', async () => {
const { create } = setup()
const releaseResult = { property: 'some property' }
const release = jest.fn(() => releaseResult)
jest.useFakeTimers()
try {
const { create } = setup()
const releaseResult = { property: 'some property' }
const release = jest.fn(() => releaseResult)

const connection = await create({}, server0, release)
const connection = await create({}, server0, release)
connection.idleTimestamp = -1234

const released = connection.release()
const released = connection.release()

expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
expect(connection.idleTimestamp).toBeCloseTo(Date.now())
} finally {
jest.useRealTimers()
}
})

it.each([
Expand Down Expand Up @@ -361,26 +369,27 @@ describe('constructor', () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now()

const { validateOnAcquire, authenticationProviderHook } = setup()
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup()

await expect(validateOnAcquire({ auth }, connection)).resolves.toBe(true)

expect(authenticationProviderHook.authenticate).toHaveBeenCalledWith({
connection, auth
})
expect(livenessCheckProviderHook.check).toHaveBeenCalledWith(connection)
})

it.each([
null,
undefined,
{ scheme: 'bearer', credentials: 'token01' }
])('should return true when connection is open and within the lifetime and authentication fails (auth=%o)', async (auth) => {
])('should return false when connection is open and within the lifetime and authentication fails (auth=%o)', async (auth) => {
const connection = new FakeConnection(server0)
const error = newError('failed')
const authenticationProvider = jest.fn(() => Promise.reject(error))
connection.creationTimestamp = Date.now()

const { validateOnAcquire, authenticationProviderHook, log } = setup({ authenticationProvider })
const { validateOnAcquire, authenticationProviderHook, log, livenessCheckProviderHook } = setup({ authenticationProvider })

await expect(validateOnAcquire({ auth }, connection)).resolves.toBe(false)

Expand All @@ -391,6 +400,7 @@ describe('constructor', () => {
expect(log.debug).toHaveBeenCalledWith(
`The connection ${connection.id} is not valid because of an error ${error.code} '${error.message}'`
)
expect(livenessCheckProviderHook.check).toHaveBeenCalledWith(connection)
})

it.each([
Expand All @@ -401,45 +411,67 @@ describe('constructor', () => {
const auth = {}
connection.creationTimestamp = Date.now()

const { validateOnAcquire, authenticationProviderHook } = setup()
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup()

await expect(validateOnAcquire({ auth, skipReAuth }, connection)).resolves.toBe(true)

expect(authenticationProviderHook.authenticate).toHaveBeenCalledWith({
connection, auth, skipReAuth
})
expect(livenessCheckProviderHook.check).toHaveBeenCalledWith(connection)
})

it('should return false when liveness checks fails', async () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now()
const error = newError('#themessage', '#thecode')

const { validateOnAcquire, authenticationProviderHook, log, livenessCheckProviderHook } = setup({
livenessCheckProvider: () => Promise.reject(error)
})

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)

expect(livenessCheckProviderHook.check).toBeCalledWith(connection)
expect(log.debug).toBeCalledWith(
`The connection ${connection.id} is not alive because of an error ${error.code} '${error.message}'`
)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
})

it('should return false when connection is closed and within the lifetime', async () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now()
await connection.close()

const { validateOnAcquire, authenticationProviderHook } = setup()
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup()

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
expect(livenessCheckProviderHook.check).not.toHaveBeenCalled()
})

it('should return false when connection is open and out of the lifetime', async () => {
const connection = new FakeConnection(server0)
connection.creationTimestamp = Date.now() - 4000

const { validateOnAcquire, authenticationProviderHook } = setup({ maxConnectionLifetime: 3000 })
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup({ maxConnectionLifetime: 3000 })

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
expect(livenessCheckProviderHook.check).not.toHaveBeenCalled()
})

it('should return false when connection is closed and out of the lifetime', async () => {
const connection = new FakeConnection(server0)
await connection.close()
connection.creationTimestamp = Date.now() - 4000

const { validateOnAcquire, authenticationProviderHook } = setup({ maxConnectionLifetime: 3000 })
const { validateOnAcquire, authenticationProviderHook, livenessCheckProviderHook } = setup({ maxConnectionLifetime: 3000 })

await expect(validateOnAcquire({}, connection)).resolves.toBe(false)
expect(authenticationProviderHook.authenticate).not.toHaveBeenCalled()
expect(livenessCheckProviderHook.check).not.toHaveBeenCalled()
})
})

Expand Down Expand Up @@ -492,14 +524,17 @@ describe('constructor', () => {
})
})

function setup ({ createConnection, authenticationProvider, maxConnectionLifetime } = {}) {
function setup ({ createConnection, authenticationProvider, maxConnectionLifetime, livenessCheckProvider } = {}) {
const newPool = jest.fn((...args) => new Pool(...args))
const log = new Logger('debug', () => undefined)
jest.spyOn(log, 'debug')
const createChannelConnectionHook = createConnection || jest.fn(async (address) => new FakeConnection(address))
const authenticationProviderHook = new AuthenticationProvider({ })
const livenessCheckProviderHook = new LivenessCheckProvider({})
jest.spyOn(authenticationProviderHook, 'authenticate')
.mockImplementation(authenticationProvider || jest.fn(({ connection }) => Promise.resolve(connection)))
jest.spyOn(livenessCheckProviderHook, 'check')
.mockImplementation(livenessCheckProvider || jest.fn(() => Promise.resolve(true)))
const provider = new DirectConnectionProvider({
newPool,
config: {
Expand All @@ -510,11 +545,13 @@ describe('constructor', () => {
})
provider._createChannelConnection = createChannelConnectionHook
provider._authenticationProvider = authenticationProviderHook
provider._livenessCheckProvider = livenessCheckProviderHook
return {
provider,
...newPool.mock.calls[0][0],
createChannelConnectionHook,
authenticationProviderHook,
livenessCheckProviderHook,
log
}
}
Expand Down Expand Up @@ -812,6 +849,22 @@ class FakeConnection extends Connection {
return this._supportsReAuth
}

set creationTimestamp (value) {
this._creationTimestamp = value
}

get creationTimestamp () {
return this._creationTimestamp
}

set idleTimestamp (value) {
this._idleTimestamp = value
}

get idleTimestamp () {
return this._idleTimestamp
}

async close () {
this._closed = true
}
Expand Down
Loading