Skip to content

Commit 50a2df6

Browse files
author
Zhen Li
committed
Throws helpful error when nesting transactions in one session
1 parent a17eee1 commit 50a2df6

File tree

9 files changed

+305
-86
lines changed

9 files changed

+305
-86
lines changed

src/driver.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,19 @@ class Driver {
178178
* @param {string} param.database - the database this session will operate on.
179179
* @returns {RxSession} new reactive session.
180180
*/
181-
rxSession ({ defaultAccessMode = WRITE, bookmarks, database = '' } = {}) {
181+
rxSession ({
182+
defaultAccessMode = WRITE,
183+
bookmarks,
184+
database = '',
185+
fetchSize
186+
} = {}) {
182187
return new RxSession({
183188
session: this._newSession({
184189
defaultAccessMode,
185190
bookmarks,
186191
database,
187-
reactive: true
192+
reactive: true,
193+
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
188194
}),
189195
config: this._config
190196
})

src/internal/connection-holder.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export default class ConnectionHolder {
6464

6565
/**
6666
* Make this holder initialize new connection if none exists already.
67-
* @return {undefined}
67+
* @return {boolean}
6868
*/
6969
initializeConnection () {
7070
if (this._referenceCount === 0) {
@@ -73,8 +73,12 @@ export default class ConnectionHolder {
7373
database: this._database,
7474
bookmark: this._bookmark
7575
})
76+
} else {
77+
this._referenceCount++
78+
return false
7679
}
7780
this._referenceCount++
81+
return true
7882
}
7983

8084
/**
@@ -141,6 +145,7 @@ export default class ConnectionHolder {
141145
class EmptyConnectionHolder extends ConnectionHolder {
142146
initializeConnection () {
143147
// nothing to initialize
148+
return true
144149
}
145150

146151
getConnection () {

src/result-rx.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ export default class RxResult {
127127

128128
subscriptions.push({
129129
unsubscribe: () => {
130-
if (result.discard) {
131-
result.discard()
130+
if (result._cancel) {
131+
result._cancel()
132132
}
133133
}
134134
})

src/session.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ class Session {
120120
const connectionHolder = this._connectionHolderWithMode(this._mode)
121121

122122
let observerPromise
123-
if (!this._hasTx) {
124-
connectionHolder.initializeConnection()
123+
if (!this._hasTx && connectionHolder.initializeConnection()) {
125124
observerPromise = connectionHolder
126125
.getConnection()
127126
.then(connection => customRunner(connection))

test/nested-statements.test.js

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/**
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import neo4j from '../src'
21+
import { statementType } from '../src/result-summary'
22+
import Session from '../src/session'
23+
import { READ } from '../src/driver'
24+
import SingleConnectionProvider from '../src/internal/connection-provider-single'
25+
import FakeConnection from './internal/fake-connection'
26+
import sharedNeo4j from './internal/shared-neo4j'
27+
import _ from 'lodash'
28+
import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version'
29+
import { isString } from '../src/internal/util'
30+
import testUtils from './internal/test-utils'
31+
import { newError, PROTOCOL_ERROR, SESSION_EXPIRED } from '../src/error'
32+
import ServerAddress from '../src/internal/server-address'
33+
import {
34+
bufferCount,
35+
catchError,
36+
concat,
37+
flatMap,
38+
map,
39+
materialize,
40+
toArray
41+
} from 'rxjs/operators'
42+
import { Notification, throwError } from 'rxjs'
43+
44+
describe('#integration session', () => {
45+
let driver
46+
let session
47+
// eslint-disable-next-line no-unused-vars
48+
let serverVersion
49+
let originalTimeout
50+
51+
beforeEach(async () => {
52+
driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken)
53+
session = driver.session({ fetchSize: 2 })
54+
originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL
55+
jasmine.DEFAULT_TIMEOUT_INTERVAL = 30000
56+
57+
serverVersion = await sharedNeo4j.cleanupAndGetVersion(driver)
58+
})
59+
60+
afterEach(async () => {
61+
jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout
62+
await driver.close()
63+
})
64+
65+
it('should handle nested queries within one transaction', done => {
66+
const size = 20
67+
let count = 0
68+
const tx = session.beginTransaction()
69+
const results = []
70+
tx.run('UNWIND range(1, $size) AS x RETURN x', { size: size }).subscribe({
71+
onNext: record => {
72+
const x = record.get('x').toInt()
73+
let index = 0
74+
const result = tx.run(
75+
'UNWIND range (1, $x) AS x CREATE (n:Node {id: x}) RETURN n.id',
76+
{ x: x }
77+
)
78+
results.push(result)
79+
result.subscribe({
80+
onNext (record) {
81+
const value = record.get('n.id')
82+
index++
83+
expect(value.toInt()).toEqual(index)
84+
},
85+
onCompleted (summary) {
86+
expect(index).toEqual(x)
87+
count += x
88+
}
89+
})
90+
},
91+
onCompleted: () => {
92+
Promise.all(results).then(() => {
93+
tx.commit().then(() => {
94+
expect(count).toBe(((1 + size) * size) / 2)
95+
session.close().then(() => done())
96+
})
97+
})
98+
},
99+
onError: error => {
100+
console.log(error)
101+
}
102+
})
103+
})
104+
105+
it('should give proper error when nesting queries within one session', done => {
106+
const size = 20
107+
const count = 0
108+
const result = session.run('UNWIND range(1, $size) AS x RETURN x', {
109+
size: size
110+
})
111+
result.subscribe({
112+
onNext: async record => {
113+
const x = record.get('x').toInt()
114+
await expectAsync(
115+
session.run('CREATE (n:Node {id: $x}) RETURN n.id', { x: x })
116+
).toBeRejectedWith(
117+
jasmine.objectContaining({
118+
message:
119+
'Statements cannot be run directly on a session with an open transaction; ' +
120+
'either run from within the transaction or use a different session.'
121+
})
122+
)
123+
},
124+
onCompleted: () => {
125+
session.close().then(() => done())
126+
},
127+
onError: error => {
128+
console.log(error)
129+
}
130+
})
131+
})
132+
133+
it('should handle sequential query runs within one session', done => {
134+
const size = 20
135+
let count = 0
136+
session
137+
.run('UNWIND range(1, $size) AS x RETURN x', { size: size })
138+
.then(async result => {
139+
for (const record of result.records) {
140+
const x = record.get('x')
141+
const innerResult = await session.run(
142+
'CREATE (n:Node {id: $x}) RETURN n.id',
143+
{ x: x }
144+
)
145+
expect(innerResult.records.length).toEqual(1)
146+
expect(innerResult.records[0].get('n.id')).toEqual(x)
147+
count++
148+
}
149+
expect(count).toEqual(size)
150+
session.close().then(() => done())
151+
})
152+
})
153+
})

test/rx/nested-statements.test.js

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import { Notification, throwError } from 'rxjs'
21+
import {
22+
flatMap,
23+
materialize,
24+
toArray,
25+
concat,
26+
map,
27+
bufferCount,
28+
catchError
29+
} from 'rxjs/operators'
30+
import neo4j from '../../src'
31+
import { ServerVersion, VERSION_4_0_0 } from '../../src/internal/server-version'
32+
import RxSession from '../../src/session-rx'
33+
import sharedNeo4j from '../internal/shared-neo4j'
34+
import { newError } from '../../src/error'
35+
36+
describe('#integration-rx transaction', () => {
37+
let driver
38+
/** @type {RxSession} */
39+
let session
40+
/** @type {ServerVersion} */
41+
let serverVersion
42+
43+
beforeEach(async () => {
44+
driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken)
45+
session = driver.rxSession()
46+
47+
serverVersion = await sharedNeo4j.cleanupAndGetVersion(driver)
48+
})
49+
50+
afterEach(async () => {
51+
if (session) {
52+
await session.close().toPromise()
53+
}
54+
await driver.close()
55+
})
56+
57+
it('should handle nested queries within one transaction', async () => {
58+
const size = 1024
59+
if (serverVersion.compareTo(VERSION_4_0_0) < 0) {
60+
return
61+
}
62+
63+
const messages = await session
64+
.beginTransaction()
65+
.pipe(
66+
flatMap(txc =>
67+
txc
68+
.run('UNWIND RANGE(1, $size) AS x RETURN x', { size })
69+
.records()
70+
.pipe(
71+
map(r => r.get(0)),
72+
bufferCount(50),
73+
flatMap(x =>
74+
txc
75+
.run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', {
76+
x
77+
})
78+
.records()
79+
),
80+
map(r => r.get(0)),
81+
concat(txc.commit()),
82+
catchError(err => txc.rollback().pipe(concat(throwError(err)))),
83+
materialize(),
84+
toArray()
85+
)
86+
)
87+
)
88+
.toPromise()
89+
90+
expect(messages.length).toBe(size + 1)
91+
expect(messages[size]).toEqual(Notification.createComplete())
92+
})
93+
94+
it('should give proper error when nesting queries within one session', async () => {
95+
const size = 1024
96+
if (serverVersion.compareTo(VERSION_4_0_0) < 0) {
97+
return
98+
}
99+
100+
const result = await session
101+
.run('UNWIND RANGE(1, $size) AS x RETURN x', { size })
102+
.records()
103+
.pipe(
104+
map(r => r.get(0)),
105+
bufferCount(50),
106+
flatMap(x =>
107+
session
108+
.run('UNWIND $x AS id CREATE (n:Node {id: id}) RETURN n.id', {
109+
x
110+
})
111+
.records()
112+
),
113+
map(r => r.get(0)),
114+
materialize(),
115+
toArray()
116+
)
117+
.toPromise()
118+
119+
expect(result).toEqual([
120+
Notification.createError(
121+
jasmine.stringMatching(
122+
/Statements cannot be run directly on a session with an open transaction/
123+
)
124+
)
125+
])
126+
})
127+
})

0 commit comments

Comments
 (0)