Skip to content

Commit f4f6d6b

Browse files
author
Zhen Li
committed
Allow async to make of PULL message to pull in batches
1 parent 30b689a commit f4f6d6b

21 files changed

+196
-35
lines changed

src/internal/bolt-protocol-v4.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
7676
const observer = new ResultStreamObserver({
7777
connection: this._connection,
7878
reactive: reactive,
79-
moreFunction: reactive ? this._requestMore : this._noOp,
80-
discardFunction: reactive ? this._requestDiscard : this._noOp,
79+
fetchSize: fetchSize,
80+
moreFunction: this._requestMore,
81+
discardFunction: this._requestDiscard,
8182
beforeKeys,
8283
afterKeys,
8384
beforeError,
@@ -99,7 +100,11 @@ export default class BoltProtocol extends BoltProtocolV3 {
99100
)
100101

101102
if (!reactive) {
102-
this._connection.write(RequestMessage.pull(), observer, flush)
103+
this._connection.write(
104+
RequestMessage.pull({ n: fetchSize }),
105+
observer,
106+
flush
107+
)
103108
}
104109

105110
return observer

src/internal/stream-observers.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import Record from '../record'
2020
import Connection from './connection'
2121
import { newError, PROTOCOL_ERROR } from '../error'
22-
import { isString } from './util'
2322
import Integer from '../integer'
2423
import { ALL } from './request-message'
2524

@@ -49,7 +48,7 @@ class ResultStreamObserver extends StreamObserver {
4948
* @param {boolean} param.reactive
5049
* @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction -
5150
* @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction -
52-
* @param {number|Integer} param.batchSize -
51+
* @param {number|Integer} param.fetchSize -
5352
* @param {function(err: Error): Promise|void} param.beforeError -
5453
* @param {function(err: Error): Promise|void} param.afterError -
5554
* @param {function(keys: string[]): Promise|void} param.beforeKeys -
@@ -62,7 +61,7 @@ class ResultStreamObserver extends StreamObserver {
6261
reactive = false,
6362
moreFunction,
6463
discardFunction,
65-
batchSize = ALL,
64+
fetchSize = ALL,
6665
beforeError,
6766
afterError,
6867
beforeKeys,
@@ -97,7 +96,7 @@ class ResultStreamObserver extends StreamObserver {
9796
this._moreFunction = moreFunction
9897
this._discardFunction = discardFunction
9998
this._discard = false
100-
this._batchSize = batchSize
99+
this._fetchSize = fetchSize
101100
}
102101

103102
/**
@@ -228,7 +227,6 @@ class ResultStreamObserver extends StreamObserver {
228227

229228
_handleStreaming () {
230229
if (
231-
this._reactive &&
232230
this._head &&
233231
this._observers.some(o => o.onNext || o.onCompleted) &&
234232
!this._streaming
@@ -241,7 +239,7 @@ class ResultStreamObserver extends StreamObserver {
241239
this._moreFunction(
242240
this._connection,
243241
this._statementId,
244-
this._batchSize,
242+
this._fetchSize,
245243
this
246244
)
247245
}

src/result.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ class Result {
8484
*/
8585
summary () {
8686
return new Promise((resolve, reject) => {
87-
this._streamObserverPromise.then(o =>
87+
this._streamObserverPromise.then(o => {
8888
o.subscribe({
8989
onCompleted: metadata => resolve(metadata),
9090
onError: err => reject(err)
9191
})
92-
)
92+
})
9393
})
9494
}
9595

@@ -102,8 +102,8 @@ class Result {
102102
_getOrCreatePromise () {
103103
if (!this._p) {
104104
this._p = new Promise((resolve, reject) => {
105-
let records = []
106-
let observer = {
105+
const records = []
106+
const observer = {
107107
onNext: record => {
108108
records.push(record)
109109
},

src/session.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ class Session {
180180
connectionHolder,
181181
onClose: this._transactionClosed.bind(this),
182182
onBookmark: this._updateBookmark.bind(this),
183-
reactive: this._reactive
183+
reactive: this._reactive,
184+
fetchSize: this._fetchSize
184185
})
185186
tx._begin(this._lastBookmark, txConfig)
186187
return tx

src/transaction.js

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,17 @@ class Transaction {
4343
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
4444
* @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
4545
* @param {boolean} reactive whether this transaction generates reactive streams
46+
* @param {number} fetchSize - the record fetch size in each pulling batch.
4647
*/
47-
constructor ({ connectionHolder, onClose, onBookmark, reactive }) {
48+
constructor ({ connectionHolder, onClose, onBookmark, reactive, fetchSize }) {
4849
this._connectionHolder = connectionHolder
4950
this._reactive = reactive
5051
this._state = _states.ACTIVE
5152
this._onClose = onClose
5253
this._onBookmark = onBookmark
5354
this._onError = this._onErrorCallback.bind(this)
5455
this._onComplete = this._onCompleteCallback.bind(this)
56+
this._fetchSize = fetchSize
5557
}
5658

5759
_begin (bookmark, txConfig) {
@@ -88,7 +90,8 @@ class Transaction {
8890
connectionHolder: this._connectionHolder,
8991
onError: this._onError,
9092
onComplete: this._onComplete,
91-
reactive: this._reactive
93+
reactive: this._reactive,
94+
fetchSize: this._fetchSize
9295
})
9396
}
9497

@@ -100,7 +103,7 @@ class Transaction {
100103
* @returns {Result} New Result
101104
*/
102105
commit () {
103-
let committed = this._state.commit({
106+
const committed = this._state.commit({
104107
connectionHolder: this._connectionHolder,
105108
onError: this._onError,
106109
onComplete: this._onComplete
@@ -124,7 +127,7 @@ class Transaction {
124127
* @returns {Result} New Result
125128
*/
126129
rollback () {
127-
let rolledback = this._state.rollback({
130+
const rolledback = this._state.rollback({
128131
connectionHolder: this._connectionHolder,
129132
onError: this._onError,
130133
onComplete: this._onComplete
@@ -164,7 +167,7 @@ class Transaction {
164167
}
165168
}
166169

167-
let _states = {
170+
const _states = {
168171
// The transaction is running with no explicit success or failure marked
169172
ACTIVE: {
170173
commit: ({ connectionHolder, onError, onComplete }) => {
@@ -182,7 +185,7 @@ let _states = {
182185
run: (
183186
statement,
184187
parameters,
185-
{ connectionHolder, onError, onComplete, reactive }
188+
{ connectionHolder, onError, onComplete, reactive, fetchSize }
186189
) => {
187190
// RUN in explicit transaction can't contain bookmarks and transaction configuration
188191
const observerPromise = connectionHolder
@@ -195,7 +198,8 @@ let _states = {
195198
database: connectionHolder.database(),
196199
beforeError: onError,
197200
afterComplete: onComplete,
198-
reactive: reactive
201+
reactive: reactive,
202+
fetchSize: fetchSize
199203
})
200204
)
201205
.catch(error => new FailedObserver({ error, onError }))

test/internal/node/direct.driver.boltkit.test.js

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,144 @@ describe('#stub-direct direct driver with stub server', () => {
447447
})
448448
})
449449

450+
describe('should allow to change fetch size', () => {
451+
async function verifyFailureOnCommit (version) {
452+
if (!boltStub.supported) {
453+
return
454+
}
455+
456+
const server = await boltStub.start(
457+
`./test/resources/boltstub/${version}/read_in_batch.script`,
458+
9001
459+
)
460+
461+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001', {
462+
fetchSize: 2
463+
})
464+
const session = driver.session({ defaultAccessMode: READ })
465+
466+
const result = await session.run('MATCH (n) RETURN n.name')
467+
const records = result.records
468+
expect(records.length).toEqual(3)
469+
expect(records[0].get(0)).toBe('Bob')
470+
expect(records[1].get(0)).toBe('Alice')
471+
expect(records[2].get(0)).toBe('Tina')
472+
473+
const connectionKey = Object.keys(openConnections(driver))[0]
474+
expect(connectionKey).toBeTruthy()
475+
476+
const connection = openConnections(driver, connectionKey)
477+
await session.close()
478+
479+
// generate a fake fatal error
480+
connection._handleFatalError(
481+
newError('connection reset', SERVICE_UNAVAILABLE)
482+
)
483+
484+
// expect that the connection to be removed from the pool
485+
expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0)
486+
expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy()
487+
// expect that the connection to be unregistered from the open connections registry
488+
expect(openConnections(driver, connectionKey)).toBeFalsy()
489+
490+
await driver.close()
491+
await server.exit()
492+
}
493+
494+
it('v4', () => verifyFailureOnCommit('v4'))
495+
})
496+
497+
describe('should stream in many batches', () => {
498+
async function verifyFailureOnCommit (version) {
499+
if (!boltStub.supported) {
500+
return
501+
}
502+
503+
const server = await boltStub.start(
504+
`./test/resources/boltstub/${version}/read_in_batch.script`,
505+
9001
506+
)
507+
508+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001')
509+
const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 })
510+
511+
const result = await session.run('MATCH (n) RETURN n.name')
512+
const records = result.records
513+
expect(records.length).toEqual(3)
514+
expect(records[0].get(0)).toBe('Bob')
515+
expect(records[1].get(0)).toBe('Alice')
516+
expect(records[2].get(0)).toBe('Tina')
517+
518+
const connectionKey = Object.keys(openConnections(driver))[0]
519+
expect(connectionKey).toBeTruthy()
520+
521+
const connection = openConnections(driver, connectionKey)
522+
await session.close()
523+
524+
// generate a fake fatal error
525+
connection._handleFatalError(
526+
newError('connection reset', SERVICE_UNAVAILABLE)
527+
)
528+
529+
// expect that the connection to be removed from the pool
530+
expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0)
531+
expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy()
532+
// expect that the connection to be unregistered from the open connections registry
533+
expect(openConnections(driver, connectionKey)).toBeFalsy()
534+
535+
await driver.close()
536+
await server.exit()
537+
}
538+
539+
it('v4', () => verifyFailureOnCommit('v4'))
540+
})
541+
542+
describe('should ignore fetchSize setting', () => {
543+
async function verifyFailureOnCommit (version) {
544+
if (!boltStub.supported) {
545+
return
546+
}
547+
548+
const server = await boltStub.start(
549+
`./test/resources/boltstub/${version}/read.script`,
550+
9001
551+
)
552+
553+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001')
554+
const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 })
555+
556+
const result = await session.run('MATCH (n) RETURN n.name')
557+
const records = result.records
558+
expect(records.length).toEqual(3)
559+
expect(records[0].get(0)).toBe('Bob')
560+
expect(records[1].get(0)).toBe('Alice')
561+
expect(records[2].get(0)).toBe('Tina')
562+
563+
const connectionKey = Object.keys(openConnections(driver))[0]
564+
expect(connectionKey).toBeTruthy()
565+
566+
const connection = openConnections(driver, connectionKey)
567+
await session.close()
568+
569+
// generate a fake fatal error
570+
connection._handleFatalError(
571+
newError('connection reset', SERVICE_UNAVAILABLE)
572+
)
573+
574+
// expect that the connection to be removed from the pool
575+
expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0)
576+
expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy()
577+
// expect that the connection to be unregistered from the open connections registry
578+
expect(openConnections(driver, connectionKey)).toBeFalsy()
579+
580+
await driver.close()
581+
await server.exit()
582+
}
583+
584+
it('v3', () => verifyFailureOnCommit('v3'))
585+
it('v2', () => verifyFailureOnCommit('v2'))
586+
})
587+
450588
function connectionPool (driver, key) {
451589
return driver._connectionProvider._connectionPool._pools[key]
452590
}

test/resources/boltstub/v4/hello_run_exit.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
C: HELLO {"credentials": "password", "scheme": "basic", "user_agent": "neo4j-javascript/0.0.0-dev", "principal": "neo4j"}
55
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
66
C: RUN "MATCH (n) RETURN n.name" {} {}
7-
PULL {"n": -1}
7+
PULL {"n": 1000}
88
S: SUCCESS {"fields": ["n.name"]}
99
RECORD ["Foo"]
1010
RECORD ["Bar"]

test/resources/boltstub/v4/query_with_error.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
!: AUTO GOODBYE
44

55
C: RUN "RETURN 10 / 0" {} {}
6-
C: PULL {"n": -1}
6+
C: PULL {"n": 1000}
77
S: FAILURE {"code": "Neo.ClientError.Statement.ArithmeticError", "message": "/ by zero"}
88
S: IGNORED
99
C: RESET

test/resources/boltstub/v4/read.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
!: AUTO GOODBYE
55

66
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"}
7-
PULL {"n": -1}
7+
PULL {"n": 1000}
88
S: SUCCESS {"fields": ["n.name"]}
99
RECORD ["Bob"]
1010
RECORD ["Alice"]

test/resources/boltstub/v4/read_dead.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
!: AUTO RESET
44

55
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"}
6-
C: PULL {"n": -1}
6+
C: PULL {"n": 1000}
77
S: <EXIT>

test/resources/boltstub/v4/read_from_aDatabase.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
!: AUTO GOODBYE
55

66
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r", "db": "aDatabase"}
7-
PULL {"n": -1}
7+
PULL {"n": 1000}
88
S: SUCCESS {"fields": ["n.name"]}
99
RECORD ["Bob"]
1010
RECORD ["Alice"]

test/resources/boltstub/v4/read_from_aDatabase_with_bookmark.script

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
!: AUTO GOODBYE
55

66
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r", "db": "aDatabase", "bookmarks": ["system:1111", "aDatabase:5555"]}
7-
PULL {"n": -1}
7+
PULL {"n": 1000}
88
S: SUCCESS {"fields": ["n.name"]}
99
RECORD ["Bob"]
1010
RECORD ["Alice"]
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
!: BOLT 4
2+
!: AUTO HELLO
3+
!: AUTO RESET
4+
!: AUTO GOODBYE
5+
6+
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"}
7+
PULL {"n": 2}
8+
S: SUCCESS {"fields": ["n.name"]}
9+
RECORD ["Bob"]
10+
RECORD ["Alice"]
11+
SUCCESS {"has_more":true}
12+
C: PULL {"n": 2}
13+
S: RECORD ["Tina"]
14+
SUCCESS {}

0 commit comments

Comments
 (0)