Skip to content

Commit 48f6af6

Browse files
authored
Hide protocol object behind the Connection (#1139)
The connection in the driver is exposing the protocol object, which is not great. Improve this part of the code can be done by make the connection object have methods to do high level requests to the server. List of calls the core driver does using the protocol, by passing the connection: * `version` * `run` * `beginTransaction` * `commitTransaction` * `rollbackTransaction` Methods is present in the `Connection` interface (used by core in **bold**): * `id` * `database` * `server` * `authToken` * `address` * `version` * `supportsReAuth` * **`isOpen`** * **`protocol`** * `connect` * `write` * **`resetAndFlush`** * **`hasOngoingObservableRequests`** * **`_release`** So, `isOpen`, `resetAndFlush` and `hasOngoingObservableRequests` are the methods which will stay in the connection along with the new methods. The method `release` will move the a `Releasable` interface, which will be composed with `Connection` when returning the connection from the provider. The `Releasable` interface is also defined to enable the `ConnectionProvider` returns a connection which can be released back to the pool. Internally, `bolt-connection` can keep exposing the internal of the connection outside the connection in a first moment. The full encapsulation of the `protocol` should be done in the next phase of refactoring.
1 parent cf1fde8 commit 48f6af6

38 files changed

+1118
-605
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-pooled.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
8282
*/
8383
_createConnection ({ auth }, address, release) {
8484
return this._createChannelConnection(address).then(connection => {
85-
connection._release = () => {
85+
connection.release = () => {
8686
return release(address, connection)
8787
}
8888
this._openConnections[connection.id] = connection
@@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
160160
await connection.resetAndFlush()
161161
}
162162
} finally {
163-
await connection._release()
163+
await connection.release()
164164
}
165165
return serverInfo
166166
}
@@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
191191
}
192192
throw error
193193
} finally {
194-
await Promise.all(connectionsToRelease.map(conn => conn._release()))
194+
await Promise.all(connectionsToRelease.map(conn => conn.release()))
195195
}
196196
}
197197

@@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
201201
connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth
202202

203203
if (shouldCreateStickyConnection || connection._sticky) {
204-
await connection._release()
204+
await connection.release()
205205
throw newError('Driver is connected to a database that does not support user switch.')
206206
}
207207
}

packages/bolt-connection/src/connection/connection-channel.js

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection {
156156
}
157157
}
158158

159+
beginTransaction (config) {
160+
return this._protocol.beginTransaction(config)
161+
}
162+
163+
run (query, parameters, config) {
164+
return this._protocol.run(query, parameters, config)
165+
}
166+
167+
commitTransaction (config) {
168+
return this._protocol.commitTransaction(config)
169+
}
170+
171+
rollbackTransaction (config) {
172+
return this._protocol.rollbackTransaction(config)
173+
}
174+
175+
getProtocolVersion () {
176+
return this._protocol.version
177+
}
178+
159179
get authToken () {
160180
return this._authToken
161181
}

packages/bolt-connection/src/connection/connection-delegate.js

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection {
3535
this._delegate = delegate
3636
}
3737

38+
beginTransaction (config) {
39+
return this._delegate.beginTransaction(config)
40+
}
41+
42+
run (query, param, config) {
43+
return this._delegate.run(query, param, config)
44+
}
45+
46+
commitTransaction (config) {
47+
return this._delegate.commitTransaction(config)
48+
}
49+
50+
rollbackTransaction (config) {
51+
return this._delegate.rollbackTransaction(config)
52+
}
53+
54+
getProtocolVersion () {
55+
return this._delegate.getProtocolVersion()
56+
}
57+
3858
get id () {
3959
return this._delegate.id
4060
}
@@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection {
103123
return this._delegate.close()
104124
}
105125

106-
_release () {
126+
release () {
107127
if (this._originalErrorHandler) {
108128
this._delegate._errorHandler = this._originalErrorHandler
109129
}
110130

111-
return this._delegate._release()
131+
return this._delegate.release()
112132
}
113133
}

packages/bolt-connection/src/connection/connection.js

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
*/
1919
// eslint-disable-next-line no-unused-vars
2020
import { ResultStreamObserver, BoltProtocol } from '../bolt'
21+
import { Connection as CoreConnection } from 'neo4j-driver-core'
2122

