diff --git a/src/v1/driver.js b/src/v1/driver.js index 8b4be63f3..257dd1fa0 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -192,7 +192,7 @@ class Driver { session(mode, bookmarkOrBookmarks) { const sessionMode = Driver._validateSessionMode(mode); const connectionProvider = this._getOrCreateConnectionProvider(); - const bookmark = new Bookmark(bookmarkOrBookmarks); + const bookmark = bookmarkOrBookmarks ? new Bookmark(bookmarkOrBookmarks) : Bookmark.empty(); return new Session(sessionMode, connectionProvider, bookmark, this._config); } diff --git a/src/v1/internal/bolt-protocol-v1.js b/src/v1/internal/bolt-protocol-v1.js index 001c676bd..d97daf183 100644 --- a/src/v1/internal/bolt-protocol-v1.js +++ b/src/v1/internal/bolt-protocol-v1.js @@ -18,6 +18,9 @@ */ import RequestMessage from './request-message'; import * as v1 from './packstream-v1'; +import {newError} from '../error'; +import Bookmark from './bookmark'; +import TxConfig from './tx-config'; export default class BoltProtocol { @@ -49,6 +52,15 @@ export default class BoltProtocol { return this._unpacker; } + /** + * Transform metadata received in SUCCESS message before it is passed to the handler. + * @param {object} metadata the received metadata. + * @return {object} transformed metadata. + */ + transformMetadata(metadata) { + return metadata; + } + /** * Perform initialization and authentication of the underlying connection. * @param {string} clientName the client name. @@ -63,9 +75,12 @@ export default class BoltProtocol { /** * Begin an explicit transaction. * @param {Bookmark} bookmark the bookmark. + * @param {TxConfig} txConfig the configuration. * @param {StreamObserver} observer the response observer. */ - beginTransaction(bookmark, observer) { + beginTransaction(bookmark, txConfig, observer) { + assertTxConfigIsEmpty(txConfig, this._connection, observer); + const runMessage = RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters()); const pullAllMessage = RequestMessage.pullAll(); @@ -78,7 +93,7 @@ export default class BoltProtocol { * @param {StreamObserver} observer the response observer. */ commitTransaction(observer) { - this.run('COMMIT', {}, observer); + this.run('COMMIT', {}, Bookmark.empty(), TxConfig.empty(), observer); } /** @@ -86,16 +101,21 @@ export default class BoltProtocol { * @param {StreamObserver} observer the response observer. */ rollbackTransaction(observer) { - this.run('ROLLBACK', {}, observer); + this.run('ROLLBACK', {}, Bookmark.empty(), TxConfig.empty(), observer); } /** * Send a Cypher statement through the underlying connection. * @param {string} statement the cypher statement. * @param {object} parameters the statement parameters. + * @param {Bookmark} bookmark the bookmark. + * @param {TxConfig} txConfig the auto-commit transaction configuration. * @param {StreamObserver} observer the response observer. */ - run(statement, parameters, observer) { + run(statement, parameters, bookmark, txConfig, observer) { + // bookmark is ignored in this version of the protocol + assertTxConfigIsEmpty(txConfig, this._connection, observer); + const runMessage = RequestMessage.run(statement, parameters); const pullAllMessage = RequestMessage.pullAll(); @@ -120,3 +140,20 @@ export default class BoltProtocol { return new v1.Unpacker(disableLosslessIntegers); } } + +/** + * @param {TxConfig} txConfig the auto-commit transaction configuration. + * @param {Connection} connection the connection. + * @param {StreamObserver} observer the response observer. + */ +function assertTxConfigIsEmpty(txConfig, connection, observer) { + if (!txConfig.isEmpty()) { + const error = newError('Driver is connected to the database that does not support transaction configuration. ' + + 'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'); + + // unsupported API was used, consider this a fatal error for the current connection + connection._handleFatalError(error); + observer.onError(error); + throw error; + } +} diff --git a/src/v1/internal/bolt-protocol-v3.js b/src/v1/internal/bolt-protocol-v3.js new file mode 100644 index 000000000..beacfbe75 --- /dev/null +++ b/src/v1/internal/bolt-protocol-v3.js @@ -0,0 +1,81 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import BoltProtocolV2 from './bolt-protocol-v2'; +import RequestMessage from './request-message'; + +export default class BoltProtocol extends BoltProtocolV2 { + + constructor(connection, chunker, disableLosslessIntegers) { + super(connection, chunker, disableLosslessIntegers); + } + + transformMetadata(metadata) { + if (metadata.t_first) { + // Bolt V3 uses shorter key 't_first' to represent 'result_available_after' + // adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value + metadata.result_available_after = metadata.t_first; + delete metadata.t_first; + } + if (metadata.t_last) { + // Bolt V3 uses shorter key 't_last' to represent 'result_consumed_after' + // adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value + metadata.result_consumed_after = metadata.t_last; + delete metadata.t_last; + } + return metadata; + } + + initialize(userAgent, authToken, observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.hello(userAgent, authToken); + this._connection.write(message, observer, true); + } + + beginTransaction(bookmark, txConfig, observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.begin(bookmark, txConfig); + this._connection.write(message, observer, true); + } + + commitTransaction(observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.commit(); + this._connection.write(message, observer, true); + } + + rollbackTransaction(observer) { + prepareToHandleSingleResponse(observer); + const message = RequestMessage.rollback(); + this._connection.write(message, observer, true); + } + + run(statement, parameters, bookmark, txConfig, observer) { + const runMessage = RequestMessage.runWithMetadata(statement, parameters, bookmark, txConfig); + const pullAllMessage = RequestMessage.pullAll(); + + this._connection.write(runMessage, observer, false); + this._connection.write(pullAllMessage, observer, true); + } +} + +function prepareToHandleSingleResponse(observer) { + if (observer && typeof observer.prepareToHandleSingleResponse === 'function') { + observer.prepareToHandleSingleResponse(); + } +} diff --git a/src/v1/internal/bookmark.js b/src/v1/internal/bookmark.js index c517c26f9..11d582d5c 100644 --- a/src/v1/internal/bookmark.js +++ b/src/v1/internal/bookmark.js @@ -36,6 +36,10 @@ export default class Bookmark { this._maxValue = maxBookmark(this._values); } + static empty() { + return EMPTY_BOOKMARK; + } + /** * Check if the given bookmark is meaningful and can be send to the database. * @return {boolean} returns true bookmark has a value, false otherwise. @@ -52,6 +56,14 @@ export default class Bookmark { return this._maxValue; } + /** + * Get all bookmark values as an array. + * @return {string[]} all values. + */ + values() { + return this._values; + } + /** * Get this bookmark as an object for begin transaction call. * @return {object} the value of this bookmark as object. @@ -72,6 +84,8 @@ export default class Bookmark { } } +const EMPTY_BOOKMARK = new Bookmark(null); + /** * Converts given value to an array. * @param {string|string[]} [value=undefined] argument to convert. diff --git a/src/v1/internal/connection.js b/src/v1/internal/connection.js index 69880bde3..4e90fac36 100644 --- a/src/v1/internal/connection.js +++ b/src/v1/internal/connection.js @@ -225,7 +225,6 @@ export default class Connection { * failing, and the connection getting ejected from the session pool. * * @param error an error object, forwarded to all current and future subscribers - * @protected */ _handleFatalError(error) { this._isBroken = true; @@ -267,7 +266,8 @@ export default class Connection { this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`); } try { - this._currentObserver.onCompleted( payload ); + const metadata = this._protocol.transformMetadata(payload); + this._currentObserver.onCompleted(metadata); } finally { this._updateCurrentObserver(); } diff --git a/src/v1/internal/protocol-handshaker.js b/src/v1/internal/protocol-handshaker.js index 276c9d2db..d940373a7 100644 --- a/src/v1/internal/protocol-handshaker.js +++ b/src/v1/internal/protocol-handshaker.js @@ -21,6 +21,7 @@ import {alloc} from './buf'; import {newError} from '../error'; import BoltProtocolV1 from './bolt-protocol-v1'; import BoltProtocolV2 from './bolt-protocol-v2'; +import BoltProtocolV3 from './bolt-protocol-v3'; const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP" const BOLT_MAGIC_PREAMBLE = 0x6060B017; @@ -69,15 +70,18 @@ export default class ProtocolHandshaker { * @private */ _createProtocolWithVersion(version) { - if (version === 1) { - return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers); - } else if (version === 2) { - return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers); - } else if (version === HTTP_MAGIC_PREAMBLE) { - throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + - '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'); - } else { - throw newError('Unknown Bolt protocol version: ' + version); + switch (version) { + case 1: + return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers); + case 2: + return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers); + case 3: + return new BoltProtocolV3(this._connection, this._chunker, this._disableLosslessIntegers); + case HTTP_MAGIC_PREAMBLE: + throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' + + '(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'); + default: + throw newError('Unknown Bolt protocol version: ' + version); } } } @@ -93,10 +97,10 @@ function newHandshakeBuffer() { handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE); //proposed versions + handshakeBuffer.writeInt32(3); handshakeBuffer.writeInt32(2); handshakeBuffer.writeInt32(1); handshakeBuffer.writeInt32(0); - handshakeBuffer.writeInt32(0); // reset the reader position handshakeBuffer.reset(); diff --git a/src/v1/internal/request-message.js b/src/v1/internal/request-message.js index a9255a7b2..22ff9d124 100644 --- a/src/v1/internal/request-message.js +++ b/src/v1/internal/request-message.js @@ -18,12 +18,17 @@ */ // Signature bytes for each request message type -const INIT = 0x01; // 0000 0001 // INIT +const INIT = 0x01; // 0000 0001 // INIT const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused const RESET = 0x0F; // 0000 1111 // RESET const RUN = 0x10; // 0001 0000 // RUN -const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD * - unused -const PULL_ALL = 0x3F; // 0011 1111 // PULL * +const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD_ALL - unused +const PULL_ALL = 0x3F; // 0011 1111 // PULL_ALL + +const HELLO = 0x01; // 0000 0001 // HELLO +const BEGIN = 0x11; // 0001 0001 // BEGIN +const COMMIT = 0x12; // 0001 0010 // COMMIT +const ROLLBACK = 0x13; // 0001 0011 // ROLLBACK export default class RequestMessage { @@ -68,8 +73,82 @@ export default class RequestMessage { static reset() { return RESET_MESSAGE; } + + /** + * Create a new HELLO message. + * @param {string} userAgent the user agent. + * @param {object} authToken the authentication token. + * @return {RequestMessage} new HELLO message. + */ + static hello(userAgent, authToken) { + const metadata = Object.assign({user_agent: userAgent}, authToken); + return new RequestMessage(HELLO, [metadata], () => `HELLO {user_agent: '${userAgent}', ...}`); + } + + /** + * Create a new BEGIN message. + * @param {Bookmark} bookmark the bookmark. + * @param {TxConfig} txConfig the configuration. + * @return {RequestMessage} new BEGIN message. + */ + static begin(bookmark, txConfig) { + const metadata = buildTxMetadata(bookmark, txConfig); + return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`); + } + + /** + * Get a COMMIT message. + * @return {RequestMessage} the COMMIT message. + */ + static commit() { + return COMMIT_MESSAGE; + } + + /** + * Get a ROLLBACK message. + * @return {RequestMessage} the ROLLBACK message. + */ + static rollback() { + return ROLLBACK_MESSAGE; + } + + /** + * Create a new RUN message with additional metadata. + * @param {string} statement the cypher statement. + * @param {object} parameters the statement parameters. + * @param {Bookmark} bookmark the bookmark. + * @param {TxConfig} txConfig the configuration. + * @return {RequestMessage} new RUN message with additional metadata. + */ + static runWithMetadata(statement, parameters, bookmark, txConfig) { + const metadata = buildTxMetadata(bookmark, txConfig); + return new RequestMessage(RUN, [statement, parameters, metadata], + () => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`); + } +} + +/** + * Create an object that represent transaction metadata. + * @param {Bookmark} bookmark the bookmark. + * @param {TxConfig} txConfig the configuration. + * @return {object} a metadata object. + */ +function buildTxMetadata(bookmark, txConfig) { + const metadata = {}; + if (!bookmark.isEmpty()) { + metadata['bookmarks'] = bookmark.values(); + } + if (txConfig.timeout) { + metadata['tx_timeout'] = txConfig.timeout; + } + if (txConfig.metadata) { + metadata['tx_metadata'] = txConfig.metadata; + } + return metadata; } // constants for messages that never change const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL'); const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET'); +const COMMIT_MESSAGE = new RequestMessage(COMMIT, [], () => 'COMMIT'); +const ROLLBACK_MESSAGE = new RequestMessage(ROLLBACK, [], () => 'ROLLBACK'); diff --git a/src/v1/internal/routing-util.js b/src/v1/internal/routing-util.js index 0cb9061b3..98b23de82 100644 --- a/src/v1/internal/routing-util.js +++ b/src/v1/internal/routing-util.js @@ -20,6 +20,8 @@ import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; import Integer, {int} from '../integer'; import {ServerVersion, VERSION_3_2_0} from './server-version'; +import Bookmark from './bookmark'; +import TxConfig from './tx-config'; const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers'; const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable($context)'; @@ -123,7 +125,7 @@ export default class RoutingUtil { params = {}; } - connection.protocol().run(query, params, streamObserver); + connection.protocol().run(query, params, Bookmark.empty(), TxConfig.empty(), streamObserver); }); } } diff --git a/src/v1/internal/server-version.js b/src/v1/internal/server-version.js index c7ffcbb16..a345f2ae1 100644 --- a/src/v1/internal/server-version.js +++ b/src/v1/internal/server-version.js @@ -112,6 +112,7 @@ function compareInts(x, y) { const VERSION_3_1_0 = new ServerVersion(3, 1, 0); const VERSION_3_2_0 = new ServerVersion(3, 2, 0); const VERSION_3_4_0 = new ServerVersion(3, 4, 0); +const VERSION_3_5_0 = new ServerVersion(3, 5, 0); const maxVer = Number.MAX_SAFE_INTEGER; const VERSION_IN_DEV = new ServerVersion(maxVer, maxVer, maxVer); @@ -120,6 +121,7 @@ export { VERSION_3_1_0, VERSION_3_2_0, VERSION_3_4_0, + VERSION_3_5_0, VERSION_IN_DEV }; diff --git a/src/v1/internal/stream-observer.js b/src/v1/internal/stream-observer.js index 398276d56..c6794b7e2 100644 --- a/src/v1/internal/stream-observer.js +++ b/src/v1/internal/stream-observer.js @@ -99,6 +99,20 @@ class StreamObserver { this._conn = conn; } + /** + * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. + * Response for RUN initializes statement keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. + * + * However, some operations can be represented as a single message which receives full metadata in a single response. + * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. + * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. + * + * This function prepares the observer to only handle a single response message. + */ + prepareToHandleSingleResponse() { + this._fieldKeys = []; + } + /** * Will be called on errors. * If user-provided observer is present, pass the error diff --git a/src/v1/internal/tx-config.js b/src/v1/internal/tx-config.js new file mode 100644 index 000000000..29f5fab84 --- /dev/null +++ b/src/v1/internal/tx-config.js @@ -0,0 +1,98 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as util from './util'; +import {int} from '../integer'; +import {newError} from '../error'; + +/** + * Internal holder of the transaction configuration. + * It performs input validation and value conversion for further serialization by the Bolt protocol layer. + * Users of the driver provide transaction configuration as regular objects `{timeout: 10, metadata: {key: 'value'}}`. + * Driver converts such objects to {@link TxConfig} immediately and uses converted values everywhere. + */ +export default class TxConfig { + + /** + * @constructor + * @param {object} config the raw configuration object. + */ + constructor(config) { + assertValidConfig(config); + this.timeout = extractTimeout(config); + this.metadata = extractMetadata(config); + } + + /** + * Get an empty config object. + * @return {TxConfig} an empty config. + */ + static empty() { + return EMPTY_CONFIG; + } + + /** + * Check if this config object is empty. I.e. has no configuration values specified. + * @return {boolean} `true` if this object is empty, `false` otherwise. + */ + isEmpty() { + return Object.values(this).every(value => value == null); + } +} + +const EMPTY_CONFIG = new TxConfig({}); + +/** + * @return {Integer|null} + */ +function extractTimeout(config) { + if (util.isObject(config) && (config.timeout || config.timeout === 0)) { + util.assertNumberOrInteger(config.timeout, 'Transaction timeout'); + const timeout = int(config.timeout); + if (timeout.isZero()) { + throw newError('Transaction timeout should not be zero'); + } + if (timeout.isNegative()) { + throw newError('Transaction timeout should not be negative'); + } + return timeout; + } + return null; +} + +/** + * @return {object|null} + */ +function extractMetadata(config) { + if (util.isObject(config) && config.metadata) { + const metadata = config.metadata; + util.assertObject(metadata); + if (Object.keys(metadata).length !== 0) { + // not an empty object + return metadata; + } + } + return null; +} + +function assertValidConfig(config) { + if (config) { + util.assertObject(config, 'Transaction config'); + } +} diff --git a/src/v1/internal/util.js b/src/v1/internal/util.js index aa858f87c..1703252ee 100644 --- a/src/v1/internal/util.js +++ b/src/v1/internal/util.js @@ -66,6 +66,13 @@ function validateStatementAndParameters(statement, parameters) { return {query, params}; } +function assertObject(obj, objName) { + if (!isObject(obj)) { + throw new TypeError(objName + ' expected to be an object but was: ' + JSON.stringify(obj)); + } + return obj; +} + function assertString(obj, objName) { if (!isString(obj)) { throw new TypeError(objName + ' expected to be string but was: ' + JSON.stringify(obj)); @@ -118,7 +125,9 @@ function isString(str) { export { isEmptyObjectOrNull, + isObject, isString, + assertObject, assertString, assertNumber, assertNumberOrInteger, diff --git a/src/v1/session.js b/src/v1/session.js index ae038585b..53e853082 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -25,13 +25,32 @@ import ConnectionHolder from './internal/connection-holder'; import Driver, {READ, WRITE} from './driver'; import TransactionExecutor from './internal/transaction-executor'; import Bookmark from './internal/bookmark'; +import TxConfig from './internal/tx-config'; +// Typedef for JSDoc. Declares TransactionConfig type and makes it possible to use in in method-level docs. /** - * A Session instance is used for handling the connection and - * sending statements through the connection. - * @access public - */ + * Configuration object containing settings for explicit and auto-commit transactions. + *

+ * Configuration is supported for: + *

    + *
  • queries executed in auto-commit transactions using {@link Session#run}
  • + *
  • transactions started by transaction functions using {@link Session#readTransaction} and {@link Session#writeTransaction}
  • + *
  • explicit transactions using {@link Session#beginTransaction}
  • + *
+ * @typedef {object} TransactionConfig + * @property {number} timeout - the transaction timeout in **milliseconds**. Transactions that execute longer than the configured timeout will + * be terminated by the database. This functionality allows to limit query/transaction execution time. Specified timeout overrides the default timeout + * configured in the database using `dbms.transaction.timeout` setting. Value should not represent a duration of zero or negative duration. + * @property {object} metadata - the transaction metadata. Specified metadata will be attached to the executing transaction and visible in the output of + * `dbms.listQueries` and `dbms.listTransactions` procedures. It will also get logged to the `query.log`. This functionality makes it easier to tag + * transactions and is equivalent to `dbms.setTXMetaData` procedure. + */ +/** + * A Session instance is used for handling the connection and + * sending statements through the connection. + * @access public + */ class Session { /** @@ -57,13 +76,15 @@ class Session { * or with the statement and parameters as separate arguments. * @param {mixed} statement - Cypher statement to execute * @param {Object} parameters - Map with parameters to use in statement + * @param {TransactionConfig} [transactionConfig] - configuration for the new auto-commit transaction. * @return {Result} - New Result */ - run(statement, parameters = {}) { + run(statement, parameters, transactionConfig) { const {query, params} = validateStatementAndParameters(statement, parameters); + const autoCommitTxConfig = transactionConfig ? new TxConfig(transactionConfig) : TxConfig.empty(); return this._run(query, params, (connection, streamObserver) => - connection.protocol().run(query, params, streamObserver) + connection.protocol().run(query, params, this._lastBookmark, autoCommitTxConfig, streamObserver) ); } @@ -89,17 +110,29 @@ class Session { * * While a transaction is open the session cannot be used to run statements outside the transaction. * - * @param {string|string[]} [bookmarkOrBookmarks=null] - reference or references to some previous transactions. - * DEPRECATED: This parameter is deprecated in favour of {@link Driver#session} that accepts an initial bookmark. - * Session will ensure that all nested transactions are chained with bookmarks to guarantee causal consistency. + * @param {TransactionConfig} [transactionConfig] - configuration for the new auto-commit transaction. * @returns {Transaction} - New Transaction */ - beginTransaction(bookmarkOrBookmarks) { - this._updateBookmark(new Bookmark(bookmarkOrBookmarks)); - return this._beginTransaction(this._mode); + beginTransaction(transactionConfig) { + // this function needs to support bookmarks parameter for backwards compatibility + // parameter was of type {string|string[]} and represented either a single or multiple bookmarks + // that's why we need to check parameter type and decide how to interpret the value + const arg = transactionConfig; + + let txConfig = TxConfig.empty(); + if (typeof arg === 'string' || arg instanceof String || Array.isArray(arg)) { + // argument looks like a single or multiple bookmarks + // bookmarks in this function are deprecated but need to be supported for backwards compatibility + this._updateBookmark(new Bookmark(arg)); + } else if (arg) { + // argument is probably a transaction configuration + txConfig = new TxConfig(arg); + } + + return this._beginTransaction(this._mode, txConfig); } - _beginTransaction(accessMode) { + _beginTransaction(accessMode, txConfig) { if (this._hasTx) { throw newError('You cannot begin a transaction on a session with an open transaction; ' + 'either run from within the transaction or use a different session.'); @@ -110,8 +143,13 @@ class Session { connectionHolder.initializeConnection(); this._hasTx = true; - const onTxClose = () => this._hasTx = false; - return new Transaction(connectionHolder, onTxClose.bind(this), this._lastBookmark, this._updateBookmark.bind(this)); + const tx = new Transaction(connectionHolder, this._transactionClosed.bind(this), this._updateBookmark.bind(this)); + tx._begin(this._lastBookmark, txConfig); + return tx; + } + + _transactionClosed() { + this._hasTx = false; } /** @@ -133,11 +171,13 @@ class Session { * * @param {function(tx: Transaction): Promise} transactionWork - callback that executes operations against * a given {@link Transaction}. + * @param {TransactionConfig} [transactionConfig] - configuration for all transactions started to execute the unit of work. * @return {Promise} resolved promise as returned by the given function or rejected promise when given * function or commit fails. */ - readTransaction(transactionWork) { - return this._runTransaction(READ, transactionWork); + readTransaction(transactionWork, transactionConfig) { + const config = new TxConfig(transactionConfig); + return this._runTransaction(READ, config, transactionWork); } /** @@ -150,16 +190,18 @@ class Session { * * @param {function(tx: Transaction): Promise} transactionWork - callback that executes operations against * a given {@link Transaction}. + * @param {TransactionConfig} [transactionConfig] - configuration for all transactions started to execute the unit of work. * @return {Promise} resolved promise as returned by the given function or rejected promise when given * function or commit fails. */ - writeTransaction(transactionWork) { - return this._runTransaction(WRITE, transactionWork); + writeTransaction(transactionWork, transactionConfig) { + const config = new TxConfig(transactionConfig); + return this._runTransaction(WRITE, config, transactionWork); } - _runTransaction(accessMode, transactionWork) { + _runTransaction(accessMode, transactionConfig, transactionWork) { return this._transactionExecutor.execute( - () => this._beginTransaction(accessMode), + () => this._beginTransaction(accessMode, transactionConfig), transactionWork ); } diff --git a/src/v1/transaction.js b/src/v1/transaction.js index d7a1efc5f..83f7320ed 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -21,6 +21,7 @@ import Result from './result'; import {validateStatementAndParameters} from './internal/util'; import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder'; import Bookmark from './internal/bookmark'; +import TxConfig from './internal/tx-config'; /** * Represents a transaction in the Neo4j database. @@ -32,20 +33,21 @@ class Transaction { * @constructor * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. - * @param {Bookmark} bookmark bookmark for transaction begin. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. */ - constructor(connectionHolder, onClose, bookmark, onBookmark) { + constructor(connectionHolder, onClose, onBookmark) { this._connectionHolder = connectionHolder; + this._state = _states.ACTIVE; + this._onClose = onClose; + this._onBookmark = onBookmark; + } + + _begin(bookmark, txConfig) { const streamObserver = new _TransactionStreamObserver(this); this._connectionHolder.getConnection(streamObserver) - .then(conn => conn.protocol().beginTransaction(bookmark, streamObserver)) + .then(conn => conn.protocol().beginTransaction(bookmark, txConfig, streamObserver)) .catch(error => streamObserver.onError(error)); - - this._state = _states.ACTIVE; - this._onClose = onClose; - this._onBookmark = onBookmark; } /** @@ -151,8 +153,12 @@ let _states = { }; }, run: (connectionHolder, observer, statement, parameters) => { + // RUN in explicit transaction can't contain bookmarks and transaction configuration + const bookmark = Bookmark.empty(); + const txConfig = TxConfig.empty(); + connectionHolder.getConnection(observer) - .then(conn => conn.protocol().run(statement, parameters || {}, observer)) + .then(conn => conn.protocol().run(statement, parameters, bookmark, txConfig, observer)) .catch(error => observer.onError(error)); return _newRunResult(observer, statement, parameters, () => observer.serverMetadata()); diff --git a/test/internal/bolt-protocol-v1.test.js b/test/internal/bolt-protocol-v1.test.js index b0bb65304..ff68d2215 100644 --- a/test/internal/bolt-protocol-v1.test.js +++ b/test/internal/bolt-protocol-v1.test.js @@ -20,6 +20,7 @@ import BoltProtocolV1 from '../../src/v1/internal/bolt-protocol-v1'; import RequestMessage from '../../src/v1/internal/request-message'; import Bookmark from '../../src/v1/internal/bookmark'; +import TxConfig from '../../src/v1/internal/tx-config'; class MessageRecorder { @@ -44,6 +45,15 @@ class MessageRecorder { describe('BoltProtocolV1', () => { + it('should not change metadata', () => { + const metadata = {result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4}; + const protocol = new BoltProtocolV1(new MessageRecorder(), null, false); + + const transformedMetadata = protocol.transformMetadata(metadata); + + expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4}); + }); + it('should initialize the connection', () => { const recorder = new MessageRecorder(); const protocol = new BoltProtocolV1(recorder, null, false); @@ -68,7 +78,7 @@ describe('BoltProtocolV1', () => { const parameters = {x: 'x', y: 'y'}; const observer = {}; - protocol.run(statement, parameters, observer); + protocol.run(statement, parameters, Bookmark.empty(), TxConfig.empty(), observer); recorder.verifyMessageCount(2); @@ -100,7 +110,7 @@ describe('BoltProtocolV1', () => { const bookmark = new Bookmark('neo4j:bookmark:v1:tx42'); const observer = {}; - protocol.beginTransaction(bookmark, observer); + protocol.beginTransaction(bookmark, TxConfig.empty(), observer); recorder.verifyMessageCount(2); diff --git a/test/internal/bolt-protocol-v3.test.js b/test/internal/bolt-protocol-v3.test.js new file mode 100644 index 000000000..837deee14 --- /dev/null +++ b/test/internal/bolt-protocol-v3.test.js @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BoltProtocolV3 from '../../src/v1/internal/bolt-protocol-v3'; + +describe('BoltProtocolV3', () => { + + it('should update metadata', () => { + const metadata = {t_first: 1, t_last: 2, db_hits: 3, some_other_key: 4}; + const protocol = new BoltProtocolV3(null, null, false); + + const transformedMetadata = protocol.transformMetadata(metadata); + + expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, db_hits: 3, some_other_key: 4}); + }); + +}); diff --git a/test/internal/bookmark.test.js b/test/internal/bookmark.test.js index e2b10fcb8..cfbf31212 100644 --- a/test/internal/bookmark.test.js +++ b/test/internal/bookmark.test.js @@ -118,4 +118,21 @@ describe('Bookmark', () => { }); }); + it('should expose bookmark values', () => { + expect(new Bookmark(undefined).values()).toEqual([]); + expect(new Bookmark(null).values()).toEqual([]); + + const bookmarkString = 'neo4j:bookmark:v1:tx123'; + expect(new Bookmark(bookmarkString).values()).toEqual([bookmarkString]); + + const bookmarkStrings = ['neo4j:bookmark:v1:tx1', 'neo4j:bookmark:v1:tx2', 'neo4j:bookmark:v1:tx3']; + expect(new Bookmark(bookmarkStrings).values()).toEqual(bookmarkStrings); + }); + + it('should expose empty bookmark value', () => { + const bookmark = Bookmark.empty(); + expect(bookmark).toBeDefined(); + expect(bookmark.isEmpty()).toBeTruthy(); + }); + }); diff --git a/test/internal/connection.test.js b/test/internal/connection.test.js index 7b63e7b28..4d94f6532 100644 --- a/test/internal/connection.test.js +++ b/test/internal/connection.test.js @@ -28,9 +28,10 @@ import {ServerVersion} from '../../src/v1/internal/server-version'; import lolex from 'lolex'; import Logger from '../../src/v1/internal/logger'; import StreamObserver from '../../src/v1/internal/stream-observer'; -import RequestMessage from '../../src/v1/internal/request-message'; import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler'; import testUtils from '../internal/test-utils'; +import Bookmark from '../../src/v1/internal/bookmark'; +import TxConfig from '../../src/v1/internal/tx-config'; const ILLEGAL_MESSAGE = {signature: 42, fields: []}; const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; @@ -88,16 +89,17 @@ describe('Connection', () => { records.push(record); }, onCompleted: () => { - expect(records[0][0]).toBe(1); + expect(records[0].get(0)).toBe(1); done(); } }; + const streamObserver = new StreamObserver(); + streamObserver.subscribe(pullAllObserver); - connection._negotiateProtocol().then(() => { - connection.protocol().initialize('mydriver/0.0.0', basicAuthToken()); - connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false); - connection.write(RequestMessage.pullAll(), pullAllObserver, true); - }); + connection.connect('mydriver/0.0.0', basicAuthToken()) + .then(() => { + connection.protocol().run('RETURN 1.0', {}, Bookmark.empty(), TxConfig.empty(), streamObserver); + }); }); it('should write protocol handshake', () => { @@ -107,10 +109,11 @@ describe('Connection', () => { connection._negotiateProtocol(); const boltMagicPreamble = '60 60 b0 17'; + const protocolVersion3 = '00 00 00 03'; const protocolVersion2 = '00 00 00 02'; const protocolVersion1 = '00 00 00 01'; const noProtocolVersion = '00 00 00 00'; - expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); + expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `); }); it('should provide error message when connecting to http-port', done => { @@ -218,7 +221,8 @@ describe('Connection', () => { }); it('should not queue RUN observer when broken', done => { - testQueueingOfObserversWithBrokenConnection(connection => connection.protocol().run('RETURN 1', {}, {}), done); + testQueueingOfObserversWithBrokenConnection(connection => + connection.protocol().run('RETURN 1', {}, Bookmark.empty(), TxConfig.empty(), {}), done); }); it('should not queue RESET observer when broken', done => { diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js index eb238d856..c292576e5 100644 --- a/test/internal/protocol-handshaker.test.js +++ b/test/internal/protocol-handshaker.test.js @@ -37,11 +37,12 @@ describe('ProtocolHandshaker', () => { expect(writtenBuffers.length).toEqual(1); const boltMagicPreamble = '60 60 b0 17'; + const protocolVersion3 = '00 00 00 03'; const protocolVersion2 = '00 00 00 02'; const protocolVersion1 = '00 00 00 01'; const noProtocolVersion = '00 00 00 00'; - expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `); + expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `); }); it('should create protocol with valid version', () => { diff --git a/test/internal/request-message.test.js b/test/internal/request-message.test.js index a21cce668..e4a83e68b 100644 --- a/test/internal/request-message.test.js +++ b/test/internal/request-message.test.js @@ -18,18 +18,21 @@ */ import RequestMessage from '../../src/v1/internal/request-message'; +import Bookmark from '../../src/v1/internal/bookmark'; +import TxConfig from '../../src/v1/internal/tx-config'; +import {int} from '../../src/v1'; describe('RequestMessage', () => { it('should create INIT message', () => { - const clientName = 'my-driver/1.0.2'; + const userAgent = 'my-driver/1.0.2'; const authToken = {username: 'neo4j', password: 'secret'}; - const message = RequestMessage.init(clientName, authToken); + const message = RequestMessage.init(userAgent, authToken); expect(message.signature).toEqual(0x01); - expect(message.fields).toEqual([clientName, authToken]); - expect(message.toString()).toEqual(`INIT ${clientName} {...}`); + expect(message.fields).toEqual([userAgent, authToken]); + expect(message.toString()).toEqual(`INIT ${userAgent} {...}`); }); it('should create RUN message', () => { @@ -58,4 +61,58 @@ describe('RequestMessage', () => { expect(message.fields).toEqual([]); expect(message.toString()).toEqual('RESET'); }); + + it('should create HELLO message', () => { + const userAgent = 'my-driver/1.0.2'; + const authToken = {username: 'neo4j', password: 'secret'}; + + const message = RequestMessage.hello(userAgent, authToken); + + expect(message.signature).toEqual(0x01); + expect(message.fields).toEqual([{user_agent: userAgent, username: 'neo4j', password: 'secret'}]); + expect(message.toString()).toEqual(`HELLO {user_agent: '${userAgent}', ...}`); + }); + + it('should create BEGIN message', () => { + const bookmark = new Bookmark(['neo4j:bookmark:v1:tx1', 'neo4j:bookmark:v1:tx10']); + const txConfig = new TxConfig({timeout: 42, metadata: {key: 42}}); + + const message = RequestMessage.begin(bookmark, txConfig); + + expect(message.signature).toEqual(0x11); + const expectedMetadata = {bookmarks: bookmark.values(), tx_timeout: int(42), tx_metadata: {key: 42}}; + expect(message.fields).toEqual([expectedMetadata]); + expect(message.toString()).toEqual(`BEGIN ${JSON.stringify(expectedMetadata)}`); + }); + + it('should create COMMIT message', () => { + const message = RequestMessage.commit(); + + expect(message.signature).toEqual(0x12); + expect(message.fields).toEqual([]); + expect(message.toString()).toEqual('COMMIT'); + }); + + it('should create ROLLBACK message', () => { + const message = RequestMessage.rollback(); + + expect(message.signature).toEqual(0x13); + expect(message.fields).toEqual([]); + expect(message.toString()).toEqual('ROLLBACK'); + }); + + it('should create RUN with metadata message', () => { + const statement = 'RETURN $x'; + const parameters = {x: 42}; + const bookmark = new Bookmark(['neo4j:bookmark:v1:tx1', 'neo4j:bookmark:v1:tx10', 'neo4j:bookmark:v1:tx100']); + const txConfig = new TxConfig({timeout: 999, metadata: {a: 'a', b: 'b'}}); + + const message = RequestMessage.runWithMetadata(statement, parameters, bookmark, txConfig); + + expect(message.signature).toEqual(0x10); + const expectedMetadata = {bookmarks: bookmark.values(), tx_timeout: int(999), tx_metadata: {a: 'a', b: 'b'}}; + expect(message.fields).toEqual([statement, parameters, expectedMetadata]); + expect(message.toString()).toEqual(`RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(expectedMetadata)}`); + }); + }); diff --git a/test/internal/stream-observer.test.js b/test/internal/stream-observer.test.js index 994c733c8..be322cdbc 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/stream-observer.test.js @@ -158,6 +158,20 @@ describe('StreamObserver', () => { expect(errors).toEqual([error1]); }); + it('should be able to handle a single response', done => { + const streamObserver = new StreamObserver(); + streamObserver.prepareToHandleSingleResponse(); + + streamObserver.subscribe({ + onCompleted: metadata => { + expect(metadata.key).toEqual(42); + done(); + } + }); + + streamObserver.onCompleted({key: 42}); + }); + }); function newStreamObserver() { diff --git a/test/internal/tx-config.test.js b/test/internal/tx-config.test.js new file mode 100644 index 000000000..0094d798e --- /dev/null +++ b/test/internal/tx-config.test.js @@ -0,0 +1,101 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import TxConfig from '../../src/v1/internal/tx-config'; +import {int} from '../../src/v1'; + +describe('TxConfig', () => { + + it('should be possible to construct from null', () => { + testEmptyConfigCreation(null); + }); + + it('should be possible to construct from undefined', () => { + testEmptyConfigCreation(undefined); + }); + + it('should be possible to construct from empty object', () => { + testEmptyConfigCreation({}); + }); + + it('should fail to construct from array', () => { + expect(() => new TxConfig([])).toThrowError(TypeError); + }); + + it('should fail to construct from function', () => { + const func = () => { + }; + expect(() => new TxConfig(func)).toThrowError(TypeError); + }); + + it('should expose empty config', () => { + const config = TxConfig.empty(); + expect(config).toBeDefined(); + expect(config.isEmpty()).toBeTruthy(); + }); + + it('should fail to construct with invalid timeout', () => { + const invalidTimeoutValues = ['15s', [15], {}, 0, int(0), -42, int(-42)]; + + invalidTimeoutValues.forEach(invalidValue => + expect(() => new TxConfig({timeout: invalidValue})).toThrow()); + }); + + it('should construct with valid timeout', () => { + testConfigCreationWithTimeout(1); + testConfigCreationWithTimeout(42000); + + testConfigCreationWithTimeout(int(1)); + testConfigCreationWithTimeout(int(424242)); + }); + + it('should fail to construct with invalid metadata', () => { + const invalidMetadataValues = ['hello', [1, 2, 3], () => 'Hello', 42]; + + invalidMetadataValues.forEach(invalidValue => + expect(() => new TxConfig({metadata: invalidValue})).toThrow()); + }); + + it('should construct with valid metadata', () => { + testEmptyConfigCreation({metadata: {}}); + + testConfigCreationWithMetadata({key: 'value'}); + testConfigCreationWithMetadata({map: {key1: 1, key2: '2', key3: []}, array: [1, 2, 3, '4']}); + }); + + function testEmptyConfigCreation(value) { + const config = new TxConfig(value); + expect(config).toBeDefined(); + expect(config.isEmpty()).toBeTruthy(); + } + + function testConfigCreationWithTimeout(value) { + const config = new TxConfig({timeout: value}); + expect(config).toBeDefined(); + expect(config.isEmpty()).toBeFalsy(); + expect(config.timeout).toEqual(int(value)); + } + + function testConfigCreationWithMetadata(value) { + const config = new TxConfig({metadata: value}); + expect(config).toBeDefined(); + expect(config.isEmpty()).toBeFalsy(); + expect(config.metadata).toEqual(value); + } + +}); diff --git a/test/internal/util.test.js b/test/internal/util.test.js index 6fc420ca9..1d88d42b7 100644 --- a/test/internal/util.test.js +++ b/test/internal/util.test.js @@ -152,6 +152,26 @@ describe('util', () => { verifyInvalidDate(2019); }); + it('should check objects', () => { + expect(util.isObject(42)).toBeFalsy(); + expect(util.isObject([])).toBeFalsy(); + expect(util.isObject(() => 'Hello')).toBeFalsy(); + expect(util.isObject('string')).toBeFalsy(); + + expect(util.isObject({})).toBeTruthy(); + expect(util.isObject({key1: 1, key2: '2'})).toBeTruthy(); + }); + + it('should assert on objects', () => { + expect(() => util.assertObject(42, '')).toThrowError(TypeError); + expect(() => util.assertObject([], '')).toThrowError(TypeError); + expect(() => util.assertObject(() => 'Hello', '')).toThrowError(TypeError); + expect(() => util.assertObject('string', '')).toThrowError(TypeError); + + expect(() => util.assertObject({}, '')).not.toThrow(); + expect(() => util.assertObject({key1: 1, key2: '2'}, '')).not.toThrow(); + }); + function verifyValidString(str) { expect(util.assertString(str, 'Test string')).toBe(str); } diff --git a/test/types/v1/session.test.ts b/test/types/v1/session.test.ts index 002e221f5..8ee6f5329 100644 --- a/test/types/v1/session.test.ts +++ b/test/types/v1/session.test.ts @@ -17,17 +17,27 @@ * limitations under the License. */ -import Session from "../../../types/v1/session"; +import Session, {TransactionConfig} from "../../../types/v1/session"; import Transaction from "../../../types/v1/transaction"; import Record from "../../../types/v1/record"; import Result, {StatementResult} from "../../../types/v1/result"; import ResultSummary from "../../../types/v1/result-summary"; +import Integer from "../../../types/v1/integer"; const dummy: any = null; +const intValue: Integer = Integer.fromInt(42); const session: Session = dummy; -const tx: Transaction = session.beginTransaction(); +const txConfig1: TransactionConfig = {}; +const txConfig2: TransactionConfig = {timeout: 5000}; +const txConfig3: TransactionConfig = {timeout: intValue}; +const txConfig4: TransactionConfig = {metadata: {}}; +const txConfig5: TransactionConfig = {metadata: {key1: 'value1', key2: 5, key3: {a: 'a', b: 'b'}, key4: [1, 2, 3]}}; +const txConfig6: TransactionConfig = {timeout: 2000, metadata: {key1: 'value1', key2: 2}}; +const txConfig7: TransactionConfig = {timeout: intValue, metadata: {key1: 'value1', key2: 2}}; + +const tx1: Transaction = session.beginTransaction(); const bookmark: null | string = session.lastBookmark(); const promise1: Promise = session.readTransaction((tx: Transaction) => { @@ -101,7 +111,7 @@ result4.subscribe({ onCompleted: (summary: ResultSummary) => console.log(summary) }); -const result5: Result = session.run({text: "RETURN 1"}); +const result5: Result = session.run("RETURN $value", {value: "42"}, txConfig1); result5.then((res: StatementResult) => { const records: Record[] = res.records; const summary: ResultSummary = res.summary; @@ -111,7 +121,7 @@ result5.then((res: StatementResult) => { console.log(error); }); -const result6: Result = session.run({text: "RETURN 1"}); +const result6: Result = session.run("RETURN $value", {value: "42"}, txConfig2); result6.subscribe({}); result6.subscribe({ onNext: (record: Record) => console.log(record) @@ -126,27 +136,6 @@ result6.subscribe({ onCompleted: (summary: ResultSummary) => console.log(summary) }); -const result7: Result = session.run({text: "RETURN $value", parameters: {value: 42}}); -result7.then((res: StatementResult) => { - const records: Record[] = res.records; - const summary: ResultSummary = res.summary; - console.log(records); - console.log(summary); -}).catch((error: Error) => { - console.log(error); -}); - -const result8: Result = session.run({text: "RETURN $value", parameters: {value: 42}}); -result8.subscribe({}); -result8.subscribe({ - onNext: (record: Record) => console.log(record) -}); -result8.subscribe({ - onNext: (record: Record) => console.log(record), - onError: (error: Error) => console.log(error) -}); -result8.subscribe({ - onNext: (record: Record) => console.log(record), - onError: (error: Error) => console.log(error), - onCompleted: (summary: ResultSummary) => console.log(summary) -}); +const tx2: Transaction = session.beginTransaction(txConfig2); +const promise5: Promise = session.readTransaction((tx: Transaction) => "", txConfig3); +const promise6: Promise = session.writeTransaction((tx: Transaction) => 42, txConfig4); diff --git a/test/types/v1/transaction.test.ts b/test/types/v1/transaction.test.ts index dbab43e07..cec754d6f 100644 --- a/test/types/v1/transaction.test.ts +++ b/test/types/v1/transaction.test.ts @@ -79,56 +79,6 @@ result4.subscribe({ onCompleted: (summary: ResultSummary) => console.log(summary) }); -const result5: Result = tx.run({text: "RETURN 1"}); -result5.then((res: StatementResult) => { - const records: Record[] = res.records; - const summary: ResultSummary = res.summary; - console.log(records); - console.log(summary); -}).catch((error: Error) => { - console.log(error); -}); - -const result6: Result = tx.run({text: "RETURN 1"}); -result6.subscribe({}); -result6.subscribe({ - onNext: (record: Record) => console.log(record) -}); -result6.subscribe({ - onNext: (record: Record) => console.log(record), - onError: (error: Error) => console.log(error) -}); -result6.subscribe({ - onNext: (record: Record) => console.log(record), - onError: (error: Error) => console.log(error), - onCompleted: (summary: ResultSummary) => console.log(summary) -}); - -const result7: Result = tx.run({text: "RETURN $value", parameters: {value: 42}}); -result7.then((res: StatementResult) => { - const records: Record[] = res.records; - const summary: ResultSummary = res.summary; - console.log(records); - console.log(summary); -}).catch((error: Error) => { - console.log(error); -}); - -const result8: Result = tx.run({text: "RETURN $value", parameters: {value: 42}}); -result8.subscribe({}); -result8.subscribe({ - onNext: (record: Record) => console.log(record) -}); -result8.subscribe({ - onNext: (record: Record) => console.log(record), - onError: (error: Error) => console.log(error) -}); -result8.subscribe({ - onNext: (record: Record) => console.log(record), - onError: (error: Error) => console.log(error), - onCompleted: (summary: ResultSummary) => console.log(summary) -}); - tx.commit().then((res: StatementResult) => { console.log(res); }).catch((error: Error) => { diff --git a/test/v1/bolt-v3.test.js b/test/v1/bolt-v3.test.js new file mode 100644 index 000000000..d36931604 --- /dev/null +++ b/test/v1/bolt-v3.test.js @@ -0,0 +1,358 @@ +/** + * Copyright (c) 2002-2018 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import neo4j from '../../src/v1'; +import sharedNeo4j from '../internal/shared-neo4j'; +import {ServerVersion, VERSION_3_5_0} from '../../src/v1/internal/server-version'; + +const TX_CONFIG_WITH_METADATA = {metadata: {a: 1, b: 2}}; +const TX_CONFIG_WITH_TIMEOUT = {timeout: 42}; + +const INVALID_TIMEOUT_VALUES = [0, -1, -42, '15 seconds', [1, 2, 3]]; +const INVALID_METADATA_VALUES = ['metadata', ['1', '2', '3'], () => 'hello world']; + +describe('Bolt V3 API', () => { + + let driver; + let session; + let serverVersion; + let originalTimeout; + + beforeEach(done => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken); + session = driver.session(); + originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; + jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000; + + session.run('MATCH (n) DETACH DELETE n').then(result => { + serverVersion = ServerVersion.fromString(result.summary.server.version); + done(); + }); + }); + + afterEach(() => { + jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout; + session.close(); + driver.close(); + }); + + it('should set transaction metadata for auto-commit transaction', done => { + if (!databaseSupportsBoltV3()) { + done(); + return; + } + + const metadata = { + a: 'hello world', + b: 424242, + c: [true, false, true] + }; + + // call listTransactions procedure that should list itself with the specified metadata + session.run('CALL dbms.listTransactions()', {}, {metadata: metadata}) + .then(result => { + const receivedMetadata = result.records[0].get('metaData'); + expect(receivedMetadata).toEqual(metadata); + done(); + }) + .catch(error => { + done.fail(error); + }); + }); + + it('should set transaction timeout for auto-commit transaction', done => { + if (!databaseSupportsBoltV3()) { + done(); + return; + } + + session.run('CREATE (:Node)') // create a dummy node + .then(() => { + const otherSession = driver.session(); + const tx = otherSession.beginTransaction(); + tx.run('MATCH (n:Node) SET n.prop = 1') // lock dummy node but keep the transaction open + .then(() => { + // run a query in an auto-commit transaction with timeout and try to update the locked dummy node + session.run('MATCH (n:Node) SET n.prop = $newValue', {newValue: 2}, {timeout: 1}) + .then(() => done.fail('Failure expected')) + .catch(error => { + expect(error.code.indexOf('TransientError')).toBeGreaterThan(0); + expect(error.message.indexOf('transaction has been terminated')).toBeGreaterThan(0); + + tx.rollback() + .then(() => otherSession.close()) + .then(() => done()) + .catch(error => done.fail(error)); + }); + }); + }); + }); + + it('should set transaction metadata with read transaction function', done => { + testTransactionMetadataWithTransactionFunctions(true, done); + }); + + it('should set transaction metadata with write transaction function', done => { + testTransactionMetadataWithTransactionFunctions(false, done); + }); + + it('should fail auto-commit transaction with metadata when database does not support Bolt V3', done => { + testAutoCommitTransactionConfigWhenBoltV3NotSupported(TX_CONFIG_WITH_METADATA, done); + }); + + it('should fail auto-commit transaction with timeout when database does not support Bolt V3', done => { + testAutoCommitTransactionConfigWhenBoltV3NotSupported(TX_CONFIG_WITH_TIMEOUT, done); + }); + + it('should fail read transaction function with metadata when database does not support Bolt V3', done => { + testTransactionFunctionConfigWhenBoltV3NotSupported(true, TX_CONFIG_WITH_METADATA, done); + }); + + it('should fail read transaction function with timeout when database does not support Bolt V3', done => { + testTransactionFunctionConfigWhenBoltV3NotSupported(true, TX_CONFIG_WITH_TIMEOUT, done); + }); + + it('should fail write transaction function with metadata when database does not support Bolt V3', done => { + testTransactionFunctionConfigWhenBoltV3NotSupported(false, TX_CONFIG_WITH_METADATA, done); + }); + + it('should fail write transaction function with timeout when database does not support Bolt V3', done => { + testTransactionFunctionConfigWhenBoltV3NotSupported(false, TX_CONFIG_WITH_TIMEOUT, done); + }); + + it('should set transaction metadata for explicit transactions', done => { + if (!databaseSupportsBoltV3()) { + done(); + return; + } + + const metadata = { + a: 12345, + b: 'string', + c: [1, 2, 3] + }; + + const tx = session.beginTransaction({metadata: metadata}); + // call listTransactions procedure that should list itself with the specified metadata + tx.run('CALL dbms.listTransactions()') + .then(result => { + const receivedMetadata = result.records[0].get('metaData'); + expect(receivedMetadata).toEqual(metadata); + tx.commit() + .then(() => done()) + .catch(error => done.fail(error)); + }) + .catch(error => { + done.fail(error); + }); + }); + + it('should set transaction timeout for explicit transactions', done => { + if (!databaseSupportsBoltV3()) { + done(); + return; + } + + session.run('CREATE (:Node)') // create a dummy node + .then(() => { + const otherSession = driver.session(); + const otherTx = otherSession.beginTransaction(); + otherTx.run('MATCH (n:Node) SET n.prop = 1') // lock dummy node but keep the transaction open + .then(() => { + // run a query in an explicit transaction with timeout and try to update the locked dummy node + const tx = session.beginTransaction({timeout: 1}); + tx.run('MATCH (n:Node) SET n.prop = $newValue', {newValue: 2}) + .then(() => done.fail('Failure expected')) + .catch(error => { + expect(error.code.indexOf('TransientError')).toBeGreaterThan(0); + expect(error.message.indexOf('transaction has been terminated')).toBeGreaterThan(0); + + otherTx.rollback() + .then(() => otherSession.close()) + .then(() => done()) + .catch(error => done.fail(error)); + }); + }); + }); + }); + + it('should fail to run in explicit transaction with metadata when database does not support Bolt V3', done => { + testRunInExplicitTransactionWithConfigWhenBoltV3NotSupported(TX_CONFIG_WITH_METADATA, done); + }); + + it('should fail to run in explicit transaction with timeout when database does not support Bolt V3', done => { + testRunInExplicitTransactionWithConfigWhenBoltV3NotSupported(TX_CONFIG_WITH_TIMEOUT, done); + }); + + it('should fail to commit explicit transaction with metadata when database does not support Bolt V3', done => { + testCloseExplicitTransactionWithConfigWhenBoltV3NotSupported(true, TX_CONFIG_WITH_METADATA, done); + }); + + it('should fail to commit explicit transaction with timeout when database does not support Bolt V3', done => { + testCloseExplicitTransactionWithConfigWhenBoltV3NotSupported(true, TX_CONFIG_WITH_TIMEOUT, done); + }); + + it('should fail to rollback explicit transaction with metadata when database does not support Bolt V3', done => { + testCloseExplicitTransactionWithConfigWhenBoltV3NotSupported(false, TX_CONFIG_WITH_METADATA, done); + }); + + it('should fail to rollback explicit transaction with timeout when database does not support Bolt V3', done => { + testCloseExplicitTransactionWithConfigWhenBoltV3NotSupported(false, TX_CONFIG_WITH_TIMEOUT, done); + }); + + it('should fail to run auto-commit transaction with invalid timeout', () => { + INVALID_TIMEOUT_VALUES.forEach(invalidValue => + expect(() => session.run('RETURN $x', {x: 42}, {timeout: invalidValue})).toThrow()); + }); + + it('should fail to run auto-commit transaction with invalid metadata', () => { + INVALID_METADATA_VALUES.forEach(invalidValue => + expect(() => session.run('RETURN $x', {x: 42}, {metadata: invalidValue})).toThrow()); + }); + + it('should fail to begin explicit transaction with invalid timeout', () => { + INVALID_TIMEOUT_VALUES.forEach(invalidValue => + expect(() => session.beginTransaction({timeout: invalidValue})).toThrow()); + }); + + it('should fail to begin explicit transaction with invalid metadata', () => { + INVALID_METADATA_VALUES.forEach(invalidValue => + expect(() => session.beginTransaction({metadata: invalidValue})).toThrow()); + }); + + it('should fail to run read transaction function with invalid timeout', () => { + INVALID_TIMEOUT_VALUES.forEach(invalidValue => + expect(() => session.readTransaction(tx => tx.run('RETURN 1'), {timeout: invalidValue})).toThrow()); + }); + + it('should fail to run read transaction function with invalid metadata', () => { + INVALID_METADATA_VALUES.forEach(invalidValue => + expect(() => session.readTransaction(tx => tx.run('RETURN 1'), {metadata: invalidValue})).toThrow()); + }); + + it('should fail to run write transaction function with invalid timeout', () => { + INVALID_TIMEOUT_VALUES.forEach(invalidValue => + expect(() => session.writeTransaction(tx => tx.run('RETURN 1'), {timeout: invalidValue})).toThrow()); + }); + + it('should fail to run write transaction function with invalid metadata', () => { + INVALID_METADATA_VALUES.forEach(invalidValue => + expect(() => session.writeTransaction(tx => tx.run('RETURN 1'), {metadata: invalidValue})).toThrow()); + }); + + function testTransactionMetadataWithTransactionFunctions(read, done) { + if (!databaseSupportsBoltV3()) { + done(); + return; + } + + const metadata = { + foo: 'bar', + baz: 42 + }; + + const txFunctionWithMetadata = work => read + ? session.readTransaction(work, {metadata: metadata}) + : session.writeTransaction(work, {metadata: metadata}); + + txFunctionWithMetadata(tx => tx.run('CALL dbms.listTransactions()')) + .then(result => { + const receivedMetadata = result.records[0].get('metaData'); + expect(receivedMetadata).toEqual(metadata); + done(); + }) + .catch(error => { + done.fail(error); + }); + } + + function testAutoCommitTransactionConfigWhenBoltV3NotSupported(txConfig, done) { + if (databaseSupportsBoltV3()) { + done(); + return; + } + + session.run('RETURN $x', {x: 42}, txConfig) + .then(() => done.fail('Failure expected')) + .catch(error => { + expectBoltV3NotSupportedError(error); + done(); + }); + } + + function testTransactionFunctionConfigWhenBoltV3NotSupported(read, txConfig, done) { + if (databaseSupportsBoltV3()) { + done(); + return; + } + + const txFunctionWithMetadata = work => read + ? session.readTransaction(work, txConfig) + : session.writeTransaction(work, txConfig); + + txFunctionWithMetadata(tx => tx.run('RETURN 42')) + .then(() => done.fail('Failure expected')) + .catch(error => { + expectBoltV3NotSupportedError(error); + done(); + }); + } + + function testRunInExplicitTransactionWithConfigWhenBoltV3NotSupported(txConfig, done) { + if (databaseSupportsBoltV3()) { + done(); + return; + } + + const tx = session.beginTransaction(txConfig); + tx.run('RETURN 42') + .then(() => done.fail('Failure expected')) + .catch(error => { + expectBoltV3NotSupportedError(error); + session.close(); + done(); + }); + } + + function testCloseExplicitTransactionWithConfigWhenBoltV3NotSupported(commit, txConfig, done) { + if (databaseSupportsBoltV3()) { + done(); + return; + } + + const tx = session.beginTransaction(txConfig); + const promise = commit ? tx.commit() : tx.rollback(); + + promise.then(() => done.fail('Failure expected')) + .catch(error => { + expectBoltV3NotSupportedError(error); + session.close(); + done(); + }); + } + + function expectBoltV3NotSupportedError(error) { + expect(error.message.indexOf('Driver is connected to the database that does not support transaction configuration')).toBeGreaterThan(-1); + } + + function databaseSupportsBoltV3() { + return serverVersion.compareTo(VERSION_3_5_0) >= 0; + } + +}); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index f5592efba..a14fd23d0 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -439,10 +439,10 @@ describe('session', () => { }); it('should fail nicely for illegal bookmark', () => { - expect(() => session.beginTransaction({})).toThrowError(TypeError); - expect(() => session.beginTransaction({foo: 'bar'})).toThrowError(TypeError); + expect(() => session.beginTransaction(42)).toThrowError(TypeError); expect(() => session.beginTransaction(42)).toThrowError(TypeError); expect(() => session.beginTransaction([42.0, 42.0])).toThrowError(TypeError); + expect(() => session.beginTransaction(() => ['bookmark:1', 'bookmark:2', 'bookmark:3'])).toThrowError(TypeError); }); it('should allow creation of a ' + neo4j.session.READ + ' session', done => { diff --git a/types/v1/session.d.ts b/types/v1/session.d.ts index b5fa37ab2..b4cefca73 100644 --- a/types/v1/session.d.ts +++ b/types/v1/session.d.ts @@ -18,20 +18,33 @@ */ import Transaction from "./transaction"; -import StatementRunner from "./statement-runner"; +import StatementRunner, {Parameters} from "./statement-runner"; +import Result from "./result"; +import {NumberOrInteger} from "./graph-types"; declare type TransactionWork = (tx: Transaction) => T | Promise; +declare interface TransactionConfig { + timeout?: NumberOrInteger; + metadata?: object; +} + declare interface Session extends StatementRunner { - beginTransaction(): Transaction; + run(statement: string, parameters?: Parameters, config?: TransactionConfig): Result; + + beginTransaction(config?: TransactionConfig): Transaction; lastBookmark(): string | null; - readTransaction(work: TransactionWork): Promise; + readTransaction(work: TransactionWork, config?: TransactionConfig): Promise; - writeTransaction(work: TransactionWork): Promise; + writeTransaction(work: TransactionWork, config?: TransactionConfig): Promise; close(callback?: () => void): void; } +export { + TransactionConfig +} + export default Session; diff --git a/types/v1/statement-runner.d.ts b/types/v1/statement-runner.d.ts index 701cc9c54..216ab5f32 100644 --- a/types/v1/statement-runner.d.ts +++ b/types/v1/statement-runner.d.ts @@ -23,8 +23,6 @@ declare type Parameters = { [key: string]: any }; declare interface StatementRunner { run(statement: string, parameters?: Parameters): Result; - - run(statement: { text: string, parameters?: Parameters }): Result; } export {Parameters}