diff --git a/src/v1/driver.js b/src/v1/driver.js index 585cd3cc5..8b4be63f3 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -19,7 +19,7 @@ import Session from './session'; import Pool from './internal/pool'; -import {connect} from './internal/connector'; +import Connection from './internal/connection'; import StreamObserver from './internal/stream-observer'; import {newError, SERVICE_UNAVAILABLE} from './error'; import {DirectConnectionProvider} from './internal/connection-providers'; @@ -27,6 +27,7 @@ import Bookmark from './internal/bookmark'; import ConnectivityVerifier from './internal/connectivity-verifier'; import PoolConfig, {DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE} from './internal/pool-config'; import Logger from './internal/logger'; +import ConnectionErrorHandler from './internal/connection-error-handler'; const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000; // 1 hour @@ -62,18 +63,18 @@ class Driver { * @constructor * @param {string} hostPort * @param {string} userAgent - * @param {object} token + * @param {object} authToken * @param {object} config * @protected */ - constructor(hostPort, userAgent, token = {}, config = {}) { + constructor(hostPort, userAgent, authToken = {}, config = {}) { sanitizeConfig(config); this._id = idGenerator++; this._hostPort = hostPort; this._userAgent = userAgent; this._openConnections = {}; - this._token = token; + this._authToken = authToken; this._config = config; this._log = Logger.create(config); this._pool = new Pool( @@ -127,18 +128,24 @@ class Driver { } /** - * Create a new connection instance. - * @return {Connection} new connector-api session instance, a low level session API. + * Create a new connection and initialize it. + * @return {Promise} promise resolved with a new connection or rejected when failed to connect. * @access private */ _createConnection(hostPort, release) { - const conn = connect(hostPort, this._config, this._connectionErrorCode(), this._log); - const streamObserver = new _ConnectionStreamObserver(this, conn); - conn.protocol().initialize(this._userAgent, this._token, streamObserver); - conn._release = () => release(hostPort, conn); - - this._openConnections[conn.id] = conn; - return conn; + const connection = Connection.create(hostPort, this._config, this._createConnectionErrorHandler(), this._log); + connection._release = () => release(hostPort, connection); + this._openConnections[connection.id] = connection; + + return connection.connect(this._userAgent, this._authToken) + .catch(error => { + if (this.onError) { + // notify Driver.onError callback about connection initialization errors + this.onError(error); + } + // propagate the error because connection failed to connect / initialize + throw error; + }); } /** @@ -186,7 +193,7 @@ class Driver { const sessionMode = Driver._validateSessionMode(mode); const connectionProvider = this._getOrCreateConnectionProvider(); const bookmark = new Bookmark(bookmarkOrBookmarks); - return this._createSession(sessionMode, connectionProvider, bookmark, this._config); + return new Session(sessionMode, connectionProvider, bookmark, this._config); } static _validateSessionMode(rawMode) { @@ -203,14 +210,8 @@ class Driver { } // Extension point - _createSession(mode, connectionProvider, bookmark, config) { - return new Session(mode, connectionProvider, bookmark, config); - } - - // Extension point - _connectionErrorCode() { - // connection errors might result in different error codes depending on the driver - return SERVICE_UNAVAILABLE; + _createConnectionErrorHandler() { + return new ConnectionErrorHandler(SERVICE_UNAVAILABLE); } _getOrCreateConnectionProvider() { diff --git a/src/v1/internal/connection-error-handler.js b/src/v1/internal/connection-error-handler.js new file mode 100644 index 000000000..b8ee33507 --- /dev/null +++ b/src/v1/internal/connection-error-handler.js @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error'; + +export default class ConnectionErrorHandler { + + constructor(errorCode, handleUnavailability, handleWriteFailure) { + this._errorCode = errorCode; + this._handleUnavailability = handleUnavailability || noOpHandler; + this._handleWriteFailure = handleWriteFailure || noOpHandler; + } + + /** + * Error code to use for network errors. + * @return {string} the error code. + */ + errorCode() { + return this._errorCode; + } + + /** + * Handle and transform the error. + * @param {Neo4jError} error the original error. + * @param {string} hostPort the host and port of the connection where the error happened. + * @return {Neo4jError} new error that should be propagated to the user. + */ + handleAndTransformError(error, hostPort) { + if (isAvailabilityError(error)) { + return this._handleUnavailability(error, hostPort); + } + if (isFailureToWrite(error)) { + return this._handleWriteFailure(error, hostPort); + } + return error; + } +} + +function isAvailabilityError(error) { + if (error) { + return error.code === SESSION_EXPIRED || + error.code === SERVICE_UNAVAILABLE || + error.code === 'Neo.TransientError.General.DatabaseUnavailable'; + } + return false; +} + +function isFailureToWrite(error) { + if (error) { + return error.code === 'Neo.ClientError.Cluster.NotALeader' || + error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; + } + return false; +} + +function noOpHandler(error) { + return error; +} diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js index 9ded035af..2c88ce4e8 100644 --- a/src/v1/internal/connection-holder.js +++ b/src/v1/internal/connection-holder.js @@ -55,7 +55,7 @@ export default class ConnectionHolder { getConnection(streamObserver) { return this._connectionPromise.then(connection => { streamObserver.resolveConnection(connection); - return connection.initializationCompleted(); + return connection; }); } diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index ca74b53c4..566ce9b2e 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -26,6 +26,8 @@ import hasFeature from './features'; import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers'; import RoutingUtil from './routing-util'; +const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized'; + class ConnectionProvider { acquireConnection(mode) { @@ -195,20 +197,32 @@ export class LoadBalancer extends ConnectionProvider { // try next router return this._createSessionForRediscovery(currentRouter).then(session => { - return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter) + if (session) { + return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter); + } else { + // unable to acquire connection and create session towards the current router + // return null to signal that the next router should be tried + return null; + } }); }); }, Promise.resolve(null)); } _createSessionForRediscovery(routerAddress) { - return this._connectionPool.acquire(routerAddress).then(connection => { - // initialized connection is required for routing procedure call - // server version needs to be known to decide which routing procedure to use - const initializedConnectionPromise = connection.initializationCompleted(); - const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise); - return new Session(READ, connectionProvider); - }); + return this._connectionPool.acquire(routerAddress) + .then(connection => { + const connectionProvider = new SingleConnectionProvider(connection); + return new Session(READ, connectionProvider); + }) + .catch(error => { + // unable to acquire connection towards the given router + if (error && error.code === UNAUTHORIZED_ERROR_CODE) { + // auth error is a sign of a configuration issue, rediscovery should not proceed + throw error; + } + return null; + }); } _applyRoutingTableIfPossible(newRoutingTable) { @@ -257,14 +271,14 @@ export class LoadBalancer extends ConnectionProvider { export class SingleConnectionProvider extends ConnectionProvider { - constructor(connectionPromise) { + constructor(connection) { super(); - this._connectionPromise = connectionPromise; + this._connection = connection; } acquireConnection(mode) { - const connectionPromise = this._connectionPromise; - this._connectionPromise = null; - return connectionPromise; + const connection = this._connection; + this._connection = null; + return Promise.resolve(connection); } } diff --git a/src/v1/internal/connector.js b/src/v1/internal/connection.js similarity index 56% rename from src/v1/internal/connector.js rename to src/v1/internal/connection.js index 5d7ee53c6..69880bde3 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connection.js @@ -49,7 +49,7 @@ const FAILURE = 0x7F; // 0111 1111 // FAILURE function NO_OP(){} -let NO_OP_OBSERVER = { +const NO_OP_OBSERVER = { onNext : NO_OP, onCompleted : NO_OP, onError : NO_OP @@ -57,33 +57,22 @@ let NO_OP_OBSERVER = { let idGenerator = 0; -/** - * A connection manages sending and receiving messages over a channel. A - * connector is very closely tied to the Bolt protocol, it implements the - * same message structure with very little frills. This means Connectors are - * naturally tied to a specific version of the protocol, and we expect - * another layer will be needed to support multiple versions. - * - * The connector tries to batch outbound messages by requiring its users - * to call 'sync' when messages need to be sent, and it routes response - * messages back to the originators of the requests that created those - * response messages. - * @access private - */ -class Connection { +export default class Connection { /** * @constructor * @param {NodeChannel|WebSocketChannel} channel - channel with a 'write' function and a 'onmessage' callback property. + * @param {ConnectionErrorHandler} errorHandler the error handler. * @param {string} hostPort - the hostname and port to connect to. * @param {Logger} log - the configured logger. * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. */ - constructor(channel, hostPort, log, disableLosslessIntegers = false) { + constructor(channel, errorHandler, hostPort, log, disableLosslessIntegers = false) { this.id = idGenerator++; this.hostPort = hostPort; this.server = {address: hostPort}; this.creationTimestamp = Date.now(); + this._errorHandler = errorHandler; this._disableLosslessIntegers = disableLosslessIntegers; this._pendingObservers = []; this._currentObserver = undefined; @@ -92,43 +81,107 @@ class Connection { this._chunker = new Chunker( channel ); this._log = log; - const protocolHandshaker = new ProtocolHandshaker(this, channel, this._chunker, this._disableLosslessIntegers, this._log); - // initially assume that database supports latest Bolt version, create latest packer and unpacker - this._protocol = protocolHandshaker.createLatestProtocol(); + // bolt protocol is initially not initialized + this._protocol = null; + // error extracted from a FAILURE message this._currentFailure = null; - this._state = new ConnectionState(this); - - // Set to true on fatal errors, to get this out of session pool. + // Set to true on fatal errors, to get this out of connection pool. this._isBroken = false; - // TODO: Using `onmessage` and `onerror` came from the WebSocket API, - // it reads poorly and has several annoying drawbacks. Swap to having - // Channel extend EventEmitter instead, then we can use `on('data',..)` - this._ch.onmessage = buffer => this._initializeProtocol(buffer, protocolHandshaker); - - // Listen to connection errors. Important note though; - // In some cases we will get a channel that is already broken (for instance, - // if the user passes invalid configuration options). In this case, onerror - // will have "already triggered" before we add out listener here. So the line - // below also checks that the channel is not already failed. This could be nicely - // encapsulated into Channel if we used `on('error', ..)` rather than `onerror=..` - // as outlined in the comment about `onmessage` further up in this file. - this._ch.onerror = this._handleFatalError.bind(this); - if( this._ch._error ) { - this._handleFatalError(this._ch._error); - } - - this._dechunker.onmessage = (buf) => { - this._handleMessage(this._protocol.unpacker().unpack(buf)); - }; - if (this._log.isDebugEnabled()) { this._log.debug(`${this} created towards ${hostPort}`); } + } + + /** + * Crete new connection to the provided address. Returned connection is not connected. + * @param {string} url - the Bolt endpoint to connect to. + * @param {object} config - this driver configuration. + * @param {ConnectionErrorHandler} errorHandler - the error handler for connection errors. + * @param {Logger} log - configured logger. + * @return {Connection} - new connection. + */ + static create(url, config, errorHandler, log) { + const Ch = config.channel || Channel; + const parsedAddress = urlUtil.parseDatabaseUrl(url); + const channelConfig = new ChannelConfig(parsedAddress, config, errorHandler.errorCode()); + return new Connection(new Ch(channelConfig), errorHandler, parsedAddress.hostAndPort, log, config.disableLosslessIntegers); + } + + /** + * Connect to the target address, negotiate Bolt protocol and send initialization message. + * @param {string} userAgent the user agent for this driver. + * @param {object} authToken the object containing auth information. + * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. + */ + connect(userAgent, authToken) { + return this._negotiateProtocol().then(() => this._initialize(userAgent, authToken)); + } + + /** + * Execute Bolt protocol handshake to initialize the protocol version. + * @return {Promise} promise resolved with the current connection if handshake is successful. Rejected promise otherwise. + */ + _negotiateProtocol() { + const protocolHandshaker = new ProtocolHandshaker(this, this._ch, this._chunker, this._disableLosslessIntegers, this._log); + + return new Promise((resolve, reject) => { + + const handshakeErrorHandler = error => { + this._handleFatalError(error); + reject(error); + }; + + this._ch.onerror = handshakeErrorHandler.bind(this); + if (this._ch._error) { + // channel is already broken + handshakeErrorHandler(this._ch._error); + } + + this._ch.onmessage = buffer => { + try { + // read the response buffer and initialize the protocol + this._protocol = protocolHandshaker.createNegotiatedProtocol(buffer); + + // reset the error handler to just handle errors and forget about the handshake promise + this._ch.onerror = this._handleFatalError.bind(this); + + // Ok, protocol running. Simply forward all messages to the dechunker + this._ch.onmessage = buf => this._dechunker.write(buf); + + // setup dechunker to dechunk messages and forward them to the message handler + this._dechunker.onmessage = (buf) => { + this._handleMessage(this._protocol.unpacker().unpack(buf)); + }; + // forward all pending bytes to the dechunker + if (buffer.hasRemaining()) { + this._dechunker.write(buffer.readSlice(buffer.remaining())); + } + + resolve(this); + } catch (e) { + this._handleFatalError(e); + reject(e); + } + }; - protocolHandshaker.writeHandshakeRequest(); + protocolHandshaker.writeHandshakeRequest(); + }); + } + + /** + * Perform protocol-specific initialization which includes authentication. + * @param {string} userAgent the user agent for this driver. + * @param {object} authToken the object containing auth information. + * @return {Promise} promise resolved with the current connection if initialization is successful. Rejected promise otherwise. + */ + _initialize(userAgent, authToken) { + return new Promise((resolve, reject) => { + const observer = new InitializationObserver(this, resolve, reject); + this._protocol.initialize(userAgent, authToken, observer); + }); } /** @@ -146,10 +199,6 @@ class Connection { * @param {boolean} flush true if flush should happen after the message is written to the buffer. */ write(message, observer, flush) { - if (message.isInitializationMessage) { - observer = this._state.wrap(observer); - } - const queued = this._queueObserver(observer); if (queued) { @@ -170,51 +219,29 @@ class Connection { } } - /** - * Complete protocol initialization. - * @param {BaseBuffer} buffer the handshake response buffer. - * @param {ProtocolHandshaker} protocolHandshaker the handshaker utility. - * @private - */ - _initializeProtocol(buffer, protocolHandshaker) { - try { - // re-assign the protocol because version might be lower than we initially assumed - this._protocol = protocolHandshaker.createNegotiatedProtocol(buffer); - - // Ok, protocol running. Simply forward all messages to the dechunker - this._ch.onmessage = buf => this._dechunker.write(buf); - - if (buffer.hasRemaining()) { - this._dechunker.write(buffer.readSlice(buffer.remaining())); - } - } catch (e) { - this._handleFatalError(e); - } - } - /** * "Fatal" means the connection is dead. Only call this if something * happens that cannot be recovered from. This will lead to all subscribers * failing, and the connection getting ejected from the session pool. * - * @param err an error object, forwarded to all current and future subscribers + * @param error an error object, forwarded to all current and future subscribers * @protected */ - _handleFatalError( err ) { + _handleFatalError(error) { this._isBroken = true; - this._error = err; + this._error = this._errorHandler.handleAndTransformError(error, this.hostPort); if (this._log.isErrorEnabled()) { - this._log.error(`${this} experienced a fatal error ${JSON.stringify(err)}`); + this._log.error(`${this} experienced a fatal error ${JSON.stringify(this._error)}`); } - if( this._currentObserver && this._currentObserver.onError ) { - this._currentObserver.onError(err); + if (this._currentObserver && this._currentObserver.onError) { + this._currentObserver.onError(this._error); } - while( this._pendingObservers.length > 0 ) { + while (this._pendingObservers.length > 0) { let observer = this._pendingObservers.shift(); - if( observer && observer.onError ) { - observer.onError(err); + if (observer && observer.onError) { + observer.onError(this._error); } } } @@ -250,7 +277,8 @@ class Connection { this._log.debug(`${this} S: FAILURE ${JSON.stringify(msg)}`); } try { - this._currentFailure = newError(payload.message, payload.code); + const error = newError(payload.message, payload.code); + this._currentFailure = this._errorHandler.handleAndTransformError(error, this.hostPort); this._currentObserver.onError( this._currentFailure ); } finally { this._updateCurrentObserver(); @@ -337,15 +365,6 @@ class Connection { return true; } - /** - * Get promise resolved when connection initialization succeed or rejected when it fails. - * Connection is initialized using {@link BoltProtocol#initialize()} function. - * @return {Promise} the result of connection initialization. - */ - initializationCompleted() { - return this._state.initializationCompleted(); - } - /* * Pop next pending observer form the list of observers and make it current observer. * @protected @@ -378,20 +397,6 @@ class Connection { return this._protocol.packer().packable(value, (err) => this._handleFatalError(err)); } - /** - * @protected - */ - _markInitialized(metadata) { - const serverVersion = metadata ? metadata.server : null; - if (!this.server.version) { - this.server.version = serverVersion; - const version = ServerVersion.fromString(serverVersion); - if (version.compareTo(VERSION_3_2_0) < 0) { - this._protocol.packer().disableByteArrays(); - } - } - } - _handleProtocolError(message) { this._currentFailure = null; this._updateCurrentObserver(); @@ -401,110 +406,36 @@ class Connection { } } -class ConnectionState { +class InitializationObserver { - /** - * @constructor - * @param {Connection} connection the connection to track state for. - */ - constructor(connection) { + constructor(connection, onSuccess, onError) { this._connection = connection; - - this._initRequested = false; - this._initError = null; - - this._resolveInitPromise = null; - this._rejectInitPromise = null; - this._initPromise = new Promise((resolve, reject) => { - this._resolveInitPromise = resolve; - this._rejectInitPromise = reject; - }); + this._onSuccess = onSuccess; + this._onError = onError; } - /** - * Wrap the given observer to track connection's initialization state. Connection is closed by the server if - * processing of INIT message fails so returned observer will handle initialization failure as a fatal error. - * @param {StreamObserver} observer the observer used for INIT message. - * @return {StreamObserver} updated observer. - */ - wrap(observer) { - return { - onNext: record => { - if (observer && observer.onNext) { - observer.onNext(record); - } - }, - onError: error => { - this._processFailure(error); - - this._connection._updateCurrentObserver(); // make sure this same observer will not be called again - try { - if (observer && observer.onError) { - observer.onError(error); - } - } finally { - this._connection._handleFatalError(error); - } - }, - onCompleted: metaData => { - this._connection._markInitialized(metaData); - this._resolveInitPromise(this._connection); - - if (observer && observer.onCompleted) { - observer.onCompleted(metaData); - } - } - }; + onNext(record) { + this.onError(newError('Received RECORD when initializing ' + JSON.stringify(record))); } - /** - * Get promise resolved when connection initialization succeed or rejected when it fails. - * @return {Promise} the result of connection initialization. - */ - initializationCompleted() { - this._initRequested = true; - - if (this._initError) { - const error = this._initError; - this._initError = null; // to reject initPromise only once - this._rejectInitPromise(error); - } + onError(error) { + this._connection._updateCurrentObserver(); // make sure this exact observer will not be called again + this._connection._handleFatalError(error); // initialization errors are fatal - return this._initPromise; + this._onError(error); } - /** - * @private - */ - _processFailure(error) { - if (this._initRequested) { - // someone is waiting for initialization to complete, reject the promise - this._rejectInitPromise(error); - } else { - // no one is waiting for initialization, memorize the error but do not reject the promise - // to avoid unnecessary unhandled promise rejection warnings - this._initError = error; + onCompleted(metadata) { + // read server version from the response metadata + const serverVersion = metadata ? metadata.server : null; + if (!this._connection.server.version) { + this._connection.server.version = serverVersion; + const version = ServerVersion.fromString(serverVersion); + if (version.compareTo(VERSION_3_2_0) < 0) { + this._connection.protocol().packer().disableByteArrays(); + } } - } -} -/** - * Crete new connection to the provided address. - * @access private - * @param {string} hostPort - the Bolt endpoint to connect to - * @param {object} config - this driver configuration - * @param {string=null} connectionErrorCode - error code for errors raised on connection errors - * @param {Logger} log - configured logger - * @return {Connection} - New connection - */ -function connect(hostPort, config = {}, connectionErrorCode = null, log = Logger.noOp()) { - const Ch = config.channel || Channel; - const parsedAddress = urlUtil.parseDatabaseUrl(hostPort); - const channelConfig = new ChannelConfig(parsedAddress, config, connectionErrorCode); - return new Connection(new Ch(channelConfig), parsedAddress.hostAndPort, log, config.disableLosslessIntegers); + this._onSuccess(this._connection); + } } - -export { - connect, - Connection -}; diff --git a/src/v1/internal/http/http-driver.js b/src/v1/internal/http/http-driver.js index 08151c646..c324e6a5a 100644 --- a/src/v1/internal/http/http-driver.js +++ b/src/v1/internal/http/http-driver.js @@ -29,7 +29,7 @@ export default class HttpDriver extends Driver { } session() { - return new HttpSession(this._hostPort, this._token, this._config, this._sessionTracker); + return new HttpSession(this._hostPort, this._authToken, this._config, this._sessionTracker); } close() { diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index aece79eec..75b0281c2 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -22,8 +22,9 @@ import {newError} from '../error'; import Logger from './logger'; class Pool { + /** - * @param {function} create an allocation function that creates a new resource. It's given + * @param {function(function): Promise} create an allocation function that creates a promise with a new resource. It's given * a single argument, a function that will return the resource to * the pool if invoked, which is meant to be called on .dispose * or .close or whatever mechanism the resource uses to finalize. @@ -54,33 +55,46 @@ class Pool { * @return {object} resource that is ready to use. */ acquire(key) { - const resource = this._acquire(key); + return this._acquire(key).then(resource => { + if (resource) { + resourceAcquired(key, this._activeResourceCounts); + if (this._log.isDebugEnabled()) { + this._log.debug(`${resource} acquired from the pool`); + } + return resource; + } - if (resource) { - resourceAcquired(key, this._activeResourceCounts); - if (this._log.isDebugEnabled()) { - this._log.debug(`${resource} acquired from the pool`); + // We're out of resources and will try to acquire later on when an existing resource is released. + const allRequests = this._acquireRequests; + const requests = allRequests[key]; + if (!requests) { + allRequests[key] = []; } - return Promise.resolve(resource); - } - // We're out of resources and will try to acquire later on when an existing resource is released. - const allRequests = this._acquireRequests; - const requests = allRequests[key]; - if (!requests) { - allRequests[key] = []; - } + return new Promise((resolve, reject) => { + let request; + + const timeoutId = setTimeout(() => { + // acquisition timeout fired - return new Promise((resolve, reject) => { - let request; + // remove request from the queue of pending requests, if it's still there + // request might've been taken out by the release operation + const pendingRequests = allRequests[key]; + if (pendingRequests) { + allRequests[key] = pendingRequests.filter(item => item !== request); + } - const timeoutId = setTimeout(() => { - allRequests[key] = allRequests[key].filter(item => item !== request); - reject(newError(`Connection acquisition timed out in ${this._acquisitionTimeout} ms.`)); - }, this._acquisitionTimeout); + if (request.isCompleted()) { + // request already resolved/rejected by the release operation; nothing to do + } else { + // request is still pending and needs to be failed + request.reject(newError(`Connection acquisition timed out in ${this._acquisitionTimeout} ms.`)); + } + }, this._acquisitionTimeout); - request = new PendingRequest(resolve, timeoutId, this._log); - allRequests[key].push(request); + request = new PendingRequest(resolve, reject, timeoutId, this._log); + allRequests[key].push(request); + }); }); } @@ -133,14 +147,14 @@ class Pool { if (this._validate(resource)) { // idle resource is valid and can be acquired - return resource; + return Promise.resolve(resource); } else { this._destroy(resource); } } if (this._maxSize && this.activeResourceCount(key) >= this._maxSize) { - return null; + return Promise.resolve(null); } // there exist no idle valid resources, create a new one for acquisition @@ -172,19 +186,37 @@ class Pool { } resourceReleased(key, this._activeResourceCounts); - // check if there are any pending requests + this._processPendingAcquireRequests(key); + } + + _processPendingAcquireRequests(key) { const requests = this._acquireRequests[key]; if (requests) { - const pending = requests[0]; - - if (pending) { - const resource = this._acquire(key); - if (resource) { - // managed to acquire a valid resource from the pool to satisfy the pending acquire request - resourceAcquired(key, this._activeResourceCounts); // increment the active counter - requests.shift(); // forget the pending request - pending.resolve(resource); // resolve the pending request with the acquired resource - } + const pendingRequest = requests.shift(); // pop a pending acquire request + + if (pendingRequest) { + this._acquire(key) + .catch(error => { + // failed to acquire/create a new connection to resolve the pending acquire request + // propagate the error by failing the pending request + pendingRequest.reject(error); + return null; + }) + .then(resource => { + if (resource) { + // managed to acquire a valid resource from the pool + + if (pendingRequest.isCompleted()) { + // request has been completed, most likely failed by a timeout + // return the acquired resource back to the pool + this._release(key, resource); + } else { + // request is still pending and can be resolved with the newly acquired resource + resourceAcquired(key, this._activeResourceCounts); // increment the active counter + pendingRequest.resolve(resource); // resolve the pending request with the acquired resource + } + } + }); } else { delete this._acquireRequests[key]; } @@ -220,13 +252,24 @@ function resourceReleased(key, activeResourceCounts) { class PendingRequest { - constructor(resolve, timeoutId, log) { + constructor(resolve, reject, timeoutId, log) { this._resolve = resolve; + this._reject = reject; this._timeoutId = timeoutId; this._log = log; + this._completed = false; + } + + isCompleted() { + return this._completed; } resolve(resource) { + if (this._completed) { + return; + } + this._completed = true; + clearTimeout(this._timeoutId); if (this._log.isDebugEnabled()) { this._log.debug(`${resource} acquired from the pool`); @@ -234,6 +277,15 @@ class PendingRequest { this._resolve(resource); } + reject(error) { + if (this._completed) { + return; + } + this._completed = true; + + clearTimeout(this._timeoutId); + this._reject(error); + } } export default Pool diff --git a/src/v1/internal/protocol-handshaker.js b/src/v1/internal/protocol-handshaker.js index ca7ad2b40..276c9d2db 100644 --- a/src/v1/internal/protocol-handshaker.js +++ b/src/v1/internal/protocol-handshaker.js @@ -25,8 +25,6 @@ import BoltProtocolV2 from './bolt-protocol-v2'; const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP" const BOLT_MAGIC_PREAMBLE = 0x6060B017; -const LATEST_PROTOCOL_VERSION = 2; - export default class ProtocolHandshaker { /** @@ -45,14 +43,6 @@ export default class ProtocolHandshaker { this._log = log; } - /** - * Create the newest bolt protocol. - * @return {BoltProtocol} the protocol. - */ - createLatestProtocol() { - return this._createProtocolWithVersion(LATEST_PROTOCOL_VERSION); - } - /** * Write a Bolt handshake into the underlying network channel. */ diff --git a/src/v1/internal/request-message.js b/src/v1/internal/request-message.js index 7dd0b1662..a9255a7b2 100644 --- a/src/v1/internal/request-message.js +++ b/src/v1/internal/request-message.js @@ -27,10 +27,9 @@ const PULL_ALL = 0x3F; // 0011 1111 // PULL * export default class RequestMessage { - constructor(signature, fields, isInitializationMessage, toString) { + constructor(signature, fields, toString) { this.signature = signature; this.fields = fields; - this.isInitializationMessage = isInitializationMessage; this.toString = toString; } @@ -41,7 +40,7 @@ export default class RequestMessage { * @return {RequestMessage} new INIT message. */ static init(clientName, authToken) { - return new RequestMessage(INIT, [clientName, authToken], true, () => `INIT ${clientName} {...}`); + return new RequestMessage(INIT, [clientName, authToken], () => `INIT ${clientName} {...}`); } /** @@ -51,7 +50,7 @@ export default class RequestMessage { * @return {RequestMessage} new RUN message. */ static run(statement, parameters) { - return new RequestMessage(RUN, [statement, parameters], false, () => `RUN ${statement} ${JSON.stringify(parameters)}`); + return new RequestMessage(RUN, [statement, parameters], () => `RUN ${statement} ${JSON.stringify(parameters)}`); } /** @@ -72,5 +71,5 @@ export default class RequestMessage { } // constants for messages that never change -const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], false, () => 'PULL_ALL'); -const RESET_MESSAGE = new RequestMessage(RESET, [], false, () => 'RESET'); +const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL'); +const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET'); diff --git a/src/v1/internal/routing-util.js b/src/v1/internal/routing-util.js index 56f2d2e6e..0cb9061b3 100644 --- a/src/v1/internal/routing-util.js +++ b/src/v1/internal/routing-util.js @@ -24,7 +24,6 @@ import {ServerVersion, VERSION_3_2_0} from './server-version'; const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers'; const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable($context)'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; -const UNAUTHORIZED_CODE = 'Neo.ClientError.Security.Unauthorized'; export default class RoutingUtil { @@ -49,9 +48,6 @@ export default class RoutingUtil { throw newError( `Server at ${routerAddress} can't perform routing. Make sure you are connecting to a causal cluster`, SERVICE_UNAVAILABLE); - } else if (error.code === UNAUTHORIZED_CODE) { - // auth error is a sign of a configuration issue, rediscovery should not proceed - throw error; } else { // return nothing when failed to connect because code higher in the callstack is still able to retry with a // different session towards a different router diff --git a/src/v1/internal/stream-observer.js b/src/v1/internal/stream-observer.js index f95e04141..398276d56 100644 --- a/src/v1/internal/stream-observer.js +++ b/src/v1/internal/stream-observer.js @@ -29,18 +29,14 @@ import Record from '../record'; * @access private */ class StreamObserver { - /** - * @constructor - * @param errorTransformer optional callback to be used for adding additional logic on error - */ - constructor(errorTransformer = (err) => {return err}) { + + constructor() { this._fieldKeys = null; this._fieldLookup = null; this._queuedRecords = []; this._tail = null; this._error = null; this._hasFailed = false; - this._errorTransformer = errorTransformer; this._observer = null; this._conn = null; this._meta = {}; @@ -110,21 +106,19 @@ class StreamObserver { * @param {Object} error - An error object */ onError(error) { - if(this._hasFailed) { + if (this._hasFailed) { return; } this._hasFailed = true; - const transformedError = this._errorTransformer(error, this._conn); - - if( this._observer ) { - if( this._observer.onError ) { - this._observer.onError( transformedError ); + if (this._observer) { + if (this._observer.onError) { + this._observer.onError(error); } else { - console.log( transformedError ); + console.log(error); } } else { - this._error = transformedError; + this._error = error; } } diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 159f05f4d..bf4c0ee56 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -17,12 +17,12 @@ * limitations under the License. */ -import Session from './session'; import {Driver} from './driver'; import {newError, SESSION_EXPIRED} from './error'; import {LoadBalancer} from './internal/connection-providers'; import LeastConnectedLoadBalancingStrategy, {LEAST_CONNECTED_STRATEGY_NAME} from './internal/least-connected-load-balancing-strategy'; import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './internal/round-robin-load-balancing-strategy'; +import ConnectionErrorHandler from './internal/connection-error-handler'; /** * A driver that supports routing in a causal cluster. @@ -44,33 +44,24 @@ class RoutingDriver extends Driver { return new LoadBalancer(hostPort, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback, this._log); } - _createSession(mode, connectionProvider, bookmark, config) { - return new RoutingSession(mode, connectionProvider, bookmark, config, (error, conn) => { - if (!conn) { - // connection can be undefined if error happened before connection was acquired - return error; - } - - const hostPort = conn.hostPort; + _createConnectionErrorHandler() { + // connection errors mean SERVICE_UNAVAILABLE for direct driver but for routing driver they should only + // result in SESSION_EXPIRED because there might still exist other servers capable of serving the request + return new ConnectionErrorHandler(SESSION_EXPIRED, + (error, hostPort) => this._handleUnavailability(error, hostPort), + (error, hostPort) => this._handleWriteFailure(error, hostPort)); + } - if (error.code === SESSION_EXPIRED || isDatabaseUnavailable(error)) { - this._log.warn(`Routing driver ${this._id} will forget ${hostPort} because of an error ${error.code} '${error.message}'`); - this._connectionProvider.forget(hostPort); - return error; - } else if (isFailureToWrite(error)) { - this._log.warn(`Routing driver ${this._id} will forget writer ${hostPort} because of an error ${error.code} '${error.message}'`); - this._connectionProvider.forgetWriter(hostPort); - return newError('No longer possible to write to server at ' + hostPort, SESSION_EXPIRED); - } else { - return error; - } - }); + _handleUnavailability(error, hostPort) { + this._log.warn(`Routing driver ${this._id} will forget ${hostPort} because of an error ${error.code} '${error.message}'`); + this._connectionProvider.forget(hostPort); + return error; } - _connectionErrorCode() { - // connection errors mean SERVICE_UNAVAILABLE for direct driver but for routing driver they should only - // result in SESSION_EXPIRED because there might still exist other servers capable of serving the request - return SESSION_EXPIRED; + _handleWriteFailure(error, hostPort) { + this._log.warn(`Routing driver ${this._id} will forget writer ${hostPort} because of an error ${error.code} '${error.message}'`); + this._connectionProvider.forgetWriter(hostPort); + return newError('No longer possible to write to server at ' + hostPort, SESSION_EXPIRED); } /** @@ -102,33 +93,4 @@ function validateConfig(config) { return config; } -/** - * @private - */ -function isFailureToWrite(error) { - return error.code === 'Neo.ClientError.Cluster.NotALeader' || - error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; -} - -/** - * @private - */ -function isDatabaseUnavailable(error) { - return error.code === 'Neo.TransientError.General.DatabaseUnavailable'; -} - -/** - * @private - */ -class RoutingSession extends Session { - constructor(mode, connectionProvider, bookmark, config, onFailedConnection) { - super(mode, connectionProvider, bookmark, config); - this._onFailedConnection = onFailedConnection; - } - - _onRunFailure() { - return this._onFailedConnection; - } -} - export default RoutingDriver diff --git a/src/v1/session.js b/src/v1/session.js index e04425efa..ae038585b 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -68,7 +68,7 @@ class Session { } _run(statement, parameters, statementRunner) { - const streamObserver = new StreamObserver(this._onRunFailure()); + const streamObserver = new StreamObserver(); const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { connectionHolder.initializeConnection(); @@ -110,10 +110,8 @@ class Session { connectionHolder.initializeConnection(); this._hasTx = true; - return new Transaction(connectionHolder, () => { - this._hasTx = false; - }, - this._onRunFailure(), this._lastBookmark, this._updateBookmark.bind(this)); + const onTxClose = () => this._hasTx = false; + return new Transaction(connectionHolder, onTxClose.bind(this), this._lastBookmark, this._updateBookmark.bind(this)); } /** @@ -196,11 +194,6 @@ class Session { } } - //Can be overridden to add error callback on RUN - _onRunFailure() { - return (err) => {return err}; - } - _connectionHolderWithMode(mode) { if (mode === READ) { return this._readConnectionHolder; diff --git a/src/v1/transaction.js b/src/v1/transaction.js index 9d4a2cdf8..d7a1efc5f 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -32,11 +32,10 @@ class Transaction { * @constructor * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. - * @param {function(error: Error): Error} errorTransformer callback use to transform error. * @param {Bookmark} bookmark bookmark for transaction begin. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. */ - constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) { + constructor(connectionHolder, onClose, bookmark, onBookmark) { this._connectionHolder = connectionHolder; const streamObserver = new _TransactionStreamObserver(this); @@ -46,7 +45,6 @@ class Transaction { this._state = _states.ACTIVE; this._onClose = onClose; - this._errorTransformer = errorTransformer; this._onBookmark = onBookmark; } @@ -117,7 +115,7 @@ class Transaction { /** Internal stream observer used for transactional results*/ class _TransactionStreamObserver extends StreamObserver { constructor(tx) { - super(tx._errorTransformer || ((err) => {return err})); + super(); this._tx = tx; } diff --git a/test/internal/bolt-protocol-v1.test.js b/test/internal/bolt-protocol-v1.test.js index 9ea272bcb..b0bb65304 100644 --- a/test/internal/bolt-protocol-v1.test.js +++ b/test/internal/bolt-protocol-v1.test.js @@ -150,5 +150,4 @@ describe('BoltProtocolV1', () => { function verifyMessage(expected, actual) { expect(actual.signature).toEqual(expected.signature); expect(actual.fields).toEqual(expected.fields); - expect(actual.isInitializationMessage).toEqual(expected.isInitializationMessage); } diff --git a/test/internal/connection-error-handler.test.js b/test/internal/connection-error-handler.test.js new file mode 100644 index 000000000..695708e34 --- /dev/null +++ b/test/internal/connection-error-handler.test.js @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler'; +import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; + +describe('ConnectionErrorHandler', () => { + + it('should return error code', () => { + const code = 'Neo4j.Error.Hello'; + const handler = new ConnectionErrorHandler(code); + expect(code).toEqual(handler.errorCode()); + }); + + it('should handle and transform availability errors', () => { + const errors = []; + const hostPorts = []; + const transformedError = newError('Message', 'Code'); + const handler = new ConnectionErrorHandler(SERVICE_UNAVAILABLE, (error, hostPort) => { + errors.push(error); + hostPorts.push(hostPort); + return transformedError; + }); + + const error1 = newError('A', SERVICE_UNAVAILABLE); + const error2 = newError('B', SESSION_EXPIRED); + const error3 = newError('C', 'Neo.TransientError.General.DatabaseUnavailable'); + + [error1, error2, error3].forEach((error, idx) => { + const newTransformedError = handler.handleAndTransformError(error, 'localhost:' + idx); + expect(newTransformedError).toEqual(transformedError); + }); + + expect(errors).toEqual([error1, error2, error3]); + expect(hostPorts).toEqual(['localhost:0', 'localhost:1', 'localhost:2']); + }); + + it('should handle and transform failure to write errors', () => { + const errors = []; + const hostPorts = []; + const transformedError = newError('Message', 'Code'); + const handler = new ConnectionErrorHandler(SERVICE_UNAVAILABLE, null, (error, hostPort) => { + errors.push(error); + hostPorts.push(hostPort); + return transformedError; + }); + + const error1 = newError('A', 'Neo.ClientError.Cluster.NotALeader'); + const error2 = newError('B', 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'); + + [error1, error2].forEach((error, idx) => { + const newTransformedError = handler.handleAndTransformError(error, 'localhost:' + idx); + expect(newTransformedError).toEqual(transformedError); + }); + + expect(errors).toEqual([error1, error2]); + expect(hostPorts).toEqual(['localhost:0', 'localhost:1']); + }); + +}); diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js index 8a269f62e..bbed8823c 100644 --- a/test/internal/connection-holder.test.js +++ b/test/internal/connection-holder.test.js @@ -65,7 +65,6 @@ describe('ConnectionHolder', () => { connectionHolder.getConnection(new StreamObserver()).then(conn => { expect(conn).toBe(connection); - verifyConnectionInitialized(conn); done(); }); }); @@ -79,24 +78,22 @@ describe('ConnectionHolder', () => { connectionHolder.initializeConnection(); connectionHolder.getConnection(streamObserver).then(conn => { - verifyConnectionInitialized(conn); verifyConnection(streamObserver, 'Neo4j/9.9.9'); done(); }); }); - it('should make stream observer aware about connection when initialization fails', done => { - const connection = new FakeConnection().withServerVersion('Neo4j/7.7.7').withFailedInitialization(new Error('Oh!')); - const connectionProvider = newSingleConnectionProvider(connection); + it('should propagate connection acquisition failure', done => { + const errorMessage = 'Failed to acquire or initialize the connection'; + const connectionPromise = Promise.reject(new Error(errorMessage)); + const connectionProvider = newSingleConnectionProvider(connectionPromise); const connectionHolder = new ConnectionHolder(READ, connectionProvider); const streamObserver = new StreamObserver(); connectionHolder.initializeConnection(); connectionHolder.getConnection(streamObserver).catch(error => { - expect(error.message).toEqual('Oh!'); - verifyConnectionInitialized(connection); - verifyConnection(streamObserver, 'Neo4j/7.7.7'); + expect(error.message).toEqual(errorMessage); done(); }); }); @@ -229,10 +226,6 @@ function newSingleConnectionProvider(connection) { return new SingleConnectionProvider(Promise.resolve(connection)); } -function verifyConnectionInitialized(connection) { - expect(connection.initializationInvoked).toEqual(1); -} - function verifyConnection(streamObserver, expectedServerVersion) { expect(streamObserver._conn).toBeDefined(); expect(streamObserver._conn).not.toBeNull(); diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 32d42eb67..58d99d132 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -1095,7 +1095,7 @@ function setupLoadBalancerToRememberRouters(loadBalancer, routersArray) { } function newPool() { - return new Pool(FakeConnection.create); + return new Pool((address, release) => Promise.resolve(new FakeConnection(address, release))); } function expectRoutingTable(loadBalancer, routers, readers, writers) { @@ -1122,14 +1122,6 @@ class FakeConnection { this.address = address; this.release = release; } - - static create(address, release) { - return new FakeConnection(address, release); - } - - initializationCompleted() { - return Promise.resolve(this); - } } class FakeRediscovery { diff --git a/test/internal/connection.test.js b/test/internal/connection.test.js new file mode 100644 index 000000000..7b63e7b28 --- /dev/null +++ b/test/internal/connection.test.js @@ -0,0 +1,421 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as DummyChannel from '../../src/v1/internal/ch-dummy'; +import Connection from '../../src/v1/internal/connection'; +import {Packer} from '../../src/v1/internal/packstream-v1'; +import {Chunker} from '../../src/v1/internal/chunking'; +import {alloc} from '../../src/v1/internal/buf'; +import {Neo4jError, newError, SERVICE_UNAVAILABLE} from '../../src/v1/error'; +import sharedNeo4j from '../internal/shared-neo4j'; +import {ServerVersion} from '../../src/v1/internal/server-version'; +import lolex from 'lolex'; +import Logger from '../../src/v1/internal/logger'; +import StreamObserver from '../../src/v1/internal/stream-observer'; +import RequestMessage from '../../src/v1/internal/request-message'; +import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler'; +import testUtils from '../internal/test-utils'; + +const ILLEGAL_MESSAGE = {signature: 42, fields: []}; +const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; +const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]}; +const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]}; + +describe('Connection', () => { + + let clock; + let connection; + + afterEach(done => { + if (clock) { + clock.uninstall(); + clock = null; + } + + const usedConnection = connection; + connection = null; + if (usedConnection) { + usedConnection.close(); + } + done(); + }); + + it('should have correct creation timestamp', () => { + clock = lolex.install(); + clock.setSystemTime(424242); + + connection = createConnection('bolt://localhost'); + + expect(connection.creationTimestamp).toEqual(424242); + }); + + it('should read/write basic messages', done => { + connection = createConnection('bolt://localhost'); + + connection._negotiateProtocol().then(() => { + connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { + onCompleted: msg => { + expect(msg).not.toBeNull(); + done(); + }, + onError: console.log + }); + }); + }); + + it('should retrieve stream', done => { + connection = createConnection('bolt://localhost'); + + const records = []; + const pullAllObserver = { + onNext: record => { + records.push(record); + }, + onCompleted: () => { + expect(records[0][0]).toBe(1); + done(); + } + }; + + connection._negotiateProtocol().then(() => { + connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); + connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false); + connection.write(RequestMessage.pullAll(), pullAllObserver, true); + }); + }); + + it('should write protocol handshake', () => { + const observer = DummyChannel.observer; + connection = createConnection('bolt://localhost', {channel: DummyChannel.channel}); + + connection._negotiateProtocol(); + + const boltMagicPreamble = '60 60 b0 17'; + const protocolVersion2 = '00 00 00 02'; + const protocolVersion1 = '00 00 00 01'; + const noProtocolVersion = '00 00 00 00'; + expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); + }); + + it('should provide error message when connecting to http-port', done => { + connection = createConnection('bolt://localhost:7474', {encrypted: false}); + + connection.connect('mydriver/0.0.0', basicAuthToken()).catch(error => { + expect(error).toBeDefined(); + expect(error).not.toBeNull(); + + if (testUtils.isServer()) { + //only node gets the pretty error message + expect(error.message).toBe('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'); + } + + done(); + }); + }); + + it('should convert failure messages to errors', done => { + const channel = new DummyChannel.channel; + connection = new Connection(channel, new ConnectionErrorHandler(SERVICE_UNAVAILABLE), 'localhost:7687', Logger.noOp()); + + connection._negotiateProtocol(); + + const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed'; + const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]'; + + connection._queueObserver({ + onError: error => { + expectNeo4jError(error, errorCode, errorMessage); + done(); + } + }); + + channel.onmessage(packedHandshakeMessage()); + channel.onmessage(packedFailureMessage(errorCode, errorMessage)); + }); + + it('should notify when connection initialization completes', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(initializedConnection => { + expect(initializedConnection).toBe(connection); + done(); + }); + }); + + it('should notify when connection initialization fails', done => { + connection = createConnection('bolt://localhost:7474'); // wrong port + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => done.fail('Should not initialize')) + .catch(error => { + expect(error).toBeDefined(); + done(); + }); + }); + + it('should have server version after connection initialization completed', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(initializedConnection => { + expect(initializedConnection).toBe(connection); + const serverVersion = ServerVersion.fromString(connection.server.version); + expect(serverVersion).toBeDefined(); + done(); + }); + }); + + it('should fail all new observers after failure to connect', done => { + connection = createConnection('bolt://localhost:7474'); // wrong port + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => done.fail('Should not connect')) + .catch(initialError => { + expect(initialError).toBeDefined(); + expect(initialError).not.toBeNull(); + + expect(connection.isOpen()).toBeFalsy(); + + const streamObserver = new StreamObserver(); + streamObserver.subscribe({ + onError: error => { + expect(error).toEqual(initialError); + done(); + } + }); + connection._queueObserver(streamObserver); + }); + }); + + it('should respect connection timeout', done => { + testConnectionTimeout(false, done); + }); + + it('should respect encrypted connection timeout', done => { + testConnectionTimeout(true, done); + }); + + it('should not queue INIT observer when broken', done => { + testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().initialize('Hello', {}, {}), done); + }); + + it('should not queue RUN observer when broken', done => { + testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().run('RETURN 1', {}, {}), done); + }); + + it('should not queue RESET observer when broken', done => { + const resetAction = connection => connection.resetAndFlush().catch(ignore => { + }); + + testQueueingOfObserversWithBrokenConnection(resetAction, done); + }); + + it('should reset and flush when SUCCESS received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.resetAndFlush().then(() => { + expect(connection.isOpen()).toBeTruthy(); + done(); + }).catch(error => done.fail(error)); + + // write a SUCCESS message for RESET before the actual response is received + connection._handleMessage(SUCCESS_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + }); + }); + + it('should fail to reset and flush when FAILURE received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + // write a FAILURE message for RESET before the actual response is received + connection._handleMessage(FAILURE_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + }); + }); + + it('should fail to reset and flush when RECORD received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + // write a RECORD message for RESET before the actual response is received + connection._handleMessage(RECORD_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + }); + }); + + it('should acknowledge failure with RESET when SUCCESS received', done => { + connection = createConnection('bolt://localhost'); + + connection.connect('my-driver/1.2.3', basicAuthToken()) + .then(() => { + connection._currentFailure = newError('Hello'); + connection._resetOnFailure(); + + // write a SUCCESS message for RESET before the actual response is received + connection._handleMessage(SUCCESS_MESSAGE); + // enqueue a dummy observer to handle the real SUCCESS message + connection._queueObserver({ + onCompleted: () => { + } + }); + + expect(connection._currentFailure).toBeNull(); + done(); + }); + }); + + it('should handle and transform fatal errors', done => { + const errors = []; + const hostPorts = []; + const transformedError = newError('Message', 'Code'); + const errorHandler = new ConnectionErrorHandler(SERVICE_UNAVAILABLE, (error, hostPort) => { + errors.push(error); + hostPorts.push(hostPort); + return transformedError; + }); + + connection = Connection.create('bolt://localhost', {}, errorHandler, Logger.noOp()); + + connection._queueObserver({ + onError: error => { + expect(error).toEqual(transformedError); + expect(errors.length).toEqual(1); + expect(errors[0].code).toEqual(SERVICE_UNAVAILABLE); + expect(hostPorts).toEqual([connection.hostPort]); + done(); + } + }); + + connection._handleFatalError(newError('Hello', SERVICE_UNAVAILABLE)); + }); + + function packedHandshakeMessage() { + const result = alloc(4); + result.putInt32(0, 1); + result.reset(); + return result; + } + + function packedFailureMessage(code, message) { + const channel = new DummyChannel.channel; + const chunker = new Chunker(channel); + const packer = new Packer(chunker); + packer.packStruct(0x7F, [packer.packable({code: code, message: message})]); + chunker.messageBoundary(); + chunker.flush(); + const data = channel.toBuffer(); + const result = alloc(data.length); + result.putBytes(0, data); + return result; + } + + function expectNeo4jError(error, expectedCode, expectedMessage) { + expect(() => { + throw error; + }).toThrow(new Neo4jError(expectedMessage, expectedCode)); + expect(error.name).toBe('Neo4jError'); + } + + function basicAuthToken() { + return { + scheme: 'basic', + principal: sharedNeo4j.username, + credentials: sharedNeo4j.password + }; + } + + function testConnectionTimeout(encrypted, done) { + const boltUri = 'bolt://10.0.0.0'; // use non-routable IP address which never responds + connection = createConnection(boltUri, {encrypted: encrypted, connectionTimeout: 1000}, 'TestErrorCode'); + + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => done.fail('Should not be able to connect')) + .catch(error => { + expect(error.code).toEqual('TestErrorCode'); + + // in some environments non-routable address results in immediate 'connection refused' error and connect + // timeout is not fired; skip message assertion for such cases, it is important for connect attempt to not hang + if (error.message.indexOf('Failed to establish connection') === 0) { + expect(error.message).toEqual('Failed to establish connection in 1000ms'); + } + + done(); + }); + } + + function testQueueingOfObserversWithBrokenConnection(connectionAction, done) { + connection = createConnection('bolt://localhost'); + + connection._negotiateProtocol().then(() => { + connection._handleMessage(ILLEGAL_MESSAGE); + expect(connection.isOpen()).toBeFalsy(); + + expect(connection._pendingObservers.length).toEqual(0); + connectionAction(connection); + expect(connection._pendingObservers.length).toEqual(0); + + done(); + }); + } + + /** + * @return {Connection} + */ + function createConnection(url, config, errorCode = null) { + return Connection.create(url, config || {}, new ConnectionErrorHandler(errorCode || SERVICE_UNAVAILABLE), Logger.noOp()); + } + +}); diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js deleted file mode 100644 index 1ad0c9c9e..000000000 --- a/test/internal/connector.test.js +++ /dev/null @@ -1,388 +0,0 @@ -/** - * Copyright (c) 2002-2018 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import * as DummyChannel from '../../src/v1/internal/ch-dummy'; -import {connect, Connection} from '../../src/v1/internal/connector'; -import {Packer} from '../../src/v1/internal/packstream-v1'; -import {Chunker} from '../../src/v1/internal/chunking'; -import {alloc} from '../../src/v1/internal/buf'; -import {Neo4jError, newError} from '../../src/v1/error'; -import sharedNeo4j from '../internal/shared-neo4j'; -import {ServerVersion} from '../../src/v1/internal/server-version'; -import lolex from 'lolex'; -import Logger from '../../src/v1/internal/logger'; -import StreamObserver from '../../src/v1/internal/stream-observer'; -import RequestMessage from '../../src/v1/internal/request-message'; - -const ILLEGAL_MESSAGE = {signature: 42, fields: []}; -const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; -const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]}; -const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]}; - -describe('connector', () => { - - let clock; - let connection; - - afterEach(done => { - if (clock) { - clock.uninstall(); - clock = null; - } - - const usedConnection = connection; - connection = null; - if (usedConnection) { - usedConnection.close(); - } - done(); - }); - - it('should have correct creation timestamp', () => { - clock = lolex.install(); - clock.setSystemTime(424242); - - connection = connect('bolt://localhost'); - - expect(connection.creationTimestamp).toEqual(424242); - }); - - it('should read/write basic messages', done => { - // Given - connection = connect("bolt://localhost"); - - // When - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onCompleted: msg => { - expect(msg).not.toBeNull(); - done(); - }, - onError: console.log - }); - }); - - it('should retrieve stream', done => { - // Given - connection = connect("bolt://localhost"); - - // When - const records = []; - const pullAllObserver = { - onNext: record => { - records.push(record); - }, - onCompleted: () => { - expect(records[0][0]).toBe(1); - done(); - } - }; - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false); - connection.write(RequestMessage.pullAll(), pullAllObserver, true); - }); - - it('should use DummyChannel to read what gets written', done => { - // Given - const observer = DummyChannel.observer; - connection = connect('bolt://localhost', {channel: DummyChannel.channel}); - - const boltMagicPreamble = '60 60 b0 17'; - const protocolVersion2 = '00 00 00 02'; - const protocolVersion1 = '00 00 00 01'; - const noProtocolVersion = '00 00 00 00'; - expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); - - observer.instance.clear(); - - // When - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - connection.write(RequestMessage.run('RETURN 1', {}), {}, true); - expect(observer.instance.toHex()).toBe('00 44 b2 01 8e 6d 79 64 72 69 76 65 72 2f 30 2e 30 2e 30 a3 86 73 63 68 65 6d 65 85 62 61 73 69 63 89 70 72 69 6e 63 69 70 61 6c 85 6e 65 6f 34 6a 8b 63 72 65 64 65 6e 74 69 61 6c 73 88 70 61 73 73 77 6f 72 64 00 00 00 0c b2 10 88 52 45 54 55 52 4e 20 31 a0 00 00 '); - done(); - }); - - it('should provide error message when connecting to http-port', done => { - // Given - connection = connect("bolt://localhost:7474", {encrypted: false}); - - // When - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onCompleted: msg => { - }, - onError: err => { - //only node gets the pretty error message - if (require('../../lib/v1/internal/ch-node.js').available) { - expect(err.message).toBe("Server responded HTTP. Make sure you are not trying to connect to the http endpoint " + - "(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)"); - } - done(); - } - }); - }); - - it('should convert failure messages to errors', done => { - const channel = new DummyChannel.channel; - connection = new Connection(channel, 'bolt://localhost', Logger.noOp()); - - const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed'; - const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]'; - - connection._queueObserver({ - onError: error => { - expectNeo4jError(error, errorCode, errorMessage); - done(); - } - }); - - channel.onmessage(packedHandshakeMessage()); - channel.onmessage(packedFailureMessage(errorCode, errorMessage)); - }); - - it('should notify when connection initialization completes', done => { - connection = connect('bolt://localhost'); - - connection.initializationCompleted().then(initializedConnection => { - expect(initializedConnection).toBe(connection); - done(); - }); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - }); - - it('should notify when connection initialization fails', done => { - connection = connect('bolt://localhost:7474'); // wrong port - - connection.initializationCompleted().catch(error => { - expect(error).toBeDefined(); - done(); - }); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - }); - - it('should notify provided observer when connection initialization completes', done => { - connection = connect('bolt://localhost'); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onCompleted: metaData => { - expect(connection.isOpen()).toBeTruthy(); - expect(metaData).toBeDefined(); - done(); - }, - }); - }); - - it('should notify provided observer when connection initialization fails', done => { - connection = connect('bolt://localhost:7474'); // wrong port - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onError: error => { - expect(connection.isOpen()).toBeFalsy(); - expect(error).toBeDefined(); - done(); - }, - }); - }); - - it('should have server version after connection initialization completed', done => { - connection = connect('bolt://localhost'); - - connection.initializationCompleted().then(initializedConnection => { - const serverVersion = ServerVersion.fromString(initializedConnection.server.version); - expect(serverVersion).toBeDefined(); - done(); - }); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - }); - - it('should fail all new observers after initialization error', done => { - connection = connect('bolt://localhost:7474'); // wrong port - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onError: initialError => { - expect(initialError).toBeDefined(); - - const streamObserver = new StreamObserver(); - streamObserver.subscribe({ - onError: error1 => { - expect(error1).toEqual(initialError); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onError: error2 => { - expect(error2).toEqual(initialError); - - done(); - } - }); - } - }); - - connection.protocol().run('RETURN 1', {}, streamObserver); - }, - }); - }); - - it('should respect connection timeout', done => { - testConnectionTimeout(false, done); - }); - - it('should respect encrypted connection timeout', done => { - testConnectionTimeout(true, done); - }); - - it('should not queue INIT observer when broken', () => { - testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().initialize('Hello', {}, {})); - }); - - it('should not queue RUN observer when broken', () => { - testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().run('RETURN 1', {}, {})); - }); - - it('should not queue RESET observer when broken', () => { - const resetAction = connection => connection.resetAndFlush().catch(ignore => { - }); - - testQueueingOfObserversWithBrokenConnection(resetAction); - }); - - it('should reset and flush when SUCCESS received', done => { - connection = connect('bolt://localhost'); - - connection.resetAndFlush().then(() => { - expect(connection.isOpen()).toBeTruthy(); - done(); - }).catch(error => done.fail(error)); - - connection._handleMessage(SUCCESS_MESSAGE); - }); - - it('should fail to reset and flush when FAILURE received', done => { - connection = connect('bolt://localhost'); - - connection.resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello'); - expect(connection._isBroken).toBeTruthy(); - expect(connection.isOpen()).toBeFalsy(); - done(); - }); - - connection._handleMessage(FAILURE_MESSAGE); - }); - - it('should fail to reset and flush when RECORD received', done => { - connection = connect('bolt://localhost'); - - connection.resetAndFlush() - .then(() => done.fail('Should fail')) - .catch(error => { - expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}'); - expect(connection._isBroken).toBeTruthy(); - expect(connection.isOpen()).toBeFalsy(); - done(); - }); - - connection._handleMessage(RECORD_MESSAGE); - }); - - it('should acknowledge failure with RESET when SUCCESS received', () => { - connection = connect('bolt://localhost'); - - connection._currentFailure = newError('Hello'); - connection._resetOnFailure(); - - connection._handleMessage(SUCCESS_MESSAGE); - expect(connection._currentFailure).toBeNull(); - }); - - function packedHandshakeMessage() { - const result = alloc(4); - result.putInt32(0, 1); - result.reset(); - return result; - } - - function packedFailureMessage(code, message) { - const channel = new DummyChannel.channel; - const chunker = new Chunker(channel); - const packer = new Packer(chunker); - packer.packStruct(0x7F, [packer.packable({code: code, message: message})]); - chunker.messageBoundary(); - chunker.flush(); - const data = channel.toBuffer(); - const result = alloc(data.length); - result.putBytes(0, data); - return result; - } - - function expectNeo4jError(error, expectedCode, expectedMessage) { - expect(() => { - throw error; - }).toThrow(new Neo4jError(expectedMessage, expectedCode)); - expect(error.name).toBe("Neo4jError"); - } - - function basicAuthToken() { - return { - scheme: 'basic', - principal: sharedNeo4j.username, - credentials: sharedNeo4j.password - }; - } - - function testConnectionTimeout(encrypted, done) { - const boltUri = 'bolt://10.0.0.0'; // use non-routable IP address which never responds - connection = connect(boltUri, {encrypted: encrypted, connectionTimeout: 1000}, 'TestErrorCode'); - - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken(), { - onNext: record => { - done.fail('Should not receive records: ' + record); - }, - onCompleted: () => { - done.fail('Should not be able to INIT'); - }, - onError: error => { - expect(error.code).toEqual('TestErrorCode'); - - // in some environments non-routable address results in immediate 'connection refused' error and connect - // timeout is not fired; skip message assertion for such cases, it is important for connect attempt to not hang - if (error.message.indexOf('Failed to establish connection') === 0) { - expect(error.message).toEqual('Failed to establish connection in 1000ms'); - } - - done(); - } - }); - } - - function testQueueingOfObserversWithBrokenConnection(connectionAction) { - connection = connect('bolt://localhost'); - - connection._handleMessage(ILLEGAL_MESSAGE); - expect(connection.isOpen()).toBeFalsy(); - - expect(connection._pendingObservers.length).toEqual(0); - connectionAction(connection); - expect(connection._pendingObservers.length).toEqual(0); - } - -}); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 687b37055..f645ef7e8 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -32,12 +32,9 @@ export default class FakeConnection { this.resetInvoked = 0; this.releaseInvoked = 0; - this.initializationInvoked = 0; this.seenStatements = []; this.seenParameters = []; this.server = {}; - - this._initializationPromise = Promise.resolve(this); } protocol() { @@ -59,11 +56,6 @@ export default class FakeConnection { this.releaseInvoked++; } - initializationCompleted() { - this.initializationInvoked++; - return this._initializationPromise; - } - isOpen() { return this._open; } @@ -85,11 +77,6 @@ export default class FakeConnection { return this; } - withFailedInitialization(error) { - this._initializationPromise = Promise.reject(error); - return this; - } - withCreationTimestamp(value) { this.creationTimestamp = value; return this; diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 09256ab0e..258d763b6 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -26,7 +26,7 @@ describe('Pool', () => { // Given let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release))); // When const p0 = pool.acquire(key); @@ -49,7 +49,7 @@ describe('Pool', () => { // Given a pool that allocates let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release))); // When const p0 = pool.acquire(key).then(r0 => { @@ -76,7 +76,7 @@ describe('Pool', () => { let counter = 0; const key1 = 'bolt://localhost:7687'; const key2 = 'bolt://localhost:7688'; - const pool = new Pool((url, release) => new Resource(url, counter++, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release))); // When const p0 = pool.acquire(key1); @@ -110,7 +110,7 @@ describe('Pool', () => { let destroyed = []; const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { destroyed.push(resource); }, @@ -144,7 +144,7 @@ describe('Pool', () => { let counter = 0; const key1 = 'bolt://localhost:7687'; const key2 = 'bolt://localhost:7688'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -189,7 +189,7 @@ describe('Pool', () => { it('destroys resource when key was purged', (done) => { let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -220,7 +220,7 @@ describe('Pool', () => { const key2 = 'bolt://localhost:7688'; const key3 = 'bolt://localhost:7689'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -251,7 +251,7 @@ describe('Pool', () => { let validated = false; let counter = 0; const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, counter++, release), + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, counter++, release)), res => { res.destroyed = true; return true; @@ -286,7 +286,7 @@ describe('Pool', () => { const existingKey = 'bolt://localhost:7687'; const absentKey = 'bolt://localhost:7688'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); const p0 = pool.acquire(existingKey); const p1 = pool.acquire(existingKey); @@ -300,7 +300,7 @@ describe('Pool', () => { }); it('reports zero active resources when empty', () => { - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); expect(pool.activeResourceCount('bolt://localhost:1')).toEqual(0); expect(pool.activeResourceCount('bolt://localhost:2')).toEqual(0); @@ -309,7 +309,7 @@ describe('Pool', () => { it('reports active resources', (done) => { const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); const p0 = pool.acquire(key); const p1 = pool.acquire(key); @@ -326,7 +326,7 @@ describe('Pool', () => { it('reports active resources when they are acquired', (done) => { const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); // three new resources are created and returned to the pool const p0 = pool.acquire(key); @@ -359,7 +359,7 @@ describe('Pool', () => { it('does not report resources that are returned to the pool', (done) => { const key = 'bolt://localhost:7687'; - const pool = new Pool((url, release) => new Resource(url, 42, release)); + const pool = new Pool((url, release) => Promise.resolve(new Resource(url, 42, release))); const p0 = pool.acquire(key); const p1 = pool.acquire(key); @@ -398,7 +398,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => {}, resource => true, new PoolConfig(2, 5000) @@ -431,7 +431,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => {}, resource => true, new PoolConfig(2, 1000) @@ -459,7 +459,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => {}, resource => true ); @@ -487,7 +487,7 @@ describe('Pool', () => { const key = 'bolt://localhost:7687'; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { }, () => true, @@ -529,7 +529,7 @@ describe('Pool', () => { let counter = 0; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { }, resourceValidOnlyOnceValidationFunction, @@ -563,7 +563,7 @@ describe('Pool', () => { let counter = 0; const pool = new Pool( - (url, release) => new Resource(url, counter++, release), + (url, release) => Promise.resolve(new Resource(url, counter++, release)), resource => { }, resourceValidOnlyOnceValidationFunction, diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js index 1b250cdec..eb238d856 100644 --- a/test/internal/protocol-handshaker.test.js +++ b/test/internal/protocol-handshaker.test.js @@ -24,16 +24,6 @@ import {alloc} from '../../src/v1/internal/buf'; describe('ProtocolHandshaker', () => { - it('should create latest protocol', () => { - const handshaker = new ProtocolHandshaker(null, null, null, false, Logger.noOp()); - - const protocol = handshaker.createLatestProtocol(); - - expect(protocol).toBeDefined(); - expect(protocol).not.toBeNull(); - expect(protocol instanceof BoltProtocol).toBeTruthy(); - }); - it('should write handshake request', () => { const writtenBuffers = []; const fakeChannel = { diff --git a/test/internal/request-message.test.js b/test/internal/request-message.test.js index 08eac5342..a21cce668 100644 --- a/test/internal/request-message.test.js +++ b/test/internal/request-message.test.js @@ -29,7 +29,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x01); expect(message.fields).toEqual([clientName, authToken]); - expect(message.isInitializationMessage).toBeTruthy(); expect(message.toString()).toEqual(`INIT ${clientName} {...}`); }); @@ -41,7 +40,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x10); expect(message.fields).toEqual([statement, parameters]); - expect(message.isInitializationMessage).toBeFalsy(); expect(message.toString()).toEqual(`RUN ${statement} ${JSON.stringify(parameters)}`); }); @@ -50,7 +48,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x3F); expect(message.fields).toEqual([]); - expect(message.isInitializationMessage).toBeFalsy(); expect(message.toString()).toEqual('PULL_ALL'); }); @@ -59,7 +56,6 @@ describe('RequestMessage', () => { expect(message.signature).toEqual(0x0F); expect(message.fields).toEqual([]); - expect(message.isInitializationMessage).toBeFalsy(); expect(message.toString()).toEqual('RESET'); }); }); diff --git a/test/internal/shared-neo4j.js b/test/internal/shared-neo4j.js index eeb4dcd6b..22d11922a 100644 --- a/test/internal/shared-neo4j.js +++ b/test/internal/shared-neo4j.js @@ -112,7 +112,7 @@ const additionalConfig = { }; const neoCtrlVersionParam = '-e'; -const defaultNeo4jVersion = '3.4.5'; +const defaultNeo4jVersion = '3.4.6'; const defaultNeoCtrlArgs = `${neoCtrlVersionParam} ${defaultNeo4jVersion}`; function neo4jCertPath(dir) { diff --git a/test/internal/stream-observer.test.js b/test/internal/stream-observer.test.js index 1b9cca79e..994c733c8 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/stream-observer.test.js @@ -142,9 +142,12 @@ describe('StreamObserver', () => { expect(receivedMetaData).toEqual({metaDataField1: 'value1', metaDataField2: 'value2'}); }); - it('invokes error transformer only once on error', () => { + it('invokes subscribed observer only once of error', () => { const errors = []; - const streamObserver = new StreamObserver(error => errors.push(error)); + const streamObserver = new StreamObserver(); + streamObserver.subscribe({ + onError: error => errors.push(error) + }); const error1 = new Error('Hello'); const error2 = new Error('World'); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index fb2f24662..f5592efba 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -971,23 +971,24 @@ describe('session', () => { }); it('should acquire connection for transaction', done => { - expect(session.beginTransaction()).toBeDefined(); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(0); - const otherSession1 = driver.session(); - expect(otherSession1.beginTransaction()).toBeDefined(); + session.beginTransaction().run('RETURN 1.0').then(result => { + expect(result.records[0].get(0)).toEqual(1); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(1); - const otherSession2 = driver.session(); - expect(otherSession2.beginTransaction()).toBeDefined(); + driver.session().beginTransaction().run('RETURN 2.0').then(result => { + expect(result.records[0].get(0)).toEqual(2); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(2); - const otherSession3 = driver.session(); - expect(otherSession3.beginTransaction()).toBeDefined(); + driver.session().beginTransaction().run('RETURN 3.0').then(result => { + expect(result.records[0].get(0)).toEqual(3); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(3); - expect(numberOfAcquiredConnectionsFromPool()).toEqual(4); + driver.session().beginTransaction().run('RETURN 4.0').then(result => { + expect(result.records[0].get(0)).toEqual(4); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(4); - session.close(() => { - otherSession1.close(() => { - otherSession2.close(() => { - otherSession3.close(() => { done(); }); });