Skip to content

Commit dccc3b4

Browse files
committed
Improve connection/protocol layers
The implementation of the connection channel, the protocol and handshake are dependent in the other in cross mutual dependency situation. This cross dependency situation could leave to unexpected behaviours when something change on this layers. The solution for this issue was separate the bolt protocol with it handshake process, creation, observers and so on in a different package to hide the details and make easier to spot the unwanted dependencies. The following changes were done to achieve these goals: * [x] Extract a factory for the connection removing the create method * [x] Extract the handshake process and separate in two, the handshake message exchange and the protocol initialisation * [x] Remove the response parsing and response observers management from the connection and put this in the bolt package (`response-handler.js`) * [x] Move the message write responsibility to the protocol object * [x] Remove connection dependency from the stream-observers * [x] Fix the connection integration tests * [x] Improve the unit tests * [x] Do not touch or break test kit tests and stress tests
1 parent 4feab7c commit dccc3b4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1837
-1161
lines changed

src/driver.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import {
3030
} from './internal/pool-config'
3131
import Session from './session'
3232
import RxSession from './session-rx'
33-
import { ALL } from './internal/request-message'
33+
import { FETCH_ALL } from './internal/bolt'
3434
import { ENCRYPTION_ON, ENCRYPTION_OFF } from './internal/util'
3535

