Skip to content

Avoid sending resets when it is not needed #902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 30, 2022
11 changes: 11 additions & 0 deletions packages/bolt-connection/src/bolt/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export default class BoltProtocol {
this._log = log
this._onProtocolError = onProtocolError
this._fatalError = null
this._lastMessageSignature = null
}

/**
Expand Down Expand Up @@ -356,6 +357,8 @@ export default class BoltProtocol {
this._log.debug(`C: ${message}`)
}

this._lastMessageSignature = message.signature

this.packer().packStruct(
message.signature,
message.fields.map(field => this.packer().packable(field))
Expand All @@ -369,6 +372,14 @@ export default class BoltProtocol {
}
}

isLastMessageLogin () {
return this._lastMessageSignature === 0x01
}

isLastMessageReset () {
return this._lastMessageSignature === 0x0f
}

/**
* Notifies faltal erros to the observers and mark the protocol in the fatal error state.
* @param {Error} error The error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ export default class PooledConnectionProvider extends ConnectionProvider {
*/
_createConnection (address, release) {
return this._createChannelConnection(address).then(connection => {
connection._release = () => release(address, connection)
connection._release = () => {
return release(address, connection)
}
this._openConnections[connection.id] = connection
return connection
.connect(this._userAgent, this._authToken)
Expand Down Expand Up @@ -119,7 +121,9 @@ export default class PooledConnectionProvider extends ConnectionProvider {
const connection = await this._connectionPool.acquire(address)
const serverInfo = new ServerInfo(connection.server, connection.protocol().version)
try {
await connection.resetAndFlush()
if (!connection.protocol().isLastMessageLogin()) {
await connection.resetAndFlush()
}
} finally {
await connection._release()
}
Expand Down
41 changes: 39 additions & 2 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ export default class ChannelConnection extends Connection {
) {
super(errorHandler)

this._reseting = false
this._resetObservers = []
this._id = idGenerator++
this._address = address
this._server = { address: address.asHostPort() }
Expand Down Expand Up @@ -304,7 +306,7 @@ export default class ChannelConnection extends Connection {
*/
resetAndFlush () {
return new Promise((resolve, reject) => {
this._protocol.reset({
this._reset({
onError: error => {
if (this._isBroken) {
// handling a fatal error, no need to raise a protocol violation
Expand All @@ -328,7 +330,7 @@ export default class ChannelConnection extends Connection {
return
}

this._protocol.reset({
this._reset({
onError: () => {
this._protocol.resetFailure()
},
Expand All @@ -338,6 +340,41 @@ export default class ChannelConnection extends Connection {
})
}

_reset(observer) {
if (this._reseting) {
if (!this._protocol.isLastMessageReset()) {
this._protocol.reset({
onError: error => {
observer.onError(error)
}, onComplete: () => {
observer.onComplete()
}
})
} else {
this._resetObservers.push(observer)
}
return
}

this._resetObservers.push(observer)
this._reseting = true

const notifyFinish = (notify) => {
this._reseting = false
const observers = this._resetObservers
this._resetObservers = []
observers.forEach(notify)
}

this._protocol.reset({
onError: error => {
notifyFinish(obs => obs.onError(error))
}, onComplete: () => {
notifyFinish(obs => obs.onComplete())
}
})
}

/*
* Pop next pending observer form the list of observers and make it current observer.
* @protected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ export default class DelegateConnection extends Connection {
return this._delegate.resetAndFlush()
}

hasOngoingObservableRequests () {
return this._delegate.hasOngoingObservableRequests()
}

close () {
return this._delegate.close()
}
Expand Down
4 changes: 4 additions & 0 deletions packages/bolt-connection/src/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ export default class Connection {
throw new Error('not implemented')
}

hasOngoingObservableRequests () {
throw new Error('not implemented')
}

/**
* Call close on the channel.
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
const create = (address, release) => {
const connection = new FakeConnection(address, release, server)
connection.protocol = () => {
return { version: protocolVersion }
return { version: protocolVersion, isLastMessageLogin() { return false } }
}
connection.resetAndFlush = resetAndFlush
if (releaseMock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,26 @@ describe('#unit RoutingConnectionProvider', () => {
.toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0])
})

it('should not call resetAndFlush for newly created connections', async () => {
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ newConnection: true })
const acquireSpy = jest.spyOn(pool, 'acquire')

await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode })

const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
const address = targetServers[0]
expect(acquireSpy).toHaveBeenCalledWith(address)

const connections = seenConnectionsPerAddress.get(address)

// verifying resetAndFlush was not called
expect(connections[0].resetAndFlush).not.toHaveBeenCalled()

// extra checks
expect(connections.length).toBe(1)
expect(connections[0]._release).toHaveBeenCalled()
})

it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => {
const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup()
const acquireSpy = jest.spyOn(pool, 'acquire')
Expand Down Expand Up @@ -2756,7 +2776,7 @@ describe('#unit RoutingConnectionProvider', () => {
})
})

function setup ({ resetAndFlush, releaseMock } = {}) {
function setup ({ resetAndFlush, releaseMock, newConnection } = { }) {
const routingTable = newRoutingTable(
database || null,
[server1, server2],
Expand All @@ -2774,6 +2794,7 @@ describe('#unit RoutingConnectionProvider', () => {
seenConnectionsPerAddress.set(address, [])
}
const connection = new FakeConnection(address, release, 'version', protocolVersion, server)
connection._firstUsage = !!newConnection
if (resetAndFlush) {
connection.resetAndFlush = resetAndFlush
}
Expand Down Expand Up @@ -3082,7 +3103,8 @@ class FakeConnection extends Connection {

protocol () {
return {
version: this._protocolVersion
version: this._protocolVersion,
isLastMessageLogin: () => this._firstUsage
}
}
}
Expand Down
174 changes: 174 additions & 0 deletions packages/bolt-connection/test/connection/connection-channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,76 @@ describe('ChannelConnection', () => {
expect(protocol.reset).toHaveBeenCalled()
expect(protocol.resetFailure).toHaveBeenCalled()
})

it('should not call protocol.reset() when there is an ongoing reset', () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(),
resetFailure: jest.fn(),
isLastMessageReset: jest.fn(() => true)
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

connection._resetOnFailure()

expect(protocol.reset).toHaveBeenCalledTimes(1)
expect(protocol.resetFailure).not.toHaveBeenCalled()

connection._resetOnFailure()

expect(protocol.reset).toHaveBeenCalledTimes(1)
expect(protocol.resetFailure).not.toHaveBeenCalled()
})

it('should call protocol.reset() when after a previous reset completed', () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => observer.onComplete()),
resetFailure: jest.fn()
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

connection._resetOnFailure()

expect(protocol.reset).toHaveBeenCalledTimes(1)
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)

connection._resetOnFailure()

expect(protocol.reset).toHaveBeenCalledTimes(2)
expect(protocol.resetFailure).toHaveBeenCalledTimes(2)
})

it('should call protocol.reset() when after a previous reset fail', () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => observer.onError(new Error('some error'))),
resetFailure: jest.fn()
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

connection._resetOnFailure()

expect(protocol.reset).toHaveBeenCalledTimes(1)
expect(protocol.resetFailure).toHaveBeenCalledTimes(1)

connection._resetOnFailure()

expect(protocol.reset).toHaveBeenCalledTimes(2)
expect(protocol.resetFailure).toHaveBeenCalledTimes(2)
})
})

describe('when connection is not open', () => {
Expand Down Expand Up @@ -340,6 +410,110 @@ describe('ChannelConnection', () => {
})
})

describe('.resetAndFlush()', () => {
it('should call protocol.reset() onComplete', async () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => observer.onComplete()),
resetFailure: jest.fn()
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

await connection.resetAndFlush().catch(() => {})

expect(protocol.reset).toHaveBeenCalled()
})

it('should call protocol.reset() onError', async () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => observer.onError()),
resetFailure: jest.fn()
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

await connection.resetAndFlush().catch(() => {})

expect(protocol.reset).toHaveBeenCalled()
})

it('should not call protocol.reset() when there is an ongoing reset', async () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => {
setTimeout(() => observer.onComplete(), 100)
}),
resetFailure: jest.fn(),
isLastMessageReset: jest.fn(() => true)
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

const completeFirstResetAndFlush = connection.resetAndFlush()

expect(protocol.reset).toHaveBeenCalledTimes(1)

await connection.resetAndFlush()

expect(protocol.reset).toHaveBeenCalledTimes(1)

await completeFirstResetAndFlush
})

it('should call protocol.reset() when after a previous reset completed', async () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => observer.onComplete()),
resetFailure: jest.fn()
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

await connection.resetAndFlush()

expect(protocol.reset).toHaveBeenCalledTimes(1)

await connection.resetAndFlush()

expect(protocol.reset).toHaveBeenCalledTimes(2)
})

it('should call protocol.reset() when after a previous reset fail', async () => {
const channel = {
_open: true
}

const protocol = {
reset: jest.fn(observer => observer.onError(new Error('some error'))),
resetFailure: jest.fn()
}
const protocolSupplier = () => protocol
const connection = spyOnConnectionChannel({ channel, protocolSupplier })

await connection.resetAndFlush().catch(() => {})

expect(protocol.reset).toHaveBeenCalledTimes(1)

await connection.resetAndFlush().catch(() => {})

expect(protocol.reset).toHaveBeenCalledTimes(2)
})
})

function spyOnConnectionChannel ({
channel,
errorHandler,
Expand Down
Loading