From 6a51799e07d063cf1d4136e2f855404200221b62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Wed, 23 Dec 2020 10:46:27 +0100 Subject: [PATCH] Providing protocol version in Result get by Transaction.run The result summary was not filled with the protocol version because the transactions were creating the Result object with a empty connection holder. Since the Result releases the connection from holder when it completes, the original holder could not be passed to it. The ReadOnlyConnectionHolder provides a safe way to pass the holder forward without let the consumer initialize, close or release the connection by simulating this methods without do the action and delegating part of the requests to the original ConnectionHolder. These changes also made shared-neo4j cluster friendly. --- src/internal/connection-holder-readonly.js | 77 +++++ src/result.js | 26 +- src/transaction.js | 49 ++- .../connection-holder-readonly.test.js | 327 ++++++++++++++++++ test/internal/shared-neo4j.js | 8 +- test/transaction.test.js | 15 + 6 files changed, 479 insertions(+), 23 deletions(-) create mode 100644 src/internal/connection-holder-readonly.js create mode 100644 test/internal/connection-holder-readonly.test.js diff --git a/src/internal/connection-holder-readonly.js b/src/internal/connection-holder-readonly.js new file mode 100644 index 000000000..831c37477 --- /dev/null +++ b/src/internal/connection-holder-readonly.js @@ -0,0 +1,77 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import ConnectionHolder from './connection-holder' + +/** + * Provides a interaction with a ConnectionHolder without change it state by + * releasing or initilizing + */ +export default class ReadOnlyConnectionHolder extends ConnectionHolder { + /** + * Contructor + * @param {ConnectionHolder} connectionHolder the connection holder which will treat the requests + */ + constructor (connectionHolder) { + super({ + mode: connectionHolder._mode, + database: connectionHolder._database, + bookmark: connectionHolder._bookmark, + connectionProvider: connectionHolder._connectionProvider + }) + this._connectionHolder = connectionHolder + } + + /** + * Return the true if the connection is suppose to be initilized with the command. + * + * @return {boolean} + */ + initializeConnection () { + if (this._connectionHolder._referenceCount === 0) { + return false + } + return true + } + + /** + * Get the current connection promise. + * @return {Promise} promise resolved with the current connection. + */ + getConnection () { + return this._connectionHolder.getConnection() + } + + /** + * Get the current connection promise, doesn't performs the release + * @return {Promise} promise with the resolved current connection + */ + releaseConnection () { + return this._connectionHolder.getConnection().catch(() => Promise.resolve()) + } + + /** + * Get the current connection promise, doesn't performs the connection close + * @return {Promise} promise with the resolved current connection + */ + close () { + return this._connectionHolder + .getConnection() + .catch(() => () => Promise.resolve()) + } +} diff --git a/src/result.js b/src/result.js index 6dc77859f..565b3ba19 100644 --- a/src/result.js +++ b/src/result.js @@ -166,26 +166,34 @@ class Result { const query = this._query const parameters = this._parameters - function release (protocolVersion) { + function complete (protocolVersion) { + onCompletedOriginal.call( + observer, + new ResultSummary(query, parameters, metadata, protocolVersion) + ) + } + + function release () { // notify connection holder that the used connection is not needed any more because result has // been fully consumed; call the original onCompleted callback after that - connectionHolder.releaseConnection().then(() => { - onCompletedOriginal.call( - observer, - new ResultSummary(query, parameters, metadata, protocolVersion) - ) - }) + return connectionHolder.releaseConnection() } connectionHolder.getConnection().then( // onFulfilled: connection => { - release(connection ? connection.protocol().version : undefined) + release().then(() => + complete( + connection !== undefined + ? connection.protocol().version + : undefined + ) + ) }, // onRejected: _ => { - release() + complete() } ) } diff --git a/src/transaction.js b/src/transaction.js index ccb02a8a6..2e7e57a0e 100644 --- a/src/transaction.js +++ b/src/transaction.js @@ -21,6 +21,7 @@ import { validateQueryAndParameters } from './internal/util' import ConnectionHolder, { EMPTY_CONNECTION_HOLDER } from './internal/connection-holder' +import ReadOnlyConnectionHolder from './internal/connection-holder-readonly' import Bookmark from './internal/bookmark' import TxConfig from './internal/tx-config' @@ -257,7 +258,12 @@ const _states = { }) .catch(error => new FailedObserver({ error, onError })) - return newCompletedResult(observerPromise, query, parameters) + return newCompletedResult( + observerPromise, + query, + parameters, + connectionHolder + ) } }, @@ -274,14 +280,20 @@ const _states = { onError }), 'COMMIT', - {} + {}, + connectionHolder ), state: _states.FAILED } }, rollback: ({ connectionHolder, onError, onComplete }) => { return { - result: newCompletedResult(new CompletedObserver(), 'ROLLBACK', {}), + result: newCompletedResult( + new CompletedObserver(), + 'ROLLBACK', + {}, + connectionHolder + ), state: _states.FAILED } }, @@ -294,7 +306,8 @@ const _states = { onError }), query, - parameters + parameters, + connectionHolder ) } }, @@ -313,7 +326,8 @@ const _states = { 'COMMIT', {} ), - state: _states.SUCCEEDED + state: _states.SUCCEEDED, + connectionHolder } }, rollback: ({ connectionHolder, onError, onComplete }) => { @@ -328,7 +342,8 @@ const _states = { 'ROLLBACK', {} ), - state: _states.SUCCEEDED + state: _states.SUCCEEDED, + connectionHolder } }, run: (query, parameters, { connectionHolder, onError, onComplete }) => { @@ -340,7 +355,8 @@ const _states = { onError }), query, - parameters + parameters, + connectionHolder ) } }, @@ -357,7 +373,8 @@ const _states = { onError }), 'COMMIT', - {} + {}, + connectionHolder ), state: _states.ROLLED_BACK } @@ -371,7 +388,8 @@ const _states = { ) }), 'ROLLBACK', - {} + {}, + connectionHolder ), state: _states.ROLLED_BACK } @@ -385,7 +403,8 @@ const _states = { onError }), query, - parameters + parameters, + connectionHolder ) } } @@ -446,15 +465,21 @@ function finishTransaction ( * @param {ResultStreamObserver} observer - an observer for the created result. * @param {string} query - the cypher query that produced the result. * @param {Object} parameters - the parameters for cypher query that produced the result. + * @param {ConnectionHolder} connectionHolder - the connection holder used to get the result * @return {Result} new result. * @private */ -function newCompletedResult (observerPromise, query, parameters) { +function newCompletedResult ( + observerPromise, + query, + parameters, + connectionHolder = EMPTY_CONNECTION_HOLDER +) { return new Result( Promise.resolve(observerPromise), query, parameters, - EMPTY_CONNECTION_HOLDER + new ReadOnlyConnectionHolder(connectionHolder || EMPTY_CONNECTION_HOLDER) ) } diff --git a/test/internal/connection-holder-readonly.test.js b/test/internal/connection-holder-readonly.test.js new file mode 100644 index 000000000..442da07d2 --- /dev/null +++ b/test/internal/connection-holder-readonly.test.js @@ -0,0 +1,327 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ConnectionHolder, { + EMPTY_CONNECTION_HOLDER +} from '../../src/internal/connection-holder' +import SingleConnectionProvider from '../../src/internal/connection-provider-single' +import { READ, WRITE } from '../../src/driver' +import FakeConnection from './fake-connection' +import Connection from '../../src/internal/connection' +import ReadOnlyConnectionHolder from '../../src/internal/connection-holder-readonly' + +describe('#unit ReadOnlyConnectionHolder wrapping EmptyConnectionHolder', () => { + it('should return rejected promise instead of connection', done => { + EMPTY_CONNECTION_HOLDER.getConnection().catch(() => { + done() + }) + }) + + it('should return resolved promise on release', done => { + EMPTY_CONNECTION_HOLDER.releaseConnection().then(() => { + done() + }) + }) + + it('should return resolved promise on close', done => { + EMPTY_CONNECTION_HOLDER.close().then(() => { + done() + }) + }) +}) + +describe('#unit ReadOnlyConnectionHolder wrapping ConnectionHolder', () => { + it('should return connection promise', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.getConnection().then(conn => { + expect(conn).toBe(connection) + done() + }) + }) + + it('should return connection promise with version', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/9.9.9') + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.getConnection().then(conn => { + verifyConnection(conn, 'Neo4j/9.9.9') + done() + }) + }) + + it('should propagate connection acquisition failure', done => { + const errorMessage = 'Failed to acquire or initialize the connection' + const connectionPromise = Promise.reject(new Error(errorMessage)) + const connectionProvider = newSingleConnectionProvider(connectionPromise) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.getConnection().catch(error => { + expect(error.message).toEqual(errorMessage) + done() + }) + }) + + it('should release not connection with single user', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should not release connection with multiple users', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => { + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + } + ) + + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should not release connection with multiple users when all users release', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => { + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + } + ) + + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + connectionHolder.releaseConnection().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + }) + }) + + it('should do nothing when closed and not initialized', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should not close even when users exist', done => { + const connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => { + connectionHolder.initializeConnection() + connectionHolder.initializeConnection() + } + ) + + connectionHolder.close().then(() => { + expect(connection.isNeverReleased()).toBeTruthy() + done() + }) + }) + + it('should initialize new connection after releasing current one', done => { + const connection1 = new FakeConnection() + const connection2 = new FakeConnection() + const connectionProvider = new RecordingConnectionProvider([ + connection1, + connection2 + ]) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.releaseConnection().then(() => { + expect(connection1.isReleasedOnce()).toBeFalsy() + + connectionHolder.initializeConnection() + connectionHolder.releaseConnection().then(() => { + expect(connection2.isReleasedOnce()).toBeFalsy() + done() + }) + }) + }) + + it('should initialize new connection after being closed', done => { + const connection1 = new FakeConnection() + const connection2 = new FakeConnection() + const connectionProvider = new RecordingConnectionProvider([ + connection1, + connection2 + ]) + const connectionHolder = newConnectionHolder( + { + mode: READ, + connectionProvider + }, + connectionHolder => connectionHolder.initializeConnection() + ) + + connectionHolder.close().then(() => { + expect(connection1.isNeverReleased()).toBeTruthy() + + connectionHolder.initializeConnection() + connectionHolder.close().then(() => { + expect(connection2.isNeverReleased()).toBeTruthy() + done() + }) + }) + }) + + it('should return passed mode', () => { + function verifyMode (connectionProvider, mode) { + expect(connectionProvider.mode()).toBe(mode) + } + + verifyMode(newConnectionHolder(), WRITE) + verifyMode(newConnectionHolder({ mode: WRITE }), WRITE) + verifyMode(newConnectionHolder({ mode: READ }), READ) + }) + + it('should default to empty database', () => { + function verifyDefault (connectionProvider) { + expect(connectionProvider.database()).toBe('') + } + + const connectionProvider = newSingleConnectionProvider(new FakeConnection()) + + verifyDefault(newConnectionHolder()) + verifyDefault(newConnectionHolder({ mode: READ, connectionProvider })) + verifyDefault(newConnectionHolder({ mode: WRITE, connectionProvider })) + verifyDefault( + newConnectionHolder({ mode: WRITE, database: '', connectionProvider }) + ) + verifyDefault( + newConnectionHolder({ mode: WRITE, database: null, connectionProvider }) + ) + verifyDefault( + newConnectionHolder({ + mode: WRITE, + database: undefined, + connectionProvider + }) + ) + }) + + it('should return passed database', () => { + const connectionProvider = newSingleConnectionProvider(new FakeConnection()) + const connectionHolder = newConnectionHolder({ + database: 'testdb', + connectionProvider + }) + + expect(connectionHolder.database()).toBe('testdb') + }) +}) + +function newConnectionHolder (params, connectionHolderInit = () => {}) { + const connectionHolder = new ConnectionHolder(params) + connectionHolderInit(connectionHolder) + return new ReadOnlyConnectionHolder(connectionHolder) +} + +class RecordingConnectionProvider extends SingleConnectionProvider { + constructor (connections) { + super(Promise.resolve()) + this.connectionPromises = connections.map(conn => Promise.resolve(conn)) + this.acquireConnectionInvoked = 0 + } + + acquireConnection (mode, database) { + return this.connectionPromises[this.acquireConnectionInvoked++] + } +} + +function newSingleConnectionProvider (connection) { + return new SingleConnectionProvider(Promise.resolve(connection)) +} + +/** + * @param {Connection} connection + * @param {*} expectedServerVersion + */ +function verifyConnection (connection, expectedServerVersion) { + expect(connection).toBeDefined() + expect(connection.server).toBeDefined() + expect(connection.server.version).toEqual(expectedServerVersion) +} diff --git a/test/internal/shared-neo4j.js b/test/internal/shared-neo4j.js index 0c7e56cc4..13dffc88c 100644 --- a/test/internal/shared-neo4j.js +++ b/test/internal/shared-neo4j.js @@ -310,7 +310,9 @@ function restart (configOverride) { async function cleanupAndGetProtocolVersion (driver) { const session = driver.session({ defaultAccessMode: neo4j.session.WRITE }) try { - const result = await session.run('MATCH (n) DETACH DELETE n') + const result = await session.writeTransaction(tx => + tx.run('MATCH (n) DETACH DELETE n') + ) return result.summary.server.protocolVersion } finally { await session.close() @@ -320,7 +322,9 @@ async function cleanupAndGetProtocolVersion (driver) { async function getEdition (driver) { const session = driver.session({ defaultAccessMode: neo4j.session.READ }) try { - const result = await session.run('CALL dbms.components() YIELD edition') + const result = await session.readTransaction(tx => + tx.run('CALL dbms.components() YIELD edition') + ) const singleRecord = result.records[0] return singleRecord.get(0) } finally { diff --git a/test/transaction.test.js b/test/transaction.test.js index ed0b397f1..954c976a2 100644 --- a/test/transaction.test.js +++ b/test/transaction.test.js @@ -87,6 +87,21 @@ describe('#integration transaction', () => { .catch(console.log) }, 60000) + it('should populate result.summary.server.protocolVersion for transaction#run', done => { + const tx = session.beginTransaction() + tx.run('CREATE (:TXNode1)') + .then(result => { + tx.commit() + .then(() => { + expect(result.summary.server.protocolVersion).toBeDefined() + expect(result.summary.server.protocolVersion).not.toBeLessThan(0) + done() + }) + .catch(done.fail.bind(done)) + }) + .catch(done.fail.bind(done)) + }, 60000) + it('should handle interactive session', done => { const tx = session.beginTransaction() tx.run("RETURN 'foo' AS res")