22-
export default class Connection {
23+
export default class Connection extends CoreConnection {
2324
/**
2425
* @param {ConnectionErrorHandler} errorHandler the error handler
2526
*/
2627
constructor (errorHandler) {
28+
super()
2729
this._errorHandler = errorHandler
2830
}
2931

@@ -51,13 +53,6 @@ export default class Connection {
5153
throw new Error('not implemented')
5254
}
5355

54-
/**
55-
* @returns {boolean} whether this connection is in a working condition
56-
*/
57-
isOpen () {
58-
throw new Error('not implemented')
59-
}
60-
6156
/**
6257
* @returns {BoltProtocol} the underlying bolt protocol assigned to this connection
6358
*/
@@ -109,18 +104,6 @@ export default class Connection {
109104
throw new Error('not implemented')
110105
}
111106

112-
/**
113-
* Send a RESET-message to the database. Message is immediately flushed to the network.
114-
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
115-
*/
116-
resetAndFlush () {
117-
throw new Error('not implemented')
118-
}
119-
120-
hasOngoingObservableRequests () {
121-
throw new Error('not implemented')
122-
}
123-
124107
/**
125108
* Call close on the channel.
126109
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.

packages/bolt-connection/src/pool/pool.js

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -273,53 +273,56 @@ class Pool {
273273
async _release (address, resource, pool) {
274274
const key = address.asKey()
275275

276-
if (pool.isActive()) {
277-
// there exist idle connections for the given key
278-
if (!await this._validateOnRelease(resource)) {
276+
try {
277+
if (pool.isActive()) {
278+
// there exist idle connections for the given key
279+
if (!await this._validateOnRelease(resource)) {
280+
if (this._log.isDebugEnabled()) {
281+
this._log.debug(
282+
`${resource} destroyed and can't be released to the pool ${key} because it is not functional`
283+
)
284+
}
285+
pool.removeInUse(resource)
286+
await this._destroy(resource)
287+
} else {
288+
if (this._installIdleObserver) {
289+
this._installIdleObserver(resource, {
290+
onError: error => {
291+
this._log.debug(
292+
`Idle connection ${resource} destroyed because of error: ${error}`
293+
)
294+
const pool = this._pools[key]
295+
if (pool) {
296+
this._pools[key] = pool.filter(r => r !== resource)
297+
pool.removeInUse(resource)
298+
}
299+
// let's not care about background clean-ups due to errors but just trigger the destroy
300+
// process for the resource, we especially catch any errors and ignore them to avoid
301+
// unhandled promise rejection warnings
302+
this._destroy(resource).catch(() => {})
303+
}
304+
})
305+
}
306+
pool.push(resource)
307+
if (this._log.isDebugEnabled()) {
308+
this._log.debug(`${resource} released to the pool ${key}`)
309+
}
310+
}
311+
} else {
312+
// key has been purged, don't put it back, just destroy the resource
279313
if (this._log.isDebugEnabled()) {
280314
this._log.debug(
281-
`${resource} destroyed and can't be released to the pool ${key} because it is not functional`
315+
`${resource} destroyed and can't be released to the pool ${key} because pool has been purged`
282316
)
283317
}
284318
pool.removeInUse(resource)
285319
await this._destroy(resource)
286-
} else {
287-
if (this._installIdleObserver) {
288-
this._installIdleObserver(resource, {
289-
onError: error => {
290-
this._log.debug(
291-
`Idle connection ${resource} destroyed because of error: ${error}`
292-
)
293-
const pool = this._pools[key]
294-
if (pool) {
295-
this._pools[key] = pool.filter(r => r !== resource)
296-
pool.removeInUse(resource)
297-
}
298-
// let's not care about background clean-ups due to errors but just trigger the destroy
299-
// process for the resource, we especially catch any errors and ignore them to avoid
300-
// unhandled promise rejection warnings
301-
this._destroy(resource).catch(() => {})
302-
}
303-
})
304-
}
305-
pool.push(resource)
306-
if (this._log.isDebugEnabled()) {
307-
this._log.debug(`${resource} released to the pool ${key}`)
308-
}
309320
}
310-
} else {
311-
// key has been purged, don't put it back, just destroy the resource
312-
if (this._log.isDebugEnabled()) {
313-
this._log.debug(
314-
`${resource} destroyed and can't be released to the pool ${key} because pool has been purged`
315-
)
316-
}
317-
pool.removeInUse(resource)
318-
await this._destroy(resource)
319-
}
320-
resourceReleased(key, this._activeResourceCounts)
321+
} finally {
322+
resourceReleased(key, this._activeResourceCounts)
321323

322-
this._processPendingAcquireRequests(address)
324+
this._processPendingAcquireRequests(address)
325+
}
323326
}
324327

325328
async _purgeKey (key) {

packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ describe('constructor', () => {
289289

290290
const connection = await create({}, server0, release)
291291

292-
const released = connection._release()
292+
const released = connection.release()
293293

294294
expect(released).toBe(releaseResult)
295295
expect(release).toHaveBeenCalledWith(server0, connection)
@@ -546,7 +546,7 @@ describe('user-switching', () => {
546546

547547
expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.'))
548548
expect(poolAcquire).toHaveBeenCalledWith({ auth: acquireAuth }, address)
549-
expect(connection._release).toHaveBeenCalled()
549+
expect(connection.release).toHaveBeenCalled()
550550
expect(connection._sticky).toEqual(isStickyConn)
551551
})
552552
})
@@ -599,15 +599,15 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
599599

600600
await connectionProvider.verifyConnectivityAndGetServerInfo()
601601

602-
expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
602+
expect(seenConnections[0].release).toHaveBeenCalledTimes(1)
603603
})
604604

605605
it('should resetAndFlush and then release the connection', async () => {
606606
const { connectionProvider, seenConnections, resetAndFlush } = setup()
607607

608608
await connectionProvider.verifyConnectivityAndGetServerInfo()
609609

610-
expect(seenConnections[0]._release.mock.invocationCallOrder[0])
610+
expect(seenConnections[0].release.mock.invocationCallOrder[0])
611611
.toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0])
612612
})
613613

@@ -636,7 +636,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
636636
await connectionProvider.verifyConnectivityAndGetServerInfo()
637637
} catch (e) {
638638
} finally {
639-
expect(seenConnections[0]._release).toHaveBeenCalledTimes(1)
639+
expect(seenConnections[0].release).toHaveBeenCalledTimes(1)
640640
}
641641
})
642642

@@ -692,7 +692,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => {
692692
}
693693
connection.resetAndFlush = resetAndFlush
694694
if (releaseMock) {
695-
connection._release = releaseMock
695+
connection.release = releaseMock
696696
}
697697
seenConnections.push(connection)
698698
return connection
@@ -782,7 +782,7 @@ class FakeConnection extends Connection {
782782
super(null)
783783

784784
this._address = address
785-
this._release = jest.fn(() => release(address, this))
785+
this.release = jest.fn(() => release(address, this))
786786
this._server = server
787787
this._authToken = auth
788788
this._closed = false

0 commit comments

Comments
 (0)