Skip to content

Fix connection getting timeout while idle #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}

static _installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
conn._setIdle(observer)
}

static _removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
conn._unsetIdle()
}

_handleSecurityError (error, address, connection) {
Expand Down
27 changes: 25 additions & 2 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
) {
super(errorHandler)
this._authToken = null
this._idle = false
this._reseting = false
this._resetObservers = []
this._id = idGenerator++
Expand Down Expand Up @@ -393,7 +394,26 @@ export default class ChannelConnection extends Connection {
}

/**
* This method still here because it's used by the {@link PooledConnectionProvider}
* This method is used by the {@link PooledConnectionProvider}
*
* @param {any} observer
*/
_setIdle (observer) {
this._idle = true
this._ch.stopReceiveTimeout()
this._protocol.queueObserverIfProtocolIsNotBroken(observer)
}

/**
* This method is used by the {@link PooledConnectionProvider}
*/
_unsetIdle () {
this._idle = false
this._updateCurrentObserver()
}

/**
* This method still here because of the connection-channel.tests.js
*
* @param {any} observer
*/
Expand All @@ -402,7 +422,7 @@ export default class ChannelConnection extends Connection {
}

hasOngoingObservableRequests () {
return this._protocol.hasOngoingObservableRequests()
return !this._idle && this._protocol.hasOngoingObservableRequests()
}

/**
Expand Down Expand Up @@ -500,6 +520,9 @@ export default class ChannelConnection extends Connection {
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange (requestsNumber) {
if (this._idle) {
return
}
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
Expand Down
143 changes: 139 additions & 4 deletions packages/bolt-connection/test/connection/connection-channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ describe('ChannelConnection', () => {
})

describe('.__handleOngoingRequestsNumberChange()', () => {
it('should call channel.stopReceiveTimeout when requets number equals to 0', () => {
it('should call channel.stopReceiveTimeout when requests number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -571,7 +571,7 @@ describe('ChannelConnection', () => {
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should not call channel.startReceiveTimeout when requets number equals to 0', () => {
it('should not call channel.startReceiveTimeout when requests number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -585,7 +585,7 @@ describe('ChannelConnection', () => {

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => {
])('should call channel.startReceiveTimeout when requests number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -599,7 +599,7 @@ describe('ChannelConnection', () => {

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => {
])('should not call channel.stopReceiveTimeout when requests number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -610,6 +610,68 @@ describe('ChannelConnection', () => {

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[0], [1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout or startReceiveTimeout when requests number equals to %d and connection is idle', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requests number equals to %d and connection is not idle anymore', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
connection._unsetIdle()
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should call channel.stopReceiveTimeout when requests number equals to 0 and connection is not idle anymore', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
connection._unsetIdle()
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(0)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})
})

describe('.resetAndFlush()', () => {
Expand Down Expand Up @@ -1181,6 +1243,44 @@ describe('ChannelConnection', () => {
})

describe('.hasOngoingObservableRequests()', () => {
it('should return false if connection is idle', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true),
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
connection._setIdle({})

const result = connection.hasOngoingObservableRequests()

expect(result).toBe(false)
expect(protocol.hasOngoingObservableRequests).not.toBeCalledWith()
})

it('should redirect request to the protocol when connection is not idle anymore', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true),
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
connection._setIdle({})
connection._unsetIdle()

const result = connection.hasOngoingObservableRequests()

expect(result).toBe(true)
expect(protocol.hasOngoingObservableRequests).toBeCalledWith()
})

it('should call redirect request to the protocol', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true)
Expand All @@ -1195,6 +1295,41 @@ describe('ChannelConnection', () => {
})
})

describe('._setIdle()', () => {
it('should stop receive timeout and enqueue observer', () => {
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}
const observer = {
onComplete: () => {}
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })

connection._setIdle(observer)

expect(channel.stopReceiveTimeout).toBeCalledTimes(1)
expect(protocol.queueObserverIfProtocolIsNotBroken).toBeCalledWith(observer)
})
})

describe('._unsetIdle()', () => {
it('should update current observer', () => {
const protocol = {
updateCurrentObserver: jest.fn(() => {})
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol })

connection._unsetIdle()

expect(protocol.updateCurrentObserver).toBeCalledTimes(1)
})
})

function spyOnConnectionChannel ({
channel,
errorHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}

static _installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
conn._setIdle(observer)
}

static _removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
conn._unsetIdle()
}

_handleSecurityError (error, address, connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
) {
super(errorHandler)
this._authToken = null
this._idle = false
this._reseting = false
this._resetObservers = []
this._id = idGenerator++
Expand Down Expand Up @@ -393,7 +394,26 @@ export default class ChannelConnection extends Connection {
}

/**
* This method still here because it's used by the {@link PooledConnectionProvider}
* This method is used by the {@link PooledConnectionProvider}
*
* @param {any} observer
*/
_setIdle (observer) {
this._idle = true
this._ch.stopReceiveTimeout()
this._protocol.queueObserverIfProtocolIsNotBroken(observer)
}

/**
* This method is used by the {@link PooledConnectionProvider}
*/
_unsetIdle () {
this._idle = false
this._updateCurrentObserver()
}

/**
* This method still here because of the connection-channel.tests.js
*
* @param {any} observer
*/
Expand All @@ -402,7 +422,7 @@ export default class ChannelConnection extends Connection {
}

hasOngoingObservableRequests () {
return this._protocol.hasOngoingObservableRequests()
return !this._idle && this._protocol.hasOngoingObservableRequests()
}

/**
Expand Down Expand Up @@ -500,6 +520,9 @@ export default class ChannelConnection extends Connection {
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange (requestsNumber) {
if (this._idle) {
return
}
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,29 @@ describe('#unit PooledConnectionProvider', () => {
clock.uninstall()
}
})

it('_installIdleObserverOnConnection should set connection as idle', () => {
const connection = new FakeConnection()
const observer = { onCompleted: () => {} }

PooledConnectionProvider._installIdleObserverOnConnection(connection, observer)

expect(connection._idle).toBe(true)
expect(connection._idleObserver).toBe(observer)
})

it('_removeIdleObserverOnConnection should unset connection as idle', () => {
const connection = new FakeConnection()
const observer = { onCompleted: () => {} }

PooledConnectionProvider._installIdleObserverOnConnection(connection, observer)

expect(connection._idle).toBe(true)
expect(connection._idleObserver).toBe(observer)

PooledConnectionProvider._removeIdleObserverOnConnection(connection)

expect(connection._idle).toBe(false)
expect(connection._idleObserver).toBe(null)
})
})
13 changes: 12 additions & 1 deletion packages/neo4j-driver/test/internal/fake-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export default class FakeConnection extends Connection {
this._databaseId = null
this._requestRoutingInformationMock = null
this._creationTimestamp = Date.now()

this._idle = false
this._idleObserver = null
this.resetInvoked = 0
this.releaseInvoked = 0
this.seenQueries = []
Expand Down Expand Up @@ -101,6 +102,16 @@ export default class FakeConnection extends Connection {
return this._idleTimestamp
}

_setIdle (observer) {
this._idle = true
this._idleObserver = observer
}

_unsetIdle () {
this._idle = false
this._idleObserver = null
}

protocol () {
// return fake protocol object that simply records seen queries and parameters
return {
Expand Down
Loading