Skip to content

Commit e8be21b

Browse files
committed
Avoid to send reset when it is not needed
The `RESET` should be send when a failure occurs or whenever the connection is being sent back to the pool with a pending request running, i.e. the bolt server is not in the `READY` state. These changes also affect the `verifyConnectivity` and `getServerInfo` implementation. The `RESET` message is not sent in these methods if it is a newly created connection.
1 parent 4441857 commit e8be21b

File tree

13 files changed

+300
-7
lines changed

13 files changed

+300
-7
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-pooled.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
7171
*/
7272
_createConnection (address, release) {
7373
return this._createChannelConnection(address).then(connection => {
74-
connection._release = () => release(address, connection)
74+
connection._firstUsage = true
75+
connection._release = () => {
76+
connection._firstUsage = false
77+
return release(address, connection)
78+
}
7579
this._openConnections[connection.id] = connection
7680
return connection
7781
.connect(this._userAgent, this._authToken)
@@ -119,7 +123,9 @@ export default class PooledConnectionProvider extends ConnectionProvider {
119123
const connection = await this._connectionPool.acquire(address)
120124
const serverInfo = new ServerInfo(connection.server, connection.protocol().version)
121125
try {
122-
await connection.resetAndFlush()
126+
if (!connection._firstUsage) {
127+
await connection.resetAndFlush()
128+
}
123129
} finally {
124130
await connection._release()
125131
}

packages/bolt-connection/src/connection/connection-channel.js

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ export default class ChannelConnection extends Connection {
122122
) {
123123
super(errorHandler)
124124

125+
this._reseting = false
125126
this._id = idGenerator++
126127
this._address = address
127128
this._server = { address: address.asHostPort() }
@@ -303,7 +304,7 @@ export default class ChannelConnection extends Connection {
303304
*/
304305
resetAndFlush () {
305306
return new Promise((resolve, reject) => {
306-
this._protocol.reset({
307+
this._reset({
307308
onError: error => {
308309
if (this._isBroken) {
309310
// handling a fatal error, no need to raise a protocol violation
@@ -327,7 +328,7 @@ export default class ChannelConnection extends Connection {
327328
return
328329
}
329330

330-
this._protocol.reset({
331+
this._reset({
331332
onError: () => {
332333
this._protocol.resetFailure()
333334
},
@@ -337,6 +338,23 @@ export default class ChannelConnection extends Connection {
337338
})
338339
}
339340

341+
_reset(observer) {
342+
if (this._reseting) {
343+
observer.onComplete()
344+
return
345+
}
346+
this._reseting = true
347+
this._protocol.reset({
348+
onError: error => {
349+
this._reseting = false
350+
observer.onError(error)
351+
}, onComplete: () => {
352+
this._reseting = false
353+
observer.onComplete()
354+
}
355+
})
356+
}
357+
340358
/*
341359
* Pop next pending observer form the list of observers and make it current observer.
342360
* @protected

packages/bolt-connection/src/connection/connection-delegate.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ export default class DelegateConnection extends Connection {
8383
return this._delegate.resetAndFlush()
8484
}
8585

86+
hasOngoingObservableRequests () {
87+
return this._delegate.hasOngoingObservableRequests()
88+
}
89+
8690
close () {
8791
return this._delegate.close()
8892
}

packages/bolt-connection/src/connection/connection.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ export default class Connection {
103103
throw new Error('not implemented')
104104
}
105105

106+
hasOngoingObservableRequests () {
107+
throw new Error('not implemented')
108+
}
109+
106110
/**
107111
* Call close on the channel.
108112
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.

packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2590,7 +2590,26 @@ describe('#unit RoutingConnectionProvider', () => {
25902590
expect(connections[0]._release).toHaveBeenCalled()
25912591
expect(connections[0]._release.mock.invocationCallOrder[0])
25922592
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
2593-
2593+
})
2594+
2595+
it('should not call resetAndFlush for newly created connections', async () => {
2596+
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ newConnection: true })
2597+
const acquireSpy = jest.spyOn(pool, 'acquire')
2598+
2599+
await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })
2600+
2601+
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
2602+
const address = targetServers[0]
2603+
expect(acquireSpy).toHaveBeenCalledWith(address)
2604+
2605+
const connections = seenConnectionsPerAddress.get(address)
2606+
2607+
// verifying resetAndFlush was not called
2608+
expect(connections[0].resetAndFlush).not.toHaveBeenCalled()
2609+
2610+
// extra checks
2611+
expect(connections.length).toBe(1)
2612+
expect(connections[0]._release).toHaveBeenCalled()
25942613
})
25952614

25962615
it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => {
@@ -2766,7 +2785,7 @@ describe('#unit RoutingConnectionProvider', () => {
27662785
})
27672786
})
27682787

2769-
function setup({ resetAndFlush, releaseMock } = {}) {
2788+
function setup({ resetAndFlush, releaseMock, newConnection } = { }) {
27702789
const routingTable = newRoutingTable(
27712790
database || null,
27722791
[server1, server2],
@@ -2784,6 +2803,7 @@ describe('#unit RoutingConnectionProvider', () => {
27842803
seenConnectionsPerAddress.set(address, [])
27852804
}
27862805
const connection = new FakeConnection(address, release, 'version', protocolVersion, server)
2806+
connection._firstUsage = !!newConnection
27872807
if (resetAndFlush) {
27882808
connection.resetAndFlush = resetAndFlush
27892809
}

packages/bolt-connection/test/connection/connection-channel.test.js

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import ChannelConnection from '../../src/connection/connection-channel'
2121
import { int, internal, newError } from 'neo4j-driver-core'
22+
import { observer } from 'neo4j-driver-core/types/internal'
2223

2324
const {
2425
serverAddress: { ServerAddress },
@@ -260,6 +261,75 @@ describe('ChannelConnection', () => {
260261
expect(protocol.reset).toHaveBeenCalled()
261262
expect(protocol.resetFailure).toHaveBeenCalled()
262263
})
264+
265+
it('should not call protocol.reset() when there is an ongoing reset', () => {
266+
const channel = {
267+
_open: true
268+
}
269+
270+
const protocol = {
271+
reset: jest.fn(),
272+
resetFailure: jest.fn()
273+
}
274+
const protocolSupplier = () => protocol
275+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
276+
277+
connection._resetOnFailure()
278+
279+
expect(protocol.reset).toHaveBeenCalledTimes(1)
280+
expect(protocol.resetFailure).not.toHaveBeenCalled()
281+
282+
connection._resetOnFailure()
283+
284+
expect(protocol.reset).toHaveBeenCalledTimes(1)
285+
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)
286+
})
287+
288+
it('should call protocol.reset() when after a previous reset completed', () => {
289+
const channel = {
290+
_open: true
291+
}
292+
293+
const protocol = {
294+
reset: jest.fn(observer => observer.onComplete()),
295+
resetFailure: jest.fn()
296+
}
297+
const protocolSupplier = () => protocol
298+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
299+
300+
connection._resetOnFailure()
301+
302+
expect(protocol.reset).toHaveBeenCalledTimes(1)
303+
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)
304+
305+
connection._resetOnFailure()
306+
307+
expect(protocol.reset).toHaveBeenCalledTimes(2)
308+
expect(protocol.resetFailure).toHaveBeenCalledTimes(2)
309+
})
310+
311+
it('should call protocol.reset() when after a previous reset fail', () => {
312+
const channel = {
313+
_open: true
314+
}
315+
316+
const protocol = {
317+
reset: jest.fn(observer => observer.onError(new Error('some error'))),
318+
resetFailure: jest.fn()
319+
}
320+
const protocolSupplier = () => protocol
321+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
322+
323+
connection._resetOnFailure()
324+
325+
expect(protocol.reset).toHaveBeenCalledTimes(1)
326+
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)
327+
328+
connection._resetOnFailure()
329+
330+
expect(protocol.reset).toHaveBeenCalledTimes(2)
331+
expect(protocol.resetFailure).toHaveBeenCalledTimes(2)
332+
})
263333
})
264334

265335
describe('when connection is not open', () => {
@@ -286,6 +356,106 @@ describe('ChannelConnection', () => {
286356
})
287357
})
288358

359+
describe('.resetAndFlush()', () => {
360+
it('should call protocol.reset() onComplete', async () => {
361+
const channel = {
362+
_open: true
363+
}
364+
365+
const protocol = {
366+
reset: jest.fn(observer => observer.onComplete()),
367+
resetFailure: jest.fn()
368+
}
369+
const protocolSupplier = () => protocol
370+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
371+
372+
await connection.resetAndFlush().catch(() => {})
373+
374+
expect(protocol.reset).toHaveBeenCalled()
375+
})
376+
377+
it('should call protocol.reset() onError', async () => {
378+
const channel = {
379+
_open: true
380+
}
381+
382+
const protocol = {
383+
reset: jest.fn(observer => observer.onError()),
384+
resetFailure: jest.fn()
385+
}
386+
const protocolSupplier = () => protocol
387+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
388+
389+
await connection.resetAndFlush().catch(() => {})
390+
391+
expect(protocol.reset).toHaveBeenCalled()
392+
})
393+
394+
it('should not call protocol.reset() when there is an ongoing reset', async () => {
395+
const channel = {
396+
_open: true
397+
}
398+
399+
const protocol = {
400+
reset: jest.fn(),
401+
resetFailure: jest.fn()
402+
}
403+
const protocolSupplier = () => protocol
404+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
405+
406+
// to not block since the reset will never complete
407+
connection.resetAndFlush()
408+
409+
expect(protocol.reset).toHaveBeenCalledTimes(1)
410+
411+
await connection.resetAndFlush()
412+
413+
expect(protocol.reset).toHaveBeenCalledTimes(1)
414+
})
415+
416+
it('should call protocol.reset() when after a previous reset completed', async () => {
417+
const channel = {
418+
_open: true
419+
}
420+
421+
const protocol = {
422+
reset: jest.fn(observer => observer.onComplete()),
423+
resetFailure: jest.fn()
424+
}
425+
const protocolSupplier = () => protocol
426+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
427+
428+
await connection.resetAndFlush()
429+
430+
expect(protocol.reset).toHaveBeenCalledTimes(1)
431+
432+
await connection.resetAndFlush()
433+
434+
expect(protocol.reset).toHaveBeenCalledTimes(2)
435+
})
436+
437+
it('should call protocol.reset() when after a previous reset fail', async () => {
438+
const channel = {
439+
_open: true
440+
}
441+
442+
const protocol = {
443+
reset: jest.fn(observer => observer.onError(new Error('some error'))),
444+
resetFailure: jest.fn()
445+
}
446+
const protocolSupplier = () => protocol
447+
const connection = spyOnConnectionChannel({ channel, protocolSupplier })
448+
449+
await connection.resetAndFlush().catch(() => {})
450+
451+
expect(protocol.reset).toHaveBeenCalledTimes(1)
452+
453+
await connection.resetAndFlush().catch(() => {})
454+
455+
expect(protocol.reset).toHaveBeenCalledTimes(2)
456+
})
457+
})
458+
289459
function spyOnConnectionChannel ({
290460
channel,
291461
errorHandler,

packages/core/src/connection.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ class Connection {
9393
throw Error('Not implemented')
9494
}
9595

96+
/**
97+
* Checks if there is an ongoing request being handled
98+
* @return {boolean} `true` if there is an ongoing request being handled
99+
*/
100+
hasOngoingObservableRequests (): boolean {
101+
throw Error('Not implemented')
102+
}
103+
96104
/**
97105
* Call close on the channel.
98106
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.

packages/core/src/internal/connection-holder.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
196196
this._connectionPromise = this._connectionPromise
197197
.then((connection?: Connection|void) => {
198198
if (connection) {
199-
if (connection.isOpen()) {
199+
if (connection.isOpen() && connection.hasOngoingObservableRequests()) {
200200
return connection
201201
.resetAndFlush()
202202
.catch(ignoreError)

packages/core/test/session.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,27 @@ describe('session', () => {
186186
})
187187
}, 70000)
188188

189+
it('close should reset connection if there is an ongoing request ', async () => {
190+
const connection = newFakeConnection()
191+
const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush')
192+
const session = newSessionWithConnection(connection)
193+
194+
await session.close()
195+
196+
expect(resetAndFlushSpy).toHaveBeenCalledTimes(1)
197+
}, 70000)
198+
199+
it('close should not reset connection if there is not an ongoing request ', async () => {
200+
const connection = newFakeConnection()
201+
connection.hasOngoingObservableRequests = () => false
202+
const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush')
203+
const session = newSessionWithConnection(connection)
204+
205+
await session.close()
206+
207+
expect(resetAndFlushSpy).not.toHaveBeenCalled()
208+
}, 70000)
209+
189210
it('should close transaction executor', done => {
190211
const session = newSessionWithConnection(newFakeConnection())
191212

packages/core/test/utils/connection.fake.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ export default class FakeConnection extends Connection {
181181
this._open = false
182182
return this
183183
}
184+
185+
hasOngoingObservableRequests(): boolean {
186+
return true
187+
}
184188
}
185189

186190
function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error) {

0 commit comments

Comments
 (0)