3636
const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour
@@ -182,7 +182,7 @@ class Driver {
182182
* @param {string|string[]} param.bookmarks - The initial reference or references to some previous
183183
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
184184
* @param {number} param.fetchSize - The record fetch size of each batch of this session.
185-
* Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config.
185+
* Use {@link FETCH_ALL} to always pull all records in one batch. This will override the config value set on driver config.
186186
* @param {string} param.database - The database this session will operate on.
187187
* @return {Session} new session.
188188
*/
@@ -369,12 +369,11 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) {
369369
*/
370370
function validateFetchSizeValue (rawValue, defaultWhenAbsent) {
371371
const fetchSize = parseInt(rawValue, 10)
372-
if (fetchSize > 0 || fetchSize === ALL) {
372+
if (fetchSize > 0 || fetchSize === FETCH_ALL) {
373373
return fetchSize
374374
} else if (fetchSize === 0 || fetchSize < 0) {
375375
throw new Error(
376-
'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' +
377-
fetchSize
376+
`The fetch size can only be a positive value or ${FETCH_ALL} for ALL. However fetchSize = ${fetchSize}`
378377
)
379378
} else {
380379
return defaultWhenAbsent

src/internal/bolt-protocol-util.js renamed to src/internal/bolt/bolt-protocol-util.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,23 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
import { newError } from '../error'
19+
import { newError } from '../../error'
2020
import { ResultStreamObserver } from './stream-observers'
2121

2222
/**
2323
* @param {TxConfig} txConfig the auto-commit transaction configuration.
24-
* @param {Connection} connection the connection.
24+
* @param {function(error: string)} onProtocolError called when the txConfig is not empty.
2525
* @param {ResultStreamObserver} observer the response observer.
2626
*/
27-
function assertTxConfigIsEmpty (txConfig, connection, observer) {
27+
function assertTxConfigIsEmpty (txConfig, onProtocolError = () => {}, observer) {
2828
if (txConfig && !txConfig.isEmpty()) {
2929
const error = newError(
3030
'Driver is connected to the database that does not support transaction configuration. ' +
3131
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'
3232
)
3333

3434
// unsupported API was used, consider this a fatal error for the current connection
35-
connection._handleFatalError(error)
35+
onProtocolError(error.message)
3636
observer.onError(error)
3737
throw error
3838
}
@@ -41,17 +41,17 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) {
4141
/**
4242
* Asserts that the passed-in database name is empty.
4343
* @param {string} database
44-
* @param {Connection} connection
44+
* @param {fuction(err: String)} onProtocolError Called when it doesn't have database set
4545
*/
46-
function assertDatabaseIsEmpty (database, connection, observer) {
46+
function assertDatabaseIsEmpty (database, onProtocolError = () => {}, observer) {
4747
if (database) {
4848
const error = newError(
4949
'Driver is connected to the database that does not support multiple databases. ' +
5050
'Please upgrade to neo4j 4.0.0 or later in order to use this functionality'
5151
)
5252

5353
// unsupported API was used, consider this a fatal error for the current connection
54-
connection._handleFatalError(error)
54+
onProtocolError(error.message)
5555
observer.onError(error)
5656
throw error
5757
}

src/internal/bolt-protocol-v1.js renamed to src/internal/bolt/bolt-protocol-v1.js

Lines changed: 156 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,56 @@ import {
2020
assertDatabaseIsEmpty,
2121
assertTxConfigIsEmpty
2222
} from './bolt-protocol-util'
23-
import Bookmark from './bookmark'
24-
import { Chunker } from './chunking'
25-
import Connection from './connection'
26-
import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from './constants'
27-
import * as v1 from './packstream-v1'
28-
import { Packer } from './packstream-v1'
23+
import Bookmark from '../bookmark'
24+
import { Chunker } from '../chunking'
25+
import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from '../constants'
26+
import Logger from '../logger'
27+
import * as v1 from '../packstream-v1'
28+
import { Packer } from '../packstream-v1'
2929
import RequestMessage from './request-message'
3030
import {
3131
LoginObserver,
3232
ResetObserver,
3333
ResultStreamObserver,
3434
StreamObserver
3535
} from './stream-observers'
36-
import TxConfig from './tx-config'
36+
import TxConfig from '../tx-config'
3737

3838
export default class BoltProtocol {
39+
/**
40+
* @callback CreateResponseHandler Creates the response handler
41+
* @param {BoltProtocol} protocol The bolt protocol
42+
* @returns {ResponseHandler} The response handler
43+
*/
44+
/**
45+
* @callback OnProtocolError Handles protocol error
46+
* @param {string} error The description
47+
*/
3948
/**
4049
* @constructor
41-
* @param {Connection} connection the connection.
50+
* @param {Object} server the server informatio.
4251
* @param {Chunker} chunker the chunker.
4352
* @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
53+
* @param {CreateResponseHandler} createResponseHandler Function which creates the response handler
54+
* @param {Logger} log the logger
55+
* @param {OnProtocolError} onProtocolError handles protocol errors
4456
*/
45-
constructor (connection, chunker, disableLosslessIntegers) {
46-
this._connection = connection
57+
constructor (
58+
server,
59+
chunker,
60+
disableLosslessIntegers,
61+
createResponseHandler = () => null,
62+
log,
63+
onProtocolError
64+
) {
65+
this._server = server || {}
66+
this._chunker = chunker
4767
this._packer = this._createPacker(chunker)
4868
this._unpacker = this._createUnpacker(disableLosslessIntegers)
69+
this._responseHandler = createResponseHandler(this)
70+
this._log = log
71+
this._onProtocolError = onProtocolError
72+
this._fatalError = null
4973
}
5074

5175
/**
@@ -91,16 +115,11 @@ export default class BoltProtocol {
91115
*/
92116
initialize ({ userAgent, authToken, onError, onComplete } = {}) {
93117
const observer = new LoginObserver({
94-
connection: this._connection,
95-
afterError: onError,
96-
afterComplete: onComplete
118+
onError: error => this._onLoginError(error, onError),
119+
onCompleted: metadata => this._onLoginCompleted(metadata, onComplete)
97120
})
98121

99-
this._connection.write(
100-
RequestMessage.init(userAgent, authToken),
101-
observer,
102-
true
103-
)
122+
this.write(RequestMessage.init(userAgent, authToken), observer, true)
104123

105124
return observer
106125
}
@@ -252,7 +271,7 @@ export default class BoltProtocol {
252271
} = {}
253272
) {
254273
const observer = new ResultStreamObserver({
255-
connection: this._connection,
274+
server: this._server,
256275
beforeKeys,
257276
afterKeys,
258277
beforeError,
@@ -262,16 +281,12 @@ export default class BoltProtocol {
262281
})
263282

264283
// bookmark and mode are ignored in this version of the protocol
265-
assertTxConfigIsEmpty(txConfig, this._connection, observer)
284+
assertTxConfigIsEmpty(txConfig, this._onProtocolError, observer)
266285
// passing in a database name on this protocol version throws an error
267-
assertDatabaseIsEmpty(database, this._connection, observer)
286+
assertDatabaseIsEmpty(database, this._onProtocolError, observer)
268287

269-
this._connection.write(
270-
RequestMessage.run(query, parameters),
271-
observer,
272-
false
273-
)
274-
this._connection.write(RequestMessage.pullAll(), observer, flush)
288+
this.write(RequestMessage.run(query, parameters), observer, false)
289+
this.write(RequestMessage.pullAll(), observer, flush)
275290

276291
return observer
277292
}
@@ -285,12 +300,12 @@ export default class BoltProtocol {
285300
*/
286301
reset ({ onError, onComplete } = {}) {
287302
const observer = new ResetObserver({
288-
connection: this._connection,
303+
onProtocolError: this._onProtocolError,
289304
onError,
290305
onComplete
291306
})
292307

293-
this._connection.write(RequestMessage.reset(), observer, true)
308+
this.write(RequestMessage.reset(), observer, true)
294309

295310
return observer
296311
}
@@ -302,4 +317,116 @@ export default class BoltProtocol {
302317
_createUnpacker (disableLosslessIntegers) {
303318
return new v1.Unpacker(disableLosslessIntegers)
304319
}
320+
321+
/**
322+
* Write a message to the network channel.
323+
* @param {RequestMessage} message the message to write.
324+
* @param {StreamObserver} observer the response observer.
325+
* @param {boolean} flush `true` if flush should happen after the message is written to the buffer.
326+
*/
327+
write (message, observer, flush) {
328+
const queued = this.queueObserverIfProtocolIsNotBroken(observer)
329+
330+
if (queued) {
331+
if (this._log.isDebugEnabled()) {
332+
this._log.debug(`${this} C: ${message}`)
333+
}
334+
335+
this.packer().packStruct(
336+
message.signature,
337+
message.fields.map(field => this.packer().packable(field))
338+
)
339+
340+
this._chunker.messageBoundary()
341+
342+
if (flush) {
343+
this._chunker.flush()
344+
}
345+
}
346+
}
347+
348+
/**
349+
* Notifies faltal erros to the observers and mark the protocol in the fatal error state.
350+
* @param {Error} error The error
351+
*/
352+
notifyFatalError (error) {
353+
this._fatalError = error
354+
return this._responseHandler._notifyErrorToObservers(error)
355+
}
356+
357+
/**
358+
* Updates the the current observer with the next one on the queue.
359+
*/
360+
updateCurrentObserver () {
361+
return this._responseHandler._updateCurrentObserver()
362+
}
363+
364+
/**
365+
* Checks if exist an ongoing observable requests
366+
* @return {boolean}
367+
*/
368+
hasOngoingObservableRequests () {
369+
return this._responseHandler.hasOngoingObservableRequests()
370+
}
371+
372+
/**
373+
* Enqueue the observer if the protocol is not broken.
374+
* In case it's broken, the observer will be notified about the error.
375+
*
376+
* @param {StreamObserver} observer The observer
377+
* @returns {boolean} if it was queued
378+
*/
379+
queueObserverIfProtocolIsNotBroken (observer) {
380+
if (this.isBroken()) {
381+
this.notifyFatalErrorToObserver(observer)
382+
return false
383+
}
384+
385+
return this._responseHandler._queueObserver(observer)
386+
}
387+
388+
/**
389+
* Veritfy the protocol is not broken.
390+
* @returns {boolean}
391+
*/
392+
isBroken () {
393+
return !!this._fatalError
394+
}
395+
396+
/**
397+
* Notifies the current fatal error to the observer
398+
*
399+
* @param {StreamObserver} observer The observer
400+
*/
401+
notifyFatalErrorToObserver (observer) {
402+
if (observer && observer.onError) {
403+
observer.onError(this._fatalError)
404+
}
405+
}
406+
407+
/**
408+
* Reset current failure on the observable response handler to null.
409+
*/
410+
resetFailure () {
411+
this._responseHandler._resetFailure()
412+
}
413+
414+
_onLoginCompleted (metadata, onCompleted) {
415+
if (metadata) {
416+
const serverVersion = metadata.server
417+
if (!this._server.version) {
418+
this._server.version = serverVersion
419+
}
420+
}
421+
if (onCompleted) {
422+
onCompleted(metadata)
423+
}
424+
}
425+
426+
_onLoginError (error, onError) {
427+
this._onProtocolError(error.message)
428+
if (onError) {
429+
onError(error)
430+
}
431+
}
305432
}

src/internal/bolt-protocol-v2.js renamed to src/internal/bolt/bolt-protocol-v2.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
* limitations under the License.
1818
*/
1919
import BoltProtocolV1 from './bolt-protocol-v1'
20-
import * as v2 from './packstream-v2'
21-
import { BOLT_PROTOCOL_V2 } from './constants'
20+
import * as v2 from '../packstream-v2'
21+
import { BOLT_PROTOCOL_V2 } from '../constants'
2222

2323
export default class BoltProtocol extends BoltProtocolV1 {
2424
_createPacker (chunker) {

0 commit comments

Comments
 (0)