diff --git a/src/driver.js b/src/driver.js index 65c1dd33e..8d3683ecc 100644 --- a/src/driver.js +++ b/src/driver.js @@ -30,9 +30,16 @@ import { } from './internal/pool-config' import Session from './session' import RxSession from './session-rx' +import { ALL } from './internal/request-message' const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour +/** + * The default record fetch size. This is used in Bolt V4 protocol to pull query execution result in batches. + * @type {number} + */ +const DEFAULT_FETCH_SIZE = 1000 + /** * Constant that represents read session access mode. * Should be used like this: `driver.session({ defaultAccessMode: neo4j.session.READ })`. @@ -132,19 +139,23 @@ class Driver { * @param {string} param.defaultAccessMode=WRITE - the access mode of this session, allowed values are {@link READ} and {@link WRITE}. * @param {string|string[]} param.bookmarks - the initial reference or references to some previous * transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown. + * @param {number} param.fetchSize - the record fetch size of each batch of this session. + * Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config. * @param {string} param.database - the database this session will operate on. * @return {Session} new session. */ session ({ defaultAccessMode = WRITE, bookmarks: bookmarkOrBookmarks, - database = '' + database = '', + fetchSize } = {}) { return this._newSession({ defaultAccessMode, bookmarkOrBookmarks, database, - reactive: false + reactive: false, + fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize) }) } @@ -229,7 +240,13 @@ class Driver { /** * @private */ - _newSession ({ defaultAccessMode, bookmarkOrBookmarks, database, reactive }) { + _newSession ({ + defaultAccessMode, + bookmarkOrBookmarks, + database, + reactive, + fetchSize + }) { const sessionMode = Driver._validateSessionMode(defaultAccessMode) const connectionProvider = this._getOrCreateConnectionProvider() const bookmark = bookmarkOrBookmarks @@ -241,7 +258,8 @@ class Driver { connectionProvider, bookmark, config: this._config, - reactive + reactive, + fetchSize }) } @@ -277,6 +295,10 @@ function sanitizeConfig (config) { config.connectionAcquisitionTimeout, DEFAULT_ACQUISITION_TIMEOUT ) + config.fetchSize = validateFetchSizeValue( + config.fetchSize, + DEFAULT_FETCH_SIZE + ) } /** @@ -293,6 +315,23 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) { } } +/** + * @private + */ +function validateFetchSizeValue (rawValue, defaultWhenAbsent) { + const fetchSize = parseInt(rawValue, 10) + if (fetchSize > 0 || fetchSize === ALL) { + return fetchSize + } else if (fetchSize === 0 || fetchSize < 0) { + throw new Error( + 'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' + + fetchSize + ) + } else { + return defaultWhenAbsent + } +} + export { Driver, READ, WRITE } export default Driver diff --git a/src/internal/bolt-protocol-v4.js b/src/internal/bolt-protocol-v4.js index 4dd2d00bf..046e24173 100644 --- a/src/internal/bolt-protocol-v4.js +++ b/src/internal/bolt-protocol-v4.js @@ -17,7 +17,7 @@ * limitations under the License. */ import BoltProtocolV3 from './bolt-protocol-v3' -import RequestMessage from './request-message' +import RequestMessage, { ALL } from './request-message' import { ResultStreamObserver } from './stream-observers' import { BOLT_PROTOCOL_V4 } from './constants' @@ -69,14 +69,16 @@ export default class BoltProtocol extends BoltProtocolV3 { beforeComplete, afterComplete, flush = true, - reactive = false + reactive = false, + fetchSize = ALL } = {} ) { const observer = new ResultStreamObserver({ connection: this._connection, reactive: reactive, - moreFunction: reactive ? this._requestMore : this._noOp, - discardFunction: reactive ? this._requestDiscard : this._noOp, + fetchSize: fetchSize, + moreFunction: this._requestMore, + discardFunction: this._requestDiscard, beforeKeys, afterKeys, beforeError, @@ -98,7 +100,11 @@ export default class BoltProtocol extends BoltProtocolV3 { ) if (!reactive) { - this._connection.write(RequestMessage.pull(), observer, flush) + this._connection.write( + RequestMessage.pull({ n: fetchSize }), + observer, + flush + ) } return observer diff --git a/src/internal/request-message.js b/src/internal/request-message.js index e310e156c..e5463cc6b 100644 --- a/src/internal/request-message.js +++ b/src/internal/request-message.js @@ -43,7 +43,7 @@ const READ_MODE = 'r' /* eslint-enable no-unused-vars */ const NO_STATEMENT_ID = -1 -const ALL = -1 +export const ALL = -1 export default class RequestMessage { constructor (signature, fields, toString) { @@ -220,19 +220,19 @@ export default class RequestMessage { function buildTxMetadata (bookmark, txConfig, database, mode) { const metadata = {} if (!bookmark.isEmpty()) { - metadata['bookmarks'] = bookmark.values() + metadata.bookmarks = bookmark.values() } if (txConfig.timeout) { - metadata['tx_timeout'] = txConfig.timeout + metadata.tx_timeout = txConfig.timeout } if (txConfig.metadata) { - metadata['tx_metadata'] = txConfig.metadata + metadata.tx_metadata = txConfig.metadata } if (database) { - metadata['db'] = assertString(database, 'database') + metadata.db = assertString(database, 'database') } if (mode === ACCESS_MODE_READ) { - metadata['mode'] = READ_MODE + metadata.mode = READ_MODE } return metadata } @@ -246,7 +246,7 @@ function buildTxMetadata (bookmark, txConfig, database, mode) { function buildStreamMetadata (stmtId, n) { const metadata = { n: int(n) } if (stmtId !== NO_STATEMENT_ID) { - metadata['qid'] = int(stmtId) + metadata.qid = int(stmtId) } return metadata } diff --git a/src/internal/stream-observers.js b/src/internal/stream-observers.js index f731f0c3b..5107edcae 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/stream-observers.js @@ -19,10 +19,8 @@ import Record from '../record' import Connection from './connection' import { newError, PROTOCOL_ERROR } from '../error' -import { isString } from './util' import Integer from '../integer' - -const DefaultBatchSize = 50 +import { ALL } from './request-message' class StreamObserver { onNext (rawRecord) {} @@ -50,7 +48,7 @@ class ResultStreamObserver extends StreamObserver { * @param {boolean} param.reactive * @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction - * @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction - - * @param {number|Integer} param.batchSize - + * @param {number|Integer} param.fetchSize - * @param {function(err: Error): Promise|void} param.beforeError - * @param {function(err: Error): Promise|void} param.afterError - * @param {function(keys: string[]): Promise|void} param.beforeKeys - @@ -63,7 +61,7 @@ class ResultStreamObserver extends StreamObserver { reactive = false, moreFunction, discardFunction, - batchSize = DefaultBatchSize, + fetchSize = ALL, beforeError, afterError, beforeKeys, @@ -98,7 +96,8 @@ class ResultStreamObserver extends StreamObserver { this._moreFunction = moreFunction this._discardFunction = discardFunction this._discard = false - this._batchSize = batchSize + this._fetchSize = fetchSize + this._finished = false } /** @@ -108,7 +107,7 @@ class ResultStreamObserver extends StreamObserver { * @param {Array} rawRecord - An array with the raw record */ onNext (rawRecord) { - let record = new Record(this._fieldKeys, rawRecord, this._fieldLookup) + const record = new Record(this._fieldKeys, rawRecord, this._fieldLookup) if (this._observers.some(o => o.onNext)) { this._observers.forEach(o => { if (o.onNext) { @@ -190,6 +189,7 @@ class ResultStreamObserver extends StreamObserver { delete meta.has_more } else { + this._finished = true const completionMetadata = Object.assign( this._connection ? { server: this._connection.server } : {}, this._meta, @@ -229,7 +229,6 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if ( - this._reactive && this._head && this._observers.some(o => o.onNext || o.onCompleted) && !this._streaming @@ -242,7 +241,7 @@ class ResultStreamObserver extends StreamObserver { this._moreFunction( this._connection, this._statementId, - this._batchSize, + this._fetchSize, this ) } @@ -282,12 +281,13 @@ class ResultStreamObserver extends StreamObserver { this._head = [] this._fieldKeys = [] this._tail = {} + this._finished = true } /** - * Discard pending record stream + * Cancel pending record stream */ - discard () { + cancel () { this._discard = true } @@ -302,6 +302,7 @@ class ResultStreamObserver extends StreamObserver { return } + this._finished = true this._hasFailed = true this._error = error @@ -357,7 +358,7 @@ class ResultStreamObserver extends StreamObserver { } this._observers.push(observer) - if (this._reactive) { + if (this._reactive && !this._finished) { this._handleStreaming() } } diff --git a/src/result-rx.js b/src/result-rx.js index bb5f3b87c..2b0fa1705 100644 --- a/src/result-rx.js +++ b/src/result-rx.js @@ -134,7 +134,7 @@ export default class RxResult { }) if (this._records.observers.length === 0) { - result._discard() + result._cancel() } result.subscribe({ diff --git a/src/result.js b/src/result.js index 47881796f..6777f04cc 100644 --- a/src/result.js +++ b/src/result.js @@ -84,12 +84,13 @@ class Result { */ summary () { return new Promise((resolve, reject) => { - this._streamObserverPromise.then(o => + this._streamObserverPromise.then(o => { + o.cancel() o.subscribe({ onCompleted: metadata => resolve(metadata), onError: err => reject(err) }) - ) + }) }) } @@ -102,8 +103,8 @@ class Result { _getOrCreatePromise () { if (!this._p) { this._p = new Promise((resolve, reject) => { - let records = [] - let observer = { + const records = [] + const observer = { onNext: record => { records.push(record) }, @@ -192,8 +193,8 @@ class Result { * @protected * @since 4.0.0 */ - _discard () { - this._streamObserverPromise.then(o => o.discard()) + _cancel () { + this._streamObserverPromise.then(o => o.cancel()) } } diff --git a/src/session.js b/src/session.js index b3e109401..03abc4524 100644 --- a/src/session.js +++ b/src/session.js @@ -49,6 +49,7 @@ class Session { * @param {string} args.database the database name * @param {Object} args.config={} - this driver configuration. * @param {boolean} args.reactive - whether this session should create reactive streams + * @param {number} args.fetchSize - defines how many records is pulled in each pulling batch */ constructor ({ mode, @@ -56,11 +57,13 @@ class Session { bookmark, database, config, - reactive + reactive, + fetchSize }) { this._mode = mode this._database = database this._reactive = reactive + this._fetchSize = fetchSize this._readConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_READ, database, @@ -107,7 +110,8 @@ class Session { mode: this._mode, database: this._database, afterComplete: this._onComplete, - reactive: this._reactive + reactive: this._reactive, + fetchSize: this._fetchSize }) ) } @@ -176,7 +180,8 @@ class Session { connectionHolder, onClose: this._transactionClosed.bind(this), onBookmark: this._updateBookmark.bind(this), - reactive: this._reactive + reactive: this._reactive, + fetchSize: this._fetchSize }) tx._begin(this._lastBookmark, txConfig) return tx diff --git a/src/transaction.js b/src/transaction.js index 908a96224..9c785f921 100644 --- a/src/transaction.js +++ b/src/transaction.js @@ -43,8 +43,9 @@ class Transaction { * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. * @param {boolean} reactive whether this transaction generates reactive streams + * @param {number} fetchSize - the record fetch size in each pulling batch. */ - constructor ({ connectionHolder, onClose, onBookmark, reactive }) { + constructor ({ connectionHolder, onClose, onBookmark, reactive, fetchSize }) { this._connectionHolder = connectionHolder this._reactive = reactive this._state = _states.ACTIVE @@ -52,6 +53,8 @@ class Transaction { this._onBookmark = onBookmark this._onError = this._onErrorCallback.bind(this) this._onComplete = this._onCompleteCallback.bind(this) + this._fetchSize = fetchSize + this._results = [] } _begin (bookmark, txConfig) { @@ -84,12 +87,15 @@ class Transaction { parameters ) - return this._state.run(query, params, { + var result = this._state.run(query, params, { connectionHolder: this._connectionHolder, onError: this._onError, onComplete: this._onComplete, - reactive: this._reactive + reactive: this._reactive, + fetchSize: this._fetchSize }) + this._results.push(result) + return result } /** @@ -97,13 +103,14 @@ class Transaction { * * After committing the transaction can no longer be used. * - * @returns {Result} New Result + * @returns {Promise} An empty promise if committed successfully or error if any error happened during commit. */ commit () { - let committed = this._state.commit({ + const committed = this._state.commit({ connectionHolder: this._connectionHolder, onError: this._onError, - onComplete: this._onComplete + onComplete: this._onComplete, + pendingResults: this._results }) this._state = committed.state // clean up @@ -121,13 +128,15 @@ class Transaction { * * After rolling back, the transaction can no longer be used. * - * @returns {Result} New Result + * @returns {Promise} An empty promise if rolled back successfully or error if any error happened during + * rollback. */ rollback () { - let rolledback = this._state.rollback({ + const rolledback = this._state.rollback({ connectionHolder: this._connectionHolder, onError: this._onError, - onComplete: this._onComplete + onComplete: this._onComplete, + pendingResults: this._results }) this._state = rolledback.state // clean up @@ -164,38 +173,50 @@ class Transaction { } } -let _states = { +const _states = { // The transaction is running with no explicit success or failure marked ACTIVE: { - commit: ({ connectionHolder, onError, onComplete }) => { + commit: ({ connectionHolder, onError, onComplete, pendingResults }) => { return { - result: finishTransaction(true, connectionHolder, onError, onComplete), + result: finishTransaction( + true, + connectionHolder, + onError, + onComplete, + pendingResults + ), state: _states.SUCCEEDED } }, - rollback: ({ connectionHolder, onError, onComplete }) => { + rollback: ({ connectionHolder, onError, onComplete, pendingResults }) => { return { - result: finishTransaction(false, connectionHolder, onError, onComplete), + result: finishTransaction( + false, + connectionHolder, + onError, + onComplete, + pendingResults + ), state: _states.ROLLED_BACK } }, run: ( statement, parameters, - { connectionHolder, onError, onComplete, reactive } + { connectionHolder, onError, onComplete, reactive, fetchSize } ) => { // RUN in explicit transaction can't contain bookmarks and transaction configuration + // No need to include mode and database name as it shall be inclued in begin const observerPromise = connectionHolder .getConnection() .then(conn => conn.protocol().run(statement, parameters, { bookmark: Bookmark.empty(), txConfig: TxConfig.empty(), - mode: connectionHolder.mode(), - database: connectionHolder.database(), beforeError: onError, afterComplete: onComplete, - reactive: reactive + reactive: reactive, + fetchSize: fetchSize }) ) .catch(error => new FailedObserver({ error, onError })) @@ -352,22 +373,32 @@ let _states = { * @param {ConnectionHolder} connectionHolder * @param {function(err:Error): any} onError * @param {function(metadata:object): any} onComplete + * @param {list>}pendingResults all run results in this transaction */ -function finishTransaction (commit, connectionHolder, onError, onComplete) { +function finishTransaction ( + commit, + connectionHolder, + onError, + onComplete, + pendingResults +) { const observerPromise = connectionHolder .getConnection() .then(connection => { - if (commit) { - return connection.protocol().commitTransaction({ - beforeError: onError, - afterComplete: onComplete - }) - } else { - return connection.protocol().rollbackTransaction({ - beforeError: onError, - afterComplete: onComplete - }) - } + pendingResults.forEach(r => r._cancel()) + return Promise.all(pendingResults).then(results => { + if (commit) { + return connection.protocol().commitTransaction({ + beforeError: onError, + afterComplete: onComplete + }) + } else { + return connection.protocol().rollbackTransaction({ + beforeError: onError, + afterComplete: onComplete + }) + } + }) }) .catch(error => new FailedObserver({ error, onError })) diff --git a/test/driver.test.js b/test/driver.test.js index 478de8d59..31822a6da 100644 --- a/test/driver.test.js +++ b/test/driver.test.js @@ -253,6 +253,28 @@ describe('#integration driver', () => { ) }) + it('should validate fetch size in the config', async () => { + await validateConfigSanitizing({}, 1000) + await validateConfigSanitizing({ fetchSize: 42 }, 42) + await validateConfigSanitizing({ fetchSize: -1 }, -1) + await validateConfigSanitizing({ fetchSize: '42' }, 42) + await validateConfigSanitizing({ fetchSize: '-1' }, -1) + }) + + it('should fail when fetch size is negative', () => { + expect(() => + neo4j.driver('bolt://localhost', sharedNeo4j.authToken, { + fetchSize: -77 + }) + ).toThrow() + }) + + it('should fail when fetch size is 0', () => { + expect(() => + neo4j.driver('bolt://localhost', sharedNeo4j.authToken, { fetchSize: 0 }) + ).toThrow() + }) + it('should discard closed connections', async () => { driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken) diff --git a/test/internal/node/direct.driver.boltkit.test.js b/test/internal/node/direct.driver.boltkit.test.js index 90b745fa7..99d2784c5 100644 --- a/test/internal/node/direct.driver.boltkit.test.js +++ b/test/internal/node/direct.driver.boltkit.test.js @@ -447,6 +447,237 @@ describe('#stub-direct direct driver with stub server', () => { }) }) + describe('should allow to change fetch size', () => { + async function verifyFailureOnCommit (version) { + if (!boltStub.supported) { + return + } + + const server = await boltStub.start( + `./test/resources/boltstub/${version}/read_in_batch.script`, + 9001 + ) + + const driver = boltStub.newDriver('bolt://127.0.0.1:9001', { + fetchSize: 2 + }) + const session = driver.session({ defaultAccessMode: READ }) + + const result = await session.run('MATCH (n) RETURN n.name') + const records = result.records + expect(records.length).toEqual(3) + expect(records[0].get(0)).toBe('Bob') + expect(records[1].get(0)).toBe('Alice') + expect(records[2].get(0)).toBe('Tina') + + const connectionKey = Object.keys(openConnections(driver))[0] + expect(connectionKey).toBeTruthy() + + const connection = openConnections(driver, connectionKey) + await session.close() + + // generate a fake fatal error + connection._handleFatalError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + // expect that the connection to be removed from the pool + expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0) + expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy() + // expect that the connection to be unregistered from the open connections registry + expect(openConnections(driver, connectionKey)).toBeFalsy() + + await driver.close() + await server.exit() + } + + it('v4', () => verifyFailureOnCommit('v4')) + }) + + describe('should stream in many batches', () => { + async function verifyFailureOnCommit (version) { + if (!boltStub.supported) { + return + } + + const server = await boltStub.start( + `./test/resources/boltstub/${version}/read_in_batch.script`, + 9001 + ) + + const driver = boltStub.newDriver('bolt://127.0.0.1:9001') + const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 }) + + const result = await session.run('MATCH (n) RETURN n.name') + const records = result.records + expect(records.length).toEqual(3) + expect(records[0].get(0)).toBe('Bob') + expect(records[1].get(0)).toBe('Alice') + expect(records[2].get(0)).toBe('Tina') + + const connectionKey = Object.keys(openConnections(driver))[0] + expect(connectionKey).toBeTruthy() + + const connection = openConnections(driver, connectionKey) + await session.close() + + // generate a fake fatal error + connection._handleFatalError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + // expect that the connection to be removed from the pool + expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0) + expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy() + // expect that the connection to be unregistered from the open connections registry + expect(openConnections(driver, connectionKey)).toBeFalsy() + + await driver.close() + await server.exit() + } + + it('v4', () => verifyFailureOnCommit('v4')) + }) + + describe('should ignore fetchSize setting', () => { + async function verifyFailureOnCommit (version) { + if (!boltStub.supported) { + return + } + + const server = await boltStub.start( + `./test/resources/boltstub/${version}/read.script`, + 9001 + ) + + const driver = boltStub.newDriver('bolt://127.0.0.1:9001') + const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 }) + + const result = await session.run('MATCH (n) RETURN n.name') + const records = result.records + expect(records.length).toEqual(3) + expect(records[0].get(0)).toBe('Bob') + expect(records[1].get(0)).toBe('Alice') + expect(records[2].get(0)).toBe('Tina') + + const connectionKey = Object.keys(openConnections(driver))[0] + expect(connectionKey).toBeTruthy() + + const connection = openConnections(driver, connectionKey) + await session.close() + + // generate a fake fatal error + connection._handleFatalError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + // expect that the connection to be removed from the pool + expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0) + expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy() + // expect that the connection to be unregistered from the open connections registry + expect(openConnections(driver, connectionKey)).toBeFalsy() + + await driver.close() + await server.exit() + } + + it('v3', () => verifyFailureOnCommit('v3')) + it('v2', () => verifyFailureOnCommit('v2')) + }) + + describe('should cancel stream with result summary method', () => { + async function verifyFailureOnCommit (version) { + if (!boltStub.supported) { + return + } + + const server = await boltStub.start( + `./test/resources/boltstub/${version}/read_discard.script`, + 9001 + ) + + const driver = boltStub.newDriver('bolt://127.0.0.1:9001') + const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 }) + + const result = session.run('MATCH (n) RETURN n.name') + await result.summary() + const records = (await result).records + expect(records.length).toEqual(2) + expect(records[0].get(0)).toBe('Bob') + expect(records[1].get(0)).toBe('Alice') + + const connectionKey = Object.keys(openConnections(driver))[0] + expect(connectionKey).toBeTruthy() + + const connection = openConnections(driver, connectionKey) + await session.close() + + // generate a fake fatal error + connection._handleFatalError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + // expect that the connection to be removed from the pool + expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0) + expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy() + // expect that the connection to be unregistered from the open connections registry + expect(openConnections(driver, connectionKey)).toBeFalsy() + + await driver.close() + await server.exit() + } + + it('v4', () => verifyFailureOnCommit('v4')) + }) + + describe('should cancel stream with tx commit', () => { + async function verifyFailureOnCommit (version) { + if (!boltStub.supported) { + return + } + + const server = await boltStub.start( + `./test/resources/boltstub/${version}/read_tx_discard.script`, + 9001 + ) + + const driver = boltStub.newDriver('bolt://127.0.0.1:9001') + const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 }) + const tx = session.beginTransaction() + + const result = tx.run('MATCH (n) RETURN n.name') + await tx.commit() + + // Client will receive a partial result + const records = (await result).records + expect(records.length).toEqual(2) + expect(records[0].get(0)).toBe('Bob') + expect(records[1].get(0)).toBe('Alice') + + const connectionKey = Object.keys(openConnections(driver))[0] + expect(connectionKey).toBeTruthy() + + const connection = openConnections(driver, connectionKey) + await session.close() + + // generate a fake fatal error + connection._handleFatalError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + // expect that the connection to be removed from the pool + expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0) + expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy() + // expect that the connection to be unregistered from the open connections registry + expect(openConnections(driver, connectionKey)).toBeFalsy() + + await driver.close() + await server.exit() + } + + it('v4', () => verifyFailureOnCommit('v4')) + }) + function connectionPool (driver, key) { return driver._connectionProvider._connectionPool._pools[key] } diff --git a/test/resources/boltstub/v3/acquire_endpoints_self_as_reader.script b/test/resources/boltstub/v3/acquire_endpoints_self_as_reader.script index 889bd4c25..d12b30803 100644 --- a/test/resources/boltstub/v3/acquire_endpoints_self_as_reader.script +++ b/test/resources/boltstub/v3/acquire_endpoints_self_as_reader.script @@ -9,7 +9,7 @@ S: SUCCESS {"fields": ["ttl", "servers"]} RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001","127.0.0.1:9009","127.0.0.1:9010"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9011"], "role": "ROUTE"}]] SUCCESS {} C: BEGIN {"mode": "r"} - RUN "MATCH (n) RETURN n.name AS name" {} {"mode": "r"} + RUN "MATCH (n) RETURN n.name AS name" {} {} PULL_ALL S: SUCCESS {} SUCCESS {"fields": ["name"]} diff --git a/test/resources/boltstub/v3/read_tx.script b/test/resources/boltstub/v3/read_tx.script index 8e3ca520a..d12a8aa3d 100644 --- a/test/resources/boltstub/v3/read_tx.script +++ b/test/resources/boltstub/v3/read_tx.script @@ -5,7 +5,7 @@ C: BEGIN {"mode": "r"} S: SUCCESS {} -C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} +C: RUN "MATCH (n) RETURN n.name" {} {} PULL_ALL S: SUCCESS {"fields": ["n.name"]} RECORD ["Bob"] diff --git a/test/resources/boltstub/v3/read_tx_dead.script b/test/resources/boltstub/v3/read_tx_dead.script index ffb0093a3..4f5f46306 100644 --- a/test/resources/boltstub/v3/read_tx_dead.script +++ b/test/resources/boltstub/v3/read_tx_dead.script @@ -3,7 +3,7 @@ !: AUTO RESET C: BEGIN {"mode": "r"} - RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} + RUN "MATCH (n) RETURN n.name" {} {} PULL_ALL S: SUCCESS {"fields": []} diff --git a/test/resources/boltstub/v3/read_tx_with_bookmarks.script b/test/resources/boltstub/v3/read_tx_with_bookmarks.script index 0af7e78e2..b097204b9 100644 --- a/test/resources/boltstub/v3/read_tx_with_bookmarks.script +++ b/test/resources/boltstub/v3/read_tx_with_bookmarks.script @@ -4,7 +4,7 @@ !: AUTO GOODBYE C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx42"], "mode": "r"} -C: RUN "MATCH (n) RETURN n.name AS name" {} {"mode": "r"} +C: RUN "MATCH (n) RETURN n.name AS name" {} {} PULL_ALL S: SUCCESS {} SUCCESS {"fields": ["name"]} diff --git a/test/resources/boltstub/v3/write_tx_not_a_leader.script b/test/resources/boltstub/v3/write_tx_not_a_leader.script index 933e2d016..ede883dbc 100644 --- a/test/resources/boltstub/v3/write_tx_not_a_leader.script +++ b/test/resources/boltstub/v3/write_tx_not_a_leader.script @@ -9,5 +9,3 @@ C: BEGIN {} S: SUCCESS {} FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"} IGNORED -C: COMMIT -S: IGNORED diff --git a/test/resources/boltstub/v4/hello_run_exit.script b/test/resources/boltstub/v4/hello_run_exit.script index c6f10a29a..204bf80fd 100644 --- a/test/resources/boltstub/v4/hello_run_exit.script +++ b/test/resources/boltstub/v4/hello_run_exit.script @@ -4,7 +4,7 @@ C: HELLO {"credentials": "password", "scheme": "basic", "user_agent": "neo4j-javascript/0.0.0-dev", "principal": "neo4j"} S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"} C: RUN "MATCH (n) RETURN n.name" {} {} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": ["n.name"]} RECORD ["Foo"] RECORD ["Bar"] diff --git a/test/resources/boltstub/v4/query_with_error.script b/test/resources/boltstub/v4/query_with_error.script index 48c37f036..ad098d98f 100644 --- a/test/resources/boltstub/v4/query_with_error.script +++ b/test/resources/boltstub/v4/query_with_error.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE C: RUN "RETURN 10 / 0" {} {} -C: PULL {"n": -1} +C: PULL {"n": 1000} S: FAILURE {"code": "Neo.ClientError.Statement.ArithmeticError", "message": "/ by zero"} S: IGNORED C: RESET diff --git a/test/resources/boltstub/v4/read.script b/test/resources/boltstub/v4/read.script index 11f0fb56a..91d0fb2c6 100644 --- a/test/resources/boltstub/v4/read.script +++ b/test/resources/boltstub/v4/read.script @@ -4,7 +4,7 @@ !: AUTO GOODBYE C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": ["n.name"]} RECORD ["Bob"] RECORD ["Alice"] diff --git a/test/resources/boltstub/v4/read_dead.script b/test/resources/boltstub/v4/read_dead.script index 98c489e73..1cac82d45 100644 --- a/test/resources/boltstub/v4/read_dead.script +++ b/test/resources/boltstub/v4/read_dead.script @@ -3,5 +3,5 @@ !: AUTO RESET C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} -C: PULL {"n": -1} +C: PULL {"n": 1000} S: diff --git a/test/resources/boltstub/v4/read_discard.script b/test/resources/boltstub/v4/read_discard.script new file mode 100644 index 000000000..82415c00c --- /dev/null +++ b/test/resources/boltstub/v4/read_discard.script @@ -0,0 +1,13 @@ +!: BOLT 4 +!: AUTO HELLO +!: AUTO RESET +!: AUTO GOODBYE + +C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} + PULL {"n": 2} +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + SUCCESS {"has_more":true} +C: DISCARD {"n": -1} +S: SUCCESS {} diff --git a/test/resources/boltstub/v4/read_from_aDatabase.script b/test/resources/boltstub/v4/read_from_aDatabase.script index 910f346c8..64ac80bed 100644 --- a/test/resources/boltstub/v4/read_from_aDatabase.script +++ b/test/resources/boltstub/v4/read_from_aDatabase.script @@ -4,7 +4,7 @@ !: AUTO GOODBYE C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r", "db": "aDatabase"} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": ["n.name"]} RECORD ["Bob"] RECORD ["Alice"] diff --git a/test/resources/boltstub/v4/read_from_aDatabase_with_bookmark.script b/test/resources/boltstub/v4/read_from_aDatabase_with_bookmark.script index 76425b105..6f6ae4a08 100644 --- a/test/resources/boltstub/v4/read_from_aDatabase_with_bookmark.script +++ b/test/resources/boltstub/v4/read_from_aDatabase_with_bookmark.script @@ -4,7 +4,7 @@ !: AUTO GOODBYE C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r", "db": "aDatabase", "bookmarks": ["system:1111", "aDatabase:5555"]} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": ["n.name"]} RECORD ["Bob"] RECORD ["Alice"] diff --git a/test/resources/boltstub/v4/read_in_batch.script b/test/resources/boltstub/v4/read_in_batch.script new file mode 100644 index 000000000..bd3b2ca06 --- /dev/null +++ b/test/resources/boltstub/v4/read_in_batch.script @@ -0,0 +1,14 @@ +!: BOLT 4 +!: AUTO HELLO +!: AUTO RESET +!: AUTO GOODBYE + +C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"} + PULL {"n": 2} +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + SUCCESS {"has_more":true} +C: PULL {"n": 2} +S: RECORD ["Tina"] + SUCCESS {} diff --git a/test/resources/boltstub/v4/read_tx_discard.script b/test/resources/boltstub/v4/read_tx_discard.script new file mode 100644 index 000000000..f6fdc62bd --- /dev/null +++ b/test/resources/boltstub/v4/read_tx_discard.script @@ -0,0 +1,17 @@ +!: BOLT 4 +!: AUTO HELLO +!: AUTO RESET +!: AUTO GOODBYE + +C: BEGIN {"mode": "r"} + RUN "MATCH (n) RETURN n.name" {} {} + PULL {"n": 2} +S: SUCCESS {} + SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + SUCCESS {"has_more":true} +C: DISCARD {"n": -1} +S: SUCCESS {} +C: COMMIT +S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx4242"} diff --git a/test/resources/boltstub/v4/read_tx_with_bookmarks.script b/test/resources/boltstub/v4/read_tx_with_bookmarks.script index 50d80c1db..3eb1c66bb 100644 --- a/test/resources/boltstub/v4/read_tx_with_bookmarks.script +++ b/test/resources/boltstub/v4/read_tx_with_bookmarks.script @@ -4,8 +4,8 @@ !: AUTO GOODBYE C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx42"], "mode": "r"} -C: RUN "MATCH (n) RETURN n.name AS name" {} {"mode": "r"} - PULL { "n": -1 } +C: RUN "MATCH (n) RETURN n.name AS name" {} {} + PULL { "n": 1000 } S: SUCCESS {} SUCCESS {"fields": ["name"]} RECORD ["Bob"] diff --git a/test/resources/boltstub/v4/reset_error.script b/test/resources/boltstub/v4/reset_error.script index 8b77172b9..4ad74ca72 100644 --- a/test/resources/boltstub/v4/reset_error.script +++ b/test/resources/boltstub/v4/reset_error.script @@ -3,7 +3,7 @@ !: AUTO GOODBYE C: RUN "RETURN 42 AS answer" {} {} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": ["answer"]} RECORD [42] SUCCESS {} diff --git a/test/resources/boltstub/v4/return_x.script b/test/resources/boltstub/v4/return_x.script index 5216b685d..18721c1dc 100644 --- a/test/resources/boltstub/v4/return_x.script +++ b/test/resources/boltstub/v4/return_x.script @@ -4,7 +4,7 @@ !: AUTO GOODBYE C: RUN "RETURN $x" {"x": 1} {} - PULL { "n": -1 } + PULL { "n": 1000 } S: SUCCESS {"fields": ["x"]} RECORD [1] SUCCESS {} diff --git a/test/resources/boltstub/v4/write.script b/test/resources/boltstub/v4/write.script index 36618e1a4..b201531e8 100644 --- a/test/resources/boltstub/v4/write.script +++ b/test/resources/boltstub/v4/write.script @@ -4,6 +4,6 @@ !: AUTO GOODBYE C: RUN "CREATE (n {name:'Bob'})" {} {} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": []} SUCCESS {} diff --git a/test/resources/boltstub/v4/write_read_tx_with_bookmarks.script b/test/resources/boltstub/v4/write_read_tx_with_bookmarks.script index f3dda0325..41794a528 100644 --- a/test/resources/boltstub/v4/write_read_tx_with_bookmarks.script +++ b/test/resources/boltstub/v4/write_read_tx_with_bookmarks.script @@ -5,7 +5,7 @@ C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx42"]} RUN "CREATE (n {name:'Bob'})" {} {} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {} SUCCESS {"fields": []} SUCCESS {} @@ -13,7 +13,7 @@ C: COMMIT S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx4242"} C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx4242"]} RUN "MATCH (n) RETURN n.name AS name" {} {} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {} SUCCESS {"fields": ["name"]} RECORD ["Bob"] diff --git a/test/resources/boltstub/v4/write_to_aDatabase.script b/test/resources/boltstub/v4/write_to_aDatabase.script index 281eb22fa..d5036b838 100644 --- a/test/resources/boltstub/v4/write_to_aDatabase.script +++ b/test/resources/boltstub/v4/write_to_aDatabase.script @@ -4,6 +4,6 @@ !: AUTO GOODBYE C: RUN "CREATE (n {name:'Bob'})" {} {"db": "aDatabase"} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {"fields": []} SUCCESS {} diff --git a/test/resources/boltstub/v4/write_tx_with_bookmarks.script b/test/resources/boltstub/v4/write_tx_with_bookmarks.script index ed2a0a334..9c6ca0cd6 100644 --- a/test/resources/boltstub/v4/write_tx_with_bookmarks.script +++ b/test/resources/boltstub/v4/write_tx_with_bookmarks.script @@ -5,7 +5,7 @@ C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx42"]} C: RUN "CREATE (n {name:'Bob'})" {} {} - PULL {"n": -1} + PULL {"n": 1000} S: SUCCESS {} SUCCESS {"fields": []} SUCCESS {} diff --git a/test/rx/summary.test.js b/test/rx/summary.test.js index bb3239058..c1ea1b88d 100644 --- a/test/rx/summary.test.js +++ b/test/rx/summary.test.js @@ -670,7 +670,7 @@ describe('#integration-rx summary', () => { const indices = await session.run('CALL db.indexes()') for (let i = 0; i < indices.records.length; i++) { - await session.run(`DROP ${getName(indices.records[i])}`) + await session.run(`DROP INDEX ${getName(indices.records[i])}`) } } finally { await session.close() diff --git a/test/session.test.js b/test/session.test.js index 4829fcf65..829667bf5 100644 --- a/test/session.test.js +++ b/test/session.test.js @@ -250,6 +250,7 @@ describe('#integration session', () => { session.run(statement).then(result => { const sum = result.summary + expect(result.records.length).toBe(10000) expect(sum.resultAvailableAfter.toInt()).not.toBeLessThan(0) expect(sum.resultConsumedAfter.toInt()).not.toBeLessThan(0) done() diff --git a/test/types/transaction.test.ts b/test/types/transaction.test.ts index 67c4ee307..4c05ba53f 100644 --- a/test/types/transaction.test.ts +++ b/test/types/transaction.test.ts @@ -83,18 +83,10 @@ result4.subscribe({ onCompleted: (summary: ResultSummary) => console.log(summary) }) -tx.commit() - .then((res: StatementResult) => { - console.log(res) - }) - .catch((error: Error) => { - console.log(error) - }) +tx.commit().then(() => { + console.log('transaction committed') +}) -tx.rollback() - .then((res: StatementResult) => { - console.log(res) - }) - .catch((error: Error) => { - console.log(error) - }) +tx.rollback().then(() => { + console.log('transaction rolled back') +}) diff --git a/types/driver.d.ts b/types/driver.d.ts index c6f541788..716bc5875 100644 --- a/types/driver.d.ts +++ b/types/driver.d.ts @@ -49,9 +49,11 @@ declare interface Config { trust?: TrustStrategy trustedCertificates?: string[] knownHosts?: string + fetchSize?: number maxConnectionPoolSize?: number maxTransactionRetryTime?: number maxConnectionLifetime?: number + connectionAcquisitionTimeout?: number connectionTimeout?: number disableLosslessIntegers?: boolean logging?: LoggingConfig @@ -71,6 +73,7 @@ declare interface Driver { }?: { defaultAccessMode?: SessionMode bookmarks?: string | string[] + fetchSize?: number database?: string }): Session diff --git a/types/transaction.d.ts b/types/transaction.d.ts index 576ad20a5..9ae2ae749 100644 --- a/types/transaction.d.ts +++ b/types/transaction.d.ts @@ -21,9 +21,9 @@ import Result from './result' import StatementRunner from './statement-runner' declare interface Transaction extends StatementRunner { - commit(): Result + commit(): Promise - rollback(): Result + rollback(): Promise isOpen(): boolean }