Skip to content

Commit 524d883

Browse files
authored
Fix verifyConnectivity/getServerInfo procedures (#888)
The previous implementation was checking if all the servers are available instead of check if at least one server is available. This should be done by iterate over the servers until it find one server available.
1 parent 28d874f commit 524d883

File tree

2 files changed

+156
-21
lines changed

2 files changed

+156
-21
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,21 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
251251
})
252252

253253
const servers = accessMode === WRITE ? routingTable.writers : routingTable.readers
254+
255+
let error = newError(
256+
`No servers available for database '${context.database}' with access mode '${accessMode}'`,
257+
SERVICE_UNAVAILABLE
258+
)
254259

255-
return Promise.all(servers.map(address => this._verifyConnectivityAndGetServerVersion({ address })))
256-
.then(([serverInfo]) => serverInfo)
260+
for (const address of servers) {
261+
try {
262+
const serverInfo = await this._verifyConnectivityAndGetServerVersion({ address })
263+
return serverInfo
264+
} catch (e) {
265+
error = e
266+
}
267+
}
268+
throw error
257269
}
258270

259271
forget (address, database) {

packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js

Lines changed: 142 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2493,24 +2493,24 @@ describe('#unit RoutingConnectionProvider', () => {
24932493
expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
24942494
})
24952495

2496-
it('should acquire, resetAndFlush and release connections for sever with the selected access mode', async () => {
2496+
it('should acquire, resetAndFlush and release connections for sever first', async () => {
24972497
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup()
24982498
const acquireSpy = jest.spyOn(pool, 'acquire')
24992499

25002500
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
25012501

25022502
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
2503-
for (const address of targetServers) {
2504-
expect(acquireSpy).toHaveBeenCalledWith(address)
2503+
const address = targetServers[0]
2504+
expect(acquireSpy).toHaveBeenCalledWith(address)
25052505

2506-
const connections = seenConnectionsPerAddress.get(address)
2506+
const connections = seenConnectionsPerAddress.get(address)
25072507

2508-
expect(connections.length).toBe(1)
2509-
expect(connections[0].resetAndFlush).toHaveBeenCalled()
2510-
expect(connections[0]._release).toHaveBeenCalled()
2511-
expect(connections[0]._release.mock.invocationCallOrder[0])
2512-
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
2513-
}
2508+
expect(connections.length).toBe(1)
2509+
expect(connections[0].resetAndFlush).toHaveBeenCalled()
2510+
expect(connections[0]._release).toHaveBeenCalled()
2511+
expect(connections[0]._release.mock.invocationCallOrder[0])
2512+
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
2513+
25142514
})
25152515

25162516
it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => {
@@ -2527,20 +2527,96 @@ describe('#unit RoutingConnectionProvider', () => {
25272527
})
25282528

25292529
describe('when the reset and flush fails for at least one the address', () => {
2530-
it('should fails with the reset and flush error', async () => {
2530+
it('should succeed with the server info ', async () => {
25312531
const error = newError('Error')
25322532
let i = 0
25332533
const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
2534-
const { connectionProvider } = setup({ resetAndFlush })
2534+
const { connectionProvider, server, protocolVersion } = setup({ resetAndFlush })
2535+
2536+
const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2537+
2538+
expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
2539+
})
2540+
2541+
it('should release the connection', async () => {
2542+
const error = newError('Error')
2543+
let i = 0
2544+
const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
2545+
const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup({ resetAndFlush })
25352546

25362547
try {
25372548
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2538-
expect().toBe('Not reached')
25392549
} catch (e) {
2550+
} finally {
2551+
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
2552+
for (const address of targetServers) {
2553+
const connections = seenConnectionsPerAddress.get(address)
2554+
2555+
expect(connections.length).toBe(1)
2556+
expect(connections[0].resetAndFlush).toHaveBeenCalled()
2557+
expect(connections[0]._release).toHaveBeenCalled()
2558+
}
2559+
}
2560+
})
2561+
2562+
describe('and the release fails', () => {
2563+
it('should fails with the release error', async () => {
2564+
const error = newError('Error')
2565+
const releaseError = newError('Release error')
2566+
let i = 0
2567+
const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
2568+
const releaseMock = jest.fn(() => Promise.reject(releaseError))
2569+
const { connectionProvider } = setup({ resetAndFlush, releaseMock })
2570+
2571+
try {
2572+
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2573+
expect().toBe('Not reached')
2574+
} catch (e) {
2575+
expect(e).toBe(releaseError)
2576+
}
2577+
})
2578+
})
2579+
})
2580+
2581+
describe('when the reset and flush fails for all addresses', () => {
2582+
it('should succeed with the server info ', async () => {
2583+
const error = newError('Error')
2584+
const resetAndFlush = jest.fn(() => Promise.reject(error))
2585+
const { connectionProvider } = setup({ resetAndFlush })
2586+
2587+
try {
2588+
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2589+
expect().toBe('Not reached')
2590+
} catch(e) {
25402591
expect(e).toBe(error)
25412592
}
25422593
})
25432594

2595+
it('should acquire, resetAndFlush and release connections for all servers', async () => {
2596+
const resetAndFlush = jest.fn(() => Promise.reject(error))
2597+
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ resetAndFlush })
2598+
const acquireSpy = jest.spyOn(pool, 'acquire')
2599+
2600+
try {
2601+
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2602+
} catch (e) {
2603+
// nothing to do
2604+
} finally {
2605+
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
2606+
for (const address of targetServers) {
2607+
expect(acquireSpy).toHaveBeenCalledWith(address)
2608+
2609+
const connections = seenConnectionsPerAddress.get(address)
2610+
2611+
expect(connections.length).toBe(1)
2612+
expect(connections[0].resetAndFlush).toHaveBeenCalled()
2613+
expect(connections[0]._release).toHaveBeenCalled()
2614+
expect(connections[0]._release.mock.invocationCallOrder[0])
2615+
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
2616+
}
2617+
}
2618+
})
2619+
25442620
it('should release the connection', async () => {
25452621
const error = newError('Error')
25462622
let i = 0
@@ -2583,16 +2659,28 @@ describe('#unit RoutingConnectionProvider', () => {
25832659
})
25842660

25852661
describe('when the release for at least one the address', () => {
2586-
it('should fails with the reset and flush error', async () => {
2662+
it('should succeed with the server info', async () => {
25872663
const error = newError('Error')
25882664
let i = 0
25892665
const releaseMock = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve())
2666+
const { connectionProvider, server, protocolVersion } = setup({ releaseMock })
2667+
2668+
const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2669+
2670+
expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion))
2671+
})
2672+
})
2673+
2674+
describe('when the release for all address', () => {
2675+
it('should fail with release error', async () => {
2676+
const error = newError('Error')
2677+
const releaseMock = jest.fn(() => Promise.reject(error))
25902678
const { connectionProvider } = setup({ releaseMock })
25912679

25922680
try {
25932681
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
25942682
expect().toBe('Not reached')
2595-
} catch (e) {
2683+
} catch(e) {
25962684
expect(e).toBe(error)
25972685
}
25982686
})
@@ -2668,7 +2756,7 @@ describe('#unit RoutingConnectionProvider', () => {
26682756
})
26692757

26702758
describe('when at least the one of the connections could not be created', () => {
2671-
it('should reject with acquistion timeout error', async () => {
2759+
it('should succeed with the server info', async () => {
26722760
let i = 0
26732761
const error = new Error('Connection creation error')
26742762
const routingTable = newRoutingTable(
@@ -2694,11 +2782,46 @@ describe('#unit RoutingConnectionProvider', () => {
26942782
pool
26952783
)
26962784

2785+
const serverInfo = connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2786+
expect(serverInfo).toEqual(new ServerInfo({}, 4.4))
2787+
})
2788+
})
2789+
2790+
describe('when there is not server available in the routing table', () => {
2791+
it('should thown an discovery error', async () => {
2792+
const expectedError = newError(
2793+
`No servers available for database '${database || null}' with access mode '${accessMode || READ}'`,
2794+
SERVICE_UNAVAILABLE
2795+
)
2796+
const routingTable = newRoutingTable(
2797+
database || null,
2798+
[server1, server2],
2799+
accessMode === READ ? [] : [server3, server4],
2800+
accessMode === WRITE ? [] : [server5, server6],
2801+
Integer.MAX_SAFE_VALUE
2802+
)
2803+
2804+
const routingTableToRouters = {}
2805+
const routingMap = {}
2806+
routingMap[server0.asKey()] = routingTable
2807+
routingMap[server1.asKey()] = routingTable
2808+
routingMap[server2.asKey()] = routingTable
2809+
routingTableToRouters[database || null] = routingMap
2810+
2811+
const connectionProvider = newRoutingConnectionProviderWithSeedRouter(
2812+
server0,
2813+
[server0], // seed router address resolves just to itself
2814+
[
2815+
routingTable
2816+
],
2817+
routingTableToRouters
2818+
)
2819+
26972820
try {
2698-
connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2821+
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
26992822
expect().toBe('not reached')
2700-
} catch (e) {
2701-
expect(e).toBe(error)
2823+
} catch (error) {
2824+
expect(error).toEqual(expectedError)
27022825
}
27032826
})
27042827
})

0 commit comments

Comments
 (0)