Skip to content

Introduce Driver.getServerInfo method #878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
version => version >= BOLT_PROTOCOL_V4_4
)
}

async verifyConnectivityAndGetServerInfo () {
return await this._verifyConnectivityAndGetServerVersion({ address: this._address })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createChannelConnection, ConnectionErrorHandler } from '../connection'
import Pool, { PoolConfig } from '../pool'
import { error, ConnectionProvider } from 'neo4j-driver-core'
import { error, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'

const { SERVICE_UNAVAILABLE } = error
export default class PooledConnectionProvider extends ConnectionProvider {
Expand Down Expand Up @@ -109,6 +109,23 @@ export default class PooledConnectionProvider extends ConnectionProvider {
return conn.close()
}

/**
* Acquire a connection from the pool and return it ServerInfo
* @param {object} param
* @param {string} param.address the server address
* @return {Promise<ServerInfo>} the server info
*/
async _verifyConnectivityAndGetServerVersion ({ address }) {
const connection = await this._connectionPool.acquire(address)
const serverInfo = new ServerInfo(connection.server, connection.protocol().version)
try {
await connection.resetAndFlush()
} finally {
await connection._release()
}
return serverInfo
}

async close () {
// purge all idle connections in the connection pool
await this._connectionPool.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,24 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return await this._hasProtocolVersion(
version => version >= BOLT_PROTOCOL_V4_4
)
}
}

async verifyConnectivityAndGetServerInfo ({ database, accessMode }) {
const context = { database: database || DEFAULT_DB_NAME }

const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
}
})

const servers = accessMode === WRITE ? routingTable.writers : routingTable.readers

return Promise.all(servers.map(address => this._verifyConnectivityAndGetServerVersion({ address })))
.then(([serverInfo]) => serverInfo)
}

forget (address, database) {
this._routingTableRegistry.apply(database, {
Expand Down
2 changes: 1 addition & 1 deletion packages/bolt-connection/src/pool/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Pool {
/**
* Acquire and idle resource fom the pool or create a new one.
* @param {ServerAddress} address the address for which we're acquiring.
* @return {Object} resource that is ready to use.
* @return {Promise<Object>} resource that is ready to use.
*/
acquire (address) {
const key = address.asKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import DirectConnectionProvider from '../../src/connection-provider/connection-provider-direct'
import { Pool } from '../../src/pool'
import { Connection, DelegateConnection } from '../../src/connection'
import { internal, newError } from 'neo4j-driver-core'
import { internal, newError, ServerInfo } from 'neo4j-driver-core'

const {
serverAddress: { ServerAddress },
Expand Down Expand Up @@ -139,6 +139,180 @@ it('should not change error when TokenExpired happens', async () => {
expect(error).toBe(expectedError)
})

describe('.verifyConnectivityAndGetServerInfo()', () => {
describe('when connection is available in the pool', () => {
it('should return the server info', async () => {
const { connectionProvider, server, protocolVersion } = setup()

const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
})

it('should reset and flush the connection', async () => {
const { connectionProvider, resetAndFlush } = setup()

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(resetAndFlush).toBeCalledTimes(1)
})

it('should release the connection', async () => {
const { connectionProvider, seenConnections } = setup()

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
})

it('should resetAndFlush and then release the connection', async () => {
const { connectionProvider, seenConnections, resetAndFlush } = setup()

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(seenConnections[0]._release.mock.invocationCallOrder[0])
.toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0])
})

describe('when reset and flush fails', () => {
it('should fails with the reset and flush error', async () => {
const error = newError('Error')
const { connectionProvider, resetAndFlush } = setup()

resetAndFlush.mockRejectedValue(error)

try {
await connectionProvider.verifyConnectivityAndGetServerInfo()
expect().toBe('Not reached')
} catch (e) {
expect(e).toBe(error)
}
})

it('should release the connection', async () => {
const error = newError('Error')
const { connectionProvider, resetAndFlush, seenConnections } = setup()

resetAndFlush.mockRejectedValue(error)

try {
await connectionProvider.verifyConnectivityAndGetServerInfo()
} catch (e) {
} finally {
expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
}
})

describe('and release fails', () => {
it('should fails with the release error', async () => {
const error = newError('Error')
const releaseError = newError('release errror')

const { connectionProvider, resetAndFlush } = setup(
{
releaseMock: () => Promise.reject(releaseError)
})

resetAndFlush.mockRejectedValue(error)

try {
await connectionProvider.verifyConnectivityAndGetServerInfo()
expect().toBe('Not reached')
} catch (e) {
expect(e).toBe(releaseError)
}
})
})
})

describe('when release fails', () => {
it('should fails with the release error', async () => {
const error = newError('Error')

const { connectionProvider } = setup(
{
releaseMock: () => Promise.reject(error)
})

try {
await connectionProvider.verifyConnectivityAndGetServerInfo()
expect().toBe('Not reached')
} catch (e) {
expect(e).toBe(error)
}
})
})

function setup({ releaseMock } = {}) {
const protocolVersion = 4.4
const resetAndFlush = jest.fn(() => Promise.resolve())
const server = { address: 'localhost:123', version: 'neo4j/1234' }
const seenConnections = []
const create = (address, release) => {
const connection = new FakeConnection(address, release, server)
connection.protocol = () => {
return { version: protocolVersion }
}
connection.resetAndFlush = resetAndFlush
if (releaseMock) {
connection._release = releaseMock
}
seenConnections.push(connection)
return connection
}
const address = ServerAddress.fromUrl('localhost:123')
const pool = newPool({ create })
const connectionProvider = newDirectConnectionProvider(address, pool)
return {
connectionProvider,
server,
protocolVersion,
resetAndFlush,
seenConnections
}
}
})

describe('when connection is not available in the pool', () => {
it('should reject with acquisition timeout error', async () => {
const address = ServerAddress.fromUrl('localhost:123')
const pool = newPool({
config: {
acquisitionTimeout: 0,
}
})

const connectionProvider = newDirectConnectionProvider(address, pool)

try {
connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo()
expect().toBe('not reached')
} catch (e) {
expect(e).toBeDefined()
}
})
})

describe('when connection it could not create the connection', () => {
it('should reject with connection creation error', async () => {
const error = new Error('Connection creation error')
const address = ServerAddress.fromUrl('localhost:123')
const pool = newPool({
create: () => { throw error }
})

const connectionProvider = newDirectConnectionProvider(address, pool)

try {
connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo()
expect().toBe('not reached')
} catch (e) {
expect(e).toBe(error)
}
})
})
})

function newDirectConnectionProvider (address, pool) {
const connectionProvider = new DirectConnectionProvider({
id: 0,
Expand All @@ -150,22 +324,34 @@ function newDirectConnectionProvider (address, pool) {
return connectionProvider
}

function newPool () {
function newPool({ create, config } = {}) {
const _create = (address, release) => {
if (create) {
return create(address, release)
}
return new FakeConnection(address, release)
}
return new Pool({
config,
create: (address, release) =>
Promise.resolve(new FakeConnection(address, release))
Promise.resolve(_create(address, release)),
})
}

class FakeConnection extends Connection {
constructor (address, release) {
constructor(address, release, server) {
super(null)

this._address = address
this.release = release
this._release = jest.fn(() => release(address, this))
this._server = server
}

get address () {
get address() {
return this._address
}

get server() {
return this._server
}
}
Loading