Skip to content

Commit deb6a32

Browse files
committed
ResponseHandler: parse response and control observers
1 parent bd7a203 commit deb6a32

File tree

3 files changed

+179
-125
lines changed

3 files changed

+179
-125
lines changed

src/internal/bolt/index.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import handshake from './handshake'
22
import create from './create'
3+
import ResponseHandler from './response-handler'
34

45
export default {
56
handshake,
6-
create
7+
create,
8+
ResponseHandler
79
}

src/internal/bolt/response-handler.js

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import { newError, PROTOCOL_ERROR } from '../../error'
20+
21+
// Signature bytes for each response message type
22+
const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
23+
const RECORD = 0x71 // 0111 0001 // RECORD <value>
24+
const IGNORED = 0x7e // 0111 1110 // IGNORED <metadata>
25+
const FAILURE = 0x7f // 0111 1111 // FAILURE <metadata>
26+
27+
function NO_OP () {}
28+
29+
const NO_OP_OBSERVER = {
30+
onNext: NO_OP,
31+
onCompleted: NO_OP,
32+
onError: NO_OP
33+
}
34+
35+
export default class ResponseHandler {
36+
constructor ({ protocol, channel, dechunker, log, connection } = {}) {
37+
this._pendingObservers = []
38+
this._isBroken = false
39+
this._log = log
40+
this._protocol = protocol // should gone from here
41+
this._connection = connection // should gone from here
42+
43+
// reset the error handler to just handle errors and forget about the handshake promise
44+
channel.onerror = connection._handleFatalError.bind(connection)
45+
46+
// Ok, protocol running. Simply forward all messages to the dechunker
47+
channel.onmessage = buf => dechunker.write(buf)
48+
49+
// setup dechunker to dechunk messages and forward them to the message handler
50+
dechunker.onmessage = buf => {
51+
this._handleMessage(protocol.unpacker().unpack(buf))
52+
}
53+
}
54+
55+
_handleMessage (msg) {
56+
if (this._connection._isBroken) {
57+
// ignore all incoming messages when this connection is broken. all previously pending observers failed
58+
// with the fatal error. all future observers will fail with same fatal error.
59+
return
60+
}
61+
62+
const payload = msg.fields[0]
63+
64+
switch (msg.signature) {
65+
case RECORD:
66+
if (this._log.isDebugEnabled()) {
67+
this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`)
68+
}
69+
this._currentObserver.onNext(payload)
70+
break
71+
case SUCCESS:
72+
if (this._log.isDebugEnabled()) {
73+
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`)
74+
}
75+
try {
76+
const metadata = this._protocol.transformMetadata(payload)
77+
this._currentObserver.onCompleted(metadata)
78+
} finally {
79+
this._updateCurrentObserver()
80+
}
81+
break
82+
case FAILURE:
83+
if (this._log.isDebugEnabled()) {
84+
this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`)
85+
}
86+
try {
87+
const error = newError(payload.message, payload.code)
88+
this._currentFailure = this._connection.handleAndTransformError(
89+
error,
90+
this._address
91+
)
92+
this._currentObserver.onError(this._currentFailure)
93+
} finally {
94+
this._updateCurrentObserver()
95+
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
96+
this._connection._resetOnFailure()
97+
}
98+
break
99+
case IGNORED:
100+
if (this._log.isDebugEnabled()) {
101+
this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`)
102+
}
103+
try {
104+
if (this._currentFailure && this._currentObserver.onError) {
105+
this._currentObserver.onError(this._currentFailure)
106+
} else if (this._currentObserver.onError) {
107+
this._currentObserver.onError(
108+
newError('Ignored either because of an error or RESET')
109+
)
110+
}
111+
} finally {
112+
this._updateCurrentObserver()
113+
}
114+
break
115+
default:
116+
this._connection._handleFatalError(
117+
newError('Unknown Bolt protocol message: ' + msg)
118+
)
119+
}
120+
}
121+
122+
/*
123+
* Pop next pending observer form the list of observers and make it current observer.
124+
* @protected
125+
*/
126+
_updateCurrentObserver () {
127+
this._currentObserver = this._pendingObservers.shift()
128+
}
129+
130+
_queueObserver (observer) {
131+
if (this._connection._isBroken) {
132+
if (observer && observer.onError) {
133+
observer.onError(this._error)
134+
}
135+
return false
136+
}
137+
observer = observer || NO_OP_OBSERVER
138+
observer.onCompleted = observer.onCompleted || NO_OP
139+
observer.onError = observer.onError || NO_OP
140+
observer.onNext = observer.onNext || NO_OP
141+
if (this._currentObserver === undefined) {
142+
this._currentObserver = observer
143+
} else {
144+
this._pendingObservers.push(observer)
145+
}
146+
return true
147+
}
148+
149+
_notifyErrorToObservers (error) {
150+
if (this._currentObserver && this._currentObserver.onError) {
151+
this._currentObserver.onError(error)
152+
}
153+
while (this._pendingObservers.length > 0) {
154+
const observer = this._pendingObservers.shift()
155+
if (observer && observer.onError) {
156+
observer.onError(error)
157+
}
158+
}
159+
}
160+
}

src/internal/connection-channel.js

Lines changed: 16 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,6 @@ import { ResultStreamObserver } from './stream-observers'
2727
import buffer from 'vinyl-buffer'
2828
import Bolt from './bolt'
2929

30-
// Signature bytes for each response message type
31-
const SUCCESS = 0x70 // 0111 0000 // SUCCESS <metadata>
32-
const RECORD = 0x71 // 0111 0001 // RECORD <value>
33-
const IGNORED = 0x7e // 0111 1110 // IGNORED <metadata>
34-
const FAILURE = 0x7f // 0111 1111 // FAILURE <metadata>
35-
36-
function NO_OP () {}
37-
38-
const NO_OP_OBSERVER = {
39-
onNext: NO_OP,
40-
onCompleted: NO_OP,
41-
onError: NO_OP
42-
}
43-
4430
let idGenerator = 0
4531

4632
/**
@@ -79,8 +65,7 @@ export function createChannelConnection (
7965
config.disableLosslessIntegers,
8066
serversideRouting,
8167
{
82-
newChunker: () => chunker,
83-
newDechunker: () => dechunker
68+
newChunker: () => chunker
8469
}
8570
)
8671

@@ -94,16 +79,15 @@ export function createChannelConnection (
9479

9580
connection._protocol = protocol
9681

97-
// reset the error handler to just handle errors and forget about the handshake promise
98-
channel.onerror = connection._handleFatalError.bind(connection)
99-
100-
// Ok, protocol running. Simply forward all messages to the dechunker
101-
channel.onmessage = buf => dechunker.write(buf)
82+
const responseHandler = new Bolt.ResponseHandler({
83+
protocol,
84+
channel,
85+
dechunker,
86+
log,
87+
connection
88+
})
10289

103-
// setup dechunker to dechunk messages and forward them to the message handler
104-
dechunker.onmessage = buf => {
105-
connection._handleMessage(protocol.unpacker().unpack(buf))
106-
}
90+
connection._responseHandler = responseHandler
10791

10892
// forward all pending bytes to the dechunker
10993
consumeRemainingBuffer(buffer => dechunker.write(buffer))
@@ -133,10 +117,7 @@ export default class ChannelConnection extends Connection {
133117
log,
134118
disableLosslessIntegers = false,
135119
serversideRouting = null,
136-
{
137-
newChunker = channel => new Chunker(channel),
138-
newDechunker = () => new Dechunker()
139-
} = {}
120+
{ newChunker = channel => new Chunker(channel) } = {}
140121
) {
141122
super(errorHandler)
142123

@@ -148,7 +129,6 @@ export default class ChannelConnection extends Connection {
148129
this._pendingObservers = []
149130
this._currentObserver = undefined
150131
this._ch = channel
151-
this._dechunker = newDechunker()
152132
this._chunker = newChunker(channel)
153133
this._log = log
154134
this._serversideRouting = serversideRouting
@@ -163,6 +143,8 @@ export default class ChannelConnection extends Connection {
163143
*/
164144
this._protocol = null
165145

146+
this._responseHandler = null
147+
166148
// error extracted from a FAILURE message
167149
this._currentFailure = null
168150

@@ -275,7 +257,7 @@ export default class ChannelConnection extends Connection {
275257
* @param {boolean} flush `true` if flush should happen after the message is written to the buffer.
276258
*/
277259
write (message, observer, flush) {
278-
const queued = this._queueObserver(observer)
260+
const queued = this._responseHandler._queueObserver(observer)
279261

280262
if (queued) {
281263
if (this._log.isDebugEnabled()) {
@@ -312,82 +294,7 @@ export default class ChannelConnection extends Connection {
312294
)
313295
}
314296

315-
if (this._currentObserver && this._currentObserver.onError) {
316-
this._currentObserver.onError(this._error)
317-
}
318-
while (this._pendingObservers.length > 0) {
319-
const observer = this._pendingObservers.shift()
320-
if (observer && observer.onError) {
321-
observer.onError(this._error)
322-
}
323-
}
324-
}
325-
326-
_handleMessage (msg) {
327-
if (this._isBroken) {
328-
// ignore all incoming messages when this connection is broken. all previously pending observers failed
329-
// with the fatal error. all future observers will fail with same fatal error.
330-
return
331-
}
332-
333-
const payload = msg.fields[0]
334-
335-
switch (msg.signature) {
336-
case RECORD:
337-
if (this._log.isDebugEnabled()) {
338-
this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`)
339-
}
340-
this._currentObserver.onNext(payload)
341-
break
342-
case SUCCESS:
343-
if (this._log.isDebugEnabled()) {
344-
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`)
345-
}
346-
try {
347-
const metadata = this._protocol.transformMetadata(payload)
348-
this._currentObserver.onCompleted(metadata)
349-
} finally {
350-
this._updateCurrentObserver()
351-
}
352-
break
353-
case FAILURE:
354-
if (this._log.isDebugEnabled()) {
355-
this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`)
356-
}
357-
try {
358-
const error = newError(payload.message, payload.code)
359-
this._currentFailure = this.handleAndTransformError(
360-
error,
361-
this._address
362-
)
363-
this._currentObserver.onError(this._currentFailure)
364-
} finally {
365-
this._updateCurrentObserver()
366-
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
367-
this._resetOnFailure()
368-
}
369-
break
370-
case IGNORED:
371-
if (this._log.isDebugEnabled()) {
372-
this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`)
373-
}
374-
try {
375-
if (this._currentFailure && this._currentObserver.onError) {
376-
this._currentObserver.onError(this._currentFailure)
377-
} else if (this._currentObserver.onError) {
378-
this._currentObserver.onError(
379-
newError('Ignored either because of an error or RESET')
380-
)
381-
}
382-
} finally {
383-
this._updateCurrentObserver()
384-
}
385-
break
386-
default:
387-
this._handleFatalError(
388-
newError('Unknown Bolt protocol message: ' + msg)
389-
)
390-
}
297+
this._responseHandler._notifyErrorToObservers(this._error)
391298
}
392299

393300
/**
@@ -427,30 +334,15 @@ export default class ChannelConnection extends Connection {
427334
}
428335

429336
_queueObserver (observer) {
430-
if (this._isBroken) {
431-
if (observer && observer.onError) {
432-
observer.onError(this._error)
433-
}
434-
return false
435-
}
436-
observer = observer || NO_OP_OBSERVER
437-
observer.onCompleted = observer.onCompleted || NO_OP
438-
observer.onError = observer.onError || NO_OP
439-
observer.onNext = observer.onNext || NO_OP
440-
if (this._currentObserver === undefined) {
441-
this._currentObserver = observer
442-
} else {
443-
this._pendingObservers.push(observer)
444-
}
445-
return true
337+
return this._responseHandler._queueObserver(observer)
446338
}
447339

448340
/*
449341
* Pop next pending observer form the list of observers and make it current observer.
450342
* @protected
451343
*/
452344
_updateCurrentObserver () {
453-
this._currentObserver = this._pendingObservers.shift()
345+
this._responseHandler._updateCurrentObserver()
454346
}
455347

456348
/** Check if this connection is in working condition */

0 commit comments

Comments
 (0)