@@ -20,32 +20,56 @@ import {
20
20
assertDatabaseIsEmpty ,
21
21
assertTxConfigIsEmpty
22
22
} from './bolt-protocol-util'
23
- import Bookmark from './bookmark'
24
- import { Chunker } from './chunking'
25
- import Connection from './connection '
26
- import { ACCESS_MODE_WRITE , BOLT_PROTOCOL_V1 } from './constants '
27
- import * as v1 from './packstream-v1'
28
- import { Packer } from './packstream-v1'
23
+ import Bookmark from '.. /bookmark'
24
+ import { Chunker } from '.. /chunking'
25
+ import { ACCESS_MODE_WRITE , BOLT_PROTOCOL_V1 } from '../constants '
26
+ import Logger from '../logger '
27
+ import * as v1 from '.. /packstream-v1'
28
+ import { Packer } from '.. /packstream-v1'
29
29
import RequestMessage from './request-message'
30
30
import {
31
31
LoginObserver ,
32
32
ResetObserver ,
33
33
ResultStreamObserver ,
34
34
StreamObserver
35
35
} from './stream-observers'
36
- import TxConfig from './tx-config'
36
+ import TxConfig from '.. /tx-config'
37
37
38
38
export default class BoltProtocol {
39
+ /**
40
+ * @callback CreateResponseHandler Creates the response handler
41
+ * @param {BoltProtocol } protocol The bolt protocol
42
+ * @returns {ResponseHandler } The response handler
43
+ */
44
+ /**
45
+ * @callback OnProtocolError Handles protocol error
46
+ * @param {string } error The description
47
+ */
39
48
/**
40
49
* @constructor
41
- * @param {Connection } connection the connection .
50
+ * @param {Object } server the server informatio .
42
51
* @param {Chunker } chunker the chunker.
43
52
* @param {boolean } disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
53
+ * @param {CreateResponseHandler } createResponseHandler Function which creates the response handler
54
+ * @param {Logger } log the logger
55
+ * @param {OnProtocolError } onProtocolError handles protocol errors
44
56
*/
45
- constructor ( connection , chunker , disableLosslessIntegers ) {
46
- this . _connection = connection
57
+ constructor (
58
+ server ,
59
+ chunker ,
60
+ disableLosslessIntegers ,
61
+ createResponseHandler = ( ) => null ,
62
+ log ,
63
+ onProtocolError
64
+ ) {
65
+ this . _server = server || { }
66
+ this . _chunker = chunker
47
67
this . _packer = this . _createPacker ( chunker )
48
68
this . _unpacker = this . _createUnpacker ( disableLosslessIntegers )
69
+ this . _responseHandler = createResponseHandler ( this )
70
+ this . _log = log
71
+ this . _onProtocolError = onProtocolError
72
+ this . _fatalError = null
49
73
}
50
74
51
75
/**
@@ -91,16 +115,11 @@ export default class BoltProtocol {
91
115
*/
92
116
initialize ( { userAgent, authToken, onError, onComplete } = { } ) {
93
117
const observer = new LoginObserver ( {
94
- connection : this . _connection ,
95
- afterError : onError ,
96
- afterComplete : onComplete
118
+ onError : error => this . _onLoginError ( error , onError ) ,
119
+ onCompleted : metadata => this . _onLoginCompleted ( metadata , onComplete )
97
120
} )
98
121
99
- this . _connection . write (
100
- RequestMessage . init ( userAgent , authToken ) ,
101
- observer ,
102
- true
103
- )
122
+ this . write ( RequestMessage . init ( userAgent , authToken ) , observer , true )
104
123
105
124
return observer
106
125
}
@@ -252,7 +271,7 @@ export default class BoltProtocol {
252
271
} = { }
253
272
) {
254
273
const observer = new ResultStreamObserver ( {
255
- connection : this . _connection ,
274
+ server : this . _server ,
256
275
beforeKeys,
257
276
afterKeys,
258
277
beforeError,
@@ -262,16 +281,12 @@ export default class BoltProtocol {
262
281
} )
263
282
264
283
// bookmark and mode are ignored in this version of the protocol
265
- assertTxConfigIsEmpty ( txConfig , this . _connection , observer )
284
+ assertTxConfigIsEmpty ( txConfig , this . _onProtocolError , observer )
266
285
// passing in a database name on this protocol version throws an error
267
- assertDatabaseIsEmpty ( database , this . _connection , observer )
286
+ assertDatabaseIsEmpty ( database , this . _onProtocolError , observer )
268
287
269
- this . _connection . write (
270
- RequestMessage . run ( query , parameters ) ,
271
- observer ,
272
- false
273
- )
274
- this . _connection . write ( RequestMessage . pullAll ( ) , observer , flush )
288
+ this . write ( RequestMessage . run ( query , parameters ) , observer , false )
289
+ this . write ( RequestMessage . pullAll ( ) , observer , flush )
275
290
276
291
return observer
277
292
}
@@ -285,12 +300,12 @@ export default class BoltProtocol {
285
300
*/
286
301
reset ( { onError, onComplete } = { } ) {
287
302
const observer = new ResetObserver ( {
288
- connection : this . _connection ,
303
+ onProtocolError : this . _onProtocolError ,
289
304
onError,
290
305
onComplete
291
306
} )
292
307
293
- this . _connection . write ( RequestMessage . reset ( ) , observer , true )
308
+ this . write ( RequestMessage . reset ( ) , observer , true )
294
309
295
310
return observer
296
311
}
@@ -302,4 +317,116 @@ export default class BoltProtocol {
302
317
_createUnpacker ( disableLosslessIntegers ) {
303
318
return new v1 . Unpacker ( disableLosslessIntegers )
304
319
}
320
+
321
+ /**
322
+ * Write a message to the network channel.
323
+ * @param {RequestMessage } message the message to write.
324
+ * @param {StreamObserver } observer the response observer.
325
+ * @param {boolean } flush `true` if flush should happen after the message is written to the buffer.
326
+ */
327
+ write ( message , observer , flush ) {
328
+ const queued = this . queueObserverIfProtocolIsNotBroken ( observer )
329
+
330
+ if ( queued ) {
331
+ if ( this . _log . isDebugEnabled ( ) ) {
332
+ this . _log . debug ( `${ this } C: ${ message } ` )
333
+ }
334
+
335
+ this . packer ( ) . packStruct (
336
+ message . signature ,
337
+ message . fields . map ( field => this . packer ( ) . packable ( field ) )
338
+ )
339
+
340
+ this . _chunker . messageBoundary ( )
341
+
342
+ if ( flush ) {
343
+ this . _chunker . flush ( )
344
+ }
345
+ }
346
+ }
347
+
348
+ /**
349
+ * Notifies faltal erros to the observers and mark the protocol in the fatal error state.
350
+ * @param {Error } error The error
351
+ */
352
+ notifyFatalError ( error ) {
353
+ this . _fatalError = error
354
+ return this . _responseHandler . _notifyErrorToObservers ( error )
355
+ }
356
+
357
+ /**
358
+ * Updates the the current observer with the next one on the queue.
359
+ */
360
+ updateCurrentObserver ( ) {
361
+ return this . _responseHandler . _updateCurrentObserver ( )
362
+ }
363
+
364
+ /**
365
+ * Checks if exist an ongoing observable requests
366
+ * @return {boolean }
367
+ */
368
+ hasOngoingObservableRequests ( ) {
369
+ return this . _responseHandler . hasOngoingObservableRequests ( )
370
+ }
371
+
372
+ /**
373
+ * Enqueue the observer if the protocol is not broken.
374
+ * In case it's broken, the observer will be notified about the error.
375
+ *
376
+ * @param {StreamObserver } observer The observer
377
+ * @returns {boolean } if it was queued
378
+ */
379
+ queueObserverIfProtocolIsNotBroken ( observer ) {
380
+ if ( this . isBroken ( ) ) {
381
+ this . notifyFatalErrorToObserver ( observer )
382
+ return false
383
+ }
384
+
385
+ return this . _responseHandler . _queueObserver ( observer )
386
+ }
387
+
388
+ /**
389
+ * Veritfy the protocol is not broken.
390
+ * @returns {boolean }
391
+ */
392
+ isBroken ( ) {
393
+ return ! ! this . _fatalError
394
+ }
395
+
396
+ /**
397
+ * Notifies the current fatal error to the observer
398
+ *
399
+ * @param {StreamObserver } observer The observer
400
+ */
401
+ notifyFatalErrorToObserver ( observer ) {
402
+ if ( observer && observer . onError ) {
403
+ observer . onError ( this . _fatalError )
404
+ }
405
+ }
406
+
407
+ /**
408
+ * Reset current failure on the observable response handler to null.
409
+ */
410
+ resetFailure ( ) {
411
+ this . _responseHandler . _resetFailure ( )
412
+ }
413
+
414
+ _onLoginCompleted ( metadata , onCompleted ) {
415
+ if ( metadata ) {
416
+ const serverVersion = metadata . server
417
+ if ( ! this . _server . version ) {
418
+ this . _server . version = serverVersion
419
+ }
420
+ }
421
+ if ( onCompleted ) {
422
+ onCompleted ( metadata )
423
+ }
424
+ }
425
+
426
+ _onLoginError ( error , onError ) {
427
+ this . _onProtocolError ( error . message )
428
+ if ( onError ) {
429
+ onError ( error )
430
+ }
431
+ }
305
432
}
0 commit comments