Skip to content

Commit a077aa4

Browse files
committed
broken: extract response handler from the connection
1 parent 71304dd commit a077aa4

File tree

7 files changed

+275
-621
lines changed

7 files changed

+275
-621
lines changed

src/internal/bolt/index.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import handshake from './handshake'
22
import create from './create'
3+
import ResponseHandler from './response-handler'
34

45
export default {
56
handshake,
6-
create
7+
create,
8+
ResponseHandler
79
}

src/internal/bolt/response-handler.js

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import { newError, PROTOCOL_ERROR } from '../../error'
20+
21+
// Signature bytes for each response message type
22+
const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
23+
const RECORD = 0x71 // 0111 0001 // RECORD <value>
24+
const IGNORED = 0x7e // 0111 1110 // IGNORED <metadata>
25+
const FAILURE = 0x7f // 0111 1111 // FAILURE <metadata>
26+
27+
function NO_OP () {}
28+
29+
const NO_OP_OBSERVER = {
30+
onNext: NO_OP,
31+
onCompleted: NO_OP,
32+
onError: NO_OP
33+
}
34+
35+
export default class ResponseHandler {
36+
constructor ({ protocol, channel, dechunker, log, errorHandler } = {}) {
37+
this._pendingObservers = []
38+
this._currentObserver = undefined
39+
this._isBroken = false
40+
this._log = log
41+
this._protocol = protocol // should gone from here
42+
this._errorHandler = errorHandler
43+
44+
// reset the error handler to just handle errors and forget about the handshake promise
45+
channel.onerror = this._handleFatalError.bind(this)
46+
47+
// Ok, protocol running. Simply forward all messages to the dechunker
48+
channel.onmessage = buf => dechunker.write(buf)
49+
50+
// setup dechunker to dechunk messages and forward them to the message handler
51+
dechunker.onmessage = buf => {
52+
this._handleMessage(protocol.unpacker().unpack(buf))
53+
}
54+
}
55+
56+
queueObserver (observer) {
57+
if (this._isBroken) {
58+
if (observer && observer.onError) {
59+
observer.onError(this._error)
60+
}
61+
return false
62+
}
63+
observer = observer || NO_OP_OBSERVER
64+
observer.onCompleted = observer.onCompleted || NO_OP
65+
observer.onError = observer.onError || NO_OP
66+
observer.onNext = observer.onNext || NO_OP
67+
if (this._currentObserver === undefined) {
68+
this._currentObserver = observer
69+
} else {
70+
this._pendingObservers.push(observer)
71+
}
72+
return true
73+
}
74+
75+
handleProtocolError (message) {
76+
this._currentFailure = null
77+
this._updateCurrentObserver()
78+
const error = newError(message, PROTOCOL_ERROR)
79+
this._handleFatalError(error)
80+
return error
81+
}
82+
83+
get isBroken () {
84+
return this._isBroken
85+
}
86+
87+
/*
88+
* Pop next pending observer form the list of observers and make it current observer.
89+
* @protected
90+
*/
91+
_updateCurrentObserver () {
92+
this._currentObserver = this._pendingObservers.shift()
93+
}
94+
95+
_handleMessage (msg) {
96+
if (this._isBroken) {
97+
// ignore all incoming messages when this connection is broken. all previously pending observers failed
98+
// with the fatal error. all future observers will fail with same fatal error.
99+
return
100+
}
101+
102+
const payload = msg.fields[0]
103+
104+
switch (msg.signature) {
105+
case RECORD:
106+
if (this._log.isDebugEnabled()) {
107+
this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`)
108+
}
109+
this._currentObserver.onNext(payload)
110+
break
111+
case SUCCESS:
112+
if (this._log.isDebugEnabled()) {
113+
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`)
114+
}
115+
try {
116+
const metadata = this._protocol.transformMetadata(payload)
117+
this._currentObserver.onCompleted(metadata)
118+
} finally {
119+
this._updateCurrentObserver()
120+
}
121+
break
122+
case FAILURE:
123+
if (this._log.isDebugEnabled()) {
124+
this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`)
125+
}
126+
try {
127+
const error = newError(payload.message, payload.code)
128+
this._currentFailure = this.handleAndTransformError(
129+
error,
130+
this._address
131+
)
132+
this._currentObserver.onError(this._currentFailure)
133+
} finally {
134+
this._updateCurrentObserver()
135+
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
136+
this._resetOnFailure()
137+
}
138+
break
139+
case IGNORED:
140+
if (this._log.isDebugEnabled()) {
141+
this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`)
142+
}
143+
try {
144+
if (this._currentFailure && this._currentObserver.onError) {
145+
this._currentObserver.onError(this._currentFailure)
146+
} else if (this._currentObserver.onError) {
147+
this._currentObserver.onError(
148+
newError('Ignored either because of an error or RESET')
149+
)
150+
}
151+
} finally {
152+
this._updateCurrentObserver()
153+
}
154+
break
155+
default:
156+
this._handleFatalError(
157+
newError('Unknown Bolt protocol message: ' + msg)
158+
)
159+
}
160+
}
161+
162+
/**
163+
* "Fatal" means the connection is dead. Only call this if something
164+
* happens that cannot be recovered from. This will lead to all subscribers
165+
* failing, and the connection getting ejected from the session pool.
166+
*
167+
* @param error an error object, forwarded to all current and future subscribers
168+
*/
169+
_handleFatalError (error) {
170+
this._isBroken = true
171+
this._error = this.handleAndTransformError(error, this._address)
172+
173+
if (this._log.isErrorEnabled()) {
174+
this._log.error(
175+
`${this} experienced a fatal error ${JSON.stringify(this._error)}`
176+
)
177+
}
178+
179+
if (this._currentObserver && this._currentObserver.onError) {
180+
this._currentObserver.onError(this._error)
181+
}
182+
while (this._pendingObservers.length > 0) {
183+
const observer = this._pendingObservers.shift()
184+
if (observer && observer.onError) {
185+
observer.onError(this._error)
186+
}
187+
}
188+
}
189+
190+
_resetOnFailure () {
191+
this._protocol.reset({
192+
onError: () => {
193+
this._currentFailure = null
194+
},
195+
onComplete: () => {
196+
this._currentFailure = null
197+
}
198+
})
199+
}
200+
201+
/**
202+
*
203+
* @param error
204+
* @param address
205+
* @returns {Neo4jError|*}
206+
*/
207+
handleAndTransformError (error, address) {
208+
if (this._errorHandler) {
209+
return this._errorHandler.handleAndTransformError(error, address)
210+
}
211+
212+
return error
213+
}
214+
}

0 commit comments

Comments
 (0)