Skip to content

Commit a17eee1

Browse files
authored
Merge pull request #495 from zhenlineo/4.0-pull-in-batches
Pull Results in Batches
2 parents f9e528d + c945ecc commit a17eee1

36 files changed

+480
-106
lines changed

src/driver.js

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,16 @@ import {
3030
} from './internal/pool-config'
3131
import Session from './session'
3232
import RxSession from './session-rx'
33+
import { ALL } from './internal/request-message'
3334

3435
const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour
3536

37+
/**
38+
* The default record fetch size. This is used in Bolt V4 protocol to pull query execution result in batches.
39+
* @type {number}
40+
*/
41+
const DEFAULT_FETCH_SIZE = 1000
42+
3643
/**
3744
* Constant that represents read session access mode.
3845
* Should be used like this: `driver.session({ defaultAccessMode: neo4j.session.READ })`.
@@ -132,19 +139,23 @@ class Driver {
132139
* @param {string} param.defaultAccessMode=WRITE - the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
133140
* @param {string|string[]} param.bookmarks - the initial reference or references to some previous
134141
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
142+
* @param {number} param.fetchSize - the record fetch size of each batch of this session.
143+
* Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config.
135144
* @param {string} param.database - the database this session will operate on.
136145
* @return {Session} new session.
137146
*/
138147
session ({
139148
defaultAccessMode = WRITE,
140149
bookmarks: bookmarkOrBookmarks,
141-
database = ''
150+
database = '',
151+
fetchSize
142152
} = {}) {
143153
return this._newSession({
144154
defaultAccessMode,
145155
bookmarkOrBookmarks,
146156
database,
147-
reactive: false
157+
reactive: false,
158+
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
148159
})
149160
}
150161

@@ -229,7 +240,13 @@ class Driver {
229240
/**
230241
* @private
231242
*/
232-
_newSession ({ defaultAccessMode, bookmarkOrBookmarks, database, reactive }) {
243+
_newSession ({
244+
defaultAccessMode,
245+
bookmarkOrBookmarks,
246+
database,
247+
reactive,
248+
fetchSize
249+
}) {
233250
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
234251
const connectionProvider = this._getOrCreateConnectionProvider()
235252
const bookmark = bookmarkOrBookmarks
@@ -241,7 +258,8 @@ class Driver {
241258
connectionProvider,
242259
bookmark,
243260
config: this._config,
244-
reactive
261+
reactive,
262+
fetchSize
245263
})
246264
}
247265

@@ -277,6 +295,10 @@ function sanitizeConfig (config) {
277295
config.connectionAcquisitionTimeout,
278296
DEFAULT_ACQUISITION_TIMEOUT
279297
)
298+
config.fetchSize = validateFetchSizeValue(
299+
config.fetchSize,
300+
DEFAULT_FETCH_SIZE
301+
)
280302
}
281303

282304
/**
@@ -293,6 +315,23 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) {
293315
}
294316
}
295317

318+
/**
319+
* @private
320+
*/
321+
function validateFetchSizeValue (rawValue, defaultWhenAbsent) {
322+
const fetchSize = parseInt(rawValue, 10)
323+
if (fetchSize > 0 || fetchSize === ALL) {
324+
return fetchSize
325+
} else if (fetchSize === 0 || fetchSize < 0) {
326+
throw new Error(
327+
'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' +
328+
fetchSize
329+
)
330+
} else {
331+
return defaultWhenAbsent
332+
}
333+
}
334+
296335
export { Driver, READ, WRITE }
297336

298337
export default Driver

src/internal/bolt-protocol-v4.js

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919
import BoltProtocolV3 from './bolt-protocol-v3'
20-
import RequestMessage from './request-message'
20+
import RequestMessage, { ALL } from './request-message'
2121
import { ResultStreamObserver } from './stream-observers'
2222
import { BOLT_PROTOCOL_V4 } from './constants'
2323

@@ -69,14 +69,16 @@ export default class BoltProtocol extends BoltProtocolV3 {
6969
beforeComplete,
7070
afterComplete,
7171
flush = true,
72-
reactive = false
72+
reactive = false,
73+
fetchSize = ALL
7374
} = {}
7475
) {
7576
const observer = new ResultStreamObserver({
7677
connection: this._connection,
7778
reactive: reactive,
78-
moreFunction: reactive ? this._requestMore : this._noOp,
79-
discardFunction: reactive ? this._requestDiscard : this._noOp,
79+
fetchSize: fetchSize,
80+
moreFunction: this._requestMore,
81+
discardFunction: this._requestDiscard,
8082
beforeKeys,
8183
afterKeys,
8284
beforeError,
@@ -98,7 +100,11 @@ export default class BoltProtocol extends BoltProtocolV3 {
98100
)
99101

100102
if (!reactive) {
101-
this._connection.write(RequestMessage.pull(), observer, flush)
103+
this._connection.write(
104+
RequestMessage.pull({ n: fetchSize }),
105+
observer,
106+
flush
107+
)
102108
}
103109

104110
return observer

src/internal/request-message.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const READ_MODE = 'r'
4343
/* eslint-enable no-unused-vars */
4444

4545
const NO_STATEMENT_ID = -1
46-
const ALL = -1
46+
export const ALL = -1
4747

4848
export default class RequestMessage {
4949
constructor (signature, fields, toString) {
@@ -220,19 +220,19 @@ export default class RequestMessage {
220220
function buildTxMetadata (bookmark, txConfig, database, mode) {
221221
const metadata = {}
222222
if (!bookmark.isEmpty()) {
223-
metadata['bookmarks'] = bookmark.values()
223+
metadata.bookmarks = bookmark.values()
224224
}
225225
if (txConfig.timeout) {
226-
metadata['tx_timeout'] = txConfig.timeout
226+
metadata.tx_timeout = txConfig.timeout
227227
}
228228
if (txConfig.metadata) {
229-
metadata['tx_metadata'] = txConfig.metadata
229+
metadata.tx_metadata = txConfig.metadata
230230
}
231231
if (database) {
232-
metadata['db'] = assertString(database, 'database')
232+
metadata.db = assertString(database, 'database')
233233
}
234234
if (mode === ACCESS_MODE_READ) {
235-
metadata['mode'] = READ_MODE
235+
metadata.mode = READ_MODE
236236
}
237237
return metadata
238238
}
@@ -246,7 +246,7 @@ function buildTxMetadata (bookmark, txConfig, database, mode) {
246246
function buildStreamMetadata (stmtId, n) {
247247
const metadata = { n: int(n) }
248248
if (stmtId !== NO_STATEMENT_ID) {
249-
metadata['qid'] = int(stmtId)
249+
metadata.qid = int(stmtId)
250250
}
251251
return metadata
252252
}

src/internal/stream-observers.js

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
import Record from '../record'
2020
import Connection from './connection'
2121
import { newError, PROTOCOL_ERROR } from '../error'
22-
import { isString } from './util'
2322
import Integer from '../integer'
24-
25-
const DefaultBatchSize = 50
23+
import { ALL } from './request-message'
2624

2725
class StreamObserver {
2826
onNext (rawRecord) {}
@@ -50,7 +48,7 @@ class ResultStreamObserver extends StreamObserver {
5048
* @param {boolean} param.reactive
5149
* @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction -
5250
* @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction -
53-
* @param {number|Integer} param.batchSize -
51+
* @param {number|Integer} param.fetchSize -
5452
* @param {function(err: Error): Promise|void} param.beforeError -
5553
* @param {function(err: Error): Promise|void} param.afterError -
5654
* @param {function(keys: string[]): Promise|void} param.beforeKeys -
@@ -63,7 +61,7 @@ class ResultStreamObserver extends StreamObserver {
6361
reactive = false,
6462
moreFunction,
6563
discardFunction,
66-
batchSize = DefaultBatchSize,
64+
fetchSize = ALL,
6765
beforeError,
6866
afterError,
6967
beforeKeys,
@@ -98,7 +96,8 @@ class ResultStreamObserver extends StreamObserver {
9896
this._moreFunction = moreFunction
9997
this._discardFunction = discardFunction
10098
this._discard = false
101-
this._batchSize = batchSize
99+
this._fetchSize = fetchSize
100+
this._finished = false
102101
}
103102

104103
/**
@@ -108,7 +107,7 @@ class ResultStreamObserver extends StreamObserver {
108107
* @param {Array} rawRecord - An array with the raw record
109108
*/
110109
onNext (rawRecord) {
111-
let record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
110+
const record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
112111
if (this._observers.some(o => o.onNext)) {
113112
this._observers.forEach(o => {
114113
if (o.onNext) {
@@ -190,6 +189,7 @@ class ResultStreamObserver extends StreamObserver {
190189

191190
delete meta.has_more
192191
} else {
192+
this._finished = true
193193
const completionMetadata = Object.assign(
194194
this._connection ? { server: this._connection.server } : {},
195195
this._meta,
@@ -229,7 +229,6 @@ class ResultStreamObserver extends StreamObserver {
229229

230230
_handleStreaming () {
231231
if (
232-
this._reactive &&
233232
this._head &&
234233
this._observers.some(o => o.onNext || o.onCompleted) &&
235234
!this._streaming
@@ -242,7 +241,7 @@ class ResultStreamObserver extends StreamObserver {
242241
this._moreFunction(
243242
this._connection,
244243
this._statementId,
245-
this._batchSize,
244+
this._fetchSize,
246245
this
247246
)
248247
}
@@ -282,12 +281,13 @@ class ResultStreamObserver extends StreamObserver {
282281
this._head = []
283282
this._fieldKeys = []
284283
this._tail = {}
284+
this._finished = true
285285
}
286286

287287
/**
288-
* Discard pending record stream
288+
* Cancel pending record stream
289289
*/
290-
discard () {
290+
cancel () {
291291
this._discard = true
292292
}
293293

@@ -302,6 +302,7 @@ class ResultStreamObserver extends StreamObserver {
302302
return
303303
}
304304

305+
this._finished = true
305306
this._hasFailed = true
306307
this._error = error
307308

@@ -357,7 +358,7 @@ class ResultStreamObserver extends StreamObserver {
357358
}
358359
this._observers.push(observer)
359360

360-
if (this._reactive) {
361+
if (this._reactive && !this._finished) {
361362
this._handleStreaming()
362363
}
363364
}

src/result-rx.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ export default class RxResult {
134134
})
135135

136136
if (this._records.observers.length === 0) {
137-
result._discard()
137+
result._cancel()
138138
}
139139

140140
result.subscribe({

src/result.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,13 @@ class Result {
8484
*/
8585
summary () {
8686
return new Promise((resolve, reject) => {
87-
this._streamObserverPromise.then(o =>
87+
this._streamObserverPromise.then(o => {
88+
o.cancel()
8889
o.subscribe({
8990
onCompleted: metadata => resolve(metadata),
9091
onError: err => reject(err)
9192
})
92-
)
93+
})
9394
})
9495
}
9596

@@ -102,8 +103,8 @@ class Result {
102103
_getOrCreatePromise () {
103104
if (!this._p) {
104105
this._p = new Promise((resolve, reject) => {
105-
let records = []
106-
let observer = {
106+
const records = []
107+
const observer = {
107108
onNext: record => {
108109
records.push(record)
109110
},
@@ -192,8 +193,8 @@ class Result {
192193
* @protected
193194
* @since 4.0.0
194195
*/
195-
_discard () {
196-
this._streamObserverPromise.then(o => o.discard())
196+
_cancel () {
197+
this._streamObserverPromise.then(o => o.cancel())
197198
}
198199
}
199200

src/session.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,21 @@ class Session {
4949
* @param {string} args.database the database name
5050
* @param {Object} args.config={} - this driver configuration.
5151
* @param {boolean} args.reactive - whether this session should create reactive streams
52+
* @param {number} args.fetchSize - defines how many records is pulled in each pulling batch
5253
*/
5354
constructor ({
5455
mode,
5556
connectionProvider,
5657
bookmark,
5758
database,
5859
config,
59-
reactive
60+
reactive,
61+
fetchSize
6062
}) {
6163
this._mode = mode
6264
this._database = database
6365
this._reactive = reactive
66+
this._fetchSize = fetchSize
6467
this._readConnectionHolder = new ConnectionHolder({
6568
mode: ACCESS_MODE_READ,
6669
database,
@@ -107,7 +110,8 @@ class Session {
107110
mode: this._mode,
108111
database: this._database,
109112
afterComplete: this._onComplete,
110-
reactive: this._reactive
113+
reactive: this._reactive,
114+
fetchSize: this._fetchSize
111115
})
112116
)
113117
}
@@ -176,7 +180,8 @@ class Session {
176180
connectionHolder,
177181
onClose: this._transactionClosed.bind(this),
178182
onBookmark: this._updateBookmark.bind(this),
179-
reactive: this._reactive
183+
reactive: this._reactive,
184+
fetchSize: this._fetchSize
180185
})
181186
tx._begin(this._lastBookmark, txConfig)
182187
return tx

0 commit comments

Comments
 (0)