Skip to content

Driver.executeQuery optimization #1124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/core/src/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class Driver {
config: DriverConfig = {},
createConnectionProvider: CreateConnectionProvider,
createSession: CreateSession = args => new Session(args),
createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery)
createQueryExecutor: CreateQueryExecutor = createSession => new QueryExecutor(createSession)
) {
sanitizeConfig(config)

Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/internal/query-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export default class QueryExecutor {
bookmarkManager: config.bookmarkManager,
impersonatedUser: config.impersonatedUser
})

// @ts-expect-error The method is private for external users
session._setTxExecutorToPipelineBegin(true)

try {
const executeInTransaction: TransactionFunction<T> = config.routing === 'READ'
? session.executeRead.bind(session)
Expand Down
9 changes: 6 additions & 3 deletions packages/core/src/internal/transaction-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class TransactionExecutor {
private readonly _multiplier: number
private readonly _jitterFactor: number
private _inFlightTimeoutIds: Timeout[]
public pipelineBegin: boolean

constructor (
maxRetryTimeMs?: number | null,
Expand All @@ -64,6 +65,7 @@ export class TransactionExecutor {
)

this._inFlightTimeoutIds = []
this.pipelineBegin = false

this._verifyAfterConstruction()
}
Expand Down Expand Up @@ -154,7 +156,8 @@ export class TransactionExecutor {
): Promise<void> {
let tx: Transaction
try {
tx = await transactionCreator()
const txPromise = transactionCreator()
tx = this.pipelineBegin ? txPromise : await txPromise
} catch (error) {
// failed to create a transaction
reject(error)
Expand Down Expand Up @@ -192,7 +195,7 @@ export class TransactionExecutor {

_handleTransactionWorkSuccess<T>(
result: T,
tx: Transaction,
tx: Transaction | TransactionPromise,
resolve: Resolve<T>,
reject: Reject
): void {
Expand All @@ -215,7 +218,7 @@ export class TransactionExecutor {
}
}

_handleTransactionWorkFailure (error: any, tx: Transaction, reject: Reject): void {
_handleTransactionWorkFailure (error: any, tx: Transaction | TransactionPromise, reject: Reject): void {
if (tx.isOpen()) {
// transaction work failed and the transaction is still open, roll it back and propagate the failure
tx.rollback()
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,15 @@ class Session {
}
}

/**
* Configure the transaction executor to pipeline transaction begin.
*
* @private
*/
private _setTxExecutorToPipelineBegin (pipelined: boolean): void {
this._transactionExecutor.pipelineBegin = pipelined
}

/**
* @protected
*/
Expand Down
38 changes: 35 additions & 3 deletions packages/core/test/internal/query-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ describe('QueryExecutor', () => {
expect(spyOnExecuteRead).toHaveBeenCalled()
})

it('should configure the session with pipeline begin', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()

await queryExecutor.execute(baseConfig, 'query')

expect(sessionsCreated.length).toBe(1)
const [{ spyOnSetTxExecutorToPipelineBegin, spyOnExecuteRead }] = sessionsCreated

expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledTimes(1)
expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledWith(true)
expect(spyOnExecuteRead.mock.invocationCallOrder[0]).toBeGreaterThan(
spyOnSetTxExecutorToPipelineBegin.mock.invocationCallOrder[0]
)
})

it('should call not call executeWrite', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()

Expand Down Expand Up @@ -213,6 +228,21 @@ describe('QueryExecutor', () => {
expect(spyOnExecuteWrite).toHaveBeenCalled()
})

it('should configure the session with pipeline begin', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()

await queryExecutor.execute(baseConfig, 'query')

expect(sessionsCreated.length).toBe(1)
const [{ spyOnSetTxExecutorToPipelineBegin, spyOnExecuteWrite }] = sessionsCreated

expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledTimes(1)
expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledWith(true)
expect(spyOnExecuteWrite.mock.invocationCallOrder[0]).toBeGreaterThan(
spyOnSetTxExecutorToPipelineBegin.mock.invocationCallOrder[0]
)
})

it('should call not call executeRead', async () => {
const { queryExecutor, sessionsCreated } = createExecutor()

Expand Down Expand Up @@ -316,7 +346,7 @@ describe('QueryExecutor', () => {
spyOnExecuteRead: jest.SpyInstance<any>
spyOnExecuteWrite: jest.SpyInstance<any>
spyOnClose: jest.SpyInstance<Promise<void>>

spyOnSetTxExecutorToPipelineBegin: jest.SpyInstance<void>
}>
createSession: jest.Mock<Session, [args: any]>
} {
Expand All @@ -329,15 +359,17 @@ describe('QueryExecutor', () => {
spyOnExecuteRead: jest.SpyInstance<any>
spyOnExecuteWrite: jest.SpyInstance<any>
spyOnClose: jest.SpyInstance<Promise<void>>

spyOnSetTxExecutorToPipelineBegin: jest.SpyInstance<void>
}> = []
const createSession = jest.fn((args) => {
const session = new Session(args)
const sessionCreated = {
session,
spyOnExecuteRead: jest.spyOn(session, 'executeRead'),
spyOnExecuteWrite: jest.spyOn(session, 'executeWrite'),
spyOnClose: jest.spyOn(session, 'close')
spyOnClose: jest.spyOn(session, 'close'),
// @ts-expect-error
spyOnSetTxExecutorToPipelineBegin: jest.spyOn(session, '_setTxExecutorToPipelineBegin')
}
sessionsCreated.push(sessionCreated)
_mockSessionExecuteRead(sessionCreated.spyOnExecuteRead)
Expand Down
20 changes: 20 additions & 0 deletions packages/core/test/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { ConnectionProvider, Session, Connection, TransactionPromise, Transactio
import { bookmarks } from '../src/internal'
import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants'
import { Logger } from '../src/internal/logger'
import { TransactionExecutor } from '../src/internal/transaction-executor'
import ManagedTransaction from '../src/transaction-managed'
import { AuthToken, LoggerFunction } from '../src/types'
import FakeConnection from './utils/connection.fake'
Expand Down Expand Up @@ -1150,6 +1151,25 @@ describe('session', () => {
)
})
})

describe('Pipeline Begin on TxFunc', () => {
it('session should not change the default on session creation', () => {
const session = newSessionWithConnection(new FakeConnection())

// @ts-expect-error
expect(session._transactionExecutor.pipelineBegin).toEqual(new TransactionExecutor().pipelineBegin)
})

it.each([true, false])('_setTxExecutorToPipelineBegin(%s) => configure executor', (pipelined) => {
const session = newSessionWithConnection(new FakeConnection())

// @ts-expect-error
session._setTxExecutorToPipelineBegin(pipelined)

// @ts-expect-error
expect(session._transactionExecutor.pipelineBegin).toEqual(pipelined)
})
})
})

function mockBeginWithSuccess (connection: FakeConnection): FakeConnection {
Expand Down
2 changes: 1 addition & 1 deletion packages/neo4j-driver-deno/lib/core/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class Driver {
config: DriverConfig = {},
createConnectionProvider: CreateConnectionProvider,
createSession: CreateSession = args => new Session(args),
createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery)
createQueryExecutor: CreateQueryExecutor = createSession => new QueryExecutor(createSession)
) {
sanitizeConfig(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export default class QueryExecutor {
bookmarkManager: config.bookmarkManager,
impersonatedUser: config.impersonatedUser
})

// @ts-expect-error The method is private for external users
session._setTxExecutorToPipelineBegin(true)

try {
const executeInTransaction: TransactionFunction<T> = config.routing === 'READ'
? session.executeRead.bind(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class TransactionExecutor {
private readonly _multiplier: number
private readonly _jitterFactor: number
private _inFlightTimeoutIds: Timeout[]
public pipelineBegin: boolean

constructor (
maxRetryTimeMs?: number | null,
Expand All @@ -64,6 +65,7 @@ export class TransactionExecutor {
)

this._inFlightTimeoutIds = []
this.pipelineBegin = false

this._verifyAfterConstruction()
}
Expand Down Expand Up @@ -154,7 +156,8 @@ export class TransactionExecutor {
): Promise<void> {
let tx: Transaction
try {
tx = await transactionCreator()
const txPromise = transactionCreator()
tx = this.pipelineBegin ? txPromise : await txPromise
} catch (error) {
// failed to create a transaction
reject(error)
Expand Down Expand Up @@ -192,7 +195,7 @@ export class TransactionExecutor {

_handleTransactionWorkSuccess<T>(
result: T,
tx: Transaction,
tx: Transaction | TransactionPromise,
resolve: Resolve<T>,
reject: Reject
): void {
Expand All @@ -215,7 +218,7 @@ export class TransactionExecutor {
}
}

_handleTransactionWorkFailure (error: any, tx: Transaction, reject: Reject): void {
_handleTransactionWorkFailure (error: any, tx: Transaction | TransactionPromise, reject: Reject): void {
if (tx.isOpen()) {
// transaction work failed and the transaction is still open, roll it back and propagate the failure
tx.rollback()
Expand Down
9 changes: 9 additions & 0 deletions packages/neo4j-driver-deno/lib/core/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,15 @@ class Session {
}
}

/**
* Configure the transaction executor to pipeline transaction begin.
*
* @private
*/
private _setTxExecutorToPipelineBegin (pipelined: boolean): void {
this._transactionExecutor.pipelineBegin = pipelined
}

/**
* @protected
*/
Expand Down
Loading