Skip to content

Commit 402ab90

Browse files
committed
Hide protocol object behind the Connection
The connection in the driver is exposing the protocol object, which is not great. Improve this part of the code can be done by make the connection object have methods to do high level requests to the server. List of calls the core driver does using the protocol, by passing the connection: * `version` * `run` * `beginTransaction` * `commitTransaction` * `rollbackTransaction` Methods is present in the `Connection` interface (used by core in **bold**): * `id` * `database` * `server` * `authToken` * `address` * `version` * `supportsReAuth` * **`isOpen`** * **`protocol`** * `connect` * `write` * **`resetAndFlush`** * **`hasOngoingObservableRequests`** * **`_release`** So, `isOpen`, `resetAndFlush` and `hasOngoingObservableRequests` are the methods which will stay in the connection along with the new methods. The method `release` will move the a `Releasable` interface, which will be composed with `Connection` when returning the connection from the provider. The `Releasable` interface is also defined to enable the `ConnectionProvider` returns a connection which can be released back to the pool. Internally, `bolt-connection` can keep exposing the internal of the connection outside the connection in a first moment. The full encapsulation of the `protocol` should be done in the next phase of refactoring.
1 parent ee4baf7 commit 402ab90

18 files changed

+273
-312
lines changed

packages/core/src/connection-provider.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,18 @@ import { bookmarks } from './internal'
2323
import { ServerInfo } from './result-summary'
2424
import { AuthToken } from './types'
2525

26+
/**
27+
* Interface define a releasable resource shape
28+
*
29+
* @private
30+
* @interface
31+
*/
32+
class Releasable {
33+
release (): Promise<void> {
34+
throw new Error('Not implemented')
35+
}
36+
}
37+
2638
/**
2739
* Interface define a common way to acquire a connection
2840
*
@@ -53,7 +65,7 @@ class ConnectionProvider {
5365
impersonatedUser?: string
5466
onDatabaseNameResolved?: (databaseName?: string) => void
5567
auth?: AuthToken
56-
}): Promise<Connection> {
68+
}): Promise<Connection & Releasable> {
5769
throw Error('Not implemented')
5870
}
5971

@@ -150,3 +162,6 @@ class ConnectionProvider {
150162
}
151163

152164
export default ConnectionProvider
165+
export {
166+
Releasable
167+
}

packages/core/src/connection.ts

Lines changed: 62 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -18,121 +18,91 @@
1818
*/
1919
/* eslint-disable @typescript-eslint/promise-function-async */
2020

