Skip to content

Commit 3d278b6

Browse files
committed
Driver.executeQuery optimization
Pipeline begin and run when using `executeQuery` to reduce the number of round trips.
1 parent 1e11be2 commit 3d278b6

File tree

12 files changed

+476
-329
lines changed

12 files changed

+476
-329
lines changed

packages/core/src/driver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ class Driver {
444444
config: DriverConfig = {},
445445
createConnectionProvider: CreateConnectionProvider,
446446
createSession: CreateSession = args => new Session(args),
447-
createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery)
447+
createQueryExecutor: CreateQueryExecutor = createSession => new QueryExecutor(createSession)
448448
) {
449449
sanitizeConfig(config)
450450

packages/core/src/internal/query-executor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ export default class QueryExecutor {
4646
bookmarkManager: config.bookmarkManager,
4747
impersonatedUser: config.impersonatedUser
4848
})
49+
50+
// @ts-expect-error The method is private for external users
51+
session._setTxExecutorToPipelineBegin(true)
52+
4953
try {
5054
const executeInTransaction: TransactionFunction<T> = config.routing === 'READ'
5155
? session.executeRead.bind(session)

packages/core/src/internal/transaction-executor.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export class TransactionExecutor {
3939
private readonly _multiplier: number
4040
private readonly _jitterFactor: number
4141
private _inFlightTimeoutIds: Timeout[]
42+
public pipelineBegin: boolean
4243

4344
constructor (
4445
maxRetryTimeMs?: number | null,
@@ -64,6 +65,7 @@ export class TransactionExecutor {
6465
)
6566

6667
this._inFlightTimeoutIds = []
68+
this.pipelineBegin = false
6769

6870
this._verifyAfterConstruction()
6971
}
@@ -154,7 +156,8 @@ export class TransactionExecutor {
154156
): Promise<void> {
155157
let tx: Transaction
156158
try {
157-
tx = await transactionCreator()
159+
const txPromise = transactionCreator()
160+
tx = this.pipelineBegin ? txPromise : await txPromise
158161
} catch (error) {
159162
// failed to create a transaction
160163
reject(error)
@@ -192,7 +195,7 @@ export class TransactionExecutor {
192195

193196
_handleTransactionWorkSuccess<T>(
194197
result: T,
195-
tx: Transaction,
198+
tx: Transaction | TransactionPromise,
196199
resolve: Resolve<T>,
197200
reject: Reject
198201
): void {
@@ -215,7 +218,7 @@ export class TransactionExecutor {
215218
}
216219
}
217220

218-
_handleTransactionWorkFailure (error: any, tx: Transaction, reject: Reject): void {
221+
_handleTransactionWorkFailure (error: any, tx: Transaction | TransactionPromise, reject: Reject): void {
219222
if (tx.isOpen()) {
220223
// transaction work failed and the transaction is still open, roll it back and propagate the failure
221224
tx.rollback()

packages/core/src/session.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,15 @@ class Session {
587587
}
588588
}
589589

590+
/**
591+
* Configure the transaction executor to pipeline transaction begin.
592+
*
593+
* @private
594+
*/
595+
private _setTxExecutorToPipelineBegin (pipelined: boolean): void {
596+
this._transactionExecutor.pipelineBegin = pipelined
597+
}
598+
590599
/**
591600
* @protected
592601
*/

packages/core/test/internal/query-executor.test.ts

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,21 @@ describe('QueryExecutor', () => {
8686
expect(spyOnExecuteRead).toHaveBeenCalled()
8787
})
8888

89+
it('should configure the session with pipeline begin', async () => {
90+
const { queryExecutor, sessionsCreated } = createExecutor()
91+
92+
await queryExecutor.execute(baseConfig, 'query')
93+
94+
expect(sessionsCreated.length).toBe(1)
95+
const [{ spyOnSetTxExecutorToPipelineBegin, spyOnExecuteRead }] = sessionsCreated
96+
97+
expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledTimes(1)
98+
expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledWith(true)
99+
expect(spyOnExecuteRead.mock.invocationCallOrder[0]).toBeGreaterThan(
100+
spyOnSetTxExecutorToPipelineBegin.mock.invocationCallOrder[0]
101+
)
102+
})
103+
89104
it('should call not call executeWrite', async () => {
90105
const { queryExecutor, sessionsCreated } = createExecutor()
91106

@@ -213,6 +228,21 @@ describe('QueryExecutor', () => {
213228
expect(spyOnExecuteWrite).toHaveBeenCalled()
214229
})
215230

231+
it('should configure the session with pipeline begin', async () => {
232+
const { queryExecutor, sessionsCreated } = createExecutor()
233+
234+
await queryExecutor.execute(baseConfig, 'query')
235+
236+
expect(sessionsCreated.length).toBe(1)
237+
const [{ spyOnSetTxExecutorToPipelineBegin, spyOnExecuteWrite }] = sessionsCreated
238+
239+
expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledTimes(1)
240+
expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledWith(true)
241+
expect(spyOnExecuteWrite.mock.invocationCallOrder[0]).toBeGreaterThan(
242+
spyOnSetTxExecutorToPipelineBegin.mock.invocationCallOrder[0]
243+
)
244+
})
245+
216246
it('should call not call executeRead', async () => {
217247
const { queryExecutor, sessionsCreated } = createExecutor()
218248

@@ -316,7 +346,7 @@ describe('QueryExecutor', () => {
316346
spyOnExecuteRead: jest.SpyInstance<any>
317347
spyOnExecuteWrite: jest.SpyInstance<any>
318348
spyOnClose: jest.SpyInstance<Promise<void>>
319-
349+
spyOnSetTxExecutorToPipelineBegin: jest.SpyInstance<void>
320350
}>
321351
createSession: jest.Mock<Session, [args: any]>
322352
} {
@@ -329,15 +359,17 @@ describe('QueryExecutor', () => {
329359
spyOnExecuteRead: jest.SpyInstance<any>
330360
spyOnExecuteWrite: jest.SpyInstance<any>
331361
spyOnClose: jest.SpyInstance<Promise<void>>
332-
362+
spyOnSetTxExecutorToPipelineBegin: jest.SpyInstance<void>
333363
}> = []
334364
const createSession = jest.fn((args) => {
335365
const session = new Session(args)
336366
const sessionCreated = {
337367
session,
338368
spyOnExecuteRead: jest.spyOn(session, 'executeRead'),
339369
spyOnExecuteWrite: jest.spyOn(session, 'executeWrite'),
340-
spyOnClose: jest.spyOn(session, 'close')
370+
spyOnClose: jest.spyOn(session, 'close'),
371+
// @ts-expect-error
372+
spyOnSetTxExecutorToPipelineBegin: jest.spyOn(session, '_setTxExecutorToPipelineBegin')
341373
}
342374
sessionsCreated.push(sessionCreated)
343375
_mockSessionExecuteRead(sessionCreated.spyOnExecuteRead)

packages/core/test/session.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { ConnectionProvider, Session, Connection, TransactionPromise, Transactio
2020
import { bookmarks } from '../src/internal'
2121
import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants'
2222
import { Logger } from '../src/internal/logger'
23+
import { TransactionExecutor } from '../src/internal/transaction-executor'
2324
import ManagedTransaction from '../src/transaction-managed'
2425
import { AuthToken, LoggerFunction } from '../src/types'
2526
import FakeConnection from './utils/connection.fake'
@@ -1150,6 +1151,25 @@ describe('session', () => {
11501151
)
11511152
})
11521153
})
1154+
1155+
describe('Pipeline Begin on TxFunc', () => {
1156+
it('session should not change the default on session creation', () => {
1157+
const session = newSessionWithConnection(new FakeConnection())
1158+
1159+
// @ts-expect-error
1160+
expect(session._transactionExecutor.pipelineBegin).toEqual(new TransactionExecutor().pipelineBegin)
1161+
})
1162+
1163+
it.each([true, false])('_setTxExecutorToPipelineBegin(%s) => configure executor', (pipelined) => {
1164+
const session = newSessionWithConnection(new FakeConnection())
1165+
1166+
// @ts-expect-error
1167+
session._setTxExecutorToPipelineBegin(pipelined)
1168+
1169+
// @ts-expect-error
1170+
expect(session._transactionExecutor.pipelineBegin).toEqual(pipelined)
1171+
})
1172+
})
11531173
})
11541174

11551175
function mockBeginWithSuccess (connection: FakeConnection): FakeConnection {

packages/neo4j-driver-deno/lib/core/driver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ class Driver {
444444
config: DriverConfig = {},
445445
createConnectionProvider: CreateConnectionProvider,
446446
createSession: CreateSession = args => new Session(args),
447-
createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery)
447+
createQueryExecutor: CreateQueryExecutor = createSession => new QueryExecutor(createSession)
448448
) {
449449
sanitizeConfig(config)
450450

packages/neo4j-driver-deno/lib/core/internal/query-executor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ export default class QueryExecutor {
4646
bookmarkManager: config.bookmarkManager,
4747
impersonatedUser: config.impersonatedUser
4848
})
49+
50+
// @ts-expect-error The method is private for external users
51+
session._setTxExecutorToPipelineBegin(true)
52+
4953
try {
5054
const executeInTransaction: TransactionFunction<T> = config.routing === 'READ'
5155
? session.executeRead.bind(session)

packages/neo4j-driver-deno/lib/core/internal/transaction-executor.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export class TransactionExecutor {
3939
private readonly _multiplier: number
4040
private readonly _jitterFactor: number
4141
private _inFlightTimeoutIds: Timeout[]
42+
public pipelineBegin: boolean
4243

4344
constructor (
4445
maxRetryTimeMs?: number | null,
@@ -64,6 +65,7 @@ export class TransactionExecutor {
6465
)
6566

6667
this._inFlightTimeoutIds = []
68+
this.pipelineBegin = false
6769

6870
this._verifyAfterConstruction()
6971
}
@@ -154,7 +156,8 @@ export class TransactionExecutor {
154156
): Promise<void> {
155157
let tx: Transaction
156158
try {
157-
tx = await transactionCreator()
159+
const txPromise = transactionCreator()
160+
tx = this.pipelineBegin ? txPromise : await txPromise
158161
} catch (error) {
159162
// failed to create a transaction
160163
reject(error)
@@ -192,7 +195,7 @@ export class TransactionExecutor {
192195

193196
_handleTransactionWorkSuccess<T>(
194197
result: T,
195-
tx: Transaction,
198+
tx: Transaction | TransactionPromise,
196199
resolve: Resolve<T>,
197200
reject: Reject
198201
): void {
@@ -215,7 +218,7 @@ export class TransactionExecutor {
215218
}
216219
}
217220

218-
_handleTransactionWorkFailure (error: any, tx: Transaction, reject: Reject): void {
221+
_handleTransactionWorkFailure (error: any, tx: Transaction | TransactionPromise, reject: Reject): void {
219222
if (tx.isOpen()) {
220223
// transaction work failed and the transaction is still open, roll it back and propagate the failure
221224
tx.rollback()

packages/neo4j-driver-deno/lib/core/session.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,15 @@ class Session {
587587
}
588588
}
589589

590+
/**
591+
* Configure the transaction executor to pipeline transaction begin.
592+
*
593+
* @private
594+
*/
595+
private _setTxExecutorToPipelineBegin (pipelined: boolean): void {
596+
this._transactionExecutor.pipelineBegin = pipelined
597+
}
598+
590599
/**
591600
* @protected
592601
*/

0 commit comments

Comments
 (0)