diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 3327796cd..9072fa9cd 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -64,7 +64,7 @@ export class LoadBalancer extends ConnectionProvider { constructor(address, routingContext, connectionPool, loadBalancingStrategy, hostNameResolver, driverOnErrorCallback, log) { super(); this._seedRouter = address; - this._routingTable = new RoutingTable([this._seedRouter]); + this._routingTable = new RoutingTable(); this._rediscovery = new Rediscovery(new RoutingUtil(routingContext)); this._connectionPool = connectionPool; this._driverOnErrorCallback = driverOnErrorCallback; @@ -211,7 +211,10 @@ export class LoadBalancer extends ConnectionProvider { // try next router return this._createSessionForRediscovery(currentRouter).then(session => { if (session) { - return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter); + return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter).catch(error => { + this._log.warn(`unable to fetch routing table because of an error ${error}`); + return null; + }); } else { // unable to acquire connection and create session towards the current router // return null to signal that the next router should be tried @@ -256,11 +259,8 @@ export class LoadBalancer extends ConnectionProvider { } _updateRoutingTable(newRoutingTable) { - const currentRoutingTable = this._routingTable; - // close old connections to servers not present in the new routing table - const staleServers = currentRoutingTable.serversDiff(newRoutingTable); - staleServers.forEach(server => this._connectionPool.purge(server)); + this._connectionPool.keepAll(newRoutingTable.allServers()); // make this driver instance aware of the new table this._routingTable = newRoutingTable; diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index e3855e112..ee9295542 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -115,6 +115,17 @@ class Pool { Object.keys(this._pools).forEach(key => this._purgeKey(key)); } + /** + * Keep the idle resources for the provided addresses and purge the rest. + */ + keepAll(addresses) { + const keysToKeep = addresses.map(a => a.asKey()); + const keysPresent = Object.keys(this._pools); + const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) == -1); + + keysToPurge.forEach(key => this._purgeKey(key)); + } + /** * Check if this pool contains resources for the given address. * @param {ServerAddress} address the address of the server to check. diff --git a/src/v1/internal/routing-table.js b/src/v1/internal/routing-table.js index 84971d9ec..51e93aeef 100644 --- a/src/v1/internal/routing-table.js +++ b/src/v1/internal/routing-table.js @@ -47,15 +47,6 @@ export default class RoutingTable { this.writers = removeFromArray(this.writers, address); } - serversDiff(otherRoutingTable) { - const oldServers = this._allServers(); - const newServers = otherRoutingTable._allServers(); - const diffTable = {}; - oldServers.forEach(oldServer => diffTable[oldServer.asKey()] = oldServer); - newServers.forEach(newServer => delete diffTable[newServer.asKey()]); - return Object.values(diffTable); - } - /** * Check if this routing table is fresh to perform the required operation. * @param {string} accessMode the type of operation. Allowed values are {@link READ} and {@link WRITE}. @@ -68,7 +59,7 @@ export default class RoutingTable { accessMode === WRITE && this.writers.length === 0; } - _allServers() { + allServers() { return [...this.routers, ...this.readers, ...this.writers]; } diff --git a/test/internal/bolt-stub.js b/test/internal/bolt-stub.js index e5916e60e..cf2145dc1 100644 --- a/test/internal/bolt-stub.js +++ b/test/internal/bolt-stub.js @@ -114,13 +114,11 @@ class StubServer { function newDriver(url, config = {}) { // left here for debugging purposes const logging = { - level: 'debug', + level: (process.env['NEOLOGLEVEL'] || 'error').toLowerCase(), logger: (level, msg) => console.log(`${level}: ${msg}`) }; // boltstub currently does not support encryption, create driver with encryption turned off - const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF'}, config); - // use for logging enabled - // const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF', logging}, config); + const newConfig = Object.assign({ encrypted: 'ENCRYPTION_OFF', logging }, config); return neo4j.driver(url, sharedNeo4j.authToken, newConfig); } diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index b6d1971fc..4c2c0a6df 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -174,7 +174,7 @@ describe('LoadBalancer', () => { NO_OP_DRIVER_CALLBACK, Logger.noOp()); expectRoutingTable(loadBalancer, - [serverABC], + [], [], [] ); @@ -1117,6 +1117,9 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved, loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime); loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable); loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved); + if (expirationTime === Integer.ZERO) { + loadBalancer._useSeedRouter = false; + } return loadBalancer; } @@ -1170,7 +1173,7 @@ class FakeRediscovery { } lookupRoutingTableOnRouter(ignored, router) { - return this._routerToRoutingTable[router.asKey()]; + return Promise.resolve(this._routerToRoutingTable[router.asKey()]); } } diff --git a/test/internal/node/direct.driver.boltkit.test.js b/test/internal/node/direct.driver.boltkit.test.js index bcba692af..1ee02b0ef 100644 --- a/test/internal/node/direct.driver.boltkit.test.js +++ b/test/internal/node/direct.driver.boltkit.test.js @@ -20,6 +20,7 @@ import neo4j from '../../../src/v1'; import {READ, WRITE} from '../../../src/v1/driver'; import boltStub from '../bolt-stub'; +import { SERVICE_UNAVAILABLE } from '../../../src/v1/error'; describe('direct driver with stub server', () => { @@ -372,4 +373,46 @@ describe('direct driver with stub server', () => { }).catch(error => done.fail(error)); }); }); + + describe('should fail if commit fails due to broken connection', () => { + it('v1', done => { + verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted('v1', done); + }); + + it('v3', done => { + verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted('v3', done); + }); + }); + + function verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted(version, done) { + if (!boltStub.supported) { + done(); + return; + } + + const server = boltStub.start(`./test/resources/boltstub/connection_error_on_commit_${version}.script`, 9001); + + boltStub.run(() => { + const driver = boltStub.newDriver('bolt://127.0.0.1:9001'); + const session = driver.session(); + + const writeTx = session.beginTransaction(); + + writeTx.run('CREATE (n {name: \'Bob\'})').then(() => + writeTx.commit().then(result => fail('expected an error'), (error) => { + expect(error.code).toBe(SERVICE_UNAVAILABLE); + expect(error.message).toContain('Connection was closed by server'); + }) + ).finally(() => + session.close(() => { + driver.close(); + + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + })).catch(error => done.fail(error)); + } + ); + } }); diff --git a/test/internal/node/routing.driver.boltkit.test.js b/test/internal/node/routing.driver.boltkit.test.js index fc62e0b65..aa80b7064 100644 --- a/test/internal/node/routing.driver.boltkit.test.js +++ b/test/internal/node/routing.driver.boltkit.test.js @@ -198,7 +198,7 @@ describe('routing driver with stub server', () => { // When const session = driver.session(neo4j.READ); session.run("MATCH (n) RETURN n.name").catch(err => { - expect(err.code).toEqual(neo4j.error.PROTOCOL_ERROR); + expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); session.close(); driver.close(); @@ -563,7 +563,7 @@ describe('routing driver with stub server', () => { const session1 = driver.session(neo4j.session.READ); session1.run("MATCH (n) RETURN n.name").catch(() => { const session2 = driver.session(neo4j.session.READ); - session2.run("MATCH (n) RETURN n.name").then(() => { + session2.run('MATCH (n) RETURN n.name').then(() => { driver.close(); seedServer.exit(code1 => { readServer.exit(code2 => { @@ -592,8 +592,8 @@ describe('routing driver with stub server', () => { const session = driver.session(); session.run("MATCH (n) RETURN n.name").catch(err => { expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); - expect(err.message.indexOf('Make sure you are connecting to a causal cluster') > 0).toBeTruthy(); - assertHasRouters(driver, ['127.0.0.1:9001']); + expect(err.message).toContain('Could not perform discovery'); + assertHasRouters(driver, []); session.close(); driver.close(); server.exit(code => { @@ -960,31 +960,31 @@ describe('routing driver with stub server', () => { }); }); - it('should throw protocol error when no records', done => { + it('should throw error when no records', done => { testForProtocolError('./test/resources/boltstub/empty_get_servers_response.script', done); }); - it('should throw protocol error when no TTL entry', done => { + it('should throw error when no TTL entry', done => { testForProtocolError('./test/resources/boltstub/no_ttl_entry_get_servers.script', done); }); - it('should throw protocol error when no servers entry', done => { + it('should throw error when no servers entry', done => { testForProtocolError('./test/resources/boltstub/no_servers_entry_get_servers.script', done); }); - it('should throw protocol error when multiple records', done => { + it('should throw error when multiple records', done => { testForProtocolError('./test/resources/boltstub/unparsable_ttl_get_servers.script', done); }); - it('should throw protocol error on unparsable record', done => { + it('should throw error on unparsable record', done => { testForProtocolError('./test/resources/boltstub/unparsable_servers_get_servers.script', done); }); - it('should throw protocol error when no routers', done => { + it('should throw error when no routers', done => { testForProtocolError('./test/resources/boltstub/no_routers_get_servers.script', done); }); - it('should throw protocol error when no readers', done => { + it('should throw error when no readers', done => { testForProtocolError('./test/resources/boltstub/no_readers_get_servers.script', done); }); @@ -2125,6 +2125,49 @@ describe('routing driver with stub server', () => { }); }) + it('should revert to initial router if the only known router returns invalid routing table', done => { + if (!boltStub.supported) { + done(); + return; + } + + // the first seed to get the routing table + // the returned routing table includes a non-reachable read-server and points to only one router + // which will return an invalid routing table + const router1 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_point_to_empty_router_and_exit.script', 9001); + // returns an empty routing table + const router2 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_empty.script', 9004); + // returns a normal routing table + const router3 = boltStub.start('./test/resources/boltstub/acquire_endpoints_v3_three_servers_and_exit.script', 9003); + // ordinary read server + const reader = boltStub.start('./test/resources/boltstub/read_server_v3_read_tx.script', 9002); + + boltStub.run(() => { + const driver = boltStub.newDriver('bolt+routing://my.virtual.host:8080', { + resolver: address => ['127.0.0.1:9001', '127.0.0.1:9003'] + }); + + const session = driver.session(neo4j.session.READ); + session.readTransaction(tx => tx.run('MATCH (n) RETURN n.name')).then(res => { + session.close(); + driver.close(); + router1.exit(code1 => { + router2.exit(code2 => { + router3.exit(code3 => { + reader.exit(code4 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + expect(code4).toEqual(0); + done(); + }); + }); + }); + }); + }).catch(error => done.fail(error)); + }); + }); + function testAddressPurgeOnDatabaseError(query, accessMode, done) { if (!boltStub.supported) { done(); @@ -2254,7 +2297,7 @@ describe('routing driver with stub server', () => { const session = driver.session(); session.run('MATCH (n) RETURN n.name').catch(error => { - expect(error.code).toEqual(neo4j.error.PROTOCOL_ERROR); + expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); session.close(); driver.close(); diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 36963ab50..96e938cec 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -296,6 +296,83 @@ describe('Pool', () => { }); }); + it('purges keys other than the ones to keep', done => { + let counter = 0; + + const address1 = ServerAddress.fromUrl('bolt://localhost:7687'); + const address2 = ServerAddress.fromUrl('bolt://localhost:7688'); + const address3 = ServerAddress.fromUrl('bolt://localhost:7689'); + + const pool = new Pool((server, release) => Promise.resolve(new Resource(server, counter++, release)), + res => { + res.destroyed = true; + return true; + } + ); + + const acquiredResources = [ + pool.acquire(address1), + pool.acquire(address2), + pool.acquire(address3), + pool.acquire(address1), + pool.acquire(address2), + pool.acquire(address3) + ]; + + Promise.all(acquiredResources).then(values => { + expect(pool.has(address1)).toBeTruthy(); + expect(pool.has(address2)).toBeTruthy(); + expect(pool.has(address3)).toBeTruthy(); + + pool.keepAll([address1, address3]); + + expect(pool.has(address1)).toBeTruthy(); + expect(pool.has(address3)).toBeTruthy(); + expect(pool.has(address2)).toBeFalsy(); + + done(); + }); + }); + + it('purges all keys if addresses to keep is empty', done => { + let counter = 0; + + const address1 = ServerAddress.fromUrl('bolt://localhost:7687'); + const address2 = ServerAddress.fromUrl('bolt://localhost:7688'); + const address3 = ServerAddress.fromUrl('bolt://localhost:7689'); + + const pool = new Pool((server, release) => Promise.resolve(new Resource(server, counter++, release)), + res => { + res.destroyed = true; + return true; + } + ); + + const acquiredResources = [ + pool.acquire(address1), + pool.acquire(address2), + pool.acquire(address3), + pool.acquire(address1), + pool.acquire(address2), + pool.acquire(address3) + ]; + + Promise.all(acquiredResources).then(values => { + expect(pool.has(address1)).toBeTruthy(); + expect(pool.has(address2)).toBeTruthy(); + expect(pool.has(address3)).toBeTruthy(); + + pool.keepAll([]); + + expect(pool.has(address1)).toBeFalsy(); + expect(pool.has(address3)).toBeFalsy(); + expect(pool.has(address2)).toBeFalsy(); + + done(); + }); + }); + + it('skips broken connections during acquire', (done) => { let validated = false; let counter = 0; diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index 0c6ce74c9..928b88308 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -180,7 +180,7 @@ describe('rediscovery', () => { expect(routingTable.expirationTime).toEqual(expires); - const allServers = routingTable.serversDiff(new RoutingTable()).sort(); + const allServers = routingTable.allServers().sort(); const allExpectedServers = [...routerAddresses, ...readerAddresses, ...writerAddresses].sort(); expect(allServers.map(s => s.asHostPort())).toEqual(allExpectedServers); diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js index ab6f8d344..68d360847 100644 --- a/test/internal/routing-table.test.js +++ b/test/internal/routing-table.test.js @@ -127,69 +127,6 @@ describe('routing-table', () => { expect(table.writers).toEqual([server5]); }); - it('should return all servers in diff when other table is empty', () => { - const oldTable = createTable([server1, server2], [server3, server4], [server5, server6], notExpired()); - const newTable = createTable([], [], [], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([server1, server2, server3, server4, server5, server6]); - }); - - it('should no servers in diff when this table is empty', () => { - const oldTable = createTable([], [], [], notExpired()); - const newTable = createTable([server1, server2], [server3, server4], [server5, server6], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([]); - }); - - it('should include different routers in servers diff', () => { - const oldTable = createTable([server1, server7, server2, server42], [server3, server4], [server5, server6], notExpired()); - const newTable = createTable([server1, server2], [server3, server4], [server5, server6], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([server7, server42]); - }); - - it('should include different readers in servers diff', () => { - const oldTable = createTable([server1, server2], [server3, server7, server4, server42], [server5, server6], notExpired()); - const newTable = createTable([server1, server2], [server3, server4], [server5, server6], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([server7, server42]); - }); - - it('should include different writers in servers diff', () => { - const oldTable = createTable([server1, server2], [server3, server4], [server5, server7, server6, server42], notExpired()); - const newTable = createTable([server1, server2], [server3, server4], [server5, server6], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([server7, server42]); - }); - - it('should include different servers in diff', () => { - const oldTable = createTable([server1, server2, server11], [server22, server3, server33, server4], [server5, server44, server6], notExpired()); - const newTable = createTable([server1], [server2, server3, server4, server6], [server5], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([server11, server22, server33, server44]); - }); - - it('should include different servers in diff with logical equality', () => { - const oldTable = createTable([server1, server11], [server2, server22], [server3, server33], notExpired()); - const newTable = createTable([ServerAddress.fromUrl(server1.asHostPort())], [ServerAddress.fromUrl(server2.asHostPort())], [ServerAddress.fromUrl(server3.asHostPort())], notExpired()); - - const servers = oldTable.serversDiff(newTable); - - expect(servers).toEqual([server11, server22, server33]); - }); - it('should have correct toString', () => { const originalDateNow = Date.now; try { diff --git a/test/resources/boltstub/acquire_endpoints_v3_empty.script b/test/resources/boltstub/acquire_endpoints_v3_empty.script new file mode 100644 index 000000000..fa079dcf7 --- /dev/null +++ b/test/resources/boltstub/acquire_endpoints_v3_empty.script @@ -0,0 +1,9 @@ +!: BOLT 3 +!: AUTO HELLO +!: AUTO RESET + +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, []] + SUCCESS {} diff --git a/test/resources/boltstub/acquire_endpoints_v3_point_to_empty_router_and_exit.script b/test/resources/boltstub/acquire_endpoints_v3_point_to_empty_router_and_exit.script new file mode 100644 index 000000000..07190d29d --- /dev/null +++ b/test/resources/boltstub/acquire_endpoints_v3_point_to_empty_router_and_exit.script @@ -0,0 +1,12 @@ +!: BOLT 3 +!: AUTO HELLO +!: AUTO RESET + +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9010"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9011"], "role": "READ"},{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"}]] + SUCCESS {} +C: RESET +S: SUCCESS {} + diff --git a/test/resources/boltstub/acquire_endpoints_v3_three_servers_and_exit.script b/test/resources/boltstub/acquire_endpoints_v3_three_servers_and_exit.script new file mode 100644 index 000000000..2ae6e7780 --- /dev/null +++ b/test/resources/boltstub/acquire_endpoints_v3_three_servers_and_exit.script @@ -0,0 +1,11 @@ +!: BOLT 3 +!: AUTO HELLO + +C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} +C: RESET +S: SUCCESS {} + diff --git a/test/resources/boltstub/connection_error_on_commit_v1.script b/test/resources/boltstub/connection_error_on_commit_v1.script new file mode 100644 index 000000000..800efcdac --- /dev/null +++ b/test/resources/boltstub/connection_error_on_commit_v1.script @@ -0,0 +1,15 @@ +!: BOLT 1 +!: AUTO INIT +!: AUTO RESET + +C: RUN "BEGIN" {} + PULL_ALL + RUN "CREATE (n {name: 'Bob'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} + SUCCESS {} + SUCCESS {} +C: RUN "COMMIT" {} + PULL_ALL +S: diff --git a/test/resources/boltstub/connection_error_on_commit_v3.script b/test/resources/boltstub/connection_error_on_commit_v3.script new file mode 100644 index 000000000..98f00ca54 --- /dev/null +++ b/test/resources/boltstub/connection_error_on_commit_v3.script @@ -0,0 +1,12 @@ +!: BOLT 3 +!: AUTO HELLO +!: AUTO RESET + +C: BEGIN {} +S: SUCCESS {} +C: RUN "CREATE (n {name: 'Bob'})" {} {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: COMMIT +S: diff --git a/test/resources/boltstub/read_server_v3_read.script b/test/resources/boltstub/read_server_v3_read.script index 82b2aebe3..9507194e4 100644 --- a/test/resources/boltstub/read_server_v3_read.script +++ b/test/resources/boltstub/read_server_v3_read.script @@ -1,8 +1,7 @@ !: BOLT 3 +!: AUTO HELLO !: AUTO RESET -C: HELLO {"scheme": "basic", "principal": "neo4j", "credentials": "password", "user_agent": "neo4j-javascript/0.0.0-dev"} -S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} PULL_ALL S: SUCCESS {"fields": ["n.name"]} diff --git a/test/resources/boltstub/read_server_v3_read_tx.script b/test/resources/boltstub/read_server_v3_read_tx.script index af1b3f56d..2a5772494 100644 --- a/test/resources/boltstub/read_server_v3_read_tx.script +++ b/test/resources/boltstub/read_server_v3_read_tx.script @@ -1,8 +1,7 @@ !: BOLT 3 +!: AUTO HELLO !: AUTO RESET -C: HELLO {"scheme": "basic", "principal": "neo4j", "credentials": "password", "user_agent": "neo4j-javascript/0.0.0-dev"} -S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} C: BEGIN {"mode": "r"} S: SUCCESS {} C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index d15f4e7f0..c63db928b 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -211,7 +211,7 @@ describe('driver', () => { // Expect driver.onError = error => { - expect(error.message).toEqual(`Server at localhost:7687 can't perform routing. Make sure you are connecting to a causal cluster`); + expect(error.message).toContain('Could not perform discovery. No routing servers available.'); expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); done(); };