Skip to content

Throws helpful error when nesting transactions in one session #497

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,19 @@ class Driver {
* @param {string} param.database - the database this session will operate on.
* @returns {RxSession} new reactive session.
*/
rxSession ({ defaultAccessMode = WRITE, bookmarks, database = '' } = {}) {
rxSession ({
defaultAccessMode = WRITE,
bookmarks,
database = '',
fetchSize
} = {}) {
return new RxSession({
session: this._newSession({
defaultAccessMode,
bookmarks,
database,
reactive: true
reactive: true,
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
}),
config: this._config
})
Expand Down
7 changes: 6 additions & 1 deletion src/internal/connection-holder.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export default class ConnectionHolder {

/**
* Make this holder initialize new connection if none exists already.
* @return {undefined}
* @return {boolean}
*/
initializeConnection () {
if (this._referenceCount === 0) {
Expand All @@ -73,8 +73,12 @@ export default class ConnectionHolder {
database: this._database,
bookmark: this._bookmark
})
} else {
this._referenceCount++
return false
}
this._referenceCount++
return true
}

/**
Expand Down Expand Up @@ -141,6 +145,7 @@ export default class ConnectionHolder {
class EmptyConnectionHolder extends ConnectionHolder {
initializeConnection () {
// nothing to initialize
return true
}

getConnection () {
Expand Down
4 changes: 2 additions & 2 deletions src/result-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ export default class RxResult {

subscriptions.push({
unsubscribe: () => {
if (result.discard) {
result.discard()
if (result._cancel) {
result._cancel()
}
}
})
Expand Down
3 changes: 1 addition & 2 deletions src/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ class Session {
const connectionHolder = this._connectionHolderWithMode(this._mode)

let observerPromise
if (!this._hasTx) {
connectionHolder.initializeConnection()
if (!this._hasTx && connectionHolder.initializeConnection()) {
observerPromise = connectionHolder
.getConnection()
.then(connection => customRunner(connection))
Expand Down
153 changes: 153 additions & 0 deletions test/nested-statements.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import neo4j from '../src'
import { statementType } from '../src/result-summary'
import Session from '../src/session'
import { READ } from '../src/driver'
import SingleConnectionProvider from '../src/internal/connection-provider-single'
import FakeConnection from './internal/fake-connection'
import sharedNeo4j from './internal/shared-neo4j'
import _ from 'lodash'
import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version'
import { isString } from '../src/internal/util'
import testUtils from './internal/test-utils'
import { newError, PROTOCOL_ERROR, SESSION_EXPIRED } from '../src/error'
import ServerAddress from '../src/internal/server-address'
import {
bufferCount,
catchError,
concat,
flatMap,
map,
materialize,
toArray
} from 'rxjs/operators'
import { Notification, throwError } from 'rxjs'

describe('#integration session', () => {
let driver
let session
// eslint-disable-next-line no-unused-vars
let serverVersion
let originalTimeout

beforeEach(async () => {
driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken)
session = driver.session({ fetchSize: 2 })
originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL
jasmine.DEFAULT_TIMEOUT_INTERVAL = 30000

serverVersion = await sharedNeo4j.cleanupAndGetVersion(driver)
})

afterEach(async () => {
jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout
await driver.close()
})

it('should handle nested queries within one transaction', done => {
const size = 20
let count = 0
const tx = session.beginTransaction()
const results = []
tx.run('UNWIND range(1, $size) AS x RETURN x', { size: size }).subscribe({
onNext: record => {
const x = record.get('x').toInt()
let index = 0
const result = tx.run(
'UNWIND range (1, $x) AS x CREATE (n:Node {id: x}) RETURN n.id',
{ x: x }
)
results.push(result)
result.subscribe({
onNext (record) {
const value = record.get('n.id')
index++
expect(value.toInt()).toEqual(index)
},
onCompleted (summary) {
expect(index).toEqual(x)
count += x
}
})
},
onCompleted: () => {
Promise.all(results).then(() => {
tx.commit().then(() => {
expect(count).toBe(((1 + size) * size) / 2)
session.close().then(() => done())
})
})
},
onError: error => {
console.log(error)
}
})
})

it('should give proper error when nesting queries within one session', done => {
const size = 20
const count = 0
const result = session.run('UNWIND range(1, $size) AS x RETURN x', {
size: size
})
result.subscribe({
onNext: async record => {
const x = record.get('x').toInt()
await expectAsync(
session.run('CREATE (n:Node {id: $x}) RETURN n.id', { x: x })
).toBeRejectedWith(
jasmine.objectContaining({
message:
'Statements cannot be run directly on a session with an open transaction; ' +
'either run from within the transaction or use a different session.'
})
)
},
onCompleted: () => {
session.close().then(() => done())
},
onError: error => {
console.log(error)
}
})
})

it('should handle sequential query runs within one session', done => {
const size = 20
let count = 0
session
.run('UNWIND range(1, $size) AS x RETURN x', { size: size })
.then(async result => {
for (const record of result.records) {
const x = record.get('x')
const innerResult = await session.run(
'CREATE (n:Node {id: $x}) RETURN n.id',
{ x: x }
)
expect(innerResult.records.length).toEqual(1)
expect(innerResult.records[0].get('n.id')).toEqual(x)
count++
}
expect(count).toEqual(size)
session.close().then(() => done())
})
})
})
127 changes: 127 additions & 0 deletions test/rx/nested-statements.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Notification, throwError } from 'rxjs'
import {
flatMap,
materialize,
toArray,
concat,
map,
bufferCount,
catchError
} from 'rxjs/operators'
import neo4j from '../../src'
import { ServerVersion, VERSION_4_0_0 } from '../../src/internal/server-version'
import RxSession from '../../src/session-rx'
import sharedNeo4j from '../internal/shared-neo4j'
import { newError } from '../../src/error'

describe('#integration-rx transaction', () => {
let driver
/** @type {RxSession} */
let session
/** @type {ServerVersion} */
let serverVersion

beforeEach(async () => {
driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken)
session = driver.rxSession()

serverVersion = await sharedNeo4j.cleanupAndGetVersion(driver)
})

afterEach(async () => {
if (session) {
await session.close().toPromise()
}
await driver.close()
})

it('should handle nested queries within one transaction', async () => {
const size = 1024
if (serverVersion.compareTo(VERSION_4_0_0) < 0) {
return
}

const messages = await session
.beginTransaction()
.pipe(
flatMap(txc =>
txc
.run('UNWIND RANGE(1, $size) AS x RETURN x', { size })
.records()
.pipe(
map(r => r.get(0)),
bufferCount(50),
flatMap(x =>
txc
.run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', {
x
})
.records()
),
map(r => r.get(0)),
concat(txc.commit()),
catchError(err => txc.rollback().pipe(concat(throwError(err)))),
materialize(),
toArray()
)
)
)
.toPromise()

expect(messages.length).toBe(size + 1)
expect(messages[size]).toEqual(Notification.createComplete())
})

it('should give proper error when nesting queries within one session', async () => {
const size = 1024
if (serverVersion.compareTo(VERSION_4_0_0) < 0) {
return
}

const result = await session
.run('UNWIND RANGE(1, $size) AS x RETURN x', { size })
.records()
.pipe(
map(r => r.get(0)),
bufferCount(50),
flatMap(x =>
session
.run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', {
x
})
.records()
),
map(r => r.get(0)),
materialize(),
toArray()
)
.toPromise()

expect(result).toEqual([
Notification.createError(
jasmine.stringMatching(
/Statements cannot be run directly on a session with an open transaction/
)
)
])
})
})
Loading