Skip to content

Hide protocol object behind the Connection #1139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
*/
_createConnection ({ auth }, address, release) {
return this._createChannelConnection(address).then(connection => {
connection._release = () => {
connection.release = () => {
return release(address, connection)
}
this._openConnections[connection.id] = connection
Expand Down Expand Up @@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
await connection.resetAndFlush()
}
} finally {
await connection._release()
await connection.release()
}
return serverInfo
}
Expand Down Expand Up @@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}
throw error
} finally {
await Promise.all(connectionsToRelease.map(conn => conn._release()))
await Promise.all(connectionsToRelease.map(conn => conn.release()))
}
}

Expand All @@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth

if (shouldCreateStickyConnection || connection._sticky) {
await connection._release()
await connection.release()
throw newError('Driver is connected to a database that does not support user switch.')
}
}
Expand Down
20 changes: 20 additions & 0 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection {
}
}

beginTransaction (config) {
return this._protocol.beginTransaction(config)
}

run (query, parameters, config) {
return this._protocol.run(query, parameters, config)
}

commitTransaction (config) {
return this._protocol.commitTransaction(config)
}

rollbackTransaction (config) {
return this._protocol.rollbackTransaction(config)
}

getProtocolVersion () {
return this._protocol.version
}

get authToken () {
return this._authToken
}
Expand Down
24 changes: 22 additions & 2 deletions packages/bolt-connection/src/connection/connection-delegate.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection {
this._delegate = delegate
}

beginTransaction (config) {
return this._delegate.beginTransaction(config)
}

run (query, param, config) {
return this._delegate.run(query, param, config)
}

commitTransaction (config) {
return this._delegate.commitTransaction(config)
}

rollbackTransaction (config) {
return this._delegate.rollbackTransaction(config)
}

getProtocolVersion () {
return this._delegate.getProtocolVersion()
}

get id () {
return this._delegate.id
}
Expand Down Expand Up @@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection {
return this._delegate.close()
}

_release () {
release () {
if (this._originalErrorHandler) {
this._delegate._errorHandler = this._originalErrorHandler
}

return this._delegate._release()
return this._delegate.release()
}
}
23 changes: 3 additions & 20 deletions packages/bolt-connection/src/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
// eslint-disable-next-line no-unused-vars
import { ResultStreamObserver, BoltProtocol } from '../bolt'
import { Connection as CoreConnection } from 'neo4j-driver-core'

export default class Connection {
export default class Connection extends CoreConnection {
/**
* @param {ConnectionErrorHandler} errorHandler the error handler
*/
constructor (errorHandler) {
super()
this._errorHandler = errorHandler
}

Expand Down Expand Up @@ -51,13 +53,6 @@ export default class Connection {
throw new Error('not implemented')
}

/**
* @returns {boolean} whether this connection is in a working condition
*/
isOpen () {
throw new Error('not implemented')
}

/**
* @returns {BoltProtocol} the underlying bolt protocol assigned to this connection
*/
Expand Down Expand Up @@ -109,18 +104,6 @@ export default class Connection {
throw new Error('not implemented')
}

/**
* Send a RESET-message to the database. Message is immediately flushed to the network.
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
*/
resetAndFlush () {
throw new Error('not implemented')
}

hasOngoingObservableRequests () {
throw new Error('not implemented')
}

/**
* Call close on the channel.
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.
Expand Down
81 changes: 42 additions & 39 deletions packages/bolt-connection/src/pool/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,53 +273,56 @@ class Pool {
async _release (address, resource, pool) {
const key = address.asKey()

if (pool.isActive()) {
// there exist idle connections for the given key
if (!await this._validateOnRelease(resource)) {
try {
if (pool.isActive()) {
// there exist idle connections for the given key
if (!await this._validateOnRelease(resource)) {
if (this._log.isDebugEnabled()) {
this._log.debug(
`${resource} destroyed and can't be released to the pool ${key} because it is not functional`
)
}
pool.removeInUse(resource)
await this._destroy(resource)
} else {
if (this._installIdleObserver) {
this._installIdleObserver(resource, {
onError: error => {
this._log.debug(
`Idle connection ${resource} destroyed because of error: ${error}`
)
const pool = this._pools[key]
if (pool) {
this._pools[key] = pool.filter(r => r !== resource)
pool.removeInUse(resource)
}
// let's not care about background clean-ups due to errors but just trigger the destroy
// process for the resource, we especially catch any errors and ignore them to avoid
// unhandled promise rejection warnings
this._destroy(resource).catch(() => {})
}
})
}
pool.push(resource)
if (this._log.isDebugEnabled()) {
this._log.debug(`${resource} released to the pool ${key}`)
}
}
} else {
// key has been purged, don't put it back, just destroy the resource
if (this._log.isDebugEnabled()) {
this._log.debug(
`${resource} destroyed and can't be released to the pool ${key} because it is not functional`
`${resource} destroyed and can't be released to the pool ${key} because pool has been purged`
)
}
pool.removeInUse(resource)
await this._destroy(resource)
} else {
if (this._installIdleObserver) {
this._installIdleObserver(resource, {
onError: error => {
this._log.debug(
`Idle connection ${resource} destroyed because of error: ${error}`
)
const pool = this._pools[key]
if (pool) {
this._pools[key] = pool.filter(r => r !== resource)
pool.removeInUse(resource)
}
// let's not care about background clean-ups due to errors but just trigger the destroy
// process for the resource, we especially catch any errors and ignore them to avoid
// unhandled promise rejection warnings
this._destroy(resource).catch(() => {})
}
})
}
pool.push(resource)
if (this._log.isDebugEnabled()) {
this._log.debug(`${resource} released to the pool ${key}`)
}
}
} else {
// key has been purged, don't put it back, just destroy the resource
if (this._log.isDebugEnabled()) {
this._log.debug(
`${resource} destroyed and can't be released to the pool ${key} because pool has been purged`
)
}
pool.removeInUse(resource)
await this._destroy(resource)
}
resourceReleased(key, this._activeResourceCounts)
} finally {
resourceReleased(key, this._activeResourceCounts)

this._processPendingAcquireRequests(address)
this._processPendingAcquireRequests(address)
}
}

async _purgeKey (key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ describe('constructor', () => {

const connection = await create({}, server0, release)

const released = connection._release()
const released = connection.release()

expect(released).toBe(releaseResult)
expect(release).toHaveBeenCalledWith(server0, connection)
Expand Down Expand Up @@ -546,7 +546,7 @@ describe('user-switching', () => {

expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
expect(poolAcquire).toHaveBeenCalledWith({ auth: acquireAuth }, address)
expect(connection._release).toHaveBeenCalled()
expect(connection.release).toHaveBeenCalled()
expect(connection._sticky).toEqual(isStickyConn)
})
})
Expand Down Expand Up @@ -599,15 +599,15 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
expect(seenConnections[0].release).toHaveBeenCalledTimes(1)
})

it('should resetAndFlush and then release the connection', async () => {
const { connectionProvider, seenConnections, resetAndFlush } = setup()

await connectionProvider.verifyConnectivityAndGetServerInfo()

expect(seenConnections[0]._release.mock.invocationCallOrder[0])
expect(seenConnections[0].release.mock.invocationCallOrder[0])
.toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0])
})

Expand Down Expand Up @@ -636,7 +636,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
await connectionProvider.verifyConnectivityAndGetServerInfo()
} catch (e) {
} finally {
expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
expect(seenConnections[0].release).toHaveBeenCalledTimes(1)
}
})

Expand Down Expand Up @@ -692,7 +692,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
}
connection.resetAndFlush = resetAndFlush
if (releaseMock) {
connection._release = releaseMock
connection.release = releaseMock
}
seenConnections.push(connection)
return connection
Expand Down Expand Up @@ -782,7 +782,7 @@ class FakeConnection extends Connection {
super(null)

this._address = address
this._release = jest.fn(() => release(address, this))
this.release = jest.fn(() => release(address, this))
this._server = server
this._authToken = auth
this._closed = false
Expand Down
Loading