21-
import { ServerAddress } from './internal/server-address'
22-
23-
/**
24-
* Interface which defines the raw connection with the database
25-
* @private
26-
*/
27-
class Connection {
28-
get id (): string {
29-
return ''
30-
}
21+
import { Bookmarks } from './internal/bookmarks'
22+
import { AccessMode } from './internal/constants'
23+
import { ResultStreamObserver } from './internal/observers'
24+
import { TxConfig } from './internal/tx-config'
25+
import NotificationFilter from './notification-filter'
26+
27+
interface HasBeforeErrorAndAfterComplete {
28+
beforeError?: (error: Error) => void
29+
afterComplete?: (metadata: unknown) => void
30+
}
3131

32-
get databaseId (): string {
33-
return ''
34-
}
32+
interface BeginTransactionConfig extends HasBeforeErrorAndAfterComplete {
33+
bookmarks: Bookmarks
34+
txConfig: TxConfig
35+
mode?: AccessMode
36+
database?: string
37+
impersonatedUser?: string
38+
notificationFilter?: NotificationFilter
39+
}
3540

36-
get server (): any {
37-
return {}
38-
}
41+
interface CommitTransactionConfig extends HasBeforeErrorAndAfterComplete {
3942

40-
/**
41-
* @property {object} authToken The auth registered in the connection
42-
*/
43-
get authToken (): any {
44-
return {}
45-
}
43+
}
4644

47-
/**
48-
* @property {ServerAddress} the server address this connection is opened against
49-
*/
50-
get address (): ServerAddress | undefined {
51-
return undefined
52-
}
45+
interface RollbackConnectionConfig extends HasBeforeErrorAndAfterComplete {
5346

54-
/**
55-
* @property {ServerVersion} the version of the server this connection is connected to
56-
*/
57-
get version (): any {
58-
return undefined
59-
}
47+
}
6048

61-
/**
62-
* @property {boolean} supportsReAuth Indicates the connection supports re-auth
63-
*/
64-
get supportsReAuth (): boolean {
65-
return false
66-
}
49+
interface RunQueryConfig extends BeginTransactionConfig {
50+
fetchSize: number
51+
highRecordWatermark: number
52+
lowRecordWatermark: number
53+
reactive: boolean
54+
}
6755

68-
/**
69-
* @returns {boolean} whether this connection is in a working condition
70-
*/
71-
isOpen (): boolean {
72-
return false
56+
/**
57+
* Interface which defines a connection for the core driver object.
58+
*
59+
*
60+
* This connection exposes only methods used by the code module.
61+
* Methods with connection implementation details can be defined and used
62+
* by the implementation layer.
63+
*
64+
* @private
65+
* @interface
66+
*/
67+
class Connection {
68+
beginTransaction (config: BeginTransactionConfig): ResultStreamObserver {
69+
throw new Error('Not implemented')
7370
}
7471

75-
/**
76-
* @todo be removed and internalize the methods
77-
* @returns {any} the underlying bolt protocol assigned to this connection
78-
*/
79-
protocol (): any {
80-
throw Error('Not implemented')
72+
run (query: string, parameters?: Record<string, unknown>, config?: RunQueryConfig): ResultStreamObserver {
73+
throw new Error('Not implemented')
8174
}
8275

83-
/**
84-
* Connect to the target address, negotiate Bolt protocol and send initialization message.
85-
* @param {string} userAgent the user agent for this driver.
86-
* @param {string} boltAgent the bolt agent for this driver.
87-
* @param {Object} authToken the object containing auth information.
88-
* @param {Object} waitReAuth whether to connect method should wait until re-Authorised
89-
* @return {Promise<Connection>} promise resolved with the current connection if connection is successful. Rejected promise otherwise.
90-
*/
91-
connect (userAgent: string, boltAgent: string, authToken: any, waitReAuth: false): Promise<Connection> {
92-
throw Error('Not implemented')
76+
commitTransaction (config: CommitTransactionConfig): ResultStreamObserver {
77+
throw new Error('Not implemented')
9378
}
9479

95-
/**
96-
* Write a message to the network channel.
97-
* @param {RequestMessage} message the message to write.
98-
* @param {ResultStreamObserver} observer the response observer.
99-
* @param {boolean} flush `true` if flush should happen after the message is written to the buffer.
100-
*/
101-
write (message: any, observer: any, flush: boolean): void {
102-
throw Error('Not implemented')
80+
rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver {
81+
throw new Error('Not implemented')
10382
}
10483

105-
/**
106-
* Send a RESET-message to the database. Message is immediately flushed to the network.
107-
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
108-
*/
10984
resetAndFlush (): Promise<void> {
110-
throw Error('Not implemented')
85+
throw new Error('Not implemented')
11186
}
11287

113-
/**
114-
* Checks if there is an ongoing request being handled
115-
* @return {boolean} `true` if there is an ongoing request being handled
116-
*/
117-
hasOngoingObservableRequests (): boolean {
118-
throw Error('Not implemented')
88+
isOpen (): boolean {
89+
throw new Error('Not implemented')
11990
}
12091

121-
/**
122-
* Call close on the channel.
123-
* @returns {Promise<void>} - A promise that will be resolved when the connection is closed.
124-
*
125-
*/
126-
close (): Promise<void> {
127-
throw Error('Not implemented')
92+
getProtocolVersion (): number {
93+
throw new Error('Not implemented')
12894
}
12995

130-
/**
131-
* Called to release the connection
132-
*/
133-
_release (): Promise<void> {
134-
return Promise.resolve()
96+
hasOngoingObservableRequests (): boolean {
97+
throw new Error('Not implemented')
13598
}
13699
}
137100

138101
export default Connection
102+
103+
export type {
104+
BeginTransactionConfig,
105+
CommitTransactionConfig,
106+
RollbackConnectionConfig,
107+
RunQueryConfig
108+
}

packages/core/src/internal/connection-holder.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import { newError } from '../error'
2222
import { assertString } from './util'
2323
import Connection from '../connection'
24-
import { ACCESS_MODE_WRITE } from './constants'
24+
import { ACCESS_MODE_WRITE, AccessMode } from './constants'
2525
import { Bookmarks } from './bookmarks'
26-
import ConnectionProvider from '../connection-provider'
26+
import ConnectionProvider, { Releasable } from '../connection-provider'
2727
import { AuthToken } from '../types'
2828

2929
/**
@@ -77,12 +77,12 @@ interface ConnectionHolderInterface {
7777
* @private
7878
*/
7979
class ConnectionHolder implements ConnectionHolderInterface {
80-
private readonly _mode: string
80+
private readonly _mode: AccessMode
8181
private _database?: string
8282
private readonly _bookmarks: Bookmarks
8383
private readonly _connectionProvider?: ConnectionProvider
8484
private _referenceCount: number
85-
private _connectionPromise: Promise<Connection | null>
85+
private _connectionPromise: Promise<Connection & Releasable | null>
8686
private readonly _impersonatedUser?: string
8787
private readonly _getConnectionAcquistionBookmarks: () => Promise<Bookmarks>
8888
private readonly _onDatabaseNameResolved?: (databaseName?: string) => void
@@ -111,7 +111,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
111111
getConnectionAcquistionBookmarks,
112112
auth
113113
}: {
114-
mode?: string
114+
mode?: AccessMode
115115
database?: string
116116
bookmarks?: Bookmarks
117117
connectionProvider?: ConnectionProvider
@@ -133,7 +133,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
133133
this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty()))
134134
}
135135

136-
mode (): string | undefined {
136+
mode (): AccessMode | undefined {
137137
return this._mode
138138
}
139139

@@ -168,7 +168,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
168168
return true
169169
}
170170

171-
private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise<Connection | null> {
171+
private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise<Connection & Releasable | null> {
172172
return await connectionProvider.acquireConnection({
173173
accessMode: this._mode,
174174
database: this._database,
@@ -218,15 +218,15 @@ class ConnectionHolder implements ConnectionHolderInterface {
218218
*/
219219
private _releaseConnection (hasTx?: boolean): Promise<Connection | null> {
220220
this._connectionPromise = this._connectionPromise
221-
.then((connection?: Connection | null) => {
221+
.then((connection?: Connection & Releasable | null) => {
222222
if (connection != null) {
223223
if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) {
224224
return connection
225225
.resetAndFlush()
226226
.catch(ignoreError)
227-
.then(() => connection._release().then(() => null))
227+
.then(() => connection.release().then(() => null))
228228
}
229-
return connection._release().then(() => null)
229+
return connection.release().then(() => null)
230230
} else {
231231
return Promise.resolve(null)
232232
}

packages/core/src/internal/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ const BOLT_PROTOCOL_V5_1: number = 5.1
3838
const BOLT_PROTOCOL_V5_2: number = 5.2
3939
const BOLT_PROTOCOL_V5_3: number = 5.3
4040

41+
export type AccessMode = typeof ACCESS_MODE_READ | typeof ACCESS_MODE_WRITE
42+
4143
export {
4244
FETCH_ALL,
4345
ACCESS_MODE_READ,

packages/core/src/result.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ class Result<R extends RecordShape = RecordShape> implements Promise<QueryResult
513513
connectionHolder
514514
.releaseConnection()
515515
.then(() =>
516-
connection?.protocol()?.version
516+
connection?.getProtocolVersion()
517517
),
518518
// onRejected:
519519
_ => undefined

packages/core/src/session.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ class Session {
187187
const result = this._run(validatedQuery, params, async connection => {
188188
const bookmarks = await this._bookmarks()
189189
this._assertSessionIsOpen()
190-
return (connection as Connection).protocol().run(validatedQuery, params, {
190+
return (connection as Connection).run(validatedQuery, params, {
191191
bookmarks,
192192
txConfig: autoCommitTxConfig,
193193
mode: this._mode,

packages/core/src/transaction.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class Transaction {
139139
this._onConnection()
140140
if (connection != null) {
141141
this._bookmarks = await getBookmarks()
142-
return connection.protocol().beginTransaction({
142+
return connection.beginTransaction({
143143
bookmarks: this._bookmarks,
144144
txConfig,
145145
mode: this._connectionHolder.mode(),
@@ -150,13 +150,13 @@ class Transaction {
150150
if (events != null) {
151151
events.onError(error)
152152
}
153-
return this._onError(error)
153+
this._onError(error).catch(() => {})
154154
},
155155
afterComplete: (metadata: any) => {
156156
if (events != null) {
157157
events.onComplete(metadata)
158158
}
159-
return this._onComplete(metadata)
159+
this._onComplete(metadata)
160160
}
161161
})
162162
} else {
@@ -364,7 +364,7 @@ const _states = {
364364
}
365365
},
366366
run: (
367-
query: Query,
367+
query: string,
368368
parameters: any,
369369
{
370370
connectionHolder,
@@ -388,7 +388,7 @@ const _states = {
388388
.then(conn => {
389389
onConnection()
390390
if (conn != null) {
391-
return conn.protocol().run(query, parameters, {
391+
return conn.run(query, parameters, {
392392
bookmarks: Bookmarks.empty(),
393393
txConfig: TxConfig.empty(),
394394
beforeError: onError,
@@ -643,12 +643,12 @@ function finishTransaction (
643643
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
644644
if (connection != null) {
645645
if (commit) {
646-
return connection.protocol().commitTransaction({
646+
return connection.commitTransaction({
647647
beforeError: onError,
648648
afterComplete: onComplete
649649
})
650650
} else {
651-
return connection.protocol().rollbackTransaction({
651+
return connection.rollbackTransaction({
652652
beforeError: onError,
653653
afterComplete: onComplete
654654
})

0 commit comments

Comments
 (0)