diff --git a/src/driver.js b/src/driver.js index 657b8653c..360703319 100644 --- a/src/driver.js +++ b/src/driver.js @@ -30,7 +30,7 @@ import { } from './internal/pool-config' import Session from './session' import RxSession from './session-rx' -import { ALL } from './internal/request-message' +import { FETCH_ALL } from './internal/bolt' import { ENCRYPTION_ON, ENCRYPTION_OFF } from './internal/util' const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour @@ -182,7 +182,7 @@ class Driver { * @param {string|string[]} param.bookmarks - The initial reference or references to some previous * transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown. * @param {number} param.fetchSize - The record fetch size of each batch of this session. - * Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config. + * Use {@link FETCH_ALL} to always pull all records in one batch. This will override the config value set on driver config. * @param {string} param.database - The database this session will operate on. * @return {Session} new session. */ @@ -369,12 +369,11 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) { */ function validateFetchSizeValue (rawValue, defaultWhenAbsent) { const fetchSize = parseInt(rawValue, 10) - if (fetchSize > 0 || fetchSize === ALL) { + if (fetchSize > 0 || fetchSize === FETCH_ALL) { return fetchSize } else if (fetchSize === 0 || fetchSize < 0) { throw new Error( - 'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' + - fetchSize + `The fetch size can only be a positive value or ${FETCH_ALL} for ALL. However fetchSize = ${fetchSize}` ) } else { return defaultWhenAbsent diff --git a/src/internal/bolt-protocol-util.js b/src/internal/bolt/bolt-protocol-util.js similarity index 79% rename from src/internal/bolt-protocol-util.js rename to src/internal/bolt/bolt-protocol-util.js index 314ad483c..e1d152575 100644 --- a/src/internal/bolt-protocol-util.js +++ b/src/internal/bolt/bolt-protocol-util.js @@ -16,15 +16,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { newError } from '../error' +import { newError } from '../../error' import { ResultStreamObserver } from './stream-observers' /** * @param {TxConfig} txConfig the auto-commit transaction configuration. - * @param {Connection} connection the connection. + * @param {function(error: string)} onProtocolError called when the txConfig is not empty. * @param {ResultStreamObserver} observer the response observer. */ -function assertTxConfigIsEmpty (txConfig, connection, observer) { +function assertTxConfigIsEmpty (txConfig, onProtocolError = () => {}, observer) { if (txConfig && !txConfig.isEmpty()) { const error = newError( 'Driver is connected to the database that does not support transaction configuration. ' + @@ -32,7 +32,7 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) { ) // unsupported API was used, consider this a fatal error for the current connection - connection._handleFatalError(error) + onProtocolError(error.message) observer.onError(error) throw error } @@ -41,9 +41,9 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) { /** * Asserts that the passed-in database name is empty. * @param {string} database - * @param {Connection} connection + * @param {fuction(err: String)} onProtocolError Called when it doesn't have database set */ -function assertDatabaseIsEmpty (database, connection, observer) { +function assertDatabaseIsEmpty (database, onProtocolError = () => {}, observer) { if (database) { const error = newError( 'Driver is connected to the database that does not support multiple databases. ' + @@ -51,7 +51,7 @@ function assertDatabaseIsEmpty (database, connection, observer) { ) // unsupported API was used, consider this a fatal error for the current connection - connection._handleFatalError(error) + onProtocolError(error.message) observer.onError(error) throw error } diff --git a/src/internal/bolt-protocol-v1.js b/src/internal/bolt/bolt-protocol-v1.js similarity index 65% rename from src/internal/bolt-protocol-v1.js rename to src/internal/bolt/bolt-protocol-v1.js index 3a4206528..d345402be 100644 --- a/src/internal/bolt-protocol-v1.js +++ b/src/internal/bolt/bolt-protocol-v1.js @@ -20,12 +20,12 @@ import { assertDatabaseIsEmpty, assertTxConfigIsEmpty } from './bolt-protocol-util' -import Bookmark from './bookmark' -import { Chunker } from './chunking' -import Connection from './connection' -import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from './constants' -import * as v1 from './packstream-v1' -import { Packer } from './packstream-v1' +import Bookmark from '../bookmark' +import { Chunker } from '../chunking' +import { ACCESS_MODE_WRITE, BOLT_PROTOCOL_V1 } from '../constants' +import Logger from '../logger' +import * as v1 from '../packstream-v1' +import { Packer } from '../packstream-v1' import RequestMessage from './request-message' import { LoginObserver, @@ -33,19 +33,43 @@ import { ResultStreamObserver, StreamObserver } from './stream-observers' -import TxConfig from './tx-config' +import TxConfig from '../tx-config' export default class BoltProtocol { + /** + * @callback CreateResponseHandler Creates the response handler + * @param {BoltProtocol} protocol The bolt protocol + * @returns {ResponseHandler} The response handler + */ + /** + * @callback OnProtocolError Handles protocol error + * @param {string} error The description + */ /** * @constructor - * @param {Connection} connection the connection. + * @param {Object} server the server informatio. * @param {Chunker} chunker the chunker. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. + * @param {CreateResponseHandler} createResponseHandler Function which creates the response handler + * @param {Logger} log the logger + * @param {OnProtocolError} onProtocolError handles protocol errors */ - constructor (connection, chunker, disableLosslessIntegers) { - this._connection = connection + constructor ( + server, + chunker, + disableLosslessIntegers, + createResponseHandler = () => null, + log, + onProtocolError + ) { + this._server = server || {} + this._chunker = chunker this._packer = this._createPacker(chunker) this._unpacker = this._createUnpacker(disableLosslessIntegers) + this._responseHandler = createResponseHandler(this) + this._log = log + this._onProtocolError = onProtocolError + this._fatalError = null } /** @@ -91,16 +115,11 @@ export default class BoltProtocol { */ initialize ({ userAgent, authToken, onError, onComplete } = {}) { const observer = new LoginObserver({ - connection: this._connection, - afterError: onError, - afterComplete: onComplete + onError: error => this._onLoginError(error, onError), + onCompleted: metadata => this._onLoginCompleted(metadata, onComplete) }) - this._connection.write( - RequestMessage.init(userAgent, authToken), - observer, - true - ) + this.write(RequestMessage.init(userAgent, authToken), observer, true) return observer } @@ -252,7 +271,7 @@ export default class BoltProtocol { } = {} ) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeKeys, afterKeys, beforeError, @@ -262,16 +281,12 @@ export default class BoltProtocol { }) // bookmark and mode are ignored in this version of the protocol - assertTxConfigIsEmpty(txConfig, this._connection, observer) + assertTxConfigIsEmpty(txConfig, this._onProtocolError, observer) // passing in a database name on this protocol version throws an error - assertDatabaseIsEmpty(database, this._connection, observer) + assertDatabaseIsEmpty(database, this._onProtocolError, observer) - this._connection.write( - RequestMessage.run(query, parameters), - observer, - false - ) - this._connection.write(RequestMessage.pullAll(), observer, flush) + this.write(RequestMessage.run(query, parameters), observer, false) + this.write(RequestMessage.pullAll(), observer, flush) return observer } @@ -285,12 +300,12 @@ export default class BoltProtocol { */ reset ({ onError, onComplete } = {}) { const observer = new ResetObserver({ - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onComplete }) - this._connection.write(RequestMessage.reset(), observer, true) + this.write(RequestMessage.reset(), observer, true) return observer } @@ -302,4 +317,116 @@ export default class BoltProtocol { _createUnpacker (disableLosslessIntegers) { return new v1.Unpacker(disableLosslessIntegers) } + + /** + * Write a message to the network channel. + * @param {RequestMessage} message the message to write. + * @param {StreamObserver} observer the response observer. + * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. + */ + write (message, observer, flush) { + const queued = this.queueObserverIfProtocolIsNotBroken(observer) + + if (queued) { + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} C: ${message}`) + } + + this.packer().packStruct( + message.signature, + message.fields.map(field => this.packer().packable(field)) + ) + + this._chunker.messageBoundary() + + if (flush) { + this._chunker.flush() + } + } + } + + /** + * Notifies faltal erros to the observers and mark the protocol in the fatal error state. + * @param {Error} error The error + */ + notifyFatalError (error) { + this._fatalError = error + return this._responseHandler._notifyErrorToObservers(error) + } + + /** + * Updates the the current observer with the next one on the queue. + */ + updateCurrentObserver () { + return this._responseHandler._updateCurrentObserver() + } + + /** + * Checks if exist an ongoing observable requests + * @return {boolean} + */ + hasOngoingObservableRequests () { + return this._responseHandler.hasOngoingObservableRequests() + } + + /** + * Enqueue the observer if the protocol is not broken. + * In case it's broken, the observer will be notified about the error. + * + * @param {StreamObserver} observer The observer + * @returns {boolean} if it was queued + */ + queueObserverIfProtocolIsNotBroken (observer) { + if (this.isBroken()) { + this.notifyFatalErrorToObserver(observer) + return false + } + + return this._responseHandler._queueObserver(observer) + } + + /** + * Veritfy the protocol is not broken. + * @returns {boolean} + */ + isBroken () { + return !!this._fatalError + } + + /** + * Notifies the current fatal error to the observer + * + * @param {StreamObserver} observer The observer + */ + notifyFatalErrorToObserver (observer) { + if (observer && observer.onError) { + observer.onError(this._fatalError) + } + } + + /** + * Reset current failure on the observable response handler to null. + */ + resetFailure () { + this._responseHandler._resetFailure() + } + + _onLoginCompleted (metadata, onCompleted) { + if (metadata) { + const serverVersion = metadata.server + if (!this._server.version) { + this._server.version = serverVersion + } + } + if (onCompleted) { + onCompleted(metadata) + } + } + + _onLoginError (error, onError) { + this._onProtocolError(error.message) + if (onError) { + onError(error) + } + } } diff --git a/src/internal/bolt-protocol-v2.js b/src/internal/bolt/bolt-protocol-v2.js similarity index 92% rename from src/internal/bolt-protocol-v2.js rename to src/internal/bolt/bolt-protocol-v2.js index fabb602ad..533360702 100644 --- a/src/internal/bolt-protocol-v2.js +++ b/src/internal/bolt/bolt-protocol-v2.js @@ -17,8 +17,8 @@ * limitations under the License. */ import BoltProtocolV1 from './bolt-protocol-v1' -import * as v2 from './packstream-v2' -import { BOLT_PROTOCOL_V2 } from './constants' +import * as v2 from '../packstream-v2' +import { BOLT_PROTOCOL_V2 } from '../constants' export default class BoltProtocol extends BoltProtocolV1 { _createPacker (chunker) { diff --git a/src/internal/bolt-protocol-v3.js b/src/internal/bolt/bolt-protocol-v3.js similarity index 84% rename from src/internal/bolt-protocol-v3.js rename to src/internal/bolt/bolt-protocol-v3.js index 3a28346e2..2b4604001 100644 --- a/src/internal/bolt-protocol-v3.js +++ b/src/internal/bolt/bolt-protocol-v3.js @@ -25,9 +25,9 @@ import { ResultStreamObserver, ProcedureRouteObserver } from './stream-observers' -import { BOLT_PROTOCOL_V3 } from './constants' -import Bookmark from './bookmark' -import TxConfig from './tx-config' +import { BOLT_PROTOCOL_V3 } from '../constants' +import Bookmark from '../bookmark' +import TxConfig from '../tx-config' const CONTEXT = 'context' const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})` @@ -56,22 +56,17 @@ export default class BoltProtocol extends BoltProtocolV2 { initialize ({ userAgent, authToken, onError, onComplete } = {}) { const observer = new LoginObserver({ - connection: this._connection, - afterError: onError, - afterComplete: onComplete + onError: error => this._onLoginError(error, onError), + onCompleted: metadata => this._onLoginCompleted(metadata, onComplete) }) - this._connection.write( - RequestMessage.hello(userAgent, authToken), - observer, - true - ) + this.write(RequestMessage.hello(userAgent, authToken), observer, true) return observer } prepareToClose () { - this._connection.write(RequestMessage.goodbye(), noOpObserver, true) + this.write(RequestMessage.goodbye(), noOpObserver, true) } beginTransaction ({ @@ -85,7 +80,7 @@ export default class BoltProtocol extends BoltProtocolV2 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -94,9 +89,9 @@ export default class BoltProtocol extends BoltProtocolV2 { observer.prepareToHandleSingleResponse() // passing in a database name on this protocol version throws an error - assertDatabaseIsEmpty(database, this._connection, observer) + assertDatabaseIsEmpty(database, this._onProtocolError, observer) - this._connection.write( + this.write( RequestMessage.begin({ bookmark, txConfig, mode }), observer, true @@ -112,7 +107,7 @@ export default class BoltProtocol extends BoltProtocolV2 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -120,7 +115,7 @@ export default class BoltProtocol extends BoltProtocolV2 { }) observer.prepareToHandleSingleResponse() - this._connection.write(RequestMessage.commit(), observer, true) + this.write(RequestMessage.commit(), observer, true) return observer } @@ -132,7 +127,7 @@ export default class BoltProtocol extends BoltProtocolV2 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -140,7 +135,7 @@ export default class BoltProtocol extends BoltProtocolV2 { }) observer.prepareToHandleSingleResponse() - this._connection.write(RequestMessage.rollback(), observer, true) + this.write(RequestMessage.rollback(), observer, true) return observer } @@ -163,7 +158,7 @@ export default class BoltProtocol extends BoltProtocolV2 { } = {} ) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeKeys, afterKeys, beforeError, @@ -173,9 +168,9 @@ export default class BoltProtocol extends BoltProtocolV2 { }) // passing in a database name on this protocol version throws an error - assertDatabaseIsEmpty(database, this._connection, observer) + assertDatabaseIsEmpty(database, this._onProtocolError, observer) - this._connection.write( + this.write( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -184,7 +179,7 @@ export default class BoltProtocol extends BoltProtocolV2 { observer, false ) - this._connection.write(RequestMessage.pullAll(), observer, flush) + this.write(RequestMessage.pullAll(), observer, flush) return observer } @@ -218,7 +213,7 @@ export default class BoltProtocol extends BoltProtocolV2 { return new ProcedureRouteObserver({ resultObserver, - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onCompleted }) diff --git a/src/internal/bolt-protocol-v4x0.js b/src/internal/bolt/bolt-protocol-v4x0.js similarity index 83% rename from src/internal/bolt-protocol-v4x0.js rename to src/internal/bolt/bolt-protocol-v4x0.js index 31ebddd22..cad0642d3 100644 --- a/src/internal/bolt-protocol-v4x0.js +++ b/src/internal/bolt/bolt-protocol-v4x0.js @@ -22,9 +22,9 @@ import { ResultStreamObserver, ProcedureRouteObserver } from './stream-observers' -import { BOLT_PROTOCOL_V4_0 } from './constants' -import Bookmark from './bookmark' -import TxConfig from './tx-config' +import { BOLT_PROTOCOL_V4_0 } from '../constants' +import Bookmark from '../bookmark' +import TxConfig from '../tx-config' const CONTEXT = 'context' const DATABASE = 'database' @@ -46,7 +46,7 @@ export default class BoltProtocol extends BoltProtocolV3 { afterComplete } = {}) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, beforeError, afterError, beforeComplete, @@ -54,7 +54,7 @@ export default class BoltProtocol extends BoltProtocolV3 { }) observer.prepareToHandleSingleResponse() - this._connection.write( + this.write( RequestMessage.begin({ bookmark, txConfig, database, mode }), observer, true @@ -83,11 +83,11 @@ export default class BoltProtocol extends BoltProtocolV3 { } = {} ) { const observer = new ResultStreamObserver({ - connection: this._connection, + server: this._server, reactive: reactive, fetchSize: fetchSize, - moreFunction: this._requestMore, - discardFunction: this._requestDiscard, + moreFunction: this._requestMore.bind(this), + discardFunction: this._requestDiscard.bind(this), beforeKeys, afterKeys, beforeError, @@ -97,7 +97,7 @@ export default class BoltProtocol extends BoltProtocolV3 { }) const flushRun = reactive - this._connection.write( + this.write( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -109,22 +109,18 @@ export default class BoltProtocol extends BoltProtocolV3 { ) if (!reactive) { - this._connection.write( - RequestMessage.pull({ n: fetchSize }), - observer, - flush - ) + this.write(RequestMessage.pull({ n: fetchSize }), observer, flush) } return observer } - _requestMore (connection, stmtId, n, observer) { - connection.write(RequestMessage.pull({ stmtId, n }), observer, true) + _requestMore (stmtId, n, observer) { + this.write(RequestMessage.pull({ stmtId, n }), observer, true) } - _requestDiscard (connection, stmtId, observer) { - connection.write(RequestMessage.discard({ stmtId }), observer, true) + _requestDiscard (stmtId, observer) { + this.write(RequestMessage.discard({ stmtId }), observer, true) } _noOp () {} @@ -163,7 +159,7 @@ export default class BoltProtocol extends BoltProtocolV3 { return new ProcedureRouteObserver({ resultObserver, - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onCompleted }) diff --git a/src/internal/bolt-protocol-v4x1.js b/src/internal/bolt/bolt-protocol-v4x1.js similarity index 67% rename from src/internal/bolt-protocol-v4x1.js rename to src/internal/bolt/bolt-protocol-v4x1.js index 3092bf572..4ca7d322b 100644 --- a/src/internal/bolt-protocol-v4x1.js +++ b/src/internal/bolt/bolt-protocol-v4x1.js @@ -18,19 +18,37 @@ */ import BoltProtocolV4 from './bolt-protocol-v4x0' import RequestMessage, { ALL } from './request-message' -import { BOLT_PROTOCOL_V4_1 } from './constants' +import { BOLT_PROTOCOL_V4_1 } from '../constants' import { LoginObserver } from './stream-observers' export default class BoltProtocol extends BoltProtocolV4 { /** * @constructor - * @param {Connection} connection the connection. + * @param {Object} server the server informatio. * @param {Chunker} chunker the chunker. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. + * @param {CreateResponseHandler} createResponseHandler Function which creates the response handler + * @param {Logger} log the logger * @param {Object} serversideRouting + * */ - constructor (connection, chunker, disableLosslessIntegers, serversideRouting) { - super(connection, chunker, disableLosslessIntegers) + constructor ( + server, + chunker, + disableLosslessIntegers, + createResponseHandler = () => null, + log, + onProtocolError, + serversideRouting + ) { + super( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) this._serversideRouting = serversideRouting } @@ -40,12 +58,11 @@ export default class BoltProtocol extends BoltProtocolV4 { initialize ({ userAgent, authToken, onError, onComplete } = {}) { const observer = new LoginObserver({ - connection: this._connection, - afterError: onError, - afterComplete: onComplete + onError: error => this._onLoginError(error, onError), + onCompleted: metadata => this._onLoginCompleted(metadata, onComplete) }) - this._connection.write( + this.write( RequestMessage.hello(userAgent, authToken, this._serversideRouting), observer, true diff --git a/src/internal/bolt-protocol-v4x2.js b/src/internal/bolt/bolt-protocol-v4x2.js similarity index 94% rename from src/internal/bolt-protocol-v4x2.js rename to src/internal/bolt/bolt-protocol-v4x2.js index e24f3360e..5d4279ccc 100644 --- a/src/internal/bolt-protocol-v4x2.js +++ b/src/internal/bolt/bolt-protocol-v4x2.js @@ -17,7 +17,7 @@ * limitations under the License. */ import BoltProtocolV41 from './bolt-protocol-v4x1' -import { BOLT_PROTOCOL_V4_2 } from './constants' +import { BOLT_PROTOCOL_V4_2 } from '../constants' export default class BoltProtocol extends BoltProtocolV41 { get version () { diff --git a/src/internal/bolt-protocol-v4x3.js b/src/internal/bolt/bolt-protocol-v4x3.js similarity index 94% rename from src/internal/bolt-protocol-v4x3.js rename to src/internal/bolt/bolt-protocol-v4x3.js index 18538638a..4746e7864 100644 --- a/src/internal/bolt-protocol-v4x3.js +++ b/src/internal/bolt/bolt-protocol-v4x3.js @@ -17,7 +17,7 @@ * limitations under the License. */ import BoltProtocolV42 from './bolt-protocol-v4x2' -import { BOLT_PROTOCOL_V4_3 } from './constants' +import { BOLT_PROTOCOL_V4_3 } from '../constants' import RequestMessage from './request-message' import { RouteObserver } from './stream-observers' @@ -37,7 +37,6 @@ export default class BoltProtocol extends BoltProtocolV42 { * @param {function(RawRoutingTable)} param.onCompleted * @returns {RouteObserver} the route observer */ - requestRoutingInformation ({ routingContext = {}, databaseName = null, @@ -46,12 +45,12 @@ export default class BoltProtocol extends BoltProtocolV42 { onCompleted }) { const observer = new RouteObserver({ - connection: this._connection, + onProtocolError: this._onProtocolError, onError, onCompleted }) - this._connection.write( + this.write( RequestMessage.route( { ...routingContext, address: initialAddress }, databaseName diff --git a/src/internal/bolt/create.js b/src/internal/bolt/create.js new file mode 100644 index 000000000..d930ca249 --- /dev/null +++ b/src/internal/bolt/create.js @@ -0,0 +1,168 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { newError } from '../../error' +import BoltProtocolV1 from './bolt-protocol-v1' +import BoltProtocolV2 from './bolt-protocol-v2' +import BoltProtocolV3 from './bolt-protocol-v3' +import BoltProtocolV4x0 from './bolt-protocol-v4x0' +import BoltProtocolV4x1 from './bolt-protocol-v4x1' +import BoltProtocolV4x2 from './bolt-protocol-v4x2' +import BoltProtocolV4x3 from './bolt-protocol-v4x3' +import { Chunker, Dechunker } from '../chunking' +import ResponseHandler from './response-handler' + +/** + * Creates a protocol with a given version + * + * @param {object} config + * @param {number} config.version The version of the protocol + * @param {channel} config.channel The channel + * @param {Chunker} config.chunker The chunker + * @param {Dechunker} config.dechunker The dechunker + * @param {Logger} config.log The logger + * @param {ResponseHandler~Observer} config.observer Observer + * @param {boolean} config.disableLosslessIntegers Disable the lossless integers + * @param {boolean} config.serversideRouting It's using server side routing + */ +export default function create ({ + version, + chunker, + dechunker, + channel, + disableLosslessIntegers, + serversideRouting, + server, // server info + log, + observer +} = {}) { + const createResponseHandler = protocol => { + const responseHandler = new ResponseHandler({ + transformMetadata: protocol.transformMetadata.bind(protocol), + log, + observer + }) + + // reset the error handler to just handle errors and forget about the handshake promise + channel.onerror = observer.onError.bind(observer) + + // Ok, protocol running. Simply forward all messages to the dechunker + channel.onmessage = buf => dechunker.write(buf) + + // setup dechunker to dechunk messages and forward them to the message handler + dechunker.onmessage = buf => { + responseHandler.handleResponse(protocol.unpacker().unpack(buf)) + } + + return responseHandler + } + + return createProtocol( + version, + server, + chunker, + disableLosslessIntegers, + serversideRouting, + createResponseHandler, + observer.onProtocolError.bind(observer), + log + ) +} + +function createProtocol ( + version, + server, + chunker, + disableLosslessIntegers, + serversideRouting, + createResponseHandler, + onProtocolError, + log +) { + switch (version) { + case 1: + return new BoltProtocolV1( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 2: + return new BoltProtocolV2( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 3: + return new BoltProtocolV3( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 4.0: + return new BoltProtocolV4x0( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError + ) + case 4.1: + return new BoltProtocolV4x1( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError, + serversideRouting + ) + case 4.2: + return new BoltProtocolV4x2( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError, + serversideRouting + ) + case 4.3: + return new BoltProtocolV4x3( + server, + chunker, + disableLosslessIntegers, + createResponseHandler, + log, + onProtocolError, + serversideRouting + ) + default: + throw newError('Unknown Bolt protocol version: ' + version) + } +} diff --git a/src/internal/bolt/handshake.js b/src/internal/bolt/handshake.js new file mode 100644 index 000000000..61727d665 --- /dev/null +++ b/src/internal/bolt/handshake.js @@ -0,0 +1,133 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { alloc } from '../node' +import { newError } from '../../error' + +const BOLT_MAGIC_PREAMBLE = 0x6060b017 + +function version (major, minor) { + return { + major, + minor + } +} + +function createHandshakeMessage (versions) { + if (versions.length > 4) { + throw newError('It should not have more than 4 versions of the protocol') + } + const handshakeBuffer = alloc(5 * 4) + + handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE) + + versions.forEach(version => { + if (version instanceof Array) { + const { major, minor } = version[0] + const { minor: minMinor } = version[1] + const range = minor - minMinor + handshakeBuffer.writeInt32((range << 16) | (minor << 8) | major) + } else { + const { major, minor } = version + handshakeBuffer.writeInt32((minor << 8) | major) + } + }) + + handshakeBuffer.reset() + + return handshakeBuffer +} + +function parseNegotiatedResponse (buffer) { + const h = [ + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8(), + buffer.readUInt8() + ] + if (h[0] === 0x48 && h[1] === 0x54 && h[2] === 0x54 && h[3] === 0x50) { + throw newError( + 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' + ) + } + return Number(h[3] + '.' + h[2]) +} + +/** + * @return {BaseBuffer} + * @private + */ +function newHandshakeBuffer () { + return createHandshakeMessage([ + [version(4, 3), version(4, 2)], + version(4, 1), + version(4, 0), + version(3, 0) + ]) +} + +/** + * This callback is displayed as a global member. + * @callback BufferConsumerCallback + * @param {buffer} buffer the remaining buffer + */ +/** + * @typedef HandshakeResult + * @property {number} protocolVersion The protocol version negotiated in the handshake + * @property {function(BufferConsumerCallback)} consumeRemainingBuffer A function to consume the remaining buffer if it exists + */ +/** + * Shake hands using the channel and return the protocol version + * + * @param {Channel} channel the channel use to shake hands + * @returns {Promise} Promise of protocol version and consumeRemainingBuffer + */ +export default function handshake (channel) { + return new Promise((resolve, reject) => { + const handshakeErrorHandler = error => { + reject(error) + } + + channel.onerror = handshakeErrorHandler.bind(this) + if (channel._error) { + handshakeErrorHandler(channel._error) + } + + channel.onmessage = buffer => { + try { + // read the response buffer and initialize the protocol + const protocolVersion = parseNegotiatedResponse(buffer) + + resolve({ + protocolVersion, + consumeRemainingBuffer: consumer => { + if (buffer.hasRemaining()) { + consumer(buffer.readSlice(buffer.remaining())) + } + } + }) + } catch (e) { + reject(e) + } + } + + channel.write(newHandshakeBuffer()) + }) +} diff --git a/src/internal/bolt/index.js b/src/internal/bolt/index.js new file mode 100644 index 000000000..354a05099 --- /dev/null +++ b/src/internal/bolt/index.js @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import handshake from './handshake' +import create from './create' +import _BoltProtocol from './bolt-protocol-v4x3' +import _RawRoutingTable from './routing-table-raw' + +export * from './stream-observers' +export { ALL as FETCH_ALL } from './request-message' + +export const BoltProtocol = _BoltProtocol +export const RawRoutingTable = _RawRoutingTable + +export default { + handshake, + create +} diff --git a/src/internal/request-message.js b/src/internal/bolt/request-message.js similarity index 98% rename from src/internal/request-message.js rename to src/internal/bolt/request-message.js index 10098f32c..b36d3bddb 100644 --- a/src/internal/request-message.js +++ b/src/internal/bolt/request-message.js @@ -17,9 +17,9 @@ * limitations under the License. */ -import { ACCESS_MODE_READ } from './constants' -import { int } from '../integer' -import { assertString } from './util' +import { ACCESS_MODE_READ } from '../constants' +import { int } from '../../integer' +import { assertString } from '../util' /* eslint-disable no-unused-vars */ // Signature bytes for each request message type diff --git a/src/internal/bolt/response-handler.js b/src/internal/bolt/response-handler.js new file mode 100644 index 000000000..f02623b5b --- /dev/null +++ b/src/internal/bolt/response-handler.js @@ -0,0 +1,189 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { newError } from '../../error' + +// Signature bytes for each response message type +const SUCCESS = 0x70 // 0111 0000 // SUCCESS +const RECORD = 0x71 // 0111 0001 // RECORD +const IGNORED = 0x7e // 0111 1110 // IGNORED +const FAILURE = 0x7f // 0111 1111 // FAILURE + +function NO_OP () {} + +function NO_OP_IDENTITY (subject) { + return subject +} + +const NO_OP_OBSERVER = { + onNext: NO_OP, + onCompleted: NO_OP, + onError: NO_OP +} + +/** + * Treat the protocol responses and notify the observers + */ +export default class ResponseHandler { + /** + * Called when something went wrong with the connectio + * @callback ResponseHandler~Observer~OnErrorApplyTransformation + * @param {any} error The error + * @returns {any} The new error + */ + /** + * Called when something went wrong with the connectio + * @callback ResponseHandler~Observer~OnError + * @param {any} error The error + */ + /** + * Called when something went wrong with the connectio + * @callback ResponseHandler~MetadataTransformer + * @param {any} metadata The metadata got onSuccess + * @returns {any} The transformed metadata + */ + /** + * @typedef {Object} ResponseHandler~Observer + * @property {ResponseHandler~Observer~OnError} onError Invoke when a connection error occurs + * @property {ResponseHandler~Observer~OnError} onFailure Invoke when a protocol failure occurs + * @property {ResponseHandler~Observer~OnErrorApplyTransformation} onErrorApplyTransformation Invoke just after the failure occurs, + * before notify to respective observer. This method should transform the failure reason to the approprited one. + */ + /** + * Constructor + * @param {Object} param The params + * @param {ResponseHandler~MetadataTransformer} transformMetadata Transform metadata when the SUCCESS is received. + * @param {Channel} channel The channel used to exchange messages + * @param {Logger} log The logger + * @param {ResponseHandler~Observer} observer Object which will be notified about errors + */ + constructor ({ transformMetadata, log, observer } = {}) { + this._pendingObservers = [] + this._log = log + this._transformMetadata = transformMetadata || NO_OP_IDENTITY + this._observer = Object.assign( + { + onError: NO_OP, + onFailure: NO_OP, + onErrorApplyTransformation: NO_OP_IDENTITY + }, + observer + ) + } + + handleResponse (msg) { + const payload = msg.fields[0] + + switch (msg.signature) { + case RECORD: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`) + } + this._currentObserver.onNext(payload) + break + case SUCCESS: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`) + } + try { + const metadata = this._transformMetadata(payload) + this._currentObserver.onCompleted(metadata) + } finally { + this._updateCurrentObserver() + } + break + case FAILURE: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`) + } + try { + const error = newError(payload.message, payload.code) + this._currentFailure = this._observer.onErrorApplyTransformation( + error + ) + this._currentObserver.onError(this._currentFailure) + } finally { + this._updateCurrentObserver() + // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. + this._observer.onFailure(this._currentFailure) + } + break + case IGNORED: + if (this._log.isDebugEnabled()) { + this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`) + } + try { + if (this._currentFailure && this._currentObserver.onError) { + this._currentObserver.onError(this._currentFailure) + } else if (this._currentObserver.onError) { + this._currentObserver.onError( + newError('Ignored either because of an error or RESET') + ) + } + } finally { + this._updateCurrentObserver() + } + break + default: + this._observer.onError( + newError('Unknown Bolt protocol message: ' + msg) + ) + } + } + + /* + * Pop next pending observer form the list of observers and make it current observer. + * @protected + */ + _updateCurrentObserver () { + this._currentObserver = this._pendingObservers.shift() + } + + _queueObserver (observer) { + observer = observer || NO_OP_OBSERVER + observer.onCompleted = observer.onCompleted || NO_OP + observer.onError = observer.onError || NO_OP + observer.onNext = observer.onNext || NO_OP + if (this._currentObserver === undefined) { + this._currentObserver = observer + } else { + this._pendingObservers.push(observer) + } + return true + } + + _notifyErrorToObservers (error) { + if (this._currentObserver && this._currentObserver.onError) { + this._currentObserver.onError(error) + } + while (this._pendingObservers.length > 0) { + const observer = this._pendingObservers.shift() + if (observer && observer.onError) { + observer.onError(error) + } + } + } + + hasOngoingObservableRequests () { + return this._currentObserver != null || this._pendingObservers.length > 0 + } + + _resetFailure () { + this._currentFailure = null + } +} diff --git a/src/internal/routing-table-raw.js b/src/internal/bolt/routing-table-raw.js similarity index 75% rename from src/internal/routing-table-raw.js rename to src/internal/bolt/routing-table-raw.js index ea5854c42..498d192c3 100644 --- a/src/internal/routing-table-raw.js +++ b/src/internal/bolt/routing-table-raw.js @@ -1,4 +1,22 @@ -import Record from '../record' +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Record from '../../record' /** * Represente the raw version of the routing table @@ -6,7 +24,7 @@ import Record from '../record' export default class RawRoutingTable { /** * Constructs the raw routing table for Record based result - * @param {record} record The record which will be used get the raw routing table + * @param {Record} record The record which will be used get the raw routing table * @returns {RawRoutingTable} The raw routing table */ static ofRecord (record) { diff --git a/src/internal/stream-observers.js b/src/internal/bolt/stream-observers.js similarity index 83% rename from src/internal/stream-observers.js rename to src/internal/bolt/stream-observers.js index b54e318a8..956bc47ee 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/bolt/stream-observers.js @@ -16,10 +16,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import Record from '../record' -import Connection from './connection' -import { newError, PROTOCOL_ERROR } from '../error' -import Integer from '../integer' +import Record from '../../record' +import { newError, PROTOCOL_ERROR } from '../../error' +import Integer from '../../integer' import { ALL } from './request-message' import RawRoutingTable from './routing-table-raw' @@ -45,10 +44,10 @@ class ResultStreamObserver extends StreamObserver { /** * * @param {Object} param - * @param {Connection} param.connection + * @param {Object} param.server * @param {boolean} param.reactive - * @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction - - * @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction - + * @param {function(stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction - + * @param {function(stmtId: number|Integer, observer: StreamObserver)} param.discardFunction - * @param {number|Integer} param.fetchSize - * @param {function(err: Error): Promise|void} param.beforeError - * @param {function(err: Error): Promise|void} param.afterError - @@ -58,7 +57,6 @@ class ResultStreamObserver extends StreamObserver { * @param {function(metadata: Object): Promise|void} param.afterComplete - */ constructor ({ - connection, reactive = false, moreFunction, discardFunction, @@ -68,12 +66,11 @@ class ResultStreamObserver extends StreamObserver { beforeKeys, afterKeys, beforeComplete, - afterComplete + afterComplete, + server } = {}) { super() - this._connection = connection - this._fieldKeys = null this._fieldLookup = null this._head = null @@ -82,6 +79,7 @@ class ResultStreamObserver extends StreamObserver { this._error = null this._observers = [] this._meta = {} + this._server = server this._beforeError = beforeError this._afterError = afterError @@ -217,7 +215,7 @@ class ResultStreamObserver extends StreamObserver { _handlePullSuccess (meta) { this._setState(_states.SUCCEEDED) const completionMetadata = Object.assign( - this._connection ? { server: this._connection.server } : {}, + this._server ? { server: this._server } : {}, this._meta, meta ) @@ -344,15 +342,10 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { if (this._discard) { - this._discardFunction(this._connection, this._queryId, this) + this._discardFunction(this._queryId, this) this._setState(_states.STREAMING) } else if (this._autoPull) { - this._moreFunction( - this._connection, - this._queryId, - this._fetchSize, - this - ) + this._moreFunction(this._queryId, this._fetchSize, this) this._setState(_states.STREAMING) } } @@ -389,26 +382,13 @@ class LoginObserver extends StreamObserver { /** * * @param {Object} param - - * @param {Connection} param.connection - * @param {function(err: Error)} param.beforeError - * @param {function(err: Error)} param.afterError - * @param {function(metadata)} param.beforeComplete - * @param {function(metadata)} param.afterComplete + * @param {function(err: Error)} param.onError + * @param {function(metadata)} param.onCompleted */ - constructor ({ - connection, - beforeError, - afterError, - beforeComplete, - afterComplete - } = {}) { + constructor ({ onError, onCompleted } = {}) { super() - - this._connection = connection - this._beforeError = beforeError - this._afterError = afterError - this._beforeComplete = beforeComplete - this._afterComplete = afterComplete + this._onError = onError + this._onCompleted = onCompleted } onNext (record) { @@ -418,39 +398,14 @@ class LoginObserver extends StreamObserver { } onError (error) { - if (this._beforeError) { - this._beforeError(error) - } - - this._connection._updateCurrentObserver() // make sure this exact observer will not be called again - this._connection._handleFatalError(error) // initialization errors are fatal - - if (this._afterError) { - this._afterError(error) + if (this._onError) { + this._onError(error) } } onCompleted (metadata) { - if (this._beforeComplete) { - this._beforeComplete(metadata) - } - - if (metadata) { - // read server version from the response metadata, if it is available - const serverVersion = metadata.server - if (!this._connection.version) { - this._connection.version = serverVersion - } - - // read database connection id from the response metadata, if it is available - const dbConnectionId = metadata.connection_id - if (!this._connection.databaseId) { - this._connection.databaseId = dbConnectionId - } - } - - if (this._afterComplete) { - this._afterComplete(metadata) + if (this._onCompleted) { + this._onCompleted(metadata) } } } @@ -459,14 +414,14 @@ class ResetObserver extends StreamObserver { /** * * @param {Object} param - - * @param {Connection} param.connection + * @param {function(err: String)} param.onProtocolError * @param {function(err: Error)} param.onError * @param {function(metadata)} param.onComplete */ - constructor ({ connection, onError, onComplete } = {}) { + constructor ({ onProtocolError, onError, onComplete } = {}) { super() - this._connection = connection + this._onProtocolError = onProtocolError this._onError = onError this._onComplete = onComplete } @@ -482,8 +437,8 @@ class ResetObserver extends StreamObserver { } onError (error) { - if (error.code === PROTOCOL_ERROR) { - this._connection._handleProtocolError(error.message) + if (error.code === PROTOCOL_ERROR && this._onProtocolError) { + this._onProtocolError(error.message) } if (this._onError) { @@ -514,14 +469,14 @@ class CompletedObserver extends ResultStreamObserver { } class ProcedureRouteObserver extends StreamObserver { - constructor ({ resultObserver, connection, onError, onCompleted }) { + constructor ({ resultObserver, onProtocolError, onError, onCompleted }) { super() this._resultObserver = resultObserver this._onError = onError this._onCompleted = onCompleted - this._connection = connection this._records = [] + this._onProtocolError = onProtocolError resultObserver.subscribe(this) } @@ -530,8 +485,8 @@ class ProcedureRouteObserver extends StreamObserver { } onError (error) { - if (error.code === PROTOCOL_ERROR) { - this._connection._handleProtocolError(error.message) + if (error.code === PROTOCOL_ERROR && this._onProtocolError) { + this._onProtocolError(error.message) } if (this._onError) { @@ -563,14 +518,14 @@ class RouteObserver extends StreamObserver { /** * * @param {Object} param - - * @param {Connection} param.connection + * @param {function(err: String)} param.onProtocolError * @param {function(err: Error)} param.onError * @param {function(RawRoutingTable)} param.onCompleted */ - constructor ({ connection, onError, onCompleted } = {}) { + constructor ({ onProtocolError, onError, onCompleted } = {}) { super() - this._connection = connection + this._onProtocolError = onProtocolError this._onError = onError this._onCompleted = onCompleted } @@ -586,8 +541,8 @@ class RouteObserver extends StreamObserver { } onError (error) { - if (error.code === PROTOCOL_ERROR) { - this._connection._handleProtocolError(error.message) + if (error.code === PROTOCOL_ERROR && this._onProtocolError) { + this._onProtocolError(error.message) } if (this._onError) { diff --git a/src/internal/connection-channel.js b/src/internal/connection-channel.js index 909c0666a..d3e8ffd62 100644 --- a/src/internal/connection-channel.js +++ b/src/internal/connection-channel.js @@ -21,26 +21,81 @@ import { Channel } from './node' import { Chunker, Dechunker } from './chunking' import { newError, PROTOCOL_ERROR } from '../error' import ChannelConfig from './channel-config' -import ProtocolHandshaker from './protocol-handshaker' import Connection from './connection' -import BoltProtocol from './bolt-protocol-v1' -import { ResultStreamObserver } from './stream-observers' +import Bolt from './bolt' -// Signature bytes for each response message type -const SUCCESS = 0x70 // 0111 0000 // SUCCESS -const RECORD = 0x71 // 0111 0001 // RECORD -const IGNORED = 0x7e // 0111 1110 // IGNORED -const FAILURE = 0x7f // 0111 1111 // FAILURE +let idGenerator = 0 -function NO_OP () {} +/** + * Crete new connection to the provided address. Returned connection is not connected. + * @param {ServerAddress} address - the Bolt endpoint to connect to. + * @param {Object} config - the driver configuration. + * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. + * @param {Logger} log - configured logger. + * @return {Connection} - new connection. + */ +export function createChannelConnection ( + address, + config, + errorHandler, + log, + serversideRouting = null, + createChannel = channelConfig => new Channel(channelConfig) +) { + const channelConfig = new ChannelConfig( + address, + config, + errorHandler.errorCode() + ) + + const channel = createChannel(channelConfig) + + return Bolt.handshake(channel) + .then(({ protocolVersion: version, consumeRemainingBuffer }) => { + const chunker = new Chunker(channel) + const dechunker = new Dechunker() + const createProtocol = conn => + Bolt.create({ + version, + connection: conn, + channel, + chunker, + dechunker, + disableLosslessIntegers: config.disableLosslessIntegers, + serversideRouting, + server: conn.server, + log, + observer: { + onError: conn._handleFatalError.bind(conn), + onFailure: conn._resetOnFailure.bind(conn), + onProtocolError: conn._handleProtocolError.bind(conn), + onErrorApplyTransformation: error => + conn.handleAndTransformError(error, conn._address) + } + }) + + const connection = new ChannelConnection( + channel, + errorHandler, + address, + log, + config.disableLosslessIntegers, + serversideRouting, + chunker, + createProtocol + ) -const NO_OP_OBSERVER = { - onNext: NO_OP, - onCompleted: NO_OP, - onError: NO_OP -} + // forward all pending bytes to the dechunker + consumeRemainingBuffer(buffer => dechunker.write(buffer)) -let idGenerator = 0 + return connection + }) + .catch(reason => + channel.close().then(() => { + throw reason + }) + ) +} export default class ChannelConnection extends Connection { /** @@ -50,6 +105,8 @@ export default class ChannelConnection extends Connection { * @param {ServerAddress} address - the server address to connect to. * @param {Logger} log - the configured logger. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. + * @param {Chunker} chunker the chunker + * @param protocolSupplier Bolt protocol supplier */ constructor ( channel, @@ -57,7 +114,9 @@ export default class ChannelConnection extends Connection { address, log, disableLosslessIntegers = false, - serversideRouting = null + serversideRouting = null, + chunker, // to be removed, + protocolSupplier ) { super(errorHandler) @@ -66,11 +125,8 @@ export default class ChannelConnection extends Connection { this._server = { address: address.asHostPort() } this.creationTimestamp = Date.now() this._disableLosslessIntegers = disableLosslessIntegers - this._pendingObservers = [] - this._currentObserver = undefined this._ch = channel - this._dechunker = new Dechunker() - this._chunker = new Chunker(channel) + this._chunker = chunker this._log = log this._serversideRouting = serversideRouting @@ -82,10 +138,7 @@ export default class ChannelConnection extends Connection { * @private * @type {BoltProtocol} */ - this._protocol = null - - // error extracted from a FAILURE message - this._currentFailure = null + this._protocol = protocolSupplier(this) // Set to true on fatal errors, to get this out of connection pool. this._isBroken = false @@ -95,30 +148,6 @@ export default class ChannelConnection extends Connection { } } - /** - * Crete new connection to the provided address. Returned connection is not connected. - * @param {ServerAddress} address - the Bolt endpoint to connect to. - * @param {Object} config - the driver configuration. - * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. - * @param {Logger} log - configured logger. - * @return {Connection} - new connection. - */ - static create (address, config, errorHandler, log, serversideRouting = null) { - const channelConfig = new ChannelConfig( - address, - config, - errorHandler.errorCode() - ) - return new ChannelConnection( - new Channel(channelConfig), - errorHandler, - address, - log, - config.disableLosslessIntegers, - serversideRouting - ) - } - get id () { return this._id } @@ -132,72 +161,13 @@ export default class ChannelConnection extends Connection { } /** - * Connect to the target address, negotiate Bolt protocol and send initialization message. + * Send initialization message. * @param {string} userAgent the user agent for this driver. * @param {Object} authToken the object containing auth information. * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. */ connect (userAgent, authToken) { - return this._negotiateProtocol().then(() => - this._initialize(userAgent, authToken) - ) - } - - /** - * Execute Bolt protocol handshake to initialize the protocol version. - * @return {Promise} promise resolved with the current connection if handshake is successful. Rejected promise otherwise. - */ - _negotiateProtocol () { - const protocolHandshaker = new ProtocolHandshaker( - this, - this._ch, - this._chunker, - this._disableLosslessIntegers, - this._log, - this._serversideRouting - ) - - return new Promise((resolve, reject) => { - const handshakeErrorHandler = error => { - this._handleFatalError(error) - reject(error) - } - - this._ch.onerror = handshakeErrorHandler.bind(this) - if (this._ch._error) { - // channel is already broken - handshakeErrorHandler(this._ch._error) - } - - this._ch.onmessage = buffer => { - try { - // read the response buffer and initialize the protocol - this._protocol = protocolHandshaker.createNegotiatedProtocol(buffer) - - // reset the error handler to just handle errors and forget about the handshake promise - this._ch.onerror = this._handleFatalError.bind(this) - - // Ok, protocol running. Simply forward all messages to the dechunker - this._ch.onmessage = buf => this._dechunker.write(buf) - - // setup dechunker to dechunk messages and forward them to the message handler - this._dechunker.onmessage = buf => { - this._handleMessage(this._protocol.unpacker().unpack(buf)) - } - // forward all pending bytes to the dechunker - if (buffer.hasRemaining()) { - this._dechunker.write(buffer.readSlice(buffer.remaining())) - } - - resolve(this) - } catch (e) { - this._handleFatalError(e) - reject(e) - } - } - - protocolHandshaker.writeHandshakeRequest() - }) + return this._initialize(userAgent, authToken) } /** @@ -213,7 +183,22 @@ export default class ChannelConnection extends Connection { userAgent, authToken, onError: err => reject(err), - onComplete: () => resolve(self) + onComplete: metadata => { + if (metadata) { + // read server version from the response metadata, if it is available + const serverVersion = metadata.server + if (!this.version) { + this.version = serverVersion + } + + // read database connection id from the response metadata, if it is available + const dbConnectionId = metadata.connection_id + if (!this.databaseId) { + this.databaseId = dbConnectionId + } + } + resolve(self) + } }) }) } @@ -248,35 +233,6 @@ export default class ChannelConnection extends Connection { return this._server } - /** - * Write a message to the network channel. - * @param {RequestMessage} message the message to write. - * @param {ResultStreamObserver} observer the response observer. - * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. - */ - write (message, observer, flush) { - const queued = this._queueObserver(observer) - - if (queued) { - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} C: ${message}`) - } - - this._protocol - .packer() - .packStruct( - message.signature, - message.fields.map(field => this._packable(field)) - ) - - this._chunker.messageBoundary() - - if (flush) { - this._chunker.flush() - } - } - } - /** * "Fatal" means the connection is dead. Only call this if something * happens that cannot be recovered from. This will lead to all subscribers @@ -294,82 +250,20 @@ export default class ChannelConnection extends Connection { ) } - if (this._currentObserver && this._currentObserver.onError) { - this._currentObserver.onError(this._error) - } - while (this._pendingObservers.length > 0) { - const observer = this._pendingObservers.shift() - if (observer && observer.onError) { - observer.onError(this._error) - } - } + this._protocol.notifyFatalError(this._error) } - _handleMessage (msg) { - if (this._isBroken) { - // ignore all incoming messages when this connection is broken. all previously pending observers failed - // with the fatal error. all future observers will fail with same fatal error. - return - } - - const payload = msg.fields[0] + /** + * This method still here because it's used by the {@link PooledConnectionProvider} + * + * @param {any} observer + */ + _queueObserver (observer) { + return this._protocol.queueObserverIfProtocolIsNotBroken(observer) + } - switch (msg.signature) { - case RECORD: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: RECORD ${JSON.stringify(msg)}`) - } - this._currentObserver.onNext(payload) - break - case SUCCESS: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`) - } - try { - const metadata = this._protocol.transformMetadata(payload) - this._currentObserver.onCompleted(metadata) - } finally { - this._updateCurrentObserver() - } - break - case FAILURE: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`) - } - try { - const error = newError(payload.message, payload.code) - this._currentFailure = this.handleAndTransformError( - error, - this._address - ) - this._currentObserver.onError(this._currentFailure) - } finally { - this._updateCurrentObserver() - // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. - this._resetOnFailure() - } - break - case IGNORED: - if (this._log.isDebugEnabled()) { - this._log.debug(`${this} S: IGNORED ${JSON.stringify(msg)}`) - } - try { - if (this._currentFailure && this._currentObserver.onError) { - this._currentObserver.onError(this._currentFailure) - } else if (this._currentObserver.onError) { - this._currentObserver.onError( - newError('Ignored either because of an error or RESET') - ) - } - } finally { - this._updateCurrentObserver() - } - break - default: - this._handleFatalError( - newError('Unknown Bolt protocol message: ' + msg) - ) - } + hasOngoingObservableRequests () { + return this._protocol.hasOngoingObservableRequests() } /** @@ -400,39 +294,20 @@ export default class ChannelConnection extends Connection { _resetOnFailure () { this._protocol.reset({ onError: () => { - this._currentFailure = null + this._protocol.resetFailure() }, onComplete: () => { - this._currentFailure = null + this._protocol.resetFailure() } }) } - _queueObserver (observer) { - if (this._isBroken) { - if (observer && observer.onError) { - observer.onError(this._error) - } - return false - } - observer = observer || NO_OP_OBSERVER - observer.onCompleted = observer.onCompleted || NO_OP - observer.onError = observer.onError || NO_OP - observer.onNext = observer.onNext || NO_OP - if (this._currentObserver === undefined) { - this._currentObserver = observer - } else { - this._pendingObservers.push(observer) - } - return true - } - /* * Pop next pending observer form the list of observers and make it current observer. * @protected */ _updateCurrentObserver () { - this._currentObserver = this._pendingObservers.shift() + this._protocol.updateCurrentObserver() } /** Check if this connection is in working condition */ @@ -466,12 +341,8 @@ export default class ChannelConnection extends Connection { return `Connection [${this.id}][${this.databaseId || ''}]` } - _packable (value) { - return this._protocol.packer().packable(value) - } - _handleProtocolError (message) { - this._currentFailure = null + this._protocol.resetFailure() this._updateCurrentObserver() const error = newError(message, PROTOCOL_ERROR) this._handleFatalError(error) diff --git a/src/internal/connection-provider-direct.js b/src/internal/connection-provider-direct.js index 57518afef..48c0ca6ef 100644 --- a/src/internal/connection-provider-direct.js +++ b/src/internal/connection-provider-direct.js @@ -19,7 +19,7 @@ import PooledConnectionProvider from './connection-provider-pooled' import DelegateConnection from './connection-delegate' -import ChannelConnection from './connection-channel' +import { createChannelConnection } from './connection-channel' import { BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V3 } from './constants' export default class DirectConnectionProvider extends PooledConnectionProvider { @@ -40,25 +40,24 @@ export default class DirectConnectionProvider extends PooledConnectionProvider { } async _hasProtocolVersion (versionPredicate) { - const connection = ChannelConnection.create( + const connection = await createChannelConnection( this._address, this._config, this._createConnectionErrorHandler(), this._log ) - try { - await connection._negotiateProtocol() + const protocolVersion = connection.protocol() + ? connection.protocol().version + : null - const protocol = connection.protocol() - if (protocol) { - return versionPredicate(protocol.version) - } + await connection.close() - return false - } finally { - await connection.close() + if (protocolVersion) { + return versionPredicate(protocolVersion) } + + return false } async supportsMultiDb () { diff --git a/src/internal/connection-provider-pooled.js b/src/internal/connection-provider-pooled.js index 20b925d74..43d7dfc4c 100644 --- a/src/internal/connection-provider-pooled.js +++ b/src/internal/connection-provider-pooled.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import ChannelConnection from './connection-channel' +import { createChannelConnection } from './connection-channel' import Pool from './pool' import PoolConfig from './pool-config' import ConnectionErrorHandler from './connection-error-handler' @@ -39,7 +39,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { this._createChannelConnection = createChannelConnectionHook || (address => { - return ChannelConnection.create( + return createChannelConnection( address, this._config, this._createConnectionErrorHandler(), @@ -72,15 +72,17 @@ export default class PooledConnectionProvider extends ConnectionProvider { * @access private */ _createConnection (address, release) { - const connection = this._createChannelConnection(address) - connection._release = () => release(address, connection) - this._openConnections[connection.id] = connection - - return connection.connect(this._userAgent, this._authToken).catch(error => { - // let's destroy this connection - this._destroyConnection(connection) - // propagate the error because connection failed to connect / initialize - throw error + return this._createChannelConnection(address).then(connection => { + connection._release = () => release(address, connection) + this._openConnections[connection.id] = connection + return connection + .connect(this._userAgent, this._authToken) + .catch(error => { + // let's destroy this connection + this._destroyConnection(connection) + // propagate the error because connection failed to connect / initialize + throw error + }) }) } diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index e8741091a..d33b74100 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -29,7 +29,7 @@ import ConnectionErrorHandler from './connection-error-handler' import DelegateConnection from './connection-delegate' import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy' import Bookmark from './bookmark' -import ChannelConnection from './connection-channel' +import { createChannelConnection } from './connection-channel' import { int } from '../integer' import { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0 } from './constants' @@ -53,7 +53,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider routingTablePurgeDelay }) { super({ id, config, log, userAgent, authToken }, address => { - return ChannelConnection.create( + return createChannelConnection( address, this._config, this._createConnectionErrorHandler(), @@ -164,27 +164,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider let lastError for (let i = 0; i < addresses.length; i++) { - const connection = ChannelConnection.create( - addresses[i], - this._config, - this._createConnectionErrorHandler(), - this._log - ) - try { - await connection._negotiateProtocol() + const connection = await createChannelConnection( + this._address, + this._config, + this._createConnectionErrorHandler(), + this._log + ) - const protocol = connection.protocol() - if (protocol) { - return versionPredicate(protocol.version) + const protocolVersion = connection.protocol() + ? connection.protocol().version + : null + + await connection.close() + + if (protocolVersion) { + return versionPredicate(protocolVersion) } return false } catch (error) { lastError = error - } finally { - await connection.close() } + + return false } if (lastError) { diff --git a/src/internal/connection.js b/src/internal/connection.js index 709977a05..022bb9c44 100644 --- a/src/internal/connection.js +++ b/src/internal/connection.js @@ -17,8 +17,7 @@ * limitations under the License. */ -import { ResultStreamObserver } from './stream-observers' -import BoltProtocol from './bolt-protocol-v1' +import { ResultStreamObserver, BoltProtocol } from './bolt' export default class Connection { /** diff --git a/src/internal/connectivity-verifier.js b/src/internal/connectivity-verifier.js index 02884c9cc..911ebdaad 100644 --- a/src/internal/connectivity-verifier.js +++ b/src/internal/connectivity-verifier.js @@ -19,7 +19,7 @@ import ConnectionHolder from './connection-holder' import { READ } from '../driver' -import { ResultStreamObserver } from './stream-observers' +import { ResultStreamObserver } from './bolt' /** * Verifies connectivity using the given connection provider. diff --git a/src/internal/protocol-handshaker.js b/src/internal/protocol-handshaker.js deleted file mode 100644 index 3d6f5e1a8..000000000 --- a/src/internal/protocol-handshaker.js +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { alloc } from './node' -import { newError } from '../error' -import BoltProtocolV1 from './bolt-protocol-v1' -import BoltProtocolV2 from './bolt-protocol-v2' -import BoltProtocolV3 from './bolt-protocol-v3' -import BoltProtocolV4x0 from './bolt-protocol-v4x0' -import BoltProtocolV4x1 from './bolt-protocol-v4x1' -import BoltProtocolV4x2 from './bolt-protocol-v4x2' -import BoltProtocolV4x3 from './bolt-protocol-v4x3' -const BOLT_MAGIC_PREAMBLE = 0x6060b017 - -export default class ProtocolHandshaker { - /** - * @constructor - * @param {Connection} connection the connection owning this protocol. - * @param {Channel} channel the network channel. - * @param {Chunker} chunker the message chunker. - * @param {boolean} disableLosslessIntegers flag to use native JS numbers. - * @param {Logger} log the logger. - */ - constructor ( - connection, - channel, - chunker, - disableLosslessIntegers, - log, - serversideRouting = null - ) { - this._connection = connection - this._channel = channel - this._chunker = chunker - this._disableLosslessIntegers = disableLosslessIntegers - this._log = log - this._serversideRouting = serversideRouting - } - - /** - * Write a Bolt handshake into the underlying network channel. - */ - writeHandshakeRequest () { - this._channel.write(newHandshakeBuffer()) - } - - /** - * Read the given handshake response and create the negotiated bolt protocol. - * @param {BaseBuffer} buffer byte buffer containing the handshake response. - * @return {BoltProtocol} bolt protocol corresponding to the version suggested by the database. - * @throws {Neo4jError} when bolt protocol can't be instantiated. - */ - createNegotiatedProtocol (buffer) { - const h = [ - buffer.readUInt8(), - buffer.readUInt8(), - buffer.readUInt8(), - buffer.readUInt8() - ] - if (h[0] === 0x48 && h[1] === 0x54 && h[2] === 0x54 && h[3] === 0x50) { - throw newError( - 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + - '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' - ) - } - const negotiatedVersion = Number(h[3] + '.' + h[2]) - if (this._log.isDebugEnabled()) { - this._log.debug( - `${this._connection} negotiated protocol version ${negotiatedVersion}` - ) - } - return this._createProtocolWithVersion(negotiatedVersion) - } - - /** - * @return {BoltProtocol} - * @private - */ - _createProtocolWithVersion (version) { - switch (version) { - case 1: - return new BoltProtocolV1( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 2: - return new BoltProtocolV2( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 3: - return new BoltProtocolV3( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 4.0: - return new BoltProtocolV4x0( - this._connection, - this._chunker, - this._disableLosslessIntegers - ) - case 4.1: - return new BoltProtocolV4x1( - this._connection, - this._chunker, - this._disableLosslessIntegers, - this._serversideRouting - ) - case 4.2: - return new BoltProtocolV4x2( - this._connection, - this._chunker, - this._disableLosslessIntegers, - this._serversideRouting - ) - case 4.3: - return new BoltProtocolV4x3( - this._connection, - this._chunker, - this._disableLosslessIntegers, - this._serversideRouting - ) - default: - throw newError('Unknown Bolt protocol version: ' + version) - } - } -} - -/** - * @return {BaseBuffer} - * @private - */ -function newHandshakeBuffer () { - const handshakeBuffer = alloc(5 * 4) - - // magic preamble - handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE) - - // proposed versions - handshakeBuffer.writeInt32((1 << 16) | (3 << 8) | 4) - handshakeBuffer.writeInt32((1 << 8) | 4) - handshakeBuffer.writeInt32(4) - handshakeBuffer.writeInt32(3) - - // reset the reader position - handshakeBuffer.reset() - - return handshakeBuffer -} diff --git a/src/internal/rediscovery.js b/src/internal/rediscovery.js index 704633069..9ab50ac2e 100644 --- a/src/internal/rediscovery.js +++ b/src/internal/rediscovery.js @@ -17,7 +17,7 @@ * limitations under the License. */ import RoutingTable from './routing-table' -import RawRoutingTable from './routing-table-raw' +import { RawRoutingTable } from './bolt' import Session from '../session' import ServerAddress from './server-address' import { newError, SERVICE_UNAVAILABLE } from '../error' diff --git a/src/result.js b/src/result.js index 6dc77859f..74169f03e 100644 --- a/src/result.js +++ b/src/result.js @@ -19,7 +19,7 @@ import ResultSummary from './result-summary' import { EMPTY_CONNECTION_HOLDER } from './internal/connection-holder' -import { ResultStreamObserver } from './internal/stream-observers' +import { ResultStreamObserver } from './internal/bolt' const DEFAULT_ON_ERROR = error => { console.log('Uncaught error when processing result: ' + error) diff --git a/src/session.js b/src/session.js index 42cccfa8c..42ecdb01b 100644 --- a/src/session.js +++ b/src/session.js @@ -16,10 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { - ResultStreamObserver, - FailedObserver -} from './internal/stream-observers' +import { ResultStreamObserver, FailedObserver } from './internal/bolt' import Result from './result' import Transaction from './transaction' import { newError } from './error' diff --git a/src/transaction.js b/src/transaction.js index ccb02a8a6..4734ec54b 100644 --- a/src/transaction.js +++ b/src/transaction.js @@ -28,7 +28,7 @@ import { ResultStreamObserver, FailedObserver, CompletedObserver -} from './internal/stream-observers' +} from './internal/bolt' import { newError } from './error' /** diff --git a/test/internal/bolt-protocol-v1.test.js b/test/internal/bolt/bolt-protocol-v1.test.js similarity index 71% rename from test/internal/bolt-protocol-v1.test.js rename to test/internal/bolt/bolt-protocol-v1.test.js index a21046e06..a892d67b4 100644 --- a/test/internal/bolt-protocol-v1.test.js +++ b/test/internal/bolt/bolt-protocol-v1.test.js @@ -17,13 +17,13 @@ * limitations under the License. */ -import BoltProtocolV1 from '../../src/internal/bolt-protocol-v1' -import RequestMessage from '../../src/internal/request-message' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' -import utils from './test-utils' -import { LoginObserver } from '../../src/internal/stream-observers' +import BoltProtocolV1 from '../../../src/internal/bolt/bolt-protocol-v1' +import RequestMessage from '../../../src/internal/bolt/request-message' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' +import utils from '../test-utils' +import { LoginObserver } from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV1', () => { beforeEach(() => { @@ -55,7 +55,9 @@ describe('#unit BoltProtocolV1', () => { it('should initialize the connection', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const onError = _error => {} const onComplete = () => {} @@ -71,20 +73,20 @@ describe('#unit BoltProtocolV1', () => { expect(observer).toBeTruthy() expect(observer instanceof LoginObserver).toBeTruthy() - expect(observer._afterError).toBe(onError) - expect(observer._afterComplete).toBe(onComplete) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.init(clientName, authToken) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should run a query', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -94,31 +96,35 @@ describe('#unit BoltProtocolV1', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.run(query, parameters) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should reset the connection', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const observer = protocol.reset() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.reset()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.reset()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should begin a transaction', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const bookmark = new Bookmark('neo4j:bookmark:v1:tx42') @@ -128,42 +134,46 @@ describe('#unit BoltProtocolV1', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters()) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, false]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, false]) }) it('should commit a transaction', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const observer = protocol.commitTransaction() - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage(RequestMessage.run('COMMIT', {})) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[0]).toBeMessage(RequestMessage.run('COMMIT', {})) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should rollback a transaction', () => { const recorder = new utils.MessageRecordingConnection() - const protocol = new BoltProtocolV1(recorder, null, false) + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) const observer = protocol.rollbackTransaction() - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage(RequestMessage.run('ROLLBACK', {})) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[0]).toBeMessage(RequestMessage.run('ROLLBACK', {})) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should return correct bolt version number', () => { diff --git a/test/internal/bolt-protocol-v2.test.js b/test/internal/bolt/bolt-protocol-v2.test.js similarity index 89% rename from test/internal/bolt-protocol-v2.test.js rename to test/internal/bolt/bolt-protocol-v2.test.js index 502e7c728..06abc741a 100644 --- a/test/internal/bolt-protocol-v2.test.js +++ b/test/internal/bolt/bolt-protocol-v2.test.js @@ -17,8 +17,8 @@ * limitations under the License. */ -import BoltProtocolV2 from '../../src/internal/bolt-protocol-v2' -import utils from './test-utils' +import BoltProtocolV2 from '../../../src/internal/bolt/bolt-protocol-v2' +import utils from '../test-utils' describe('#unit BoltProtocolV2', () => { beforeEach(() => { diff --git a/test/internal/bolt-protocol-v3.test.js b/test/internal/bolt/bolt-protocol-v3.test.js similarity index 77% rename from test/internal/bolt-protocol-v3.test.js rename to test/internal/bolt/bolt-protocol-v3.test.js index a8d52067e..24e432ca2 100644 --- a/test/internal/bolt-protocol-v3.test.js +++ b/test/internal/bolt/bolt-protocol-v3.test.js @@ -17,16 +17,16 @@ * limitations under the License. */ -import BoltProtocolV3 from '../../src/internal/bolt-protocol-v3' -import RequestMessage from '../../src/internal/request-message' -import utils from './test-utils' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' +import BoltProtocolV3 from '../../../src/internal/bolt/bolt-protocol-v3' +import RequestMessage from '../../../src/internal/bolt/request-message' +import utils from '../test-utils' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' import { ProcedureRouteObserver, ResultStreamObserver -} from '../../src/internal/stream-observers' +} from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV3', () => { beforeEach(() => { @@ -50,18 +50,19 @@ describe('#unit BoltProtocolV3', () => { it('should initialize connection', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const clientName = 'js-driver/1.2.3' const authToken = { username: 'neo4j', password: 'secret' } const observer = protocol.initialize({ userAgent: clientName, authToken }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.hello(clientName, authToken) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should run a query', () => { @@ -75,6 +76,7 @@ describe('#unit BoltProtocolV3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -85,18 +87,18 @@ describe('#unit BoltProtocolV3', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, mode: WRITE }) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pullAll()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pullAll()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should begin a transaction', () => { @@ -110,6 +112,7 @@ describe('#unit BoltProtocolV3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -117,36 +120,38 @@ describe('#unit BoltProtocolV3', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should commit', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.commitTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.commit()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.commit()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should rollback', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.rollbackTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.rollback()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.rollback()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should return correct bolt version number', () => { @@ -158,6 +163,7 @@ describe('#unit BoltProtocolV3', () => { it('should request the routing table from the correct procedure', () => { const expectedResultObserver = new ResultStreamObserver() const protocol = new SpiedBoltProtocolV3(expectedResultObserver) + utils.spyProtocolWrite(protocol) const routingContext = { abc: 'context ' } const sessionContext = { bookmark: 'book' } const onError = () => {} @@ -194,6 +200,7 @@ describe('#unit BoltProtocolV3', () => { function verifyError (fn) { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV3(recorder, null, false) + utils.spyProtocolWrite(protocol) expect(() => fn(protocol)).toThrowError( 'Driver is connected to the database that does not support multiple databases. ' + diff --git a/test/internal/bolt-protocol-v4x0.test.js b/test/internal/bolt/bolt-protocol-v4x0.test.js similarity index 80% rename from test/internal/bolt-protocol-v4x0.test.js rename to test/internal/bolt/bolt-protocol-v4x0.test.js index 2e2f3c932..56ddbb1c3 100644 --- a/test/internal/bolt-protocol-v4x0.test.js +++ b/test/internal/bolt/bolt-protocol-v4x0.test.js @@ -17,16 +17,16 @@ * limitations under the License. */ -import BoltProtocolV4x0 from '../../src/internal/bolt-protocol-v4x0' -import RequestMessage from '../../src/internal/request-message' -import utils from './test-utils' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' +import BoltProtocolV4x0 from '../../../src/internal/bolt/bolt-protocol-v4x0' +import RequestMessage from '../../../src/internal/bolt/request-message' +import utils from '../test-utils' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' import { ProcedureRouteObserver, ResultStreamObserver -} from '../../src/internal/stream-observers' +} from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV4x0', () => { beforeEach(() => { @@ -45,6 +45,7 @@ describe('#unit BoltProtocolV4x0', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x0(recorder, null, false) + utils.spyProtocolWrite(protocol) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -56,9 +57,9 @@ describe('#unit BoltProtocolV4x0', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -66,9 +67,9 @@ describe('#unit BoltProtocolV4x0', () => { mode: WRITE }) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pull()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pull()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should begin a transaction', () => { @@ -83,6 +84,7 @@ describe('#unit BoltProtocolV4x0', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x0(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -91,12 +93,12 @@ describe('#unit BoltProtocolV4x0', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, database, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should return correct bolt version number', () => { @@ -108,6 +110,7 @@ describe('#unit BoltProtocolV4x0', () => { it('should request the routing table from the correct procedure', () => { const expectedResultObserver = new ResultStreamObserver() const protocol = new SpiedBoltProtocolV4x0(expectedResultObserver) + utils.spyProtocolWrite(protocol) const routingContext = { abc: 'context ' } const sessionContext = { bookmark: 'book' } const databaseName = 'the name' diff --git a/test/internal/bolt-protocol-v4x3.test.js b/test/internal/bolt/bolt-protocol-v4x3.test.js similarity index 69% rename from test/internal/bolt-protocol-v4x3.test.js rename to test/internal/bolt/bolt-protocol-v4x3.test.js index b8e2cdcbb..4b8134902 100644 --- a/test/internal/bolt-protocol-v4x3.test.js +++ b/test/internal/bolt/bolt-protocol-v4x3.test.js @@ -17,13 +17,13 @@ * limitations under the License. */ -import BoltProtocolV4x3 from '../../src/internal/bolt-protocol-v4x3' -import RequestMessage from '../../src/internal/request-message' -import utils from './test-utils' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { WRITE } from '../../src/driver' -import { RouteObserver } from '../../src/internal/stream-observers' +import BoltProtocolV4x3 from '../../../src/internal/bolt/bolt-protocol-v4x3' +import RequestMessage from '../../../src/internal/bolt/request-message' +import utils from '../test-utils' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { WRITE } from '../../../src/driver' +import { RouteObserver } from '../../../src/internal/bolt/stream-observers' describe('#unit BoltProtocolV4x3', () => { beforeEach(() => { @@ -33,6 +33,7 @@ describe('#unit BoltProtocolV4x3', () => { it('should request routing information', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const routingContext = { someContextParam: 'value' } const databaseName = 'name' @@ -41,13 +42,13 @@ describe('#unit BoltProtocolV4x3', () => { databaseName }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.route({ ...routingContext, address: null }, databaseName) ) - expect(recorder.observers).toEqual([observer]) + expect(protocol.observers).toEqual([observer]) expect(observer).toEqual(jasmine.any(RouteObserver)) - expect(recorder.flushes).toEqual([true]) + expect(protocol.flushes).toEqual([true]) }) it('should run a query', () => { @@ -62,6 +63,7 @@ describe('#unit BoltProtocolV4x3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const query = 'RETURN $x, $y' const parameters = { x: 'x', y: 'y' } @@ -73,9 +75,9 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) - recorder.verifyMessageCount(2) + protocol.verifyMessageCount(2) - expect(recorder.messages[0]).toBeMessage( + expect(protocol.messages[0]).toBeMessage( RequestMessage.runWithMetadata(query, parameters, { bookmark, txConfig, @@ -83,9 +85,9 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) ) - expect(recorder.messages[1]).toBeMessage(RequestMessage.pull()) - expect(recorder.observers).toEqual([observer, observer]) - expect(recorder.flushes).toEqual([false, true]) + expect(protocol.messages[1]).toBeMessage(RequestMessage.pull()) + expect(protocol.observers).toEqual([observer, observer]) + expect(protocol.flushes).toEqual([false, true]) }) it('should begin a transaction', () => { const database = 'testdb' @@ -99,6 +101,7 @@ describe('#unit BoltProtocolV4x3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -107,12 +110,12 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, database, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should return correct bolt version number', () => { @@ -138,18 +141,19 @@ describe('#unit BoltProtocolV4x3', () => { it('should initialize connection', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const clientName = 'js-driver/1.2.3' const authToken = { username: 'neo4j', password: 'secret' } const observer = protocol.initialize({ userAgent: clientName, authToken }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.hello(clientName, authToken) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should begin a transaction', () => { @@ -163,6 +167,7 @@ describe('#unit BoltProtocolV4x3', () => { }) const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.beginTransaction({ bookmark, @@ -170,35 +175,37 @@ describe('#unit BoltProtocolV4x3', () => { mode: WRITE }) - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage( + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage( RequestMessage.begin({ bookmark, txConfig, mode: WRITE }) ) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should commit', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.commitTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.commit()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.commit()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) it('should rollback', () => { const recorder = new utils.MessageRecordingConnection() const protocol = new BoltProtocolV4x3(recorder, null, false) + utils.spyProtocolWrite(protocol) const observer = protocol.rollbackTransaction() - recorder.verifyMessageCount(1) - expect(recorder.messages[0]).toBeMessage(RequestMessage.rollback()) - expect(recorder.observers).toEqual([observer]) - expect(recorder.flushes).toEqual([true]) + protocol.verifyMessageCount(1) + expect(protocol.messages[0]).toBeMessage(RequestMessage.rollback()) + expect(protocol.observers).toEqual([observer]) + expect(protocol.flushes).toEqual([true]) }) }) diff --git a/test/internal/bolt/index.test.js b/test/internal/bolt/index.test.js new file mode 100644 index 000000000..fcf50e206 --- /dev/null +++ b/test/internal/bolt/index.test.js @@ -0,0 +1,329 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Bolt from '../../../src/internal/bolt' +import DummyChannel from '../dummy-channel' +import { alloc } from '../../../src/internal/node' +import { newError } from '../../../src/error' +import { Chunker, Dechunker } from '../../../src/internal/chunking' +import Logger from '../../../src/internal/logger' + +import BoltProtocolV1 from '../../../src/internal/bolt/bolt-protocol-v1' +import BoltProtocolV2 from '../../../src/internal/bolt/bolt-protocol-v2' +import BoltProtocolV3 from '../../../src/internal/bolt/bolt-protocol-v3' +import BoltProtocolV4x0 from '../../../src/internal/bolt/bolt-protocol-v4x0' +import BoltProtocolV4x1 from '../../../src/internal/bolt/bolt-protocol-v4x1' +import BoltProtocolV4x2 from '../../../src/internal/bolt/bolt-protocol-v4x2' +import BoltProtocolV4x3 from '../../../src/internal/bolt/bolt-protocol-v4x3' + +describe('#unit Bolt', () => { + describe('handshake', () => { + it('should write the correct handshake message', () => { + const { channel } = subject() + expect(channel.written.length).toBe(1) + const writtenBuffer = channel.written[0] + + const boltMagicPreamble = '60 60 b0 17' + const protocolVersion4x3to4x2 = '00 01 03 04' + const protocolVersion4x1 = '00 00 01 04' + const protocolVersion4x0 = '00 00 00 04' + const protocolVersion3 = '00 00 00 03' + + expect(writtenBuffer.toHex()).toEqual( + `${boltMagicPreamble} ${protocolVersion4x3to4x2} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` + ) + }) + + it('should handle a successful handshake without reaining buffer', done => { + const { channel, handshakePromise } = subject() + const expectedProtocolVersion = 4.3 + + handshakePromise + .then(({ protocolVersion, consumeRemainingBuffer }) => { + expect(protocolVersion).toEqual(expectedProtocolVersion) + consumeRemainingBuffer(() => + done.fail('Should not have remaining buffer') + ) + done() + }) + .catch(done.fail.bind(done)) + + channel.onmessage(packedHandshakeMessage(expectedProtocolVersion)) + }) + + it('should handle a successful handshake with reaining buffer', done => { + const { channel, handshakePromise } = subject() + const expectedProtocolVersion = 4.3 + const expectedExtraBuffer = createExtraBuffer() + handshakePromise + .then(({ protocolVersion, consumeRemainingBuffer }) => { + expect(protocolVersion).toEqual(expectedProtocolVersion) + let consumeRemainingBufferCalled = false + consumeRemainingBuffer(buffer => { + consumeRemainingBufferCalled = true + expect(buffer.toHex()).toEqual(expectedExtraBuffer.toHex()) + }) + expect(consumeRemainingBufferCalled).toBeTruthy() + done() + }) + .catch(done.fail.bind(done)) + + channel.onmessage( + packedHandshakeMessage(expectedProtocolVersion, expectedExtraBuffer) + ) + }) + + it('should fail if the server responds with the http header', done => { + const { channel, handshakePromise } = subject() + const httpMagicNumber = 1213486160 + + handshakePromise + .then(() => done.fail('should not resolve an failure')) + .catch(error => { + expect(error).toEqual( + newError( + 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' + ) + ) + done() + }) + + channel.onmessage(packedHandshakeMessage(httpMagicNumber)) + }) + it('should handle a failed handshake', done => { + const { channel, handshakePromise } = subject() + const expectedError = new Error('Something got wrong') + + handshakePromise + .then(() => done.fail('should not resolve an failure')) + .catch(error => { + expect(error).toBe(expectedError) + done() + }) + + channel.onerror(expectedError) + }) + + it('should handle an already broken channel', done => { + const channel = new DummyChannel() + const expectedError = new Error('Something got wrong') + channel._error = expectedError + const { handshakePromise } = subject({ channel }) + + handshakePromise + .then(() => done.fail('should resolve an failure')) + .catch(error => { + expect(error).toBe(expectedError) + done() + }) + }) + + function subject ({ channel = new DummyChannel() } = {}) { + return { + channel, + handshakePromise: Bolt.handshake(channel) + } + } + + function packedHandshakeMessage (protocolVersion, extraBuffer) { + const major = Math.floor(protocolVersion) + const minor = protocolVersion * 10 - major * 10 + const bufferLength = 4 + (extraBuffer ? extraBuffer.length : 0) + const result = alloc(bufferLength) + result.putInt32(0, (minor << 8) | major) + if (extraBuffer) { + result.putBytes(4, extraBuffer) + } + result.reset() + return result + } + + function createExtraBuffer () { + const buffer = alloc(16) + buffer.putInt32(0, 1970) + buffer.putInt32(4, 1984) + buffer.putInt32(8, 2010) + buffer.putInt32(12, 2012) + buffer.reset() + return buffer + } + }) + + describe('create', () => { + forEachAvailableProtcol(({ version, protocolClass }) => { + it(`it should create protocol ${version}`, () => { + const params = createBoltCreateParams({ version }) + + const protocol = Bolt.create(params) + + expect(protocol.version).toEqual(version) + expect(protocol).toEqual(jasmine.any(protocolClass)) + expect(protocol._server).toBe(params.server) + expect(protocol._packer).toEqual(protocol._createPacker(params.chunker)) + expect(protocol._unpacker).toEqual( + protocol._createUnpacker(params.disableLosslessIntegers) + ) + expect(protocol._log).toEqual(params.log) + const expectedError = 'Some error' + protocol._onProtocolError(expectedError) + expect(params.observer.protocolErrors).toEqual([expectedError]) + }) + + it(`it should configure configure the correct ResponseHandler for version ${version}`, () => { + const expectedFailure = 'expected failure' + const expectedError = 'expected error' + const expectedErrorAppliedTransformation = + 'expected error applied transformation' + const params = createBoltCreateParams({ version }) + + const protocol = Bolt.create(params) + + expect(protocol._responseHandler).toBeDefined() + const responseHandler = protocol._responseHandler + expect(responseHandler._log).toBe(params.log) + + const observer = responseHandler._observer + observer.onError(expectedError) + observer.onFailure(expectedFailure) + observer.onErrorApplyTransformation(expectedErrorAppliedTransformation) + + expect(params.observer.failures).toEqual([expectedFailure]) + expect(params.observer.errors).toEqual([expectedError]) + expect(params.observer.errorsAppliedTransformation).toEqual([ + expectedErrorAppliedTransformation + ]) + }) + + it(`it should configure the channel.onerror to call the observer for version ${version}`, () => { + const expectedError = 'expected error' + const params = createBoltCreateParams({ version }) + + const protocol = Bolt.create(params) + + expect(protocol).toBeDefined() + + params.channel.onerror(expectedError) + + expect(params.observer.errors).toEqual([expectedError]) + }) + + it(`it should configure the channel.onmessage to dechunk and call the response handler ${version}`, () => { + const params = createBoltCreateParams({ version }) + let receivedMessage = null + const expectedMessage = { + signature: 0x10, + fields: [123] + } + const protocol = Bolt.create(params) + protocol._responseHandler.handleResponse = msg => { + receivedMessage = msg + } + + protocol.packer().packStruct( + expectedMessage.signature, + expectedMessage.fields.map(field => protocol.packer().packable(field)) + ) + params.chunker.messageBoundary() + params.chunker.flush() + params.channel.onmessage(params.channel.toBuffer()) + + expect(receivedMessage).not.toBeNull() + expect(receivedMessage.signature).toEqual(expectedMessage.signature) + expect(receivedMessage.fields).toEqual(expectedMessage.fields) + }) + }) + + forEachUnknownProtocolVersion(version => { + it(`it should not create unknown protocol ${version}`, () => { + try { + Bolt.create(createBoltCreateParams({ version })) + fail(`should not create protocol version ${version} with success`) + } catch (error) { + expect(error).toEqual( + newError('Unknown Bolt protocol version: ' + version) + ) + } + }) + }) + + function forEachAvailableProtcol (lambda) { + function v (version, protocolClass) { + return { version, protocolClass } + } + + const availableProtocols = [ + v(1, BoltProtocolV1), + v(2, BoltProtocolV2), + v(3, BoltProtocolV3), + v(4.0, BoltProtocolV4x0), + v(4.1, BoltProtocolV4x1), + v(4.2, BoltProtocolV4x2), + v(4.3, BoltProtocolV4x3) + ] + + availableProtocols.forEach(lambda) + } + + function forEachUnknownProtocolVersion (lambda) { + ;[0, -1, 'javascript', undefined, null, 1.1].forEach(lambda) + } + + function createBoltCreateParams ({ version } = {}) { + const server = {} + const channel = new DummyChannel() + const chunker = new Chunker(channel) + const dechunker = new Dechunker() + const disableLosslessIntegers = false + const serversideRouting = false + const log = Logger.noOp() + const observer = createObserver() + return { + version, + server, + channel, + chunker, + dechunker, + disableLosslessIntegers, + serversideRouting, + log, + observer + } + } + + function createObserver () { + const protocolErrors = [] + const errorsAppliedTransformation = [] + const failures = [] + const errors = [] + return { + protocolErrors, + failures, + errors, + errorsAppliedTransformation, + onError: error => errors.push(error), + onFailure: failure => failures.push(failure), + onErrorApplyTransformation: error => { + errorsAppliedTransformation.push(error) + return error + }, + onProtocolError: protocolError => protocolErrors.push(protocolError) + } + } + }) +}) diff --git a/test/internal/request-message.test.js b/test/internal/bolt/request-message.test.js similarity index 96% rename from test/internal/request-message.test.js rename to test/internal/bolt/request-message.test.js index fbd2ebf1f..abe3bbacd 100644 --- a/test/internal/request-message.test.js +++ b/test/internal/bolt/request-message.test.js @@ -17,11 +17,11 @@ * limitations under the License. */ -import RequestMessage from '../../src/internal/request-message' -import Bookmark from '../../src/internal/bookmark' -import TxConfig from '../../src/internal/tx-config' -import { int } from '../../src' -import { READ, WRITE } from '../../src/driver' +import RequestMessage from '../../../src/internal/bolt/request-message' +import Bookmark from '../../../src/internal/bookmark' +import TxConfig from '../../../src/internal/tx-config' +import { int } from '../../../src' +import { READ, WRITE } from '../../../src/driver' describe('#unit RequestMessage', () => { it('should create INIT message', () => { diff --git a/test/internal/routing-table-raw.test.js b/test/internal/bolt/routing-table-raw.test.js similarity index 86% rename from test/internal/routing-table-raw.test.js rename to test/internal/bolt/routing-table-raw.test.js index 29034ccd1..426db6dcf 100644 --- a/test/internal/routing-table-raw.test.js +++ b/test/internal/bolt/routing-table-raw.test.js @@ -1,5 +1,23 @@ -import RawRoutingTable from '../../src/internal/routing-table-raw' -import Record from '../../src/record' +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RawRoutingTable from '../../../src/internal/bolt/routing-table-raw' +import Record from '../../../src/record' describe('#unit RawRoutingTable', () => { describe('ofNull', () => { diff --git a/test/internal/stream-observer.test.js b/test/internal/bolt/stream-observer.test.js similarity index 83% rename from test/internal/stream-observer.test.js rename to test/internal/bolt/stream-observer.test.js index 0939dd6b0..4fb2b049c 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/bolt/stream-observer.test.js @@ -17,24 +17,23 @@ * limitations under the License. */ -import FakeConnection from './fake-connection' import { ResultStreamObserver, RouteObserver, ProcedureRouteObserver -} from '../../src/internal/stream-observers' -import RawRoutingTable from '../../src/internal/routing-table-raw' -import { PROTOCOL_ERROR, newError } from '../../src/error' -import Record from '../../src/record' +} from '../../../src/internal/bolt/stream-observers' +import { RawRoutingTable } from '../../../src/internal/bolt' +import { PROTOCOL_ERROR, newError } from '../../../src/error' +import Record from '../../../src/record' const NO_OP = () => {} describe('#unit ResultStreamObserver', () => { - it('remembers resolved connection', () => { - const connection = new FakeConnection() - const streamObserver = newStreamObserver(connection) + it('remembers resolved server', () => { + const server = { address: '192.168.0.1' } + const streamObserver = newStreamObserver(server) - expect(streamObserver._connection).toBe(connection) + expect(streamObserver._server).toBe(server) }) it('remembers subscriber', () => { @@ -239,23 +238,27 @@ describe('#unit RouteObserver', () => { onError: metadata => { onErrorCalled = true expect(metadata).toBe(expectedError) - } + }, + onProtocolError: () => {} }).onError(expectedError) expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError when a protocol error occurs', () => { - const connection = new FakeConnection() + it('should call onProtocolError when a protocol error occurs', () => { + let onProtocolErrorCalled = false + const expectedError = newError('something wrong', PROTOCOL_ERROR) newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedError.message) + } }).onError(expectedError) - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedError.message]) + expect(onProtocolErrorCalled).toEqual(true) }) it('should call onError with a protocol error it receive a record', () => { @@ -271,14 +274,15 @@ describe('#unit RouteObserver', () => { onError: error => { onErrorCalled = true expect(error).toEqual(expectedError) - } + }, + onProtocolError: () => {} }).onNext(record) expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError with a protocol error it receive a record', () => { - const connection = new FakeConnection() + it('should call onProtocolError with a protocol error it receive a record', () => { + let onProtocolErrorCalled = false const record = new Record(['a'], ['b']) const expectedErrorMessage = 'Received RECORD when resetting: received record is: ' + @@ -286,19 +290,21 @@ describe('#unit RouteObserver', () => { newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedErrorMessage) + } }).onNext(record) - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + expect(onProtocolErrorCalled).toEqual(true) }) function newRouteObserver ({ onCompleted = shouldNotBeCalled('onComplete'), onError = shouldNotBeCalled('onError'), - connection = new FakeConnection() + onProtocolError = shouldNotBeCalled('onProtocolError') } = {}) { - return new RouteObserver({ connection, onCompleted, onError }) + return new RouteObserver({ onCompleted, onError, onProtocolError }) } function shouldNotBeCalled (methodName) { @@ -342,7 +348,8 @@ describe('#unit ProcedureRouteObserver', () => { onError: error => { onErrorCalled = true expect(error).toEqual(expectedError) - } + }, + onProtocolError: () => {} }) observer.onCompleted() @@ -350,19 +357,21 @@ describe('#unit ProcedureRouteObserver', () => { expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError with a protocol error it receive 0 records', () => { - const connection = new FakeConnection() + it('should call onProtocolError with a protocol error it receive 0 records', () => { + let onProtocolErrorCalled = false const expectedErrorMessage = 'Illegal response from router. Received 0 records but expected only one.\n' + JSON.stringify([]) newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedErrorMessage) + } }).onCompleted() - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + expect(onProtocolErrorCalled).toEqual(true) }) it('should call onError with a protocol error it receive more than one record', () => { @@ -377,7 +386,8 @@ describe('#unit ProcedureRouteObserver', () => { onError: error => { onErrorCalled = true expect(error).toEqual(expectedError) - } + }, + onProtocolError: () => {} }) observer.onNext(record) @@ -387,8 +397,8 @@ describe('#unit ProcedureRouteObserver', () => { expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError with a protocol error it receive 0 records', () => { - const connection = new FakeConnection() + it('should call onProtocolError with a protocol error it receive 0 records', () => { + let onProtocolErrorCalled = false const record = new Record(['a'], ['b']) const expectedErrorMessage = 'Illegal response from router. Received 2 records but expected only one.\n' + @@ -396,15 +406,17 @@ describe('#unit ProcedureRouteObserver', () => { const observer = newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedErrorMessage) + } }) observer.onNext(record) observer.onNext(record) observer.onCompleted() - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + expect(onProtocolErrorCalled).toEqual(true) }) it('should call onError with the error', () => { @@ -429,34 +441,37 @@ describe('#unit ProcedureRouteObserver', () => { onError: metadata => { onErrorCalled = true expect(metadata).toBe(expectedError) - } + }, + onProtocolError: null }).onError(expectedError) expect(onErrorCalled).toEqual(true) }) - it('should call connection._handleProtocolError when a protocol error occurs', () => { - const connection = new FakeConnection() + it('should call onProtocolError when a protocol error occurs', () => { + let onProtocolErrorCalled = false const expectedError = newError('something wrong', PROTOCOL_ERROR) newRouteObserver({ onError: null, - connection + onProtocolError: message => { + onProtocolErrorCalled = true + expect(message).toEqual(expectedError.message) + } }).onError(expectedError) - expect(connection.protocolErrorsHandled).toEqual(1) - expect(connection.seenProtocolErrors).toEqual([expectedError.message]) + expect(onProtocolErrorCalled).toEqual(true) }) function newRouteObserver ({ onCompleted = shouldNotBeCalled('onComplete'), onError = shouldNotBeCalled('onError'), - connection = new FakeConnection(), + onProtocolError = shouldNotBeCalled('onProtocolError'), resultObserver = new FakeResultStreamObserver() } = {}) { return new ProcedureRouteObserver({ resultObserver, - connection, + onProtocolError, onCompleted, onError }) @@ -477,9 +492,9 @@ describe('#unit ProcedureRouteObserver', () => { } }) -function newStreamObserver (connection) { +function newStreamObserver (server) { return new ResultStreamObserver({ - connection + server }) } diff --git a/test/internal/connection-channel.test.js b/test/internal/connection-channel.test.js index cf5f8957b..f14b48c6d 100644 --- a/test/internal/connection-channel.test.js +++ b/test/internal/connection-channel.test.js @@ -18,7 +18,9 @@ */ import DummyChannel from './dummy-channel' -import ChannelConnection from '../../src/internal/connection-channel' +import ChannelConnection, { + createChannelConnection +} from '../../src/internal/connection-channel' import { Packer } from '../../src/internal/packstream-v1' import { Chunker } from '../../src/internal/chunking' import { alloc } from '../../src/internal/node' @@ -33,7 +35,7 @@ import Bookmark from '../../src/internal/bookmark' import TxConfig from '../../src/internal/tx-config' import { WRITE } from '../../src/driver' import ServerAddress from '../../src/internal/server-address' -import { ResultStreamObserver } from '../../src/internal/stream-observers' +import { ResultStreamObserver } from '../../src/internal/bolt' const ILLEGAL_MESSAGE = { signature: 42, fields: [] } const SUCCESS_MESSAGE = { signature: 0x70, fields: [{}] } @@ -52,12 +54,12 @@ describe('#integration ChannelConnection', () => { } }) - it('should have correct creation timestamp', () => { + it('should have correct creation timestamp', async () => { const clock = lolex.install() try { clock.setSystemTime(424242) - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) expect(connection.creationTimestamp).toEqual(424242) } finally { @@ -66,23 +68,23 @@ describe('#integration ChannelConnection', () => { }) it('should read/write basic messages', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection._negotiateProtocol().then(() => { - connection.protocol().initialize({ - userAgent: 'mydriver/0.0.0', - authToken: basicAuthToken(), - onComplete: metadata => { - expect(metadata).not.toBeNull() - done() - }, - onError: console.log + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection.protocol().initialize({ + userAgent: 'mydriver/0.0.0', + authToken: basicAuthToken(), + onComplete: metadata => { + expect(metadata).not.toBeNull() + done() + }, + onError: done.fail.bind(done) + }) }) - }) + .catch(done.fail.bind(done)) }) - it('should retrieve stream', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should retrieve stream', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) const records = [] const pullAllObserver = { @@ -92,95 +94,82 @@ describe('#integration ChannelConnection', () => { onCompleted: () => { expect(records[0].get(0)).toBe(1) done() - } + }, + onError: done.fail.bind(done) } - connection.connect('mydriver/0.0.0', basicAuthToken()).then(() => { - connection - .protocol() - .run( - 'RETURN 1.0', - {}, - { - bookmark: Bookmark.empty(), - txConfig: TxConfig.empty(), - mode: WRITE - } - ) - .subscribe(pullAllObserver) - }) - }) - - it('should write protocol handshake', () => { - const channel = new DummyChannel() - connection = new ChannelConnection( - channel, - new ConnectionErrorHandler(SERVICE_UNAVAILABLE), - ServerAddress.fromUrl('localhost:7687'), - Logger.noOp() - ) - - connection._negotiateProtocol() - - const boltMagicPreamble = '60 60 b0 17' - const protocolVersion4x3 = '00 01 03 04' - const protocolVersion4x1 = '00 00 01 04' - const protocolVersion4x0 = '00 00 00 04' - const protocolVersion3 = '00 00 00 03' - expect(channel.toHex()).toBe( - `${boltMagicPreamble} ${protocolVersion4x3} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` - ) + connection + .connect('mydriver/0.0.0', basicAuthToken()) + .then(() => { + connection + .protocol() + .run( + 'RETURN 1.0', + {}, + { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + mode: WRITE + } + ) + .subscribe(pullAllObserver) + }) + .catch(done.fail.bind(done)) }) - it('should provide error message when connecting to http-port', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}:7474`, { + it('should provide error message when connecting to http-port', async done => { + await createConnection(`bolt://${sharedNeo4j.hostname}:7474`, { encrypted: false }) + .then(done.fail.bind(done)) + .catch(error => { + expect(error).toBeDefined() + expect(error).not.toBeNull() - connection.connect('mydriver/0.0.0', basicAuthToken()).catch(error => { - expect(error).toBeDefined() - expect(error).not.toBeNull() - - if (testUtils.isServer()) { - // only node gets the pretty error message - expect(error.message).toBe( - 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + - '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' - ) - } - - done() - }) + if (testUtils.isServer()) { + // only node gets the pretty error message + expect(error.message).toBe( + 'Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)' + ) + } + done() + }) }) it('should convert failure messages to errors', done => { const channel = new DummyChannel() - connection = new ChannelConnection( - channel, - new ConnectionErrorHandler(SERVICE_UNAVAILABLE), - ServerAddress.fromUrl('localhost:7687'), - Logger.noOp() - ) - - connection._negotiateProtocol() - const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed' const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]' - connection._queueObserver({ - onError: error => { - expectNeo4jError(error, errorCode, errorMessage) - done() - } - }) + createChannelConnection( + ServerAddress.fromUrl('localhost:7687'), + {}, + new ConnectionErrorHandler(SERVICE_UNAVAILABLE), + Logger.noOp(), + null, + () => channel + ) + .then(c => { + connection = c + connection._queueObserver({ + onCompleted: done.fail.bind(done), + onComplete: done.fail.bind(done), + onError: error => { + expectNeo4jError(error, errorCode, errorMessage) + done() + } + }) + channel.onmessage(packedFailureMessage(errorCode, errorMessage)) + }) + .catch(done.fail.bind(done)) channel.onmessage(packedHandshakeMessage()) - channel.onmessage(packedFailureMessage(errorCode, errorMessage)) }) - it('should notify when connection initialization completes', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should notify when connection initialization completes', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) connection .connect('mydriver/0.0.0', basicAuthToken()) @@ -188,13 +177,14 @@ describe('#integration ChannelConnection', () => { expect(initializedConnection).toBe(connection) done() }) + .catch(done.fail.bind(done)) }) - it('should notify when connection initialization fails', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}:7474`) // wrong port + it('should notify when connection initialization fails', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) // wrong port connection - .connect('mydriver/0.0.0', basicAuthToken()) + .connect('mydriver/0.0.0', basicWrongAuthToken()) .then(() => done.fail('Should not initialize')) .catch(error => { expect(error).toBeDefined() @@ -202,9 +192,8 @@ describe('#integration ChannelConnection', () => { }) }) - it('should have server version after connection initialization completed', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - + it('should have server version after connection initialization completed', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) connection .connect('mydriver/0.0.0', basicAuthToken()) .then(initializedConnection => { @@ -213,13 +202,14 @@ describe('#integration ChannelConnection', () => { expect(serverVersion).toBeDefined() done() }) + .catch(done.fail.bind(done)) }) - it('should fail all new observers after failure to connect', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}:7474`) // wrong port + it('should fail all new observers after failure to connect', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) connection - .connect('mydriver/0.0.0', basicAuthToken()) + .connect('mydriver/0.0.0', basicWrongAuthToken()) .then(() => done.fail('Should not connect')) .catch(initialError => { expect(initialError).toBeDefined() @@ -275,94 +265,113 @@ describe('#integration ChannelConnection', () => { testQueueingOfObserversWithBrokenConnection(resetAction, done) }) - it('should reset and flush when SUCCESS received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should reset and flush when SUCCESS received', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection - .resetAndFlush() - .then(() => { - expect(connection.isOpen()).toBeTruthy() - done() + connection + .connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection + .resetAndFlush() + .then(() => { + expect(connection.isOpen()).toBeTruthy() + done() + }) + .catch(error => done.fail(error)) + + // write a SUCCESS message for RESET before the actual response is received + connection.protocol()._responseHandler.handleResponse(SUCCESS_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} }) - .catch(error => done.fail(error)) - - // write a SUCCESS message for RESET before the actual response is received - connection._handleMessage(SUCCESS_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} }) - }) + .catch(done.fail.bind(done)) }) - it('should fail to reset and flush when FAILURE received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection - .resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual( - 'Received FAILURE as a response for RESET: Neo4jError: Hello' - ) - expect(connection._isBroken).toBeTruthy() - expect(connection.isOpen()).toBeFalsy() - done() + it('should fail to reset and flush when FAILURE received', async done => { + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { + connection + .resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual( + 'Received FAILURE as a response for RESET: Neo4jError: Hello' + ) + expect(connection._isBroken).toBeTruthy() + expect(connection.isOpen()).toBeFalsy() + done() + }) + + // write a FAILURE message for RESET before the actual response is received / white box test + connection.protocol()._responseHandler.handleResponse(FAILURE_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} + }) }) - - // write a FAILURE message for RESET before the actual response is received - connection._handleMessage(FAILURE_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} }) - }) + .catch(done.fail.bind(done)) }) - it('should fail to reset and flush when RECORD received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + it('should fail to reset and flush when RECORD received', async done => { + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection - .resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual( - 'Received RECORD when resetting: received record is: {"value":"Hello"}' - ) - expect(connection._isBroken).toBeTruthy() - expect(connection.isOpen()).toBeFalsy() - done() - }) + connection + .connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection + .resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual( + 'Received RECORD when resetting: received record is: {"value":"Hello"}' + ) + expect(connection._isBroken).toBeTruthy() + expect(connection.isOpen()).toBeFalsy() + done() + }) - // write a RECORD message for RESET before the actual response is received - connection._handleMessage(RECORD_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} + // write a RECORD message for RESET before the actual response is received + connection.protocol()._responseHandler.handleResponse(RECORD_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} + }) }) - }) + .catch(done.fail.bind(done)) }) - it('should acknowledge failure with RESET when SUCCESS received', done => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection.connect('my-driver/1.2.3', basicAuthToken()).then(() => { - connection._currentFailure = newError('Hello') - connection._resetOnFailure() - - // write a SUCCESS message for RESET before the actual response is received - connection._handleMessage(SUCCESS_MESSAGE) - // enqueue a dummy observer to handle the real SUCCESS message - connection._queueObserver({ - onCompleted: () => {} + it('should acknowledge failure with RESET when SUCCESS received', async done => { + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection + .connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.protocol()._responseHandler._currentFailure = newError( + 'Hello' + ) // white box test, not ideal + connection._resetOnFailure() + + // write a SUCCESS message for RESET before the actual response is received + connection + .protocol() + ._responseHandler.handleResponse(SUCCESS_MESSAGE) + // enqueue a dummy observer to handle the real SUCCESS message + connection.protocol()._responseHandler._queueObserver({ + onCompleted: () => {} + }) + + expect( + connection.protocol()._responseHandler._currentFailure + ).toBeNull() + done() + }) + .catch(done.fail.bind(done)) }) - - expect(connection._currentFailure).toBeNull() - done() - }) + .catch(done.fail.bind(done)) }) it('should handle and transform fatal errors', done => { @@ -378,30 +387,32 @@ describe('#integration ChannelConnection', () => { } ) - connection = ChannelConnection.create( + createChannelConnection( ServerAddress.fromUrl(`bolt://${sharedNeo4j.hostname}`), {}, errorHandler, Logger.noOp() ) - - connection._queueObserver({ - onError: error => { - expect(error).toEqual(transformedError) - expect(errors.length).toEqual(1) - expect(errors[0].code).toEqual(SERVICE_UNAVAILABLE) - expect(addresses).toEqual([connection.address]) - done() - } - }) - - connection._handleFatalError(newError('Hello', SERVICE_UNAVAILABLE)) + .then(c => { + connection = c + connection._queueObserver({ + onError: error => { + expect(error).toEqual(transformedError) + expect(errors.length).toEqual(1) + expect(errors[0].code).toEqual(SERVICE_UNAVAILABLE) + expect(addresses).toEqual([connection.address]) + done() + } + }) + connection._handleFatalError(newError('Hello', SERVICE_UNAVAILABLE)) + }) + .catch(done.fail.bind(done)) }) it('should send INIT/HELLO and GOODBYE messages', async () => { const messages = [] - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - recordWrittenMessages(connection, messages) + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) + recordWrittenMessages(connection._protocol, messages) await connection.connect('mydriver/0.0.0', basicAuthToken()) @@ -418,7 +429,7 @@ describe('#integration ChannelConnection', () => { }) it('should not prepare broken connection to close', async () => { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) + connection = await createConnection(`bolt://${sharedNeo4j.hostname}`) await connection.connect('my-connection/9.9.9', basicAuthToken()) expect(connection._protocol).toBeDefined() @@ -470,21 +481,26 @@ describe('#integration ChannelConnection', () => { } } + function basicWrongAuthToken () { + return { + scheme: 'basic', + principal: sharedNeo4j.username + 'a', + credentials: sharedNeo4j.password + 'b' + } + } + async function testConnectionTimeout (encrypted) { const clock = jasmine.clock() clock.install() try { const boltUri = 'bolt://10.0.0.0' // use non-routable IP address which never responds - connection = createConnection( + setImmediate(() => clock.tick(1001)) + connection = await createConnection( boltUri, { encrypted: encrypted, connectionTimeout: 1000 }, 'TestErrorCode' ) - - clock.tick(1001) - - await connection.connect('mydriver/0.0.0', basicAuthToken()) } catch (error) { expect(error.code).toEqual('TestErrorCode') @@ -505,30 +521,34 @@ describe('#integration ChannelConnection', () => { } function testQueueingOfObserversWithBrokenConnection (connectionAction, done) { - connection = createConnection(`bolt://${sharedNeo4j.hostname}`) - - connection._negotiateProtocol().then(() => { - connection._handleMessage(ILLEGAL_MESSAGE) - expect(connection.isOpen()).toBeFalsy() + createConnection(`bolt://${sharedNeo4j.hostname}`) + .then(connection => { + connection._handleProtocolError(ILLEGAL_MESSAGE) + expect(connection.isOpen()).toBeFalsy() - expect(connection._pendingObservers.length).toEqual(0) - connectionAction(connection) - expect(connection._pendingObservers.length).toEqual(0) + expect(connection.hasOngoingObservableRequests()).toBeFalsy() + connectionAction(connection) + expect(connection.hasOngoingObservableRequests()).toBeFalsy() - done() - }) + done() + }) + .catch(done.fail.bind(done)) } /** - * @return {Connection} + * @return {Promise} */ function createConnection (url, config, errorCode = null) { - return ChannelConnection.create( + const _config = config || {} + return createChannelConnection( ServerAddress.fromUrl(url), - config || {}, + _config, new ConnectionErrorHandler(errorCode || SERVICE_UNAVAILABLE), Logger.noOp() - ) + ).then(c => { + connection = c + return connection + }) } function recordWrittenMessages (connection, messages) { diff --git a/test/internal/connection-delegate.test.js b/test/internal/connection-delegate.test.js index 1dae77735..42bed041e 100644 --- a/test/internal/connection-delegate.test.js +++ b/test/internal/connection-delegate.test.js @@ -18,7 +18,7 @@ */ import DelegateConnection from '../../src/internal/connection-delegate' import Connection from '../../src/internal/connection' -import BoltProtocol from '../../src/internal/bolt-protocol-v1' +import { BoltProtocol } from '../../src/internal/bolt' import BoltAddress from '../../src/internal/server-address' import ConnectionErrorHandler from '../../src/internal/connection-error-handler' diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js deleted file mode 100644 index 2d6e0f99c..000000000 --- a/test/internal/protocol-handshaker.test.js +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import ProtocolHandshaker from '../../src/internal/protocol-handshaker' -import Logger from '../../src/internal/logger' -import BoltProtocol from '../../src/internal/bolt-protocol-v1' -import BoltProtocolV4x3 from '../../src/internal/bolt-protocol-v4x3' - -import { alloc } from '../../src/internal/node' - -describe('#unit ProtocolHandshaker', () => { - it('should write handshake request', () => { - const writtenBuffers = [] - const fakeChannel = { - write: buffer => writtenBuffers.push(buffer) - } - - const handshaker = new ProtocolHandshaker( - null, - fakeChannel, - null, - false, - Logger.noOp() - ) - - handshaker.writeHandshakeRequest() - - expect(writtenBuffers.length).toEqual(1) - - const boltMagicPreamble = '60 60 b0 17' - const protocolVersion4x3 = '00 01 03 04' - const protocolVersion4x1 = '00 00 01 04' - const protocolVersion4x0 = '00 00 00 04' - const protocolVersion3 = '00 00 00 03' - - expect(writtenBuffers[0].toHex()).toEqual( - `${boltMagicPreamble} ${protocolVersion4x3} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` - ) - }) - - it('should create protocol with valid version', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with Bolt V1 - const buffer = handshakeResponse(1) - - const protocol = handshaker.createNegotiatedProtocol(buffer) - - expect(protocol).toBeDefined() - expect(protocol).not.toBeNull() - expect(protocol instanceof BoltProtocol).toBeTruthy() - }) - - it('should create protocol 4.3', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with Bolt V4.3 - const buffer = handshakeResponse(4, 3) - - const protocol = handshaker.createNegotiatedProtocol(buffer) - - expect(protocol).toBeDefined() - expect(protocol).not.toBeNull() - expect(protocol.version).toEqual(4.3) - expect(protocol instanceof BoltProtocolV4x3).toBeTruthy() - }) - - it('should fail to create protocol from invalid version', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with Bolt V42 which is invalid - const buffer = handshakeResponse(42) - - expect(() => handshaker.createNegotiatedProtocol(buffer)).toThrow() - }) - - it('should fail to create protocol from HTTP as invalid version', () => { - const handshaker = new ProtocolHandshaker( - null, - null, - null, - false, - Logger.noOp() - ) - - // buffer with HTTP magic int - const buffer = handshakeResponse(1213486160) - - expect(() => handshaker.createNegotiatedProtocol(buffer)).toThrow() - }) -}) - -/** - * @param {number} version - * @return {BaseBuffer} - */ -function handshakeResponse (version, minor = 0) { - const buffer = alloc(4) - buffer.writeInt32((minor << 8) | version) - buffer.reset() - return buffer -} diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index 6b3fbfa06..b78c98aad 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import RawRoutingTable from '../../src/internal/routing-table-raw' +import { RawRoutingTable } from '../../src/internal/bolt' import Rediscovery from '../../src/internal/rediscovery' import RoutingTable from '../../src/internal/routing-table' import ServerAddress from '../../src/internal/server-address' diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js index 25803eb59..2cd6cca49 100644 --- a/test/internal/routing-table.test.js +++ b/test/internal/routing-table.test.js @@ -20,7 +20,7 @@ import RoutingTable from '../../src/internal/routing-table' import Integer, { int } from '../../src/integer' import { READ, WRITE } from '../../src/driver' import ServerAddress from '../../src/internal/server-address' -import RawRoutingTable from '../../src/internal/routing-table-raw' +import { RawRoutingTable } from '../../src/internal/bolt' import { PROTOCOL_ERROR } from '../../src/error' import lolex from 'lolex' diff --git a/test/internal/test-utils.js b/test/internal/test-utils.js index 542b48d70..a79b326bc 100644 --- a/test/internal/test-utils.js +++ b/test/internal/test-utils.js @@ -16,6 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { Chunker } from '../../src/internal/chunking' import Connection from '../../src/internal/connection' function isClient () { @@ -115,10 +116,33 @@ class MessageRecordingConnection extends Connection { } } +function spyProtocolWrite (protocol, callRealMethod = false) { + protocol.messages = [] + protocol.observers = [] + protocol.flushes = [] + + const write = callRealMethod ? protocol.write.bind(protocol) : () => true + protocol.write = (message, observer, flush) => { + protocol.messages.push(message) + protocol.observers.push(observer) + protocol.flushes.push(flush) + return write(message, observer, flush) + } + + protocol.verifyMessageCount = expected => { + expect(protocol.messages.length).toEqual(expected) + expect(protocol.observers.length).toEqual(expected) + expect(protocol.flushes.length).toEqual(expected) + } + + return protocol +} + export default { isClient, isServer, fakeStandardDateWithOffset, matchers, - MessageRecordingConnection + MessageRecordingConnection, + spyProtocolWrite } diff --git a/test/session.test.js b/test/session.test.js index 52b2d2438..dc2f018a6 100644 --- a/test/session.test.js +++ b/test/session.test.js @@ -392,7 +392,7 @@ describe('#integration session', () => { // wait some time than close the session with a long running query setTimeout(() => { - session.close() + session.close().catch(done.fail.bind(done)) }, 1000) }, 70000)