17
17
* limitations under the License.
18
18
*/
19
19
import { newError , PROTOCOL_ERROR } from '../../error'
20
+ import BoltProtocol from '../bolt-protocol-v1'
20
21
21
22
// Signature bytes for each response message type
22
23
const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
@@ -32,16 +33,48 @@ const NO_OP_OBSERVER = {
32
33
onError : NO_OP
33
34
}
34
35
36
+ /**
37
+ * Treat the protocol responses and notify the observers
38
+ */
35
39
export default class ResponseHandler {
36
- constructor ( { protocol, channel, dechunker, log, connection } = { } ) {
40
+ /**
41
+ * Called when something went wrong with the connectio
42
+ * @callback ResponseHandler~Observer~OnError
43
+ * @param {any } error The error
44
+ */
45
+ /**
46
+ * @typedef {Object } ResponseHandler~Observer
47
+ * @property {ResponseHandler~Observer~OnError } onError Invoke when a connection error occurs
48
+ * @property {ResponseHandler~Observer~OnError } onFailure Invoke when a protocol failure occurs
49
+ */
50
+ /**
51
+ * Constructor
52
+ * @param {Object } param The params
53
+ * @param {BoltProtocol } protocol Bolt protocol used to parse message (to be removed)
54
+ * @param {Channel } channel The channel used to exchange messages
55
+ * @param {Logger } log The logger
56
+ * @param {ResponseHandler~Observer } observer Object which will be notified about errors
57
+ */
58
+ constructor ( {
59
+ protocol,
60
+ channel,
61
+ dechunker,
62
+ log,
63
+ observer,
64
+ connection
65
+ } = { } ) {
37
66
this . _pendingObservers = [ ]
38
67
this . _isBroken = false
39
68
this . _log = log
40
69
this . _protocol = protocol // should gone from here
41
70
this . _connection = connection // should gone from here
71
+ this . _observer = Object . assign (
72
+ { onError : NO_OP , onFailure : NO_OP } ,
73
+ observer
74
+ )
42
75
43
76
// reset the error handler to just handle errors and forget about the handshake promise
44
- channel . onerror = connection . _handleFatalError . bind ( connection )
77
+ channel . onerror = this . _observer . onError . bind ( this . _observer )
45
78
46
79
// Ok, protocol running. Simply forward all messages to the dechunker
47
80
channel . onmessage = buf => dechunker . write ( buf )
@@ -53,11 +86,11 @@ export default class ResponseHandler {
53
86
}
54
87
55
88
_handleMessage ( msg ) {
56
- if ( this . _connection . _isBroken ) {
57
- // ignore all incoming messages when this connection is broken. all previously pending observers failed
58
- // with the fatal error. all future observers will fail with same fatal error.
59
- return
60
- }
89
+ // if (this._connection._isBroken) {
90
+ // ignore all incoming messages when this connection is broken. all previously pending observers failed
91
+ // with the fatal error. all future observers will fail with same fatal error.
92
+ // return
93
+ // }
61
94
62
95
const payload = msg . fields [ 0 ]
63
96
@@ -93,7 +126,7 @@ export default class ResponseHandler {
93
126
} finally {
94
127
this . _updateCurrentObserver ( )
95
128
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
96
- this . _connection . _resetOnFailure ( )
129
+ this . _observer . onFailure ( this . _currentFailure )
97
130
}
98
131
break
99
132
case IGNORED :
@@ -113,7 +146,7 @@ export default class ResponseHandler {
113
146
}
114
147
break
115
148
default :
116
- this . _connection . _handleFatalError (
149
+ this . _observer . onError (
117
150
newError ( 'Unknown Bolt protocol message: ' + msg )
118
151
)
119
152
}
@@ -128,12 +161,6 @@ export default class ResponseHandler {
128
161
}
129
162
130
163
_queueObserver ( observer ) {
131
- if ( this . _connection . _isBroken ) {
132
- if ( observer && observer . onError ) {
133
- observer . onError ( this . _error )
134
- }
135
- return false
136
- }
137
164
observer = observer || NO_OP_OBSERVER
138
165
observer . onCompleted = observer . onCompleted || NO_OP
139
166
observer . onError = observer . onError || NO_OP
0 commit comments