diff --git a/src/v1/internal/http/http-driver.js b/src/v1/internal/http/http-driver.js index 2e83a5a4b..3db3c2421 100644 --- a/src/v1/internal/http/http-driver.js +++ b/src/v1/internal/http/http-driver.js @@ -19,30 +19,20 @@ import Driver from '../../driver'; import HttpSession from './http-session'; +import HttpSessionTracker from './http-session-tracker'; export default class HttpDriver extends Driver { constructor(url, userAgent, token, config) { super(url, userAgent, token, config); - this._sessionIdGenerator = 0; - this._openSessions = {}; + this._sessionTracker = new HttpSessionTracker(); } session() { - const id = this._sessionIdGenerator; - this._sessionIdGenerator++; - const session = new HttpSession(this._url, this._token, this._config); - this._openSessions[id] = session; - return session; + return new HttpSession(this._url, this._token, this._config, this._sessionTracker); } close() { - Object.keys(this._openSessions).forEach(id => { - const session = this._openSessions[id]; - if (session) { - session.close(); - } - delete this._openSessions[id]; - }); + return this._sessionTracker.close(); } } diff --git a/src/v1/internal/http/http-request-runner.js b/src/v1/internal/http/http-request-runner.js new file mode 100644 index 000000000..251a08c84 --- /dev/null +++ b/src/v1/internal/http/http-request-runner.js @@ -0,0 +1,194 @@ +/** + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import StreamObserver from '../stream-observer'; +import HttpResponseConverter from './http-response-converter'; +import {Neo4jError, SERVICE_UNAVAILABLE} from '../../error'; + +export default class HttpRequestRunner { + + constructor(url, authToken) { + this._url = url; + this._authToken = authToken; + this._converter = new HttpResponseConverter(); + } + + /** + * Send a HTTP request to begin a transaction. + * @return {Promise} promise resolved with the transaction id or rejected with an error. + */ + beginTransaction() { + const url = beginTransactionUrl(this._url); + return sendRequest('POST', url, null, this._authToken).then(responseJson => { + const neo4jError = this._converter.extractError(responseJson); + if (neo4jError) { + throw neo4jError; + } + return this._converter.extractTransactionId(responseJson); + }); + } + + /** + * Send a HTTP request to commit a transaction. + * @param {number} transactionId id of the transaction to commit. + * @return {Promise} promise resolved if transaction got committed or rejected when commit failed. + */ + commitTransaction(transactionId) { + const url = commitTransactionUrl(this._url, transactionId); + return sendRequest('POST', url, null, this._authToken).then(responseJson => { + const neo4jError = this._converter.extractError(responseJson); + if (neo4jError) { + throw neo4jError; + } + }); + } + + /** + * Send a HTTP request to rollback a transaction. + * @param {number} transactionId id of the transaction to rollback. + * @return {Promise} promise resolved if transaction got rolled back or rejected when rollback failed. + */ + rollbackTransaction(transactionId) { + const url = transactionUrl(this._url, transactionId); + return sendRequest('DELETE', url, null, this._authToken).then(responseJson => { + const neo4jError = this._converter.extractError(responseJson); + if (neo4jError) { + throw neo4jError; + } + }); + } + + /** + * Send a HTTP request to execute a query in a transaction with the given id. + * @param {number} transactionId the transaction id. + * @param {string} statement the cypher query. + * @param {object} parameters the cypher query parameters. + * @return {Promise} a promise resolved with {@link StreamObserver} containing either records or error. + */ + runQuery(transactionId, statement, parameters) { + const streamObserver = new StreamObserver(); + const url = transactionUrl(this._url, transactionId); + const body = createStatementJson(statement, parameters, this._converter, streamObserver); + if (!body) { + // unable to encode given statement and parameters, return a failed stream observer + return Promise.resolve(streamObserver); + } + + return sendRequest('POST', url, body, this._authToken).then(responseJson => { + processResponseJson(responseJson, this._converter, streamObserver); + }).catch(error => { + streamObserver.onError(error); + }).then(() => { + return streamObserver; + }); + } +} + +function sendRequest(method, url, bodyString, authToken) { + try { + const options = { + method: method, + headers: createHttpHeaders(authToken), + body: bodyString + }; + + return new Promise((resolve, reject) => { + fetch(url, options) + .then(response => response.json()) + .then(responseJson => resolve(responseJson)) + .catch(error => reject(new Neo4jError(error.message, SERVICE_UNAVAILABLE))); + }); + } catch (e) { + return Promise.reject(e); + } +} + +function createHttpHeaders(authToken) { + const headers = new Headers(); + headers.append('Accept', 'application/json; charset=UTF-8'); + headers.append('Content-Type', 'application/json'); + headers.append('Authorization', 'Basic ' + btoa(authToken.principal + ':' + authToken.credentials)); + return headers; +} + +function createStatementJson(statement, parameters, converter, streamObserver) { + try { + return createStatementJsonOrThrow(statement, parameters, converter); + } catch (e) { + streamObserver.onError(e); + return null; + } +} + +function createStatementJsonOrThrow(statement, parameters, converter) { + const encodedParameters = converter.encodeStatementParameters(parameters); + return JSON.stringify({ + statements: [{ + statement: statement, + parameters: encodedParameters, + resultDataContents: ['row', 'graph'], + includeStats: true + }] + }); +} + +function processResponseJson(responseJson, converter, streamObserver) { + if (!responseJson) { + // request failed and there is no response + return; + } + + try { + processResponseJsonOrThrow(responseJson, converter, streamObserver); + } catch (e) { + streamObserver.onError(e); + } +} + +function processResponseJsonOrThrow(responseJson, converter, streamObserver) { + const neo4jError = converter.extractError(responseJson); + if (neo4jError) { + streamObserver.onError(neo4jError); + } else { + const recordMetadata = converter.extractRecordMetadata(responseJson); + streamObserver.onCompleted(recordMetadata); + + const rawRecords = converter.extractRawRecords(responseJson); + rawRecords.forEach(rawRecord => streamObserver.onNext(rawRecord)); + + const statementMetadata = converter.extractStatementMetadata(responseJson); + streamObserver.onCompleted(statementMetadata); + } +} + +function beginTransactionUrl(baseUrl) { + return createUrl(baseUrl, '/db/data/transaction'); +} + +function commitTransactionUrl(baseUrl, transactionId) { + return transactionUrl(baseUrl, transactionId) + '/commit'; +} + +function transactionUrl(baseUrl, transactionId) { + return beginTransactionUrl(baseUrl) + '/' + transactionId; +} + +function createUrl(baseUrl, path) { + return `${baseUrl.scheme}://${baseUrl.host}:${baseUrl.port}${path}`; +} diff --git a/src/v1/internal/http/http-data-converter.js b/src/v1/internal/http/http-response-converter.js similarity index 72% rename from src/v1/internal/http/http-data-converter.js rename to src/v1/internal/http/http-response-converter.js index df1827724..5638b15b7 100644 --- a/src/v1/internal/http/http-data-converter.js +++ b/src/v1/internal/http/http-response-converter.js @@ -19,25 +19,18 @@ import {isInt} from '../../integer'; import {Node, Path, PathSegment, Relationship} from '../../graph-types'; -import {Neo4jError, SERVICE_UNAVAILABLE} from '../../error'; +import {Neo4jError, PROTOCOL_ERROR} from '../../error'; +import {isPoint, Point} from '../../spatial-types'; +import {isDate, isDateTime, isDuration, isLocalDateTime, isLocalTime, isTime} from '../../temporal-types'; const CREDENTIALS_EXPIRED_CODE = 'Neo.ClientError.Security.CredentialsExpired'; -export default class HttpDataConverter { +export default class HttpResponseConverter { encodeStatementParameters(parameters) { return encodeQueryParameters(parameters); } - /** - * Convert network error to a {@link Neo4jError}. - * @param {object} error the error to convert. - * @return {Neo4jError} new driver friendly error. - */ - convertNetworkError(error) { - return new Neo4jError(error.message, SERVICE_UNAVAILABLE); - } - /** * Attempts to extract error from transactional cypher endpoint response and convert it to {@link Neo4jError}. * @param {object} response the response. @@ -59,6 +52,25 @@ export default class HttpDataConverter { return null; } + /** + * Extracts transaction id from the db/data/transaction endpoint response. + * @param {object} response the response. + * @return {number} the transaction id. + */ + extractTransactionId(response) { + const commitUrl = response.commit; + if (commitUrl) { + // extract id 42 from commit url like 'http://localhost:7474/db/data/transaction/42/commit' + const url = commitUrl.replace('/commit', ''); + const transactionIdString = url.substring(url.lastIndexOf('/') + 1); + const transactionId = parseInt(transactionIdString, 10); + if (transactionId || transactionId === 0) { + return transactionId; + } + } + throw new Neo4jError(`Unable to extract transaction id from the response JSON: ${JSON.stringify(response)}`); + } + /** * Extracts record metadata (array of column names) from transactional cypher endpoint response. * @param {object} response the response. @@ -127,11 +139,25 @@ function encodeQueryParameters(parameters) { function encodeQueryParameter(value) { if (value instanceof Node) { - throw new Neo4jError('It is not allowed to pass nodes in query parameters'); + throw new Neo4jError('It is not allowed to pass nodes in query parameters', PROTOCOL_ERROR); } else if (value instanceof Relationship) { - throw new Neo4jError('It is not allowed to pass relationships in query parameters'); + throw new Neo4jError('It is not allowed to pass relationships in query parameters', PROTOCOL_ERROR); } else if (value instanceof Path) { - throw new Neo4jError('It is not allowed to pass paths in query parameters'); + throw new Neo4jError('It is not allowed to pass paths in query parameters', PROTOCOL_ERROR); + } else if (isPoint(value)) { + throw newUnsupportedParameterError('points'); + } else if (isDate(value)) { + throw newUnsupportedParameterError('dates'); + } else if (isDateTime(value)) { + throw newUnsupportedParameterError('date-time'); + } else if (isDuration(value)) { + throw newUnsupportedParameterError('durations'); + } else if (isLocalDateTime(value)) { + throw newUnsupportedParameterError('local date-time'); + } else if (isLocalTime(value)) { + throw newUnsupportedParameterError('local time'); + } else if (isTime(value)) { + throw newUnsupportedParameterError('time'); } else if (isInt(value)) { return value.toNumber(); } else if (Array.isArray(value)) { @@ -143,6 +169,11 @@ function encodeQueryParameter(value) { } } +function newUnsupportedParameterError(name) { + return new Neo4jError(`It is not allowed to pass ${name} in query parameters when using HTTP endpoint. ` + + `Please consider using Cypher functions to create ${name} so that query parameters are plain objects.`, PROTOCOL_ERROR); +} + function extractResult(response) { const results = response.results; if (results) { @@ -212,23 +243,25 @@ function extractRawRecordElement(index, data, nodesById, relationshipsById) { const elementMetadata = data.meta ? data.meta[index] : null; if (elementMetadata) { - // element is either a Node, Relationship or Path - return convertComplexValue(elementMetadata, nodesById, relationshipsById); + // element is either a graph, spatial or temporal type + return convertComplexValue(element, elementMetadata, nodesById, relationshipsById); } else { // element is a primitive, like number, string, array or object return convertPrimitiveValue(element); } } -function convertComplexValue(elementMetadata, nodesById, relationshipsById) { +function convertComplexValue(element, elementMetadata, nodesById, relationshipsById) { if (isNodeMetadata(elementMetadata)) { return nodesById[elementMetadata.id]; } else if (isRelationshipMetadata(elementMetadata)) { return relationshipsById[elementMetadata.id]; } else if (isPathMetadata(elementMetadata)) { return convertPath(elementMetadata, nodesById, relationshipsById); + } else if (isPointMetadata(elementMetadata)) { + return convertPoint(element); } else { - return null; + return element; } } @@ -285,6 +318,42 @@ function createPath(pathSegments) { return new Path(pathStartNode, pathEndNode, pathSegments); } +function convertPoint(element) { + const type = element.type; + if (type !== 'Point') { + throw new Neo4jError(`Unexpected Point type received: ${JSON.stringify(element)}`); + } + + const coordinates = element.coordinates; + if (!Array.isArray(coordinates) && (coordinates.length !== 2 || coordinates.length !== 3)) { + throw new Neo4jError(`Unexpected Point coordinates received: ${JSON.stringify(element)}`); + } + + const srid = convertCrsToId(element); + + return new Point(srid, ...coordinates); +} + +function convertCrsToId(element) { + const crs = element.crs; + if (!crs || !crs.name) { + throw new Neo4jError(`Unexpected Point crs received: ${JSON.stringify(element)}`); + } + const name = crs.name.toLowerCase(); + + if (name === 'wgs-84') { + return 4326; + } else if (name === 'wgs-84-3d') { + return 4979; + } else if (name === 'cartesian') { + return 7203; + } else if (name === 'cartesian-3d') { + return 9157; + } else { + throw new Neo4jError(`Unexpected Point crs received: ${JSON.stringify(element)}`); + } +} + function convertPrimitiveValue(element) { if (element == null || element === undefined) { return null; @@ -307,11 +376,19 @@ function convertNumber(value) { } function isNodeMetadata(metadata) { - return !Array.isArray(metadata) && typeof metadata === 'object' && metadata.type === 'node'; + return isMetadataForType('node', metadata); } function isRelationshipMetadata(metadata) { - return !Array.isArray(metadata) && typeof metadata === 'object' && metadata.type === 'relationship'; + return isMetadataForType('relationship', metadata); +} + +function isPointMetadata(metadata) { + return isMetadataForType('point', metadata); +} + +function isMetadataForType(name, metadata) { + return !Array.isArray(metadata) && typeof metadata === 'object' && metadata.type === name; } function isPathMetadata(metadata) { diff --git a/src/v1/internal/http/http-session-tracker.js b/src/v1/internal/http/http-session-tracker.js new file mode 100644 index 000000000..b3f87df0c --- /dev/null +++ b/src/v1/internal/http/http-session-tracker.js @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export default class HttpSessionTracker { + + constructor() { + this._openSessions = new Set(); + } + + /** + * Record given session as open. + * @param {HttpSession} session the newly open session. + */ + sessionOpened(session) { + this._openSessions.add(session); + } + + /** + * Record given session as close. + * @param {HttpSession} session the just closed session. + */ + sessionClosed(session) { + this._openSessions.delete(session); + } + + /** + * Close this tracker and all open sessions. + */ + close() { + const sessions = Array.from(this._openSessions); + this._openSessions.clear(); + return Promise.all(sessions.map(session => closeSession(session))); + } +} + +/** + * Close given session and get a promise back. + * @param {HttpSession} session the session to close. + * @return {Promise} promise resolved when session is closed. + */ +function closeSession(session) { + return new Promise(resolve => { + session.close(() => { + resolve(); + }); + }); +} diff --git a/src/v1/internal/http/http-session.js b/src/v1/internal/http/http-session.js index 601ee0da6..0e455ae6a 100644 --- a/src/v1/internal/http/http-session.js +++ b/src/v1/internal/http/http-session.js @@ -21,13 +21,19 @@ import {WRITE} from '../../driver'; import Session from '../../session'; import {assertCypherStatement} from '../util'; import {Neo4jError} from '../../error'; -import HttpStatementRunner from './http-statement-runner'; +import HttpRequestRunner from './http-request-runner'; +import {EMPTY_CONNECTION_HOLDER} from '../connection-holder'; +import Result from '../../result'; export default class HttpSession extends Session { - constructor(url, authToken, config) { + constructor(url, authToken, config, sessionTracker) { super(WRITE, null, null, config); - this._statementRunner = new HttpStatementRunner(url, authToken); + this._ongoingTransactionIds = []; + this._serverInfoSupplier = createServerInfoSupplier(url); + this._requestRunner = new HttpRequestRunner(url, authToken); + this._sessionTracker = sessionTracker; + this._sessionTracker.sessionOpened(this); } run(statement, parameters = {}) { @@ -37,7 +43,21 @@ export default class HttpSession extends Session { } assertCypherStatement(statement); - return this._statementRunner.run(statement, parameters); + return this._requestRunner.beginTransaction().then(transactionId => { + this._ongoingTransactionIds.push(transactionId); + const queryPromise = this._requestRunner.runQuery(transactionId, statement, parameters); + + return queryPromise.then(streamObserver => { + if (streamObserver.hasFailed()) { + return rollbackTransactionAfterQueryFailure(transactionId, streamObserver, this._requestRunner); + } else { + return commitTransactionAfterQuerySuccess(transactionId, streamObserver, this._requestRunner); + } + }).then(streamObserver => { + this._ongoingTransactionIds = this._ongoingTransactionIds.filter(id => id !== transactionId); + return new Result(streamObserver, statement, parameters, this._serverInfoSupplier, EMPTY_CONNECTION_HOLDER); + }); + }); } beginTransaction() { @@ -57,10 +77,42 @@ export default class HttpSession extends Session { } close(callback = (() => null)) { - callback(); + const rollbackAllOngoingTransactions = this._ongoingTransactionIds.map(transactionId => + rollbackTransactionSilently(transactionId, this._requestRunner) + ); + + Promise.all(rollbackAllOngoingTransactions) + .then(() => { + this._sessionTracker.sessionClosed(this); + callback(); + }); } } +function rollbackTransactionAfterQueryFailure(transactionId, streamObserver, requestRunner) { + return rollbackTransactionSilently(transactionId, requestRunner).then(() => streamObserver); +} + +function commitTransactionAfterQuerySuccess(transactionId, streamObserver, requestRunner) { + return requestRunner.commitTransaction(transactionId).catch(error => { + streamObserver.onError(error); + }).then(() => { + return streamObserver; + }); +} + +function rollbackTransactionSilently(transactionId, requestRunner) { + return requestRunner.rollbackTransaction(transactionId).catch(error => { + // ignore all rollback errors + }); +} + +function createServerInfoSupplier(url) { + return () => { + return {server: {address: url.hostAndPort}}; + }; +} + function throwTransactionsNotSupported() { throw new Neo4jError('Experimental HTTP driver does not support transactions'); } diff --git a/src/v1/internal/http/http-statement-runner.js b/src/v1/internal/http/http-statement-runner.js deleted file mode 100644 index 3e92a7584..000000000 --- a/src/v1/internal/http/http-statement-runner.js +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Copyright (c) 2002-2018 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import StreamObserver from '../stream-observer'; -import Result from '../../result'; -import {EMPTY_CONNECTION_HOLDER} from '../connection-holder'; -import HttpDataConverter from './http-data-converter'; - -export default class HttpStatementRunner { - - constructor(url, authToken) { - this._serverInfoSupplier = createServerInfoSupplier(url); - this._transactionCommitUrl = createTransactionCommitUrl(url); - this._headers = createHttpHeaders(authToken); - this._converter = new HttpDataConverter(); - } - - run(statement, parameters) { - const streamObserver = new StreamObserver(); - sendPostRequest(statement, parameters, streamObserver, this._transactionCommitUrl, this._headers, this._converter); - return new Result(streamObserver, statement, parameters, this._serverInfoSupplier, EMPTY_CONNECTION_HOLDER); - } -} - -function createServerInfoSupplier(url) { - return () => { - return {server: {address: url.hostAndPort}}; - }; -} - -function createTransactionCommitUrl(url) { - return url.scheme + '://' + url.host + ':' + url.port + '/db/data/transaction/commit'; -} - -function createHttpHeaders(authToken) { - const headers = new Headers(); - headers.append('Accept', 'application/json; charset=UTF-8'); - headers.append('Content-Type', 'application/json'); - headers.append('Authorization', 'Basic ' + btoa(authToken.principal + ':' + authToken.credentials)); - return headers; -} - -function sendPostRequest(statement, parameters, streamObserver, transactionCommitUrl, headers, converter) { - try { - const options = { - method: 'POST', - headers: headers, - body: createStatementJson(statement, parameters, converter) - }; - - fetch(transactionCommitUrl, options) - .then(response => response.json()) - .catch(error => processResponseError(error, converter, streamObserver)) - .then(responseJson => processResponseJson(responseJson, converter, streamObserver)); - - } catch (e) { - streamObserver.onError(e); - } -} - -function createStatementJson(statement, parameters, converter, streamObserver) { - try { - return createStatementJsonOrThrow(statement, parameters, converter); - } catch (e) { - streamObserver.onError(e); - } -} - -function createStatementJsonOrThrow(statement, parameters, converter) { - const encodedParameters = converter.encodeStatementParameters(parameters); - return JSON.stringify({ - statements: [{ - statement: statement, - parameters: encodedParameters, - resultDataContents: ['row', 'graph'], - includeStats: true - }] - }); -} - -function processResponseError(error, converter, streamObserver) { - const neo4jError = converter.convertNetworkError(error); - streamObserver.onError(neo4jError); -} - -function processResponseJson(responseJson, converter, streamObserver) { - if (!responseJson) { - // request failed and there is no response - return; - } - - try { - processResponseJsonOrThrow(responseJson, converter, streamObserver); - } catch (e) { - streamObserver.onError(e); - } -} - -function processResponseJsonOrThrow(responseJson, converter, streamObserver) { - const neo4jError = converter.extractError(responseJson); - if (neo4jError) { - streamObserver.onError(neo4jError); - } else { - const recordMetadata = converter.extractRecordMetadata(responseJson); - streamObserver.onCompleted(recordMetadata); - - const rawRecords = converter.extractRawRecords(responseJson); - rawRecords.forEach(rawRecord => streamObserver.onNext(rawRecord)); - - const statementMetadata = converter.extractStatementMetadata(responseJson); - streamObserver.onCompleted(statementMetadata); - } -} diff --git a/src/v1/internal/stream-observer.js b/src/v1/internal/stream-observer.js index b8863b228..71168a267 100644 --- a/src/v1/internal/stream-observer.js +++ b/src/v1/internal/stream-observer.js @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import Record from "../record"; +import Record from '../record'; /** * Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses @@ -148,6 +148,10 @@ class StreamObserver { } this._observer = observer; } + + hasFailed() { + return this._hasFailed; + } } export default StreamObserver; diff --git a/test/internal/http-driver.test.js b/test/internal/http/http-driver.test.js similarity index 52% rename from test/internal/http-driver.test.js rename to test/internal/http/http-driver.test.js index 822dfdb0e..38411fd58 100644 --- a/test/internal/http-driver.test.js +++ b/test/internal/http/http-driver.test.js @@ -17,21 +17,24 @@ * limitations under the License. */ -import neo4j from '../../src/v1'; -import sharedNeo4j from '../internal/shared-neo4j'; -import testUtils from './test-utils'; +import neo4j from '../../../src/v1'; +import sharedNeo4j from '../../internal/shared-neo4j'; +import testUtils from '.././test-utils'; +import {ServerVersion, VERSION_3_1_0, VERSION_3_4_0} from '../../../src/v1/internal/server-version'; describe('http driver', () => { let boltDriver; let httpDriver; + let serverVersion; beforeEach(done => { boltDriver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {disableLosslessIntegers: true}); httpDriver = neo4j.driver('http://localhost:7474', sharedNeo4j.authToken); const session = boltDriver.session(); - session.run('MATCH (n) DETACH DELETE n').then(() => { + session.run('MATCH (n) DETACH DELETE n').then(result => { + serverVersion = ServerVersion.fromString(result.summary.server.version); session.close(() => { done(); }); @@ -240,6 +243,258 @@ describe('http driver', () => { }); }); + it('should terminate query waiting on a lock when session is closed', done => { + if (testUtils.isServer() || !databaseSupportsTransactionTerminationInLocks()) { + done(); + return; + } + + const boltSession = boltDriver.session(); + boltSession.run(`CREATE (:Node {name: 'foo'})`).then(() => { + const boltTx = boltSession.beginTransaction(); + boltTx.run(`MATCH (n:Node {name: 'foo'}) SET n.name = 'bar'`).then(() => { + // node should now be locked + + const httpSession = httpDriver.session(); + httpSession.run(`MATCH (n:Node {name: 'foo'}) SET n.name = 'baz'`).then(() => { + boltSession.close(() => done.fail('HTTP query was successful but failure expected')); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual('Neo.DatabaseError.Statement.ExecutionFailed'); + expect(error.message.indexOf('transaction has been terminated')).not.toBeLessThan(0); + boltSession.close(() => done()); + }); + + setTimeout(() => { + httpSession.close(); + }, 2000); + + }); + }); + }, 20000); + + it('should fail to pass node as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Node(neo4j.int(1), ['Person'], {name: 'Bob'}), done); + }); + + it('should fail to pass relationship as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Relationship(neo4j.int(1), neo4j.int(2), neo4j.int(3), 'KNOWS', {since: 42}), done); + }); + + it('should fail to pass path as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const node1 = new neo4j.types.Node(neo4j.int(1), ['Person'], {name: 'Alice'}); + const node2 = new neo4j.types.Node(neo4j.int(2), ['Person'], {name: 'Bob'}); + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Path(node1, node2, []), done); + }); + + it('should fail to pass point as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Point(neo4j.int(42), 1, 2, 3), done); + }); + + it('should fail to pass date as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Date(2000, 10, 12), done); + }); + + it('should fail to pass date-time as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.DateTime(2000, 10, 12, 12, 12, 0, 0, 0, null), done); + }); + + it('should fail to pass duration as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Duration(1, 1, 1, 1), done); + }); + + it('should fail to pass local date-time as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.LocalDateTime(2000, 10, 12, 10, 10, 10), done); + }); + + it('should fail to pass local time as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.LocalTime(12, 12, 12, 0), done); + }); + + it('should fail to pass time as a query parameter', done => { + if (testUtils.isServer()) { + done(); + return; + } + + testUnsupportedQueryParameterWithHttpDriver(new neo4j.types.Time(12, 12, 12, 0, 0), done); + }); + + it('should receive points', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceivingOfResults([ + 'RETURN point({x: 42.341, y: 125.0})', + 'RETURN point({x: 13.2, y: 22.2, z: 33.3})', + 'RETURN point({x: 92.3, y: 71.2, z: 2.12345, crs: "wgs-84-3d"})', + 'RETURN point({longitude: 56.7, latitude: 12.78})' + ], done); + }); + + it('should receive date', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN date({year: 2019, month: 9, day: 28})', + '2019-09-28', + done); + }); + + it('should receive date-time with time zone id', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN datetime({year: 1976, month: 11, day: 1, hour: 19, minute: 20, second: 55, nanosecond: 999111, timezone: "UTC"})', + '1976-11-01T19:20:55.000999111Z[UTC]', + done); + }); + + it('should receive date-time with time zone name', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN datetime({year: 2012, month: 12, day: 12, hour: 1, minute: 9, second: 2, nanosecond: 123, timezone: "-08:30"})', + '2012-12-12T01:09:02.000000123-08:30', + done); + }); + + it('should receive duration', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN duration({months: 3, days: 35, seconds: 19, nanoseconds: 937139})', + 'P3M35DT19.000937139S', + done); + }); + + it('should receive local date-time', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN localdatetime({year: 2032, month: 5, day: 17, hour: 13, minute: 56, second: 51, nanosecond: 999888111})', + '2032-05-17T13:56:51.999888111', + done); + }); + + it('should receive local time', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN localtime({hour: 17, minute: 2, second: 21, nanosecond: 123456789})', + '17:02:21.123456789', + done); + }); + + it('should receive time', done => { + if (testUtils.isServer() || !databaseSupportsSpatialAndTemporalTypes()) { + done(); + return; + } + + testReceiveSingleValueWithHttpDriver( + 'RETURN time({hour: 21, minute: 19, second: 1, nanosecond: 111, timezone: "+03:15"})', + '21:19:01.000000111+03:15', + done); + }); + + it('should close all open sessions when closed', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const session1 = withFakeClose(httpDriver.session()); + const session2 = withFakeClose(httpDriver.session()); + const session3 = withFakeClose(httpDriver.session()); + + expect(session1.closed).toBeFalsy(); + expect(session2.closed).toBeFalsy(); + expect(session3.closed).toBeFalsy(); + + httpDriver.close().then(() => { + expect(session1.closed).toBeTruthy(); + expect(session2.closed).toBeTruthy(); + expect(session3.closed).toBeTruthy(); + done(); + }); + }); + + function testReceiveSingleValueWithHttpDriver(query, expectedValue, done) { + runQueryAndGetResults(query, {}, httpDriver).then(results => { + const receivedValue = results[0][0]; + expect(expectedValue).toEqual(receivedValue); + done(); + }).catch(error => { + done.fail(error); + }); + } + function testSendAndReceiveWithReturnQuery(values, done) { const query = 'RETURN $value'; @@ -304,4 +559,35 @@ describe('http driver', () => { }); } + function testUnsupportedQueryParameterWithHttpDriver(value, done) { + const session = httpDriver.session(); + session.run('RETURN $value', {value: value}).then(() => { + done.fail('Should not be possible to send ' + value); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual(neo4j.error.PROTOCOL_ERROR); + session.close(() => { + done(); + }); + }); + } + + function databaseSupportsTransactionTerminationInLocks() { + return serverVersion.compareTo(VERSION_3_1_0) >= 0; + } + + function databaseSupportsSpatialAndTemporalTypes() { + return serverVersion.compareTo(VERSION_3_4_0) >= 0; + } + + function withFakeClose(httpSession) { + httpSession.closed = false; + const originalClose = httpSession.close.bind(httpSession); + httpSession.close = callback => { + httpSession.closed = true; + originalClose(callback); + }; + return httpSession; + } + }); diff --git a/test/internal/http/http-request-runner.test.js b/test/internal/http/http-request-runner.test.js new file mode 100644 index 000000000..8075bf9a1 --- /dev/null +++ b/test/internal/http/http-request-runner.test.js @@ -0,0 +1,312 @@ +/** + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import HttpRequestRunner from '../../../src/v1/internal/http/http-request-runner'; +import neo4j from '../../../src/v1'; +import sharedNeo4j from '../../internal/shared-neo4j'; +import urlUtil from '../../../src/v1/internal/url-util'; +import testUtils from '.././test-utils'; +import _ from 'lodash'; + +const VALID_URI = 'http://localhost'; +const INVALID_URI = 'http://not-localhost'; + +describe('http request runner', () => { + + it('should begin transaction', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.beginTransaction().then(transactionId => { + verifyTransactionId(transactionId); + done(); + }).catch(error => { + done.fail(error); + }); + }); + + it('should begin and commit transaction', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.beginTransaction().then(transactionId => { + verifyTransactionId(transactionId); + runner.commitTransaction(transactionId).then(() => { + done(); + }).catch(error => { + done.fail(error); + }); + }).catch(error => { + done.fail(error); + }); + }); + + it('should begin and rollback transaction', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.beginTransaction().then(transactionId => { + verifyTransactionId(transactionId); + runner.rollbackTransaction(transactionId).then(() => { + done(); + }).catch(error => { + done.fail(error); + }); + }).catch(error => { + done.fail(error); + }); + }); + + it('should fail to begin transaction with invalid uri', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(INVALID_URI); + + runner.beginTransaction().then(transactionId => { + done.fail(new Error('Should not be possible to begin a transaction with invalid URI, received transactionId: ' + transactionId)); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + done(); + }); + }); + + it('should fail to commit transaction with invalid uri', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(INVALID_URI); + + runner.commitTransaction(42).then(() => { + done.fail(new Error('Should not be possible to commit a transaction with invalid URI')); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + done(); + }); + }); + + it('should fail to rollback transaction with invalid uri', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(INVALID_URI); + + runner.rollbackTransaction(42).then(() => { + done.fail(new Error('Should not be possible to rollback a transaction with invalid URI')); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + done(); + }); + }); + + it('should fail to commit transaction with invalid id', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.commitTransaction(424242).then(() => { + done.fail(new Error('Should not be possible to commit a transaction with invalid id')); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual('Neo.ClientError.Transaction.TransactionNotFound'); + done(); + }); + }); + + it('should fail to rollback transaction with invalid id', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.rollbackTransaction(424242).then(() => { + done.fail(new Error('Should not be possible to rollback a transaction with invalid id')); + }).catch(error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual('Neo.ClientError.Transaction.TransactionNotFound'); + done(); + }); + }); + + it('should run query in transaction', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.beginTransaction().then(transactionId => { + verifyTransactionId(transactionId); + runner.runQuery(transactionId, 'RETURN 42', {}).then(streamObserver => { + streamObserver.subscribe({ + onNext: record => { + expect(record.get(0)).toEqual(42); + }, + onError: error => { + done.fail(error); + }, + onCompleted: () => { + runner.rollbackTransaction(transactionId).catch(error => { + }).then(() => { + done(); + }); + } + }); + }).catch(error => { + done.fail(error); + }); + done(); + }).catch(error => { + done.fail(error); + }); + }); + + it('should fail to run invalid query in transaction', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.beginTransaction().then(transactionId => { + verifyTransactionId(transactionId); + runner.runQuery(transactionId, 'WRONG QUERY', {}).then(streamObserver => { + streamObserver.subscribe({ + onNext: () => { + done.fail(new Error('Should not receive records')); + }, + onError: error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual('Neo.ClientError.Statement.SyntaxError'); + + runner.rollbackTransaction(transactionId).catch(error => { + }).then(() => { + done(); + }); + }, + onCompleted: () => { + done.fail(new Error('Should not complete')); + } + }); + }).catch(error => { + done.fail(error); + }); + done(); + }).catch(error => { + done.fail(error); + }); + }); + + it('should fail to run query in transaction with invalid uri', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(INVALID_URI); + + runner.runQuery(424242, 'RETURN 42', {}).then(streamObserver => { + expect(streamObserver.hasFailed()).toBeTruthy(); + streamObserver.subscribe({ + onNext: () => { + done.fail(new Error('Should not receive records')); + }, + onError: error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + done(); + }, + onCompleted: () => { + done.fail(new Error('Should not complete')); + } + }); + }).catch(error => { + done.fail(error); + }); + }); + + it('should fail to run query in transaction with invalid id', done => { + if (testUtils.isServer()) { + done(); + return; + } + + const runner = newRunner(VALID_URI); + + runner.runQuery(424242, 'RETURN 42', {}).then(streamObserver => { + expect(streamObserver.hasFailed()).toBeTruthy(); + streamObserver.subscribe({ + onNext: () => { + done.fail(new Error('Should not receive records')); + }, + onError: error => { + expect(error.name).toEqual('Neo4jError'); + expect(error.code).toEqual('Neo.ClientError.Transaction.TransactionNotFound'); + done(); + }, + onCompleted: () => { + done.fail(new Error('Should not complete')); + } + }); + }).catch(error => { + done.fail(error); + }); + }); + +}); + +function verifyTransactionId(transactionId) { + expect(transactionId).toBeDefined(); + expect(transactionId).not.toBeNull(); + expect(_.isNumber(transactionId)).toBeTruthy(); +} + +function newRunner(url, username, password) { + username = username ? username : sharedNeo4j.username; + password = password ? password : sharedNeo4j.password; + return new HttpRequestRunner(urlUtil.parseDatabaseUrl(url), neo4j.auth.basic(username, password)); +} diff --git a/test/internal/http/http-session-tracker.test.js b/test/internal/http/http-session-tracker.test.js new file mode 100644 index 000000000..e0c342d30 --- /dev/null +++ b/test/internal/http/http-session-tracker.test.js @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import sharedNeo4j from '../../internal/shared-neo4j'; +import HttpSession from '../../../src/v1/internal/http/http-session'; +import urlUtil from '../../../src/v1/internal/url-util'; +import HttpSessionTracker from '../../../src/v1/internal/http/http-session-tracker'; + +describe('http session tracker', () => { + + it('should close open sessions', done => { + const tracker = new HttpSessionTracker(); + + const session1 = new FakeHttpSession(tracker); + const session2 = new FakeHttpSession(tracker); + const session3 = new FakeHttpSession(tracker); + + tracker.sessionOpened(session1); + tracker.sessionOpened(session2); + tracker.sessionOpened(session3); + + tracker.close().then(() => { + expect(session1.timesClosed).toEqual(1); + expect(session2.timesClosed).toEqual(1); + expect(session3.timesClosed).toEqual(1); + done(); + }); + }); + + it('should not close closed sessions', done => { + const tracker = new HttpSessionTracker(); + + const session1 = new FakeHttpSession(tracker); + const session2 = new FakeHttpSession(tracker); + const session3 = new FakeHttpSession(tracker); + const session4 = new FakeHttpSession(tracker); + + tracker.sessionOpened(session1); + tracker.sessionOpened(session2); + tracker.sessionOpened(session3); + tracker.sessionOpened(session4); + + tracker.sessionClosed(session2); + tracker.sessionClosed(session4); + + tracker.close().then(() => { + expect(session1.timesClosed).toEqual(1); + expect(session2.timesClosed).toEqual(0); + expect(session3.timesClosed).toEqual(1); + expect(session4.timesClosed).toEqual(0); + done(); + }); + }); + +}); + +class FakeHttpSession extends HttpSession { + + constructor(sessionTracker) { + super(urlUtil.parseDatabaseUrl('http://localhost:7474'), sharedNeo4j.authToken, {}, sessionTracker); + this.timesClosed = 0; + } + + close(callback) { + this.timesClosed++; + callback(); + } +}