diff --git a/src/driver.js b/src/driver.js index 8d3683ecc..065a9e450 100644 --- a/src/driver.js +++ b/src/driver.js @@ -178,13 +178,19 @@ class Driver { * @param {string} param.database - the database this session will operate on. * @returns {RxSession} new reactive session. */ - rxSession ({ defaultAccessMode = WRITE, bookmarks, database = '' } = {}) { + rxSession ({ + defaultAccessMode = WRITE, + bookmarks, + database = '', + fetchSize + } = {}) { return new RxSession({ session: this._newSession({ defaultAccessMode, bookmarks, database, - reactive: true + reactive: true, + fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize) }), config: this._config }) diff --git a/src/internal/connection-holder.js b/src/internal/connection-holder.js index 60eaac442..bb817fcf9 100644 --- a/src/internal/connection-holder.js +++ b/src/internal/connection-holder.js @@ -64,7 +64,7 @@ export default class ConnectionHolder { /** * Make this holder initialize new connection if none exists already. - * @return {undefined} + * @return {boolean} */ initializeConnection () { if (this._referenceCount === 0) { @@ -73,8 +73,12 @@ export default class ConnectionHolder { database: this._database, bookmark: this._bookmark }) + } else { + this._referenceCount++ + return false } this._referenceCount++ + return true } /** @@ -141,6 +145,7 @@ export default class ConnectionHolder { class EmptyConnectionHolder extends ConnectionHolder { initializeConnection () { // nothing to initialize + return true } getConnection () { diff --git a/src/result-rx.js b/src/result-rx.js index 2b0fa1705..918af16d3 100644 --- a/src/result-rx.js +++ b/src/result-rx.js @@ -127,8 +127,8 @@ export default class RxResult { subscriptions.push({ unsubscribe: () => { - if (result.discard) { - result.discard() + if (result._cancel) { + result._cancel() } } }) diff --git a/src/session.js b/src/session.js index 03abc4524..a2bf76484 100644 --- a/src/session.js +++ b/src/session.js @@ -120,8 +120,7 @@ class Session { const connectionHolder = this._connectionHolderWithMode(this._mode) let observerPromise - if (!this._hasTx) { - connectionHolder.initializeConnection() + if (!this._hasTx && connectionHolder.initializeConnection()) { observerPromise = connectionHolder .getConnection() .then(connection => customRunner(connection)) diff --git a/test/nested-statements.test.js b/test/nested-statements.test.js new file mode 100644 index 000000000..efa0d93dc --- /dev/null +++ b/test/nested-statements.test.js @@ -0,0 +1,153 @@ +/** + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import neo4j from '../src' +import { statementType } from '../src/result-summary' +import Session from '../src/session' +import { READ } from '../src/driver' +import SingleConnectionProvider from '../src/internal/connection-provider-single' +import FakeConnection from './internal/fake-connection' +import sharedNeo4j from './internal/shared-neo4j' +import _ from 'lodash' +import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version' +import { isString } from '../src/internal/util' +import testUtils from './internal/test-utils' +import { newError, PROTOCOL_ERROR, SESSION_EXPIRED } from '../src/error' +import ServerAddress from '../src/internal/server-address' +import { + bufferCount, + catchError, + concat, + flatMap, + map, + materialize, + toArray +} from 'rxjs/operators' +import { Notification, throwError } from 'rxjs' + +describe('#integration session', () => { + let driver + let session + // eslint-disable-next-line no-unused-vars + let serverVersion + let originalTimeout + + beforeEach(async () => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken) + session = driver.session({ fetchSize: 2 }) + originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL + jasmine.DEFAULT_TIMEOUT_INTERVAL = 30000 + + serverVersion = await sharedNeo4j.cleanupAndGetVersion(driver) + }) + + afterEach(async () => { + jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout + await driver.close() + }) + + it('should handle nested queries within one transaction', done => { + const size = 20 + let count = 0 + const tx = session.beginTransaction() + const results = [] + tx.run('UNWIND range(1, $size) AS x RETURN x', { size: size }).subscribe({ + onNext: record => { + const x = record.get('x').toInt() + let index = 0 + const result = tx.run( + 'UNWIND range (1, $x) AS x CREATE (n:Node {id: x}) RETURN n.id', + { x: x } + ) + results.push(result) + result.subscribe({ + onNext (record) { + const value = record.get('n.id') + index++ + expect(value.toInt()).toEqual(index) + }, + onCompleted (summary) { + expect(index).toEqual(x) + count += x + } + }) + }, + onCompleted: () => { + Promise.all(results).then(() => { + tx.commit().then(() => { + expect(count).toBe(((1 + size) * size) / 2) + session.close().then(() => done()) + }) + }) + }, + onError: error => { + console.log(error) + } + }) + }) + + it('should give proper error when nesting queries within one session', done => { + const size = 20 + const count = 0 + const result = session.run('UNWIND range(1, $size) AS x RETURN x', { + size: size + }) + result.subscribe({ + onNext: async record => { + const x = record.get('x').toInt() + await expectAsync( + session.run('CREATE (n:Node {id: $x}) RETURN n.id', { x: x }) + ).toBeRejectedWith( + jasmine.objectContaining({ + message: + 'Statements cannot be run directly on a session with an open transaction; ' + + 'either run from within the transaction or use a different session.' + }) + ) + }, + onCompleted: () => { + session.close().then(() => done()) + }, + onError: error => { + console.log(error) + } + }) + }) + + it('should handle sequential query runs within one session', done => { + const size = 20 + let count = 0 + session + .run('UNWIND range(1, $size) AS x RETURN x', { size: size }) + .then(async result => { + for (const record of result.records) { + const x = record.get('x') + const innerResult = await session.run( + 'CREATE (n:Node {id: $x}) RETURN n.id', + { x: x } + ) + expect(innerResult.records.length).toEqual(1) + expect(innerResult.records[0].get('n.id')).toEqual(x) + count++ + } + expect(count).toEqual(size) + session.close().then(() => done()) + }) + }) +}) diff --git a/test/rx/nested-statements.test.js b/test/rx/nested-statements.test.js new file mode 100644 index 000000000..2830885c9 --- /dev/null +++ b/test/rx/nested-statements.test.js @@ -0,0 +1,127 @@ +/** + * Copyright (c) 2002-2019 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Notification, throwError } from 'rxjs' +import { + flatMap, + materialize, + toArray, + concat, + map, + bufferCount, + catchError +} from 'rxjs/operators' +import neo4j from '../../src' +import { ServerVersion, VERSION_4_0_0 } from '../../src/internal/server-version' +import RxSession from '../../src/session-rx' +import sharedNeo4j from '../internal/shared-neo4j' +import { newError } from '../../src/error' + +describe('#integration-rx transaction', () => { + let driver + /** @type {RxSession} */ + let session + /** @type {ServerVersion} */ + let serverVersion + + beforeEach(async () => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken) + session = driver.rxSession() + + serverVersion = await sharedNeo4j.cleanupAndGetVersion(driver) + }) + + afterEach(async () => { + if (session) { + await session.close().toPromise() + } + await driver.close() + }) + + it('should handle nested queries within one transaction', async () => { + const size = 1024 + if (serverVersion.compareTo(VERSION_4_0_0) < 0) { + return + } + + const messages = await session + .beginTransaction() + .pipe( + flatMap(txc => + txc + .run('UNWIND RANGE(1, $size) AS x RETURN x', { size }) + .records() + .pipe( + map(r => r.get(0)), + bufferCount(50), + flatMap(x => + txc + .run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', { + x + }) + .records() + ), + map(r => r.get(0)), + concat(txc.commit()), + catchError(err => txc.rollback().pipe(concat(throwError(err)))), + materialize(), + toArray() + ) + ) + ) + .toPromise() + + expect(messages.length).toBe(size + 1) + expect(messages[size]).toEqual(Notification.createComplete()) + }) + + it('should give proper error when nesting queries within one session', async () => { + const size = 1024 + if (serverVersion.compareTo(VERSION_4_0_0) < 0) { + return + } + + const result = await session + .run('UNWIND RANGE(1, $size) AS x RETURN x', { size }) + .records() + .pipe( + map(r => r.get(0)), + bufferCount(50), + flatMap(x => + session + .run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', { + x + }) + .records() + ), + map(r => r.get(0)), + materialize(), + toArray() + ) + .toPromise() + + expect(result).toEqual([ + Notification.createError( + jasmine.stringMatching( + /Statements cannot be run directly on a session with an open transaction/ + ) + ) + ]) + }) +}) diff --git a/test/rx/transaction.test.js b/test/rx/transaction.test.js index bddb8a4a4..37d542bdc 100644 --- a/test/rx/transaction.test.js +++ b/test/rx/transaction.test.js @@ -101,7 +101,7 @@ describe('#integration-rx transaction', () => { .run('CREATE (n:Node {id: 42}) RETURN n') .records() .pipe( - map(r => r.get('n').properties['id']), + map(r => r.get('n').properties.id), concat(txc.commit()) ) ), @@ -130,7 +130,7 @@ describe('#integration-rx transaction', () => { .run('CREATE (n:Node {id: 42}) RETURN n') .records() .pipe( - map(r => r.get('n').properties['id']), + map(r => r.get('n').properties.id), concat(txc.rollback()) ) ), @@ -539,43 +539,6 @@ describe('#integration-rx transaction', () => { expect(summary).toBeTruthy() }) - it('should handle nested queries', async () => { - const size = 1024 - if (serverVersion.compareTo(VERSION_4_0_0) < 0) { - return - } - - const messages = await session - .beginTransaction() - .pipe( - flatMap(txc => - txc - .run('UNWIND RANGE(1, $size) AS x RETURN x', { size }) - .records() - .pipe( - map(r => r.get(0)), - bufferCount(50), - flatMap(x => - txc - .run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', { - x - }) - .records() - ), - map(r => r.get(0)), - concat(txc.commit()), - catchError(err => txc.rollback().pipe(concat(throwError(err)))), - materialize(), - toArray() - ) - ) - ) - .toPromise() - - expect(messages.length).toBe(size + 1) - expect(messages[size]).toEqual(Notification.createComplete()) - }) - async function verifyNoFailureIfNotExecuted (commit) { if (serverVersion.compareTo(VERSION_4_0_0) < 0) { return @@ -732,7 +695,7 @@ describe('#integration-rx transaction', () => { .run('CREATE (n:Node {id: $id}) RETURN n', { id: neo4j.int(id) }) .records() .pipe( - map(r => r.get('n').properties['id']), + map(r => r.get('n').properties.id), materialize(), toArray() ) diff --git a/test/session.test.js b/test/session.test.js index 829667bf5..9beeb8794 100644 --- a/test/session.test.js +++ b/test/session.test.js @@ -881,43 +881,6 @@ describe('#integration session', () => { }) }) - it('should be able to do nested queries', done => { - session - .run( - 'CREATE (knight:Person:Knight {name: $name1, castle: $castle})' + - 'CREATE (king:Person {name: $name2, title: $title})', - { name1: 'Lancelot', castle: 'Camelot', name2: 'Arthur', title: 'King' } - ) - .then(() => { - session - .run( - 'MATCH (knight:Person:Knight) WHERE knight.castle = $castle RETURN id(knight) AS knight_id', - { castle: 'Camelot' } - ) - .subscribe({ - onNext: record => { - session.run( - 'MATCH (knight) WHERE id(knight) = $id MATCH (king:Person) WHERE king.name = $king CREATE (knight)-[:DEFENDS]->(king)', - { id: record.get('knight_id'), king: 'Arthur' } - ) - }, - onCompleted: () => { - session - .run('MATCH (:Knight)-[:DEFENDS]->() RETURN count(*)') - .then(result => { - const count = result.records[0].get(0).toInt() - expect(count).toEqual(1) - }) - .then(() => session.close()) - .then(() => done()) - }, - onError: error => { - console.log(error) - } - }) - }) - }) - it('should send multiple bookmarks', async () => { const nodeCount = 17 const bookmarks = [] diff --git a/types/driver.d.ts b/types/driver.d.ts index 716bc5875..c4f9ffc39 100644 --- a/types/driver.d.ts +++ b/types/driver.d.ts @@ -69,7 +69,8 @@ declare interface Driver { session({ defaultAccessMode, bookmarks, - database + database, + fetchSize }?: { defaultAccessMode?: SessionMode bookmarks?: string | string[] @@ -80,10 +81,12 @@ declare interface Driver { rxSession({ defaultAccessMode, bookmarks, - database + database, + fetchSize }?: { defaultAccessMode?: SessionMode bookmarks?: string | string[] + fetchSize?: number database?: string }): RxSession