diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 75b7ee77a..d91b7b05e 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -251,9 +251,21 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider }) const servers = accessMode === WRITE ? routingTable.writers : routingTable.readers + + let error = newError( + `No servers available for database '${context.database}' with access mode '${accessMode}'`, + SERVICE_UNAVAILABLE + ) - return Promise.all(servers.map(address => this._verifyConnectivityAndGetServerVersion({ address }))) - .then(([serverInfo]) => serverInfo) + for (const address of servers) { + try { + const serverInfo = await this._verifyConnectivityAndGetServerVersion({ address }) + return serverInfo + } catch (e) { + error = e + } + } + throw error } forget (address, database) { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index 6089ffbad..f6151a4d4 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2493,24 +2493,24 @@ describe('#unit RoutingConnectionProvider', () => { expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) }) - it('should acquire, resetAndFlush and release connections for sever with the selected access mode', async () => { + it('should acquire, resetAndFlush and release connections for sever first', async () => { const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup() const acquireSpy = jest.spyOn(pool, 'acquire') await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers - for (const address of targetServers) { - expect(acquireSpy).toHaveBeenCalledWith(address) + const address = targetServers[0] + expect(acquireSpy).toHaveBeenCalledWith(address) - const connections = seenConnectionsPerAddress.get(address) + const connections = seenConnectionsPerAddress.get(address) - expect(connections.length).toBe(1) - expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() - expect(connections[0]._release.mock.invocationCallOrder[0]) - .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) - } + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0]._release.mock.invocationCallOrder[0]) + .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) + }) it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { @@ -2527,20 +2527,96 @@ describe('#unit RoutingConnectionProvider', () => { }) describe('when the reset and flush fails for at least one the address', () => { - it('should fails with the reset and flush error', async () => { + it('should succeed with the server info ', async () => { const error = newError('Error') let i = 0 const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) - const { connectionProvider } = setup({ resetAndFlush }) + const { connectionProvider, server, protocolVersion } = setup({ resetAndFlush }) + + const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) + }) + + it('should release the connection', async () => { + const error = newError('Error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup({ resetAndFlush }) try { await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) - expect().toBe('Not reached') } catch (e) { + } finally { + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + for (const address of targetServers) { + const connections = seenConnectionsPerAddress.get(address) + + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + } + } + }) + + describe('and the release fails', () => { + it('should fails with the release error', async () => { + const error = newError('Error') + const releaseError = newError('Release error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const releaseMock = jest.fn(() => Promise.reject(releaseError)) + const { connectionProvider } = setup({ resetAndFlush, releaseMock }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(releaseError) + } + }) + }) + }) + + describe('when the reset and flush fails for all addresses', () => { + it('should succeed with the server info ', async () => { + const error = newError('Error') + const resetAndFlush = jest.fn(() => Promise.reject(error)) + const { connectionProvider } = setup({ resetAndFlush }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch(e) { expect(e).toBe(error) } }) + it('should acquire, resetAndFlush and release connections for all servers', async () => { + const resetAndFlush = jest.fn(() => Promise.reject(error)) + const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ resetAndFlush }) + const acquireSpy = jest.spyOn(pool, 'acquire') + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + } catch (e) { + // nothing to do + } finally { + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + for (const address of targetServers) { + expect(acquireSpy).toHaveBeenCalledWith(address) + + const connections = seenConnectionsPerAddress.get(address) + + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0]._release.mock.invocationCallOrder[0]) + .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) + } + } + }) + it('should release the connection', async () => { const error = newError('Error') let i = 0 @@ -2583,16 +2659,28 @@ describe('#unit RoutingConnectionProvider', () => { }) describe('when the release for at least one the address', () => { - it('should fails with the reset and flush error', async () => { + it('should succeed with the server info', async () => { const error = newError('Error') let i = 0 const releaseMock = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider, server, protocolVersion } = setup({ releaseMock }) + + const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) + }) + }) + + describe('when the release for all address', () => { + it('should fail with release error', async () => { + const error = newError('Error') + const releaseMock = jest.fn(() => Promise.reject(error)) const { connectionProvider } = setup({ releaseMock }) try { await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) expect().toBe('Not reached') - } catch (e) { + } catch(e) { expect(e).toBe(error) } }) @@ -2668,7 +2756,7 @@ describe('#unit RoutingConnectionProvider', () => { }) describe('when at least the one of the connections could not be created', () => { - it('should reject with acquistion timeout error', async () => { + it('should succeed with the server info', async () => { let i = 0 const error = new Error('Connection creation error') const routingTable = newRoutingTable( @@ -2694,11 +2782,46 @@ describe('#unit RoutingConnectionProvider', () => { pool ) + const serverInfo = connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect(serverInfo).toEqual(new ServerInfo({}, 4.4)) + }) + }) + + describe('when there is not server available in the routing table', () => { + it('should thown an discovery error', async () => { + const expectedError = newError( + `No servers available for database '${database || null}' with access mode '${accessMode || READ}'`, + SERVICE_UNAVAILABLE + ) + const routingTable = newRoutingTable( + database || null, + [server1, server2], + accessMode === READ ? [] : [server3, server4], + accessMode === WRITE ? [] : [server5, server6], + Integer.MAX_SAFE_VALUE + ) + + const routingTableToRouters = {} + const routingMap = {} + routingMap[server0.asKey()] = routingTable + routingMap[server1.asKey()] = routingTable + routingMap[server2.asKey()] = routingTable + routingTableToRouters[database || null] = routingMap + + const connectionProvider = newRoutingConnectionProviderWithSeedRouter( + server0, + [server0], // seed router address resolves just to itself + [ + routingTable + ], + routingTableToRouters + ) + try { - connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) expect().toBe('not reached') - } catch (e) { - expect(e).toBe(error) + } catch (error) { + expect(error).toEqual(expectedError) } }) })