diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index c61f049b2..dc772cfda 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -146,7 +146,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider const routingTable = await this._freshRoutingTable({ accessMode, database: context.database, - bookmarks: bookmarks, + bookmarks, impersonatedUser, onDatabaseNameResolved: (databaseName) => { context.database = context.database || databaseName diff --git a/packages/core/src/bookmark-manager.ts b/packages/core/src/bookmark-manager.ts new file mode 100644 index 000000000..3d2a36168 --- /dev/null +++ b/packages/core/src/bookmark-manager.ts @@ -0,0 +1,187 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Interface for the piece of software responsible for keeping track of current active bookmarks accross the driver. + * @interface + * @since 5.0 + * @experimental + */ +export default class BookmarkManager { + /** + * @constructor + * @private + */ + private constructor () { + throw new Error('Not implemented') + } + + /** + * Method called when the bookmarks get updated when a transaction finished. + * + * This method will be called when auto-commit queries finish and when explicit transactions + * get commited. + * + * @param {string} database The database which the bookmarks belongs to + * @param {Iterable} previousBookmarks The bookmarks used when starting the transaction + * @param {Iterable} newBookmarks The new bookmarks received at the end of the transaction. + * @returns {void} + */ + async updateBookmarks (database: string, previousBookmarks: Iterable, newBookmarks: Iterable): Promise { + throw new Error('Not implemented') + } + + /** + * Method called by the driver to get the bookmarks for one specific database + * + * @param {string} database The database which the bookmarks belong to + * @returns {Iterable} The set of bookmarks + */ + async getBookmarks (database: string): Promise> { + throw new Error('Not implemented') + } + + /** + * Method called by the driver for getting all the bookmarks. + * + * This method should return all bookmarks for all databases present in the BookmarkManager. + * + * @returns {Iterable} The set of bookmarks + */ + async getAllBookmarks (): Promise> { + throw new Error('Not implemented') + } + + /** + * Forget the databases and its bookmarks + * + * This method is not called by the driver. Forgetting unused databases is the user's responsibility. + * + * @param {Iterable} databases The databases which the bookmarks will be removed for. + */ + async forget (databases: Iterable): Promise { + throw new Error('Not implemented') + } +} + +export interface BookmarkManagerConfig { + initialBookmarks?: Map> + bookmarksSupplier?: (database?: string) => Promise> + bookmarksConsumer?: (database: string, bookmarks: Iterable) => Promise +} + +/** + * @typedef {Object} BookmarkManagerConfig + * + * @since 5.0 + * @experimental + * @property {Map>} [initialBookmarks@experimental] Defines the initial set of bookmarks. The key is the database name and the values are the bookmarks. + * @property {function([database]: string):Promise>} [bookmarksSupplier] Called for supplying extra bookmarks to the BookmarkManager + * 1. supplying bookmarks from the given database when the default BookmarkManager's `.getBookmarks(database)` gets called. + * 2. supplying all the bookmarks when the default BookmarkManager's `.getAllBookmarks()` gets called + * @property {function(database: string, bookmarks: Iterable): Promise} [bookmarksConsumer] Called when the set of bookmarks for database get updated + */ +/** + * Provides an configured {@link BookmarkManager} instance. + * + * @since 5.0 + * @experimental + * @param {BookmarkManagerConfig} [config={}] + * @returns {BookmarkManager} + */ +export function bookmarkManager (config: BookmarkManagerConfig = {}): BookmarkManager { + const initialBookmarks = new Map>() + + config.initialBookmarks?.forEach((v, k) => initialBookmarks.set(k, new Set(v))) + + return new Neo4jBookmarkManager( + initialBookmarks, + config.bookmarksSupplier, + config.bookmarksConsumer + ) +} + +class Neo4jBookmarkManager implements BookmarkManager { + constructor ( + private readonly _bookmarksPerDb: Map>, + private readonly _bookmarksSupplier?: (database?: string) => Promise>, + private readonly _bookmarksConsumer?: (database: string, bookmark: Iterable) => Promise + ) { + + } + + async updateBookmarks (database: string, previousBookmarks: Iterable, newBookmarks: Iterable): Promise { + const bookmarks = this._getOrInitializeBookmarks(database) + for (const bm of previousBookmarks) { + bookmarks.delete(bm) + } + for (const bm of newBookmarks) { + bookmarks.add(bm) + } + if (typeof this._bookmarksConsumer === 'function') { + await this._bookmarksConsumer(database, [...bookmarks]) + } + } + + private _getOrInitializeBookmarks (database: string): Set { + let maybeBookmarks = this._bookmarksPerDb.get(database) + if (maybeBookmarks === undefined) { + maybeBookmarks = new Set() + this._bookmarksPerDb.set(database, maybeBookmarks) + } + return maybeBookmarks + } + + async getBookmarks (database: string): Promise> { + const bookmarks = new Set(this._bookmarksPerDb.get(database)) + + if (typeof this._bookmarksSupplier === 'function') { + const suppliedBookmarks = await this._bookmarksSupplier(database) ?? [] + for (const bm of suppliedBookmarks) { + bookmarks.add(bm) + } + } + + return [...bookmarks] + } + + async getAllBookmarks (): Promise> { + const bookmarks = new Set() + + for (const [, dbBookmarks] of this._bookmarksPerDb) { + for (const bm of dbBookmarks) { + bookmarks.add(bm) + } + } + if (typeof this._bookmarksSupplier === 'function') { + const suppliedBookmarks = await this._bookmarksSupplier() ?? [] + for (const bm of suppliedBookmarks) { + bookmarks.add(bm) + } + } + + return bookmarks + } + + async forget (databases: Iterable): Promise { + for (const database of databases) { + this._bookmarksPerDb.delete(database) + } + } +} diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index 241115e8d..3c07d3c44 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -40,6 +40,7 @@ import { SessionMode } from './types' import { ServerAddress } from './internal/server-address' +import BookmarkManager from './bookmark-manager' const DEFAULT_MAX_CONNECTION_LIFETIME: number = 60 * 60 * 1000 // 1 hour @@ -87,6 +88,7 @@ type CreateSession = (args: { reactive: boolean fetchSize: number impersonatedUser?: string + bookmarkManager?: BookmarkManager }) => Session interface DriverConfig { @@ -96,6 +98,101 @@ interface DriverConfig { logging?: LoggingConfig } +/** + * The session configuration + * + * @interface + */ +class SessionConfig { + defaultAccessMode?: SessionMode + bookmarks?: string | string[] + database?: string + impersonatedUser?: string + fetchSize?: number + bookmarkManager?: BookmarkManager + + /** + * @constructor + * @private + */ + constructor () { + /** + * The access mode of this session, allowed values are {@link READ} and {@link WRITE}. + * **Default**: {@link WRITE} + * @type {string} + */ + this.defaultAccessMode = WRITE + /** + * The initial reference or references to some previous + * transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown. + * @type {string|string[]|undefined} + */ + this.bookmarks = [] + + /** + * The database this session will operate on. + * + * @type {string|undefined} + */ + this.database = '' + + /** + * The username which the user wants to impersonate for the duration of the session. + * + * @type {string|undefined} + */ + this.impersonatedUser = undefined + + /** + * The record fetch size of each batch of this session. + * + * Use {@link FETCH_ALL} to always pull all records in one batch. This will override the config value set on driver config. + * + * @type {number|undefined} + */ + this.fetchSize = undefined + /** + * Configure a BookmarkManager for the session to use + * + * A BookmarkManager is a piece of software responsible for keeping casual consistency between different sessions by sharing bookmarks + * between the them. + * Enabling it is done by supplying an BookmarkManager implementation instance to this param. + * A default implementation could be acquired by calling the factory function {@link bookmarkManager}. + * + * **Warning**: Share the same BookmarkManager instance accross all session can have a negative impact + * on performance since all the queries will wait for the latest changes being propagated across the cluster. + * For keeping consistency between a group of queries, use {@link Session} for grouping them. + * For keeping consistency between a group of sessions, use {@link BookmarkManager} instance for groupping them. + * + * @example + * const bookmarkManager = neo4j.bookmarkManager() + * const linkedSession1 = driver.session({ database:'neo4j', bookmarkManager }) + * const linkedSession2 = driver.session({ database:'neo4j', bookmarkManager }) + * const unlinkedSession = driver.session({ database:'neo4j' }) + * + * // Creating Driver User + * const createUserQueryResult = await linkedSession1.run('CREATE (p:Person {name: $name})', { name: 'Driver User'}) + * + * // Reading Driver User will *NOT* wait of the changes being propagated to the server before RUN the query + * // So the 'Driver User' person might not exist in the Result + * const unlinkedReadResult = await unlinkedSession.run('CREATE (p:Person {name: $name}) RETURN p', { name: 'Driver User'}) + * + * // Reading Driver User will wait of the changes being propagated to the server before RUN the query + * // So the 'Driver User' person should exist in the Result, unless deleted. + * const linkedSesssion2 = await linkedSession2.run('CREATE (p:Person {name: $name}) RETURN p', { name: 'Driver User'}) + * + * await linkedSession1.close() + * await linkedSession2.close() + * await unlinkedSession.close() + * + * @experimental + * @type {BookmarkManager|undefined} + * @since 5.0 + */ + this.bookmarkManager = undefined + } +} + /** * A driver maintains one or more {@link Session}s with a remote * Neo4j instance. Through the {@link Session}s you can send queries @@ -282,14 +379,7 @@ class Driver { * pool and made available for others to use. * * @public - * @param {Object} param - The object parameter - * @param {string} param.defaultAccessMode=WRITE - The access mode of this session, allowed values are {@link READ} and {@link WRITE}. - * @param {string|string[]} param.bookmarks - The initial reference or references to some previous - * transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown. - * @param {number} param.fetchSize - The record fetch size of each batch of this session. - * Use {@link FETCH_ALL} to always pull all records in one batch. This will override the config value set on driver config. - * @param {string} param.database - The database this session will operate on. - * @param {string} param.impersonatedUser - The username which the user wants to impersonate for the duration of the session. + * @param {SessionConfig} param - The session configuration * @return {Session} new session. */ session ({ @@ -297,14 +387,9 @@ class Driver { bookmarks: bookmarkOrBookmarks, database = '', impersonatedUser, - fetchSize - }: { - defaultAccessMode?: SessionMode - bookmarks?: string | string[] - database?: string - impersonatedUser?: string - fetchSize?: number - } = {}): Session { + fetchSize, + bookmarkManager + }: SessionConfig = {}): Session { return this._newSession({ defaultAccessMode, bookmarkOrBookmarks, @@ -312,7 +397,8 @@ class Driver { reactive: false, impersonatedUser, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize!) + fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize!), + bookmarkManager }) } @@ -348,7 +434,8 @@ class Driver { database, reactive, impersonatedUser, - fetchSize + fetchSize, + bookmarkManager }: { defaultAccessMode: SessionMode bookmarkOrBookmarks?: string | string[] @@ -356,12 +443,14 @@ class Driver { reactive: boolean impersonatedUser?: string fetchSize: number + bookmarkManager?: BookmarkManager }): Session { const sessionMode = Session._validateSessionMode(defaultAccessMode) const connectionProvider = this._getOrCreateConnectionProvider() const bookmarks = bookmarkOrBookmarks != null ? new Bookmarks(bookmarkOrBookmarks) : Bookmarks.empty() + return this._createSession({ mode: sessionMode, database: database ?? '', @@ -370,7 +459,8 @@ class Driver { config: this._config, reactive, impersonatedUser, - fetchSize + fetchSize, + bookmarkManager }) } @@ -473,7 +563,7 @@ function validateFetchSizeValue ( /** * @private */ -function extractConnectionTimeout (config: any): number|null { +function extractConnectionTimeout (config: any): number | null { const configuredTimeout = parseInt(config.connectionTimeout, 10) if (configuredTimeout === 0) { // timeout explicitly configured to 0 @@ -500,5 +590,5 @@ function createHostNameResolver (config: any): ConfiguredCustomResolver { } export { Driver, READ, WRITE } - +export type { SessionConfig } export default Driver diff --git a/packages/core/src/error.ts b/packages/core/src/error.ts index d1212acc3..7b565fee6 100644 --- a/packages/core/src/error.ts +++ b/packages/core/src/error.ts @@ -71,7 +71,8 @@ class Neo4jError extends Error { * @param {string} code - Optional error code. Will be populated when error originates in the database. */ constructor (message: string, code: Neo4jErrorCode, cause?: Error) { - // @ts-expect-error + // eslint-disable-next-line + // @ts-ignore: not available in ES6 yet super(message, cause != null ? { cause } : undefined) this.constructor = Neo4jError // eslint-disable-next-line no-proto diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 47cf53a9b..8f0bb0ff1 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -75,6 +75,8 @@ import TransactionPromise from './transaction-promise' import Session, { TransactionConfig } from './session' import Driver, * as driver from './driver' import auth from './auth' +import BookmarkManager, { BookmarkManagerConfig, bookmarkManager } from './bookmark-manager' +import { SessionConfig } from './driver' import * as types from './types' import * as json from './json' import * as internal from './internal' // todo: removed afterwards @@ -146,7 +148,8 @@ const forExport = { types, driver, json, - auth + auth, + bookmarkManager } export { @@ -205,7 +208,8 @@ export { types, driver, json, - auth + auth, + bookmarkManager } export type { @@ -214,7 +218,10 @@ export type { NotificationPosition, QueryResult, ResultObserver, - TransactionConfig + TransactionConfig, + BookmarkManager, + BookmarkManagerConfig, + SessionConfig } export default forExport diff --git a/packages/core/src/internal/bookmarks.ts b/packages/core/src/internal/bookmarks.ts index 11524191a..411ee7cae 100644 --- a/packages/core/src/internal/bookmarks.ts +++ b/packages/core/src/internal/bookmarks.ts @@ -28,7 +28,7 @@ export class Bookmarks { * @constructor * @param {string|string[]} values single bookmark as string or multiple bookmarks as a string array. */ - constructor (values?: string | string[] | string[] | null) { + constructor (values?: string | string[] | null) { this._values = asStringArray(values) } @@ -52,6 +52,10 @@ export class Bookmarks { return this._values } + [Symbol.iterator] (): IterableIterator { + return this._values[Symbol.iterator]() + } + /** * Get these bookmarks as an object for begin transaction call. * @return {Object} the value of this bookmarks holder as object. @@ -79,7 +83,7 @@ const EMPTY_BOOKMARK = new Bookmarks(null) * @return {string[]} value converted to an array. */ function asStringArray ( - value?: string | string[] | string[] | null + value?: string | string[] | null ): string[] { if (value == null || value === '') { return [] @@ -90,7 +94,7 @@ function asStringArray ( } if (Array.isArray(value)) { - const result = [] + const result = new Set() const flattenedValue = flattenArray(value) for (let i = 0; i < flattenedValue.length; i++) { const element = flattenedValue[i] @@ -102,10 +106,10 @@ function asStringArray ( `Bookmark value should be a string, given: '${element}'` ) } - result.push(element) + result.add(element) } } - return result + return [...result] } throw new TypeError( diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 6b608fedf..17b5690d1 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -83,6 +83,7 @@ class ConnectionHolder implements ConnectionHolderInterface { private _referenceCount: number private _connectionPromise: Promise private readonly _impersonatedUser?: string + private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void /** @@ -94,6 +95,7 @@ class ConnectionHolder implements ConnectionHolderInterface { * @property {ConnectionProvider} params.connectionProvider - the connection provider to acquire connections from. * @property {string?} params.impersonatedUser - the user which will be impersonated * @property {function(databaseName:string)} params.onDatabaseNameResolved - callback called when the database name is resolved + * @property {function():Promise} params.getConnectionAcquistionBookmarks - called for getting Bookmarks for acquiring connections */ constructor ({ mode = ACCESS_MODE_WRITE, @@ -101,7 +103,8 @@ class ConnectionHolder implements ConnectionHolderInterface { bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved + onDatabaseNameResolved, + getConnectionAcquistionBookmarks }: { mode?: string database?: string @@ -109,6 +112,7 @@ class ConnectionHolder implements ConnectionHolderInterface { connectionProvider?: ConnectionProvider impersonatedUser?: string onDatabaseNameResolved?: (databaseName?: string) => void + getConnectionAcquistionBookmarks?: () => Promise } = {}) { this._mode = mode this._database = database != null ? assertString(database, 'database') : '' @@ -118,6 +122,7 @@ class ConnectionHolder implements ConnectionHolderInterface { this._referenceCount = 0 this._connectionPromise = Promise.resolve(null) this._onDatabaseNameResolved = onDatabaseNameResolved + this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } mode (): string | undefined { @@ -146,13 +151,7 @@ class ConnectionHolder implements ConnectionHolderInterface { initializeConnection (): boolean { if (this._referenceCount === 0 && (this._connectionProvider != null)) { - this._connectionPromise = this._connectionProvider.acquireConnection({ - accessMode: this._mode, - database: this._database, - bookmarks: this._bookmarks, - impersonatedUser: this._impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved - }) + this._connectionPromise = this._createConnectionPromise(this._connectionProvider) } else { this._referenceCount++ return false @@ -161,6 +160,20 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } + private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + return await connectionProvider.acquireConnection({ + accessMode: this._mode, + database: this._database, + bookmarks: await this._getBookmarks(), + impersonatedUser: this._impersonatedUser, + onDatabaseNameResolved: this._onDatabaseNameResolved + }) + } + + private async _getBookmarks (): Promise { + return await this._getConnectionAcquistionBookmarks() + } + getConnection (): Promise { return this._connectionPromise } @@ -230,6 +243,8 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { mode: connectionHolder.mode(), database: connectionHolder.database(), bookmarks: connectionHolder.bookmarks(), + // @ts-expect-error + getConnectionAcquistionBookmarks: connectionHolder._getConnectionAcquistionBookmarks, connectionProvider: connectionHolder.connectionProvider() }) this._connectionHolder = connectionHolder diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index f785c120a..dfa588d3f 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -35,8 +35,9 @@ import Connection from './connection' import { NumberOrInteger } from './graph-types' import TransactionPromise from './transaction-promise' import ManagedTransaction from './transaction-managed' +import BookmarkManager from './bookmark-manager' -type ConnectionConsumer = (connection: Connection | null) => any | undefined +type ConnectionConsumer = (connection: Connection | null) => any | undefined | Promise | Promise type TransactionWork = (tx: Transaction) => Promise | T type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T @@ -62,13 +63,14 @@ class Session { private _open: boolean private _hasTx: boolean private _lastBookmarks: Bookmarks + private _configuredBookmarks: Bookmarks private readonly _transactionExecutor: TransactionExecutor private readonly _impersonatedUser?: string - private readonly _onComplete: (meta: any) => void private _databaseNameResolved: boolean private readonly _lowRecordWatermark: number private readonly _highRecordWatermark: number private readonly _results: Result[] + private readonly _bookmarkManager?: BookmarkManager /** * @constructor * @protected @@ -90,7 +92,8 @@ class Session { config, reactive, fetchSize, - impersonatedUser + impersonatedUser, + bookmarkManager }: { mode: SessionMode connectionProvider: ConnectionProvider @@ -100,19 +103,22 @@ class Session { reactive: boolean fetchSize: number impersonatedUser?: string + bookmarkManager?: BookmarkManager }) { this._mode = mode this._database = database this._reactive = reactive this._fetchSize = fetchSize this._onDatabaseNameResolved = this._onDatabaseNameResolved.bind(this) + this._getConnectionAcquistionBookmarks = this._getConnectionAcquistionBookmarks.bind(this) this._readConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_READ, database, bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved + onDatabaseNameResolved: this._onDatabaseNameResolved, + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks }) this._writeConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_WRITE, @@ -120,19 +126,21 @@ class Session { bookmarks, connectionProvider, impersonatedUser, - onDatabaseNameResolved: this._onDatabaseNameResolved + onDatabaseNameResolved: this._onDatabaseNameResolved, + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks }) this._open = true this._hasTx = false this._impersonatedUser = impersonatedUser this._lastBookmarks = bookmarks ?? Bookmarks.empty() + this._configuredBookmarks = this._lastBookmarks this._transactionExecutor = _createTransactionExecutor(config) - this._onComplete = this._onCompleteCallback.bind(this) this._databaseNameResolved = this._database !== '' const calculatedWatermaks = this._calculateWatermaks() this._lowRecordWatermark = calculatedWatermaks.low this._highRecordWatermark = calculatedWatermaks.high this._results = [] + this._bookmarkManager = bookmarkManager } /** @@ -159,15 +167,16 @@ class Session { ? new TxConfig(transactionConfig) : TxConfig.empty() - const result = this._run(validatedQuery, params, connection => { + const result = this._run(validatedQuery, params, async connection => { + const bookmarks = await this._bookmarks() this._assertSessionIsOpen() return (connection as Connection).protocol().run(validatedQuery, params, { - bookmarks: this._lastBookmarks, + bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, database: this._database, impersonatedUser: this._impersonatedUser, - afterComplete: this._onComplete, + afterComplete: (meta: any) => this._onCompleteCallback(meta, bookmarks), reactive: this._reactive, fetchSize: this._fetchSize, lowRecordWatermark: this._lowRecordWatermark, @@ -283,14 +292,14 @@ class Session { connectionHolder, impersonatedUser: this._impersonatedUser, onClose: this._transactionClosed.bind(this), - onBookmarks: this._updateBookmarks.bind(this), + onBookmarks: (newBm, oldBm, db) => this._updateBookmarks(newBm, oldBm, db), onConnection: this._assertSessionIsOpen.bind(this), reactive: this._reactive, fetchSize: this._fetchSize, lowRecordWatermark: this._lowRecordWatermark, highRecordWatermark: this._highRecordWatermark }) - tx._begin(this._lastBookmarks, txConfig) + tx._begin(() => this._bookmarks(), txConfig) return tx } @@ -332,6 +341,14 @@ class Session { return this._lastBookmarks.values() } + private async _bookmarks (): Promise { + const bookmarks = await this._bookmarkManager?.getAllBookmarks() + if (bookmarks === undefined) { + return this._lastBookmarks + } + return new Bookmarks([...bookmarks, ...this._configuredBookmarks]) + } + /** * Execute given unit of work in a {@link READ} transaction. * @@ -470,15 +487,29 @@ class Session { } } + private async _getConnectionAcquistionBookmarks (): Promise { + const bookmarks = await this._bookmarkManager?.getBookmarks('system') + if (bookmarks === undefined) { + return this._lastBookmarks + } + return new Bookmarks([...this._configuredBookmarks, ...bookmarks]) + } + /** * Update value of the last bookmarks. * @private * @param {Bookmarks} newBookmarks - The new bookmarks. * @returns {void} */ - _updateBookmarks (newBookmarks?: Bookmarks): void { + _updateBookmarks (newBookmarks?: Bookmarks, previousBookmarks?: Bookmarks, database?: string): void { if ((newBookmarks != null) && !newBookmarks.isEmpty()) { + this._bookmarkManager?.updateBookmarks( + database ?? this._database, + previousBookmarks?.values() ?? [], + newBookmarks?.values() ?? [] + ) this._lastBookmarks = newBookmarks + this._configuredBookmarks = Bookmarks.empty() } } @@ -514,8 +545,8 @@ class Session { * @param {Object} meta Connection metadatada * @returns {void} */ - _onCompleteCallback (meta: { bookmark: string | string[] }): void { - this._updateBookmarks(new Bookmarks(meta.bookmark)) + _onCompleteCallback (meta: { bookmark: string | string[], db?: string }, previousBookmarks?: Bookmarks): void { + this._updateBookmarks(new Bookmarks(meta.bookmark), previousBookmarks, meta.db) } /** diff --git a/packages/core/src/transaction-promise.ts b/packages/core/src/transaction-promise.ts index 344755e82..04798195c 100644 --- a/packages/core/src/transaction-promise.ts +++ b/packages/core/src/transaction-promise.ts @@ -69,7 +69,7 @@ class TransactionPromise extends Transaction implements Promise { }: { connectionHolder: ConnectionHolder onClose: () => void - onBookmarks: (bookmarks: Bookmarks) => void + onBookmarks: (newBookmarks: Bookmarks, previousBookmarks: Bookmarks, database?: string) => void onConnection: () => void reactive: boolean fetchSize: number @@ -162,7 +162,7 @@ class TransactionPromise extends Transaction implements Promise { /** * @access private */ - _begin (bookmarks: string | Bookmarks | string[], txConfig: TxConfig): void { + _begin (bookmarks: () => Promise, txConfig: TxConfig): void { return super._begin(bookmarks, txConfig, { onError: this._onBeginError.bind(this), onComplete: this._onBeginMetadata.bind(this) diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index d22876bf5..45d3c9ba9 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -48,15 +48,18 @@ class Transaction { private readonly _reactive: boolean private _state: any private readonly _onClose: () => void - private readonly _onBookmarks: (bookmarks: Bookmarks) => void + private readonly _onBookmarks: (newBookmarks: Bookmarks, previousBookmarks: Bookmarks, database?: string) => void private readonly _onConnection: () => void private readonly _onError: (error: Error) => Promise - private readonly _onComplete: (metadata: any) => void + private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void private readonly _fetchSize: number private readonly _results: any[] private readonly _impersonatedUser?: string private readonly _lowRecordWatermak: number private readonly _highRecordWatermark: number + private _bookmarks: Bookmarks + private readonly _activePromise: Promise + private _acceptActive: () => void /** * @constructor @@ -84,7 +87,7 @@ class Transaction { }: { connectionHolder: ConnectionHolder onClose: () => void - onBookmarks: (bookmarks: Bookmarks) => void + onBookmarks: (newBookmarks: Bookmarks, previousBookmarks: Bookmarks, database?: string) => void onConnection: () => void reactive: boolean fetchSize: number @@ -99,12 +102,16 @@ class Transaction { this._onBookmarks = onBookmarks this._onConnection = onConnection this._onError = this._onErrorCallback.bind(this) - this._onComplete = this._onCompleteCallback.bind(this) this._fetchSize = fetchSize + this._onComplete = this._onCompleteCallback.bind(this) this._results = [] this._impersonatedUser = impersonatedUser this._lowRecordWatermak = lowRecordWatermark this._highRecordWatermark = highRecordWatermark + this._bookmarks = Bookmarks.empty() + this._activePromise = new Promise((resolve, reject) => { + this._acceptActive = resolve + }) } /** @@ -113,17 +120,18 @@ class Transaction { * @param {TxConfig} txConfig * @returns {void} */ - _begin (bookmarks: Bookmarks | string | string[], txConfig: TxConfig, events?: { + _begin (getBookmarks: () => Promise, txConfig: TxConfig, events?: { onError: (error: Error) => void onComplete: (metadata: any) => void }): void { this._connectionHolder .getConnection() - .then(connection => { + .then(async connection => { this._onConnection() if (connection != null) { + this._bookmarks = await getBookmarks() return connection.protocol().beginTransaction({ - bookmarks: bookmarks, + bookmarks: this._bookmarks, txConfig: txConfig, mode: this._connectionHolder.mode(), database: this._connectionHolder.database(), @@ -151,6 +159,10 @@ class Transaction { } this._onError(error).catch(() => {}) }) + // It should make the transaction active anyway + // further errors will be treated by the existing + // observers + .finally(() => this._acceptActive()) } /** @@ -175,7 +187,8 @@ class Transaction { reactive: this._reactive, fetchSize: this._fetchSize, highRecordWatermark: this._highRecordWatermark, - lowRecordWatermark: this._lowRecordWatermak + lowRecordWatermark: this._lowRecordWatermak, + preparationJob: this._activePromise }) this._results.push(result) return result @@ -192,9 +205,10 @@ class Transaction { const committed = this._state.commit({ connectionHolder: this._connectionHolder, onError: this._onError, - onComplete: this._onComplete, + onComplete: (meta: any) => this._onCompleteCallback(meta, this._bookmarks), onConnection: this._onConnection, - pendingResults: this._results + pendingResults: this._results, + preparationJob: this._activePromise }) this._state = committed.state // clean up @@ -221,7 +235,8 @@ class Transaction { onError: this._onError, onComplete: this._onComplete, onConnection: this._onConnection, - pendingResults: this._results + pendingResults: this._results, + preparationJob: this._activePromise }) this._state = rolledback.state // clean up @@ -271,8 +286,8 @@ class Transaction { * @param {object} meta The meta with bookmarks * @returns {void} */ - _onCompleteCallback (meta: { bookmark?: string | string[] }): void { - this._onBookmarks(new Bookmarks(meta.bookmark)) + _onCompleteCallback (meta: { bookmark?: string | string[], db?: string }, previousBookmarks?: Bookmarks): void { + this._onBookmarks(new Bookmarks(meta?.bookmark), previousBookmarks ?? Bookmarks.empty(), meta?.db) } } @@ -290,6 +305,7 @@ interface StateTransitionParams { fetchSize: number highRecordWatermark: number lowRecordWatermark: number + preparationJob?: Promise } const _states = { @@ -300,7 +316,8 @@ const _states = { onError, onComplete, onConnection, - pendingResults + pendingResults, + preparationJob }: StateTransitionParams): any => { return { result: finishTransaction( @@ -309,7 +326,8 @@ const _states = { onError, onComplete, onConnection, - pendingResults + pendingResults, + preparationJob ), state: _states.SUCCEEDED } @@ -319,7 +337,8 @@ const _states = { onError, onComplete, onConnection, - pendingResults + pendingResults, + preparationJob }: StateTransitionParams): any => { return { result: finishTransaction( @@ -328,7 +347,8 @@ const _states = { onError, onComplete, onConnection, - pendingResults + pendingResults, + preparationJob ), state: _states.ROLLED_BACK } @@ -344,31 +364,35 @@ const _states = { reactive, fetchSize, highRecordWatermark, - lowRecordWatermark + lowRecordWatermark, + preparationJob }: StateTransitionParams ): any => { // RUN in explicit transaction can't contain bookmarks and transaction configuration - // No need to include mode and database name as it shall be inclued in begin - const observerPromise = connectionHolder - .getConnection() - .then(conn => { - onConnection() - if (conn != null) { - return conn.protocol().run(query, parameters, { - bookmarks: Bookmarks.empty(), - txConfig: TxConfig.empty(), - beforeError: onError, - afterComplete: onComplete, - reactive: reactive, - fetchSize: fetchSize, - highRecordWatermark: highRecordWatermark, - lowRecordWatermark: lowRecordWatermark - }) - } else { - throw newError('No connection available') - } - }) - .catch(error => new FailedObserver({ error, onError })) + // No need to include mode and database name as it shall be included in begin + const requirements = preparationJob ?? Promise.resolve() + + const observerPromise = + connectionHolder.getConnection() + .then(conn => requirements.then(() => conn)) + .then(conn => { + onConnection() + if (conn != null) { + return conn.protocol().run(query, parameters, { + bookmarks: Bookmarks.empty(), + txConfig: TxConfig.empty(), + beforeError: onError, + afterComplete: onComplete, + reactive: reactive, + fetchSize: fetchSize, + highRecordWatermark: highRecordWatermark, + lowRecordWatermark: lowRecordWatermark + }) + } else { + throw newError('No connection available') + } + }) + .catch(error => new FailedObserver({ error, onError })) return newCompletedResult( observerPromise, @@ -595,32 +619,36 @@ function finishTransaction ( onError: (err: Error) => any, onComplete: (metadata: any) => any, onConnection: () => any, - pendingResults: Result[] + pendingResults: Result[], + preparationJob?: Promise ): Result { - const observerPromise = connectionHolder - .getConnection() - .then(connection => { - onConnection() - pendingResults.forEach(r => r._cancel()) - return Promise.all(pendingResults.map(result => result.summary())).then(results => { - if (connection != null) { - if (commit) { - return connection.protocol().commitTransaction({ - beforeError: onError, - afterComplete: onComplete - }) + const requirements = preparationJob ?? Promise.resolve() + + const observerPromise = + connectionHolder.getConnection() + .then(conn => requirements.then(() => conn)) + .then(connection => { + onConnection() + pendingResults.forEach(r => r._cancel()) + return Promise.all(pendingResults.map(result => result.summary())).then(results => { + if (connection != null) { + if (commit) { + return connection.protocol().commitTransaction({ + beforeError: onError, + afterComplete: onComplete + }) + } else { + return connection.protocol().rollbackTransaction({ + beforeError: onError, + afterComplete: onComplete + }) + } } else { - return connection.protocol().rollbackTransaction({ - beforeError: onError, - afterComplete: onComplete - }) + throw newError('No connection available') } - } else { - throw newError('No connection available') - } + }) }) - }) - .catch(error => new FailedObserver({ error, onError })) + .catch(error => new FailedObserver({ error, onError })) // for commit & rollback we need result that uses real connection holder and notifies it when // connection is not needed and can be safely released to the pool diff --git a/packages/core/test/bookmark-manager.test.ts b/packages/core/test/bookmark-manager.test.ts new file mode 100644 index 000000000..0ff6ae73f --- /dev/null +++ b/packages/core/test/bookmark-manager.test.ts @@ -0,0 +1,292 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { + bookmarkManager +} from '../src/bookmark-manager' +import { installMatchers } from './utils/matchers' + +describe('BookmarkManager', () => { + const systemBookmarks = ['system:bm01', 'system:bm02'] + const neo4jBookmarks = ['neo4j:bm01', 'neo4j:bm02'] + + beforeAll(() => { + installMatchers() + }) + + describe('getBookmarks()', () => { + it('should return empty if db doesnt exists', async () => { + const manager = bookmarkManager({}) + + const bookmarks = await manager.getBookmarks('neo4j') + + expect(bookmarks).toEqual([]) + }) + + it('should return bookmarks for the given db', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const bookmarks = await manager.getBookmarks('neo4j') + + expect(bookmarks).toBeSortedEqual(neo4jBookmarks) + }) + + it('should return get bookmarks from bookmarkSupplier', async () => { + const extraBookmarks = ['neo4j:bm03', 'neo4j:bm04'] + + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier: async () => await Promise.resolve(extraBookmarks) + }) + + const bookmarks = await manager.getBookmarks('neo4j') + + expect(bookmarks).toBeSortedEqual([...neo4jBookmarks, ...extraBookmarks]) + }) + + it('should return not duplicate bookmarks if bookmarkSupplier returns existing bm', async () => { + const extraBookmarks = ['neo4j:bm03', 'neo4j:bm04'] + + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier: async () => await Promise.resolve([...extraBookmarks, ...neo4jBookmarks]) + }) + + const bookmarks = await manager.getBookmarks('neo4j') + + expect(bookmarks).toBeSortedEqual([...neo4jBookmarks, ...extraBookmarks]) + }) + + it('should return call from bookmarkSupplier with correct database', async () => { + const bookmarksSupplier = jest.fn() + + const manager = bookmarkManager({ + bookmarksSupplier + }) + + await manager.getBookmarks('neo4j') + + expect(bookmarksSupplier).toBeCalledWith('neo4j') + }) + }) + + describe('getAllBookmarks()', () => { + it('should return all bookmarks ', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const bookmarks = await manager.getAllBookmarks() + + expect(bookmarks).toBeSortedEqual([...neo4jBookmarks, ...systemBookmarks]) + }) + + it('should return empty if there are no bookmarks for any db', async () => { + const manager = bookmarkManager({}) + + const bookmarks = await manager.getAllBookmarks() + + expect(bookmarks).toBeSortedEqual([]) + }) + + it('should return enriched bookmarks list with supplied bookmarks', async () => { + const extraBookmarks = ['neo4j:bmextra', 'system:bmextra', 'adb:bmextra'] + const bookmarksSupplier = jest.fn(async (database?: string) => await Promise.resolve(extraBookmarks)) + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier + }) + + const bookmarks = await manager.getAllBookmarks() + + expect(bookmarks).toBeSortedEqual( + [...neo4jBookmarks, ...systemBookmarks, ...extraBookmarks] + ) + }) + + it('should return duplicate bookmarks if bookmarksSupplier returns already existing bm', async () => { + const extraBookmarks = ['neo4j:bmextra', 'system:bmextra', 'adb:bmextra'] + const bookmarksSupplier = jest.fn(async (database?: string) => await Promise.resolve([...extraBookmarks, ...systemBookmarks])) + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier + }) + + const bookmarks = await manager.getAllBookmarks() + + expect(bookmarks).toBeSortedEqual( + [...neo4jBookmarks, ...systemBookmarks, ...extraBookmarks] + ) + }) + + it('should call bookmarkSupplier for getting all bookmarks', async () => { + const bookmarksSupplier = jest.fn() + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier + }) + + await manager.getAllBookmarks() + + expect(bookmarksSupplier).toBeCalledWith() + }) + }) + + describe('updateBookmarks()', () => { + it('should remove previous bookmarks and new bookmarks for an existing db', async () => { + const newBookmarks = ['neo4j:bm03'] + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + await manager.updateBookmarks( + 'neo4j', + await manager.getAllBookmarks(), + newBookmarks + ) + + await expect(manager.getBookmarks('neo4j')).resolves.toBeSortedEqual(newBookmarks) + await expect(manager.getBookmarks('system')).resolves.toBeSortedEqual(systemBookmarks) + }) + + it('should not remove bookmarks not present in the original list', async () => { + const newBookmarks = ['neo4j:bm03'] + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const [bookmarkNotUsedInTx, ...bookmarksUsedInTx] = neo4jBookmarks + await manager.updateBookmarks( + 'neo4j', + bookmarksUsedInTx, + newBookmarks + ) + + await expect(manager.getBookmarks('neo4j')) + .resolves.toBeSortedEqual([bookmarkNotUsedInTx, ...newBookmarks]) + await expect(manager.getBookmarks('system')).resolves.toBeSortedEqual(systemBookmarks) + }) + + it('should add bookmarks to a non-existing database', async () => { + const newBookmarks = ['neo4j:bm03'] + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['system', systemBookmarks] + ]) + }) + + await manager.updateBookmarks( + 'neo4j', + [], + newBookmarks + ) + + await expect(manager.getBookmarks('neo4j')).resolves.toBeSortedEqual(newBookmarks) + await expect(manager.getBookmarks('system')).resolves.toBeSortedEqual(systemBookmarks) + }) + + it('should notify new bookmarks', async () => { + const bookmarksConsumer = jest.fn() + const newBookmarks = ['neo4j:bm03'] + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksConsumer + }) + + await manager.updateBookmarks( + 'neo4j', + await manager.getAllBookmarks(), + newBookmarks + ) + + expect(bookmarksConsumer).toBeCalledWith('neo4j', newBookmarks) + }) + }) + + describe('forget()', () => { + it('should forget database', async () => { + const extraBookmarks = ['system:bmextra', 'adb:bmextra'] + const bookmarksSupplier = jest.fn(async () => extraBookmarks) + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier + }) + + await manager.forget(['neo4j', 'adb']) + const bookmarks = await manager.getAllBookmarks() + + expect(bookmarks).toBeSortedEqual( + [...systemBookmarks, ...extraBookmarks] + ) + }) + + it('should forget what never reminded', async () => { + const extraBookmarks = ['system:bmextra', 'adb:bmextra'] + const bookmarksSupplier = jest.fn(async () => extraBookmarks) + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]), + bookmarksSupplier + }) + + await manager.forget(['unexisting-db']) + const bookmarks = await manager.getAllBookmarks() + + expect(bookmarks).toBeSortedEqual( + [...systemBookmarks, ...neo4jBookmarks, ...extraBookmarks] + ) + }) + }) +}) diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index c465167c8..c7c4c4b71 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -17,7 +17,7 @@ * limitations under the License. */ /* eslint-disable @typescript-eslint/promise-function-async */ -import { ConnectionProvider, newError, ServerInfo, Session } from '../src' +import { bookmarkManager, ConnectionProvider, newError, ServerInfo, Session } from '../src' import Driver, { READ } from '../src/driver' import { Bookmarks } from '../src/internal/bookmarks' import { Logger } from '../src/internal/logger' @@ -83,6 +83,78 @@ describe('Driver', () => { expect(session?.lastBookmarks()).toEqual(expectedBookmarks.values()) }) + + describe('when bookmark manager configured', () => { + it('should create session with bookmark manager when no bookmark set', async () => { + const manager = bookmarkManager() + const driver = new Driver( + META_INFO, + { ...CONFIG }, + mockCreateConnectonProvider(connectionProvider), + createSession + ) + + const session = driver.session({ bookmarkManager: manager }) + + try { + expect(createSession).toBeCalledWith(expect.objectContaining({ + bookmarkManager: manager, + bookmarks: Bookmarks.empty() + })) + } finally { + await session.close() + await driver.close() + } + }) + + it.each([ + [[], Bookmarks.empty()], + ['bookmark', new Bookmarks('bookmark')], + [['bookmark'], new Bookmarks(['bookmark'])], + [['bookmark1', 'bookmark2'], new Bookmarks(['bookmark1', 'bookmark2'])] + ])('should create session with bookmark manager when bookmark set', async (bookmarks, expectedBookmarks) => { + const manager = bookmarkManager() + const driver = new Driver( + META_INFO, + { ...CONFIG }, + mockCreateConnectonProvider(connectionProvider), + createSession + ) + + const session = driver.session({ bookmarks, bookmarkManager: manager }) + + try { + expect(createSession).toBeCalledWith(expect.objectContaining({ + bookmarkManager: manager, + bookmarks: expectedBookmarks + })) + } finally { + await session.close() + await driver.close() + } + }) + + it('should create session without bookmark manager when no bookmark manager is set', async () => { + const driver = new Driver( + META_INFO, + { ...CONFIG }, + mockCreateConnectonProvider(connectionProvider), + createSession + ) + + const session = driver.session() + + try { + expect(createSession).toBeCalledWith(expect.objectContaining({ + bookmarkManager: undefined, + bookmarks: Bookmarks.empty() + })) + } finally { + await session.close() + await driver.close() + } + }) + }) }) it.each([ diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 6a561be52..6d1d1f417 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -16,13 +16,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction } from '../src' +import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction, BookmarkManager, bookmarkManager } from '../src' import { bookmarks } from '../src/internal' import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' import ManagedTransaction from '../src/transaction-managed' import FakeConnection from './utils/connection.fake' describe('session', () => { + const systemBookmarks = ['sys:bm01', 'sys:bm02'] + const neo4jBookmarks = ['neo4j:bm01', 'neo4j:bm03'] + const customBookmarks = ['neo4j:bm02'] + it('close should return promise', done => { const connection = newFakeConnection() const session = newSessionWithConnection(connection) @@ -310,6 +314,199 @@ describe('session', () => { expect(tx).toBeDefined() }) + + it('should acquire connection with system bookmarks from the bookmark manager', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const connection = mockBeginWithSuccess(newFakeConnection()) + + const { session, connectionProvider } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + await session.beginTransaction() + + expect(connectionProvider.acquireConnection).toBeCalledWith( + expect.objectContaining({ bookmarks: new bookmarks.Bookmarks(systemBookmarks) }) + ) + }) + + it('should acquire connection with system bookmarks from the bookmark manager + lastBookmarks', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const connection = mockBeginWithSuccess(newFakeConnection()) + + const { session, connectionProvider } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + lastBookmarks: new bookmarks.Bookmarks(customBookmarks), + database: 'neo4j' + }) + + await session.beginTransaction() + + expect(connectionProvider.acquireConnection).toBeCalledWith( + expect.objectContaining({ bookmarks: new bookmarks.Bookmarks([...customBookmarks, ...systemBookmarks]) }) + ) + }) + + it.each([ + [[]], + [customBookmarks] + ])('should call getAllBookmarks for the relevant database', async (lastBookmarks) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const getAllBookmarksSpy = jest.spyOn(manager, 'getAllBookmarks') + + const connection = mockBeginWithSuccess(newFakeConnection()) + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j', + lastBookmarks: new bookmarks.Bookmarks(lastBookmarks) + }) + + await session.beginTransaction() + + expect(getAllBookmarksSpy).toBeCalledWith() + }) + + it.each([ + [[]], + [customBookmarks] + ])('should call begin query with getAllBookmarks + lastBookmarks', async (lastBookmarks) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const connection = mockBeginWithSuccess(newFakeConnection()) + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j', + lastBookmarks: new bookmarks.Bookmarks(lastBookmarks) + }) + + await session.beginTransaction() + + expect(connection.seenBeginTransaction[0]).toEqual([ + expect.objectContaining({ + bookmarks: new bookmarks.Bookmarks([...neo4jBookmarks, ...systemBookmarks, ...lastBookmarks]) + }) + ]) + }) + + it.each([ + [undefined], + ['neo4j'], + ['adb'] + ])('should not call updateBookmarks when server returns no bookmarks', async (metaDb) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const updateBookmarksSpy = jest.spyOn(manager, 'updateBookmarks') + + const connection = mockBeginWithSuccess(newFakeConnection()) + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + await session.beginTransaction() + + expect(updateBookmarksSpy).not.toBeCalled() + }) + }) + + describe('.commit()', () => { + it.each([ + [undefined, 'neo4j'], + ['neo4j', 'neo4j'], + ['adb', 'adb'] + ])('should call updateBookmarks when server returns non-empty bookmarks', async (metaDb, updateDb) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const updateBookmarksSpy = jest.spyOn(manager, 'updateBookmarks') + + const connection = mockCommitWithSuccess(mockBeginWithSuccess(newFakeConnection()), { db: metaDb, bookmark: customBookmarks }) + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + const tx = await session.beginTransaction() + await tx.commit() + + expect(updateBookmarksSpy).toBeCalledTimes(1) + expect(updateBookmarksSpy).toBeCalledWith(updateDb, [...neo4jBookmarks, ...systemBookmarks], customBookmarks) + }) + + it.each([ + [undefined], + ['neo4j'], + ['adb'] + ])('should not call updateBookmarks when server returns no bookmarks', async (metaDb) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const updateBookmarksSpy = jest.spyOn(manager, 'updateBookmarks') + + const connection = mockCommitWithSuccess(mockBeginWithSuccess(newFakeConnection()), { db: metaDb }) + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + const tx = await session.beginTransaction() + await tx.commit() + + expect(updateBookmarksSpy).not.toBeCalled() + }) }) describe.each([ @@ -349,6 +546,300 @@ describe('session', () => { expect(run).toHaveBeenCalledWith(query, params) }) }) + + describe('.run()', () => { + it('should acquire connection with system bookmarks from the bookmark manager', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const connection = newFakeConnection() + + const { session, connectionProvider } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + await session.run('query') + + expect(connectionProvider.acquireConnection).toBeCalledWith( + expect.objectContaining({ bookmarks: new bookmarks.Bookmarks(systemBookmarks) }) + ) + }) + + it('should acquire connection with system bookmarks from the bookmark manager + lastBookmarks', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const connection = newFakeConnection() + + const { session, connectionProvider } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + lastBookmarks: new bookmarks.Bookmarks(customBookmarks), + database: 'neo4j' + }) + + await session.run('query') + + expect(connectionProvider.acquireConnection).toBeCalledWith( + expect.objectContaining({ bookmarks: new bookmarks.Bookmarks([...customBookmarks, ...systemBookmarks]) }) + ) + }) + + it('should acquire connection with system bookmarks from the bookmark manager when bookmarks already updated', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const connection = newFakeConnection() + + const { session, connectionProvider } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + lastBookmarks: new bookmarks.Bookmarks(customBookmarks), + database: 'neo4j' + }) + + await session.run('query') + const { afterComplete } = connection.seenProtocolOptions[0] + afterComplete({ db: 'neo4j', bookmark: ['other-bookmark'] }) + + await session.run('query') + + expect(connectionProvider.acquireConnection).toHaveBeenNthCalledWith(2, + expect.objectContaining({ bookmarks: new bookmarks.Bookmarks(systemBookmarks) })) + }) + + it('should acquire connection with system bookmarks from the bookmark manager + lastBookmarks when bookmarks not updated', async () => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const connection = newFakeConnection() + + const { session, connectionProvider } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + lastBookmarks: new bookmarks.Bookmarks(customBookmarks), + database: 'neo4j' + }) + + await session.run('query') + const { afterComplete } = connection.seenProtocolOptions[0] + afterComplete({ db: 'neo4j', bookmark: [] }) + + await session.run('query') + + expect(connectionProvider.acquireConnection).toHaveBeenNthCalledWith(2, + expect.objectContaining({ bookmarks: new bookmarks.Bookmarks([...customBookmarks, ...systemBookmarks]) }) + ) + }) + + it.each([ + [[]], + [customBookmarks] + ])('should call getAllBookmarks for the relevant database', async (lastBookmarks) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + const getAllBookmarksSpy = jest.spyOn(manager, 'getAllBookmarks') + + const connection = newFakeConnection() + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j', + lastBookmarks: new bookmarks.Bookmarks(lastBookmarks) + }) + + await session.run('query') + + expect(getAllBookmarksSpy).toBeCalledWith() + }) + + it.each([ + [[]], + [customBookmarks] + ])('should call run query with getAllBookmarks + lastBookmarks', async (lastBookmarks) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const connection = newFakeConnection() + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j', + lastBookmarks: new bookmarks.Bookmarks(lastBookmarks) + }) + + await session.run('query') + + expect(connection.seenProtocolOptions[0]).toEqual( + expect.objectContaining({ + bookmarks: new bookmarks.Bookmarks([...neo4jBookmarks, ...systemBookmarks, ...lastBookmarks]) + }) + ) + }) + + it.each([ + [[]], + [customBookmarks] + ])('should call run query with getAllBookmarks + lastBookmarks when bookmarks not updated', async (lastBookmarks) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const connection = newFakeConnection() + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j', + lastBookmarks: new bookmarks.Bookmarks(lastBookmarks) + }) + + await session.run('query') + const { afterComplete } = connection.seenProtocolOptions[0] + afterComplete({ db: 'neo4j', bookmark: [] }) + + await session.run('query') + + expect(connection.seenProtocolOptions[1]).toEqual( + expect.objectContaining({ + bookmarks: new bookmarks.Bookmarks([...neo4jBookmarks, ...systemBookmarks, ...lastBookmarks]) + }) + ) + }) + + it.each([ + [[]], + [customBookmarks] + ])('should call run query with getAllBookmarks when bookmarks updated', async (lastBookmarks) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const connection = newFakeConnection() + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j', + lastBookmarks: new bookmarks.Bookmarks(lastBookmarks) + }) + + await session.run('query') + const { afterComplete } = connection.seenProtocolOptions[0] + afterComplete({ db: 'neo4j', bookmark: 'abc' }) + await manager.updateBookmarks('neo4j', ['abc'], neo4jBookmarks) + + await session.run('query') + + expect(connection.seenProtocolOptions[1]).toEqual( + expect.objectContaining({ + bookmarks: new bookmarks.Bookmarks([...neo4jBookmarks, ...systemBookmarks]) + }) + ) + }) + + it.each([ + [undefined, 'neo4j'], + ['neo4j', 'neo4j'], + ['adb', 'adb'] + ])('should call updateBookmarks when server returns non-empty bookmarks', async (metaDb, updateDb) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const updateBookmarksSpy = jest.spyOn(manager, 'updateBookmarks') + + const connection = newFakeConnection() + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + await session.run('query') + + const { afterComplete } = connection.seenProtocolOptions[0] + + afterComplete({ db: metaDb, bookmark: customBookmarks }) + + expect(updateBookmarksSpy).toBeCalledWith(updateDb, [...neo4jBookmarks, ...systemBookmarks], customBookmarks) + }) + + it.each([ + [undefined], + ['neo4j'], + ['adb'] + ])('should not call updateBookmarks when server returns no bookmarks', async (metaDb) => { + const manager = bookmarkManager({ + initialBookmarks: new Map([ + ['neo4j', neo4jBookmarks], + ['system', systemBookmarks] + ]) + }) + + const updateBookmarksSpy = jest.spyOn(manager, 'updateBookmarks') + + const connection = newFakeConnection() + + const { session } = setupSession({ + connection, + bookmarkManager: manager, + beginTx: false, + database: 'neo4j' + }) + + await session.run('query') + + const { afterComplete } = connection.seenProtocolOptions[0] + + afterComplete({ db: metaDb }) + + expect(updateBookmarksSpy).not.toBeCalled() + }) + }) }) function mockBeginWithSuccess (connection: FakeConnection): FakeConnection { @@ -356,7 +847,8 @@ function mockBeginWithSuccess (connection: FakeConnection): FakeConnection { connection.protocol = () => { return { ...protocol, - beginTransaction: (params: { afterComplete: () => {}}) => { + beginTransaction: (params: { afterComplete: () => {}}, ...args: any[]) => { + protocol.beginTransaction([params, ...args]) params.afterComplete() } } @@ -364,30 +856,68 @@ function mockBeginWithSuccess (connection: FakeConnection): FakeConnection { return connection } +function mockCommitWithSuccess (connection: FakeConnection, metadata: any): FakeConnection { + const protocol = connection.protocol() + connection.protocol = () => { + return { + ...protocol, + commitTransaction: (params: { afterComplete: (metadata: any) => {}}, ...args: any[]) => { + const observer = protocol.commitTransaction(...[params, ...args]) + params.afterComplete(metadata) + return observer + } + } + } + return connection +} + function newSessionWithConnection ( connection: Connection, beginTx: boolean = true, fetchSize: number = 1000, - lastBookmarks: bookmarks.Bookmarks = bookmarks.Bookmarks.empty() + lastBookmarks: bookmarks.Bookmarks = bookmarks.Bookmarks.empty(), + bookmarkManager?: BookmarkManager ): Session { + const { session } = setupSession({ + connection, beginTx, fetchSize, lastBookmarks, bookmarkManager + }) + return session +} + +function setupSession ({ + connection, + beginTx = true, + fetchSize = 1000, + database = '', + lastBookmarks = bookmarks.Bookmarks.empty(), + bookmarkManager +}: { + connection: Connection + beginTx?: boolean + fetchSize?: number + lastBookmarks?: bookmarks.Bookmarks + database?: string + bookmarkManager?: BookmarkManager +}): { session: Session, connectionProvider: ConnectionProvider } { const connectionProvider = new ConnectionProvider() - connectionProvider.acquireConnection = async () => await Promise.resolve(connection) + connectionProvider.acquireConnection = jest.fn(async () => await Promise.resolve(connection)) connectionProvider.close = async () => await Promise.resolve() const session = new Session({ mode: ACCESS_MODE_READ, connectionProvider, - database: '', + database, fetchSize, config: {}, reactive: false, - bookmarks: lastBookmarks + bookmarks: lastBookmarks, + bookmarkManager }) if (beginTx) { session.beginTransaction().catch(e => {}) // force session to acquire new connection } - return session + return { session, connectionProvider } } function newFakeConnection (): FakeConnection { diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts index 9e61a5a3d..d61459ff3 100644 --- a/packages/core/test/transaction.test.ts +++ b/packages/core/test/transaction.test.ts @@ -151,7 +151,7 @@ testTx('TransactionPromise', newTransactionPromise, () => { connection }) - tx._begin(Bookmarks.empty(), TxConfig.empty()) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) return [tx] } @@ -266,7 +266,7 @@ testTx('TransactionPromise', newTransactionPromise, () => { connection }) - tx._begin(Bookmarks.empty(), TxConfig.empty()) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) return [tx, expectedError] } }) @@ -280,7 +280,7 @@ testTx('TransactionPromise', newTransactionPromise, () => { connection: undefined }) - tx._begin(Bookmarks.empty(), TxConfig.empty()) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) try { await tx @@ -300,7 +300,7 @@ testTx('TransactionPromise', newTransactionPromise, () => { errorResolvingConnection: expectedError }) - tx._begin(Bookmarks.empty(), TxConfig.empty()) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) try { await tx @@ -325,6 +325,8 @@ function testTx (transactionName: string, newTransaction: lowRecordWatermark: 300 }) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) + await tx.run('RETURN 1') expect(connection.seenProtocolOptions[0]).toMatchObject({ @@ -343,11 +345,36 @@ function testTx (transactionName: string, newTransaction: lowRecordWatermark: 300 }) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) + const result = tx.run('RETURN 1') // @ts-expect-error expect(result._watermarks).toEqual({ high: 700, low: 300 }) }) + + it('should wait until begin message be sent', async () => { + const connection = newFakeConnection() + const tx = newTransaction({ + connection + }) + + const bookmarksPromise: Promise = new Promise((resolve) => { + setTimeout(() => resolve(Bookmarks.empty()), 1000) + }) + + tx._begin(async () => await bookmarksPromise, TxConfig.empty()) + + const result = tx.run('RETURN 1') + + expect(connection.seenBeginTransaction.length).toEqual(0) + expect(connection.seenQueries.length).toEqual(0) + + await result + + expect(connection.seenBeginTransaction.length).toEqual(1) + expect(connection.seenQueries.length).toEqual(1) + }) }) describe('.close()', () => { @@ -356,6 +383,7 @@ function testTx (transactionName: string, newTransaction: const connection = newFakeConnection() const tx = newTransaction({ connection }) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) await tx.run('RETURN 1') await tx.close() @@ -367,6 +395,7 @@ function testTx (transactionName: string, newTransaction: const connection = newFakeConnection().withRollbackError(expectedError) const tx = newTransaction({ connection }) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) await tx.run('RETURN 1') try { @@ -376,6 +405,29 @@ function testTx (transactionName: string, newTransaction: expect(error).toEqual(expectedError) } }) + + it('should wait until begin message be sent', async () => { + const connection = newFakeConnection() + const tx = newTransaction({ + connection + }) + + const bookmarksPromise: Promise = new Promise((resolve) => { + setTimeout(() => resolve(Bookmarks.empty()), 1000) + }) + + tx._begin(async () => await bookmarksPromise, TxConfig.empty()) + + const result = tx.close() + + expect(connection.seenBeginTransaction.length).toEqual(0) + expect(connection.rollbackInvoked).toEqual(0) + + await result + + expect(connection.seenBeginTransaction.length).toEqual(1) + expect(connection.rollbackInvoked).toEqual(1) + }) }) describe('when transaction is closed', () => { @@ -394,6 +446,8 @@ function testTx (transactionName: string, newTransaction: const connection = newFakeConnection() const tx = newTransaction({ connection }) + tx._begin(async () => Bookmarks.empty(), TxConfig.empty()) + await operation(tx, connection) const rollbackInvokedAfterOperation = connection.rollbackInvoked diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index f1157cede..6f5a5c024 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -45,6 +45,7 @@ export default class FakeConnection extends Connection { public seenRequestRoutingInformation: any[] public rollbackInvoked: number public _rollbackError: Error | null + public seenBeginTransaction: any[] constructor () { super() @@ -67,6 +68,7 @@ export default class FakeConnection extends Connection { this.seenRequestRoutingInformation = [] this.rollbackInvoked = 0 this._rollbackError = null + this.seenBeginTransaction = [] } get id (): string { @@ -106,7 +108,8 @@ export default class FakeConnection extends Connection { commitTransaction: () => { return mockResultStreamObserver('COMMIT', {}) }, - beginTransaction: async () => { + beginTransaction: async (...args: any) => { + this.seenBeginTransaction.push(...args) return await Promise.resolve() }, rollbackTransaction: () => { @@ -180,7 +183,7 @@ export default class FakeConnection extends Connection { return this } - hasOngoingObservableRequests(): boolean { + hasOngoingObservableRequests (): boolean { return true } } diff --git a/packages/core/test/utils/matchers.ts b/packages/core/test/utils/matchers.ts new file mode 100644 index 000000000..0005280e8 --- /dev/null +++ b/packages/core/test/utils/matchers.ts @@ -0,0 +1,43 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +declare global { + // eslint-disable-next-line @typescript-eslint/no-namespace + namespace jest { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + interface Matchers { + toBeSortedEqual: (expected: Iterable) => CustomMatcherResult + } + } +} + +export function installMatchers (): void { + expect.extend({ + toBeSortedEqual (this: jest.MatcherContext, received: Iterable, expected: Iterable): jest.CustomMatcherResult { + const sortedReceived = [...received].sort() + const sortedExpected = [...expected].sort() + + return { + pass: this.equals(sortedReceived, sortedExpected), + message: () => + `Expected sorted:\n\n\t${JSON.stringify(sortedExpected)}\n\nGot sorted:\n\n\t${JSON.stringify(sortedReceived)}` + } + } + }) +} diff --git a/packages/neo4j-driver-lite/src/index.ts b/packages/neo4j-driver-lite/src/index.ts index 3456b190c..ed5abc899 100644 --- a/packages/neo4j-driver-lite/src/index.ts +++ b/packages/neo4j-driver-lite/src/index.ts @@ -69,7 +69,11 @@ import { Connection, driver as coreDriver, types as coreTypes, - auth + auth, + BookmarkManager, + bookmarkManager, + BookmarkManagerConfig, + SessionConfig } from 'neo4j-driver-core' import { DirectConnectionProvider, @@ -460,7 +464,8 @@ const forExport = { LocalDateTime, DateTime, ConnectionProvider, - Connection + Connection, + bookmarkManager } export { @@ -512,7 +517,8 @@ export { LocalDateTime, DateTime, ConnectionProvider, - Connection + Connection, + bookmarkManager } export type { QueryResult, @@ -522,6 +528,9 @@ export type { TrustStrategy, SessionMode, ResultObserver, - NotificationPosition + NotificationPosition, + BookmarkManager, + BookmarkManagerConfig, + SessionConfig } export default forExport diff --git a/packages/neo4j-driver-lite/test/unit/index.test.ts b/packages/neo4j-driver-lite/test/unit/index.test.ts index 9899a1e16..c19f37ae6 100644 --- a/packages/neo4j-driver-lite/test/unit/index.test.ts +++ b/packages/neo4j-driver-lite/test/unit/index.test.ts @@ -16,7 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { +import neo4j, { Result, Record, QueryResult, @@ -49,7 +49,8 @@ import { Time, Date, LocalDateTime, - DateTime + DateTime, + BookmarkManager } from '../../' describe('index', () => { @@ -84,6 +85,37 @@ describe('index', () => { expect(resultSummary).toBeDefined() }) + it('should export neo4j.bookmarkManager', () => { + const bookmarkManager = neo4j.bookmarkManager() + + expect(bookmarkManager).toBeDefined() + }) + + it('should treat BookmarkManager as an interface', () => { + const bookmarkManager: BookmarkManager = { + async getAllBookmarks (): Promise { + return [] + }, + async getBookmarks (database: string): Promise { + return [] + }, + async updateBookmarks (database: string, previousBookmarks: string[], newBookmarks: string[]): Promise { + + }, + async forget (databases: string[]): Promise { + + } + } + + const driver = neo4j.driver('neo4j://localhost', neo4j.auth.basic('neo4j', 'neo4j')) + + expect(driver).toBeDefined() + + const session = driver.session({ bookmarkManager }) + + expect(session).toBeDefined() + }) + it('should export AuthToken', () => { const authToken: AuthToken = { scheme: 'basic', diff --git a/packages/neo4j-driver/src/driver.js b/packages/neo4j-driver/src/driver.js index e72365ad6..491c43f02 100644 --- a/packages/neo4j-driver/src/driver.js +++ b/packages/neo4j-driver/src/driver.js @@ -49,12 +49,7 @@ class Driver extends CoreDriver { * pool and made available for others to use. * * @public - * @param {Object} param - * @param {string} param.defaultAccessMode=WRITE - The access mode of this session, allowed values are {@link READ} and {@link WRITE}. - * @param {string|string[]} param.bookmarks - The initial reference or references to some previous transactions. Value is optional and - * absence indicates that the bookmarks do not exist or are unknown. - * @param {string} param.database - The database this session will operate on. - * @param {string} param.impersonatedUser - The name of the user which should be impersonated for the duration of the session. + * @param {SessionConfig} config * @returns {RxSession} new reactive session. */ rxSession ({ @@ -62,7 +57,8 @@ class Driver extends CoreDriver { bookmarks, database = '', fetchSize, - impersonatedUser + impersonatedUser, + bookmarkManager } = {}) { return new RxSession({ session: this._newSession({ @@ -71,7 +67,8 @@ class Driver extends CoreDriver { database, impersonatedUser, reactive: false, - fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize) + fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize), + bookmarkManager }), config: this._config }) diff --git a/packages/neo4j-driver/src/index.js b/packages/neo4j-driver/src/index.js index d706e5add..101e5ab6e 100644 --- a/packages/neo4j-driver/src/index.js +++ b/packages/neo4j-driver/src/index.js @@ -60,7 +60,8 @@ import { auth, Session, Transaction, - ManagedTransaction + ManagedTransaction, + bookmarkManager } from 'neo4j-driver-core' import { DirectConnectionProvider, @@ -335,7 +336,7 @@ const USER_AGENT = 'neo4j-javascript/' + VERSION const logging = { console: level => { return { - level: level, + level, logger: (level, message) => console.log(`${global.Date.now()} ${level.toUpperCase()} ${message}`) } @@ -453,7 +454,8 @@ const forExport = { Time, Date, LocalDateTime, - DateTime + DateTime, + bookmarkManager } export { @@ -506,6 +508,7 @@ export { Time, Date, LocalDateTime, - DateTime + DateTime, + bookmarkManager } export default forExport diff --git a/packages/neo4j-driver/test/driver.test.js b/packages/neo4j-driver/test/driver.test.js index 1883a541c..36b16c712 100644 --- a/packages/neo4j-driver/test/driver.test.js +++ b/packages/neo4j-driver/test/driver.test.js @@ -25,7 +25,7 @@ import { DEFAULT_MAX_SIZE } from '../../bolt-connection/lib/pool/pool-config' import testUtils from './internal/test-utils' -import { json, internal } from 'neo4j-driver-core' +import { json, internal, bookmarkManager } from 'neo4j-driver-core' const { bookmarks: { Bookmarks } @@ -125,6 +125,7 @@ describe('#unit driver', () => { }) describe('.rxSession()', () => { + const manager = bookmarkManager() ;[ [undefined, Bookmarks.empty()], [null, Bookmarks.empty()], @@ -143,6 +144,22 @@ describe('#unit driver', () => { expect(session.lastBookmarks()).toEqual(expectedBookmarks.values()) }) }) + + ;[ + [manager, manager], + [undefined, undefined] + ].forEach(([bookmarkManager, configuredBookmarkManager]) => { + it('should create session using param bookmark manager', () => { + driver = neo4j.driver( + `neo4j+ssc://${sharedNeo4j.hostname}`, + sharedNeo4j.authToken + ) + + const session = driver.rxSession({ bookmarkManager }) + + expect(session._session._bookmarkManager).toEqual(configuredBookmarkManager) + }) + }) }) }) diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index 8adc4f0df..5ba4a4dd4 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -47,7 +47,7 @@ describe('#unit EmptyConnectionHolder', () => { }) describe('#unit ConnectionHolder', () => { - it('should acquire new connection during initialization', () => { + it('should acquire new connection during initialization', async () => { const connectionProvider = new RecordingConnectionProvider([ new FakeConnection() ]) @@ -58,6 +58,8 @@ describe('#unit ConnectionHolder', () => { connectionHolder.initializeConnection() + await connectionHolder.getConnection() + expect(connectionProvider.acquireConnectionInvoked).toBe(1) }) diff --git a/packages/neo4j-driver/types/driver.d.ts b/packages/neo4j-driver/types/driver.d.ts index ca6cc2094..9f48468f0 100644 --- a/packages/neo4j-driver/types/driver.d.ts +++ b/packages/neo4j-driver/types/driver.d.ts @@ -20,7 +20,8 @@ import RxSession from './session-rx' import { Driver as CoreDriver, - types + types, + SessionConfig } from 'neo4j-driver-core' declare type AuthToken = types.AuthToken @@ -34,12 +35,7 @@ declare const READ: SessionMode declare const WRITE: SessionMode declare interface Driver extends CoreDriver { - rxSession: (sessionParams?: { - defaultAccessMode?: SessionMode - bookmarks?: string | string[] - fetchSize?: number - database?: string - }) => RxSession + rxSession: (sessionParams?: SessionConfig) => RxSession } export { diff --git a/packages/neo4j-driver/types/index.d.ts b/packages/neo4j-driver/types/index.d.ts index 89937a80e..16a570dfd 100644 --- a/packages/neo4j-driver/types/index.d.ts +++ b/packages/neo4j-driver/types/index.d.ts @@ -60,7 +60,11 @@ import { Transaction, ManagedTransaction, Session, - ConnectionProvider + ConnectionProvider, + BookmarkManager, + bookmarkManager, + BookmarkManagerConfig, + SessionConfig } from 'neo4j-driver-core' import { AuthToken, @@ -220,6 +224,7 @@ declare const forExport: { isDate: typeof isDate isLocalDateTime: typeof isLocalDateTime isDateTime: typeof isDateTime + bookmarkManager: typeof bookmarkManager } export { @@ -280,7 +285,14 @@ export { isTime, isDate, isLocalDateTime, - isDateTime + isDateTime, + bookmarkManager +} + +export type { + BookmarkManager, + BookmarkManagerConfig, + SessionConfig } export default forExport diff --git a/packages/testkit-backend/src/context.js b/packages/testkit-backend/src/context.js index 1153c2fe1..fad3a5329 100644 --- a/packages/testkit-backend/src/context.js +++ b/packages/testkit-backend/src/context.js @@ -9,6 +9,9 @@ export default class Context { this._shouldRunTest = shouldRunTest this._getFeatures = getFeatures this._results = {} + this._bookmarkSupplierRequests = {} + this._notifyBookmarksRequests = {} + this._bookmarksManagers = {} } addDriver (driver) { @@ -106,6 +109,48 @@ export default class Context { return this._getFeatures() } + addBookmarkSupplierRequest (resolve, reject) { + return this._add(this._bookmarkSupplierRequests, { + resolve, reject + }) + } + + removeBookmarkSupplierRequest (id) { + delete this._bookmarkSupplierRequests[id] + } + + getBookmarkSupplierRequest (id) { + return this._bookmarkSupplierRequests[id] + } + + addNotifyBookmarksRequest (resolve, reject) { + return this._add(this._notifyBookmarksRequests, { + resolve, reject + }) + } + + removeNotifyBookmarksRequest (id) { + delete this._notifyBookmarksRequests[id] + } + + getNotifyBookmarksRequest (id) { + return this._notifyBookmarksRequests[id] + } + + addBookmarkManager (bookmarkManagerFactory) { + this._id++ + this._bookmarksManagers[this._id] = bookmarkManagerFactory(this._id) + return this._id + } + + getBookmarkManager (id) { + return this._bookmarksManagers[id] + } + + removeBookmarkManager (id) { + delete this._bookmarksManagers[id] + } + _add (map, object) { this._id++ map[this._id] = object diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 4cddbf559..d941af6e2 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -17,6 +17,7 @@ const features = [ 'Feature:Auth:Custom', 'Feature:Auth:Kerberos', 'Feature:Auth:Bearer', + 'Feature:API:BookmarkManager', 'Feature:API:SSLConfig', 'Feature:API:SSLSchemes', 'Feature:API:Type.Temporal', @@ -35,6 +36,7 @@ const features = [ 'Feature:API:Driver.VerifyConnectivity', 'Optimization:EagerTransactionBegin', 'Optimization:ImplicitDefaultArguments', + 'Optimization:MinimalBookmarksSet', 'Optimization:MinimalResets', ...SUPPORTED_TLS ] diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index db3305c1d..088b89aff 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -20,11 +20,16 @@ export { ForcedRoutingTableUpdate, ResultNext, RetryablePositive, - RetryableNegative + RetryableNegative, + NewBookmarkManager, + BookmarkManagerClose, + BookmarksSupplierCompleted, + BookmarksConsumerCompleted, + StartSubTest } from './request-handlers.js' export function NewSession (context, data, wire) { - let { driverId, accessMode, bookmarks, database, fetchSize, impersonatedUser } = data + let { driverId, accessMode, bookmarks, database, fetchSize, impersonatedUser, bookmarkManagerId } = data switch (accessMode) { case 'r': accessMode = neo4j.session.READ @@ -36,13 +41,22 @@ export function NewSession (context, data, wire) { wire.writeBackendError('Unknown accessmode: ' + accessMode) return } + let bookmarkManager + if (bookmarkManagerId != null) { + bookmarkManager = context.getBookmarkManager(bookmarkManagerId) + if (bookmarkManager == null) { + wire.writeBackendError(`Bookmark manager ${bookmarkManagerId} not found`) + return + } + } const driver = context.getDriver(driverId) const session = driver.rxSession({ defaultAccessMode: accessMode, bookmarks, database, fetchSize, - impersonatedUser + impersonatedUser, + bookmarkManager }) const id = context.addSession(session) wire.writeResponse(responses.Session({ id })) diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 9abc93dd8..b845c42b1 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -43,10 +43,10 @@ export function NewDriver (context, data, wire) { } const resolver = resolverRegistered ? address => - new Promise((resolve, reject) => { - const id = context.addResolverRequest(resolve, reject) - wire.writeResponse(responses.ResolverResolutionRequired({ id, address })) - }) + new Promise((resolve, reject) => { + const id = context.addResolverRequest(resolve, reject) + wire.writeResponse(responses.ResolverResolutionRequired({ id, address })) + }) : undefined const config = { userAgent, @@ -107,7 +107,7 @@ export function DriverClose (context, data, wire) { } export function NewSession (context, data, wire) { - let { driverId, accessMode, bookmarks, database, fetchSize, impersonatedUser } = data + let { driverId, accessMode, bookmarks, database, fetchSize, impersonatedUser, bookmarkManagerId } = data switch (accessMode) { case 'r': accessMode = neo4j.session.READ @@ -119,13 +119,22 @@ export function NewSession (context, data, wire) { wire.writeBackendError('Unknown accessmode: ' + accessMode) return } + let bookmarkManager + if (bookmarkManagerId != null) { + bookmarkManager = context.getBookmarkManager(bookmarkManagerId) + if (bookmarkManager == null) { + wire.writeBackendError(`Bookmark manager ${bookmarkManagerId} not found`) + return + } + } const driver = context.getDriver(driverId) const session = driver.session({ defaultAccessMode: accessMode, bookmarks, database, fetchSize, - impersonatedUser + impersonatedUser, + bookmarkManager }) const id = context.addSession(session) wire.writeResponse(responses.Session({ id })) @@ -406,6 +415,80 @@ export function ResolverResolutionCompleted ( request.resolve(addresses) } +export function NewBookmarkManager ( + context, + { + initialBookmarks, + bookmarksSupplierRegistered, + bookmarksConsumerRegistered + }, + wire +) { + let bookmarkManager + const id = context.addBookmarkManager((bookmarkManagerId) => { + let bookmarksSupplier + let bookmarksConsumer + if (initialBookmarks != null) { + initialBookmarks = new Map(Object.entries(initialBookmarks)) + } + if (bookmarksSupplierRegistered === true) { + bookmarksSupplier = (database) => + new Promise((resolve, reject) => { + const id = context.addBookmarkSupplierRequest(resolve, reject) + wire.writeResponse(responses.BookmarksSupplierRequest({ id, bookmarkManagerId, database })) + }) + } + if (bookmarksConsumerRegistered === true) { + bookmarksConsumer = (database, bookmarks) => + new Promise((resolve, reject) => { + const id = context.addNotifyBookmarksRequest(resolve, reject) + wire.writeResponse(responses.BookmarksConsumerRequest({ id, bookmarkManagerId, database, bookmarks })) + }) + } + bookmarkManager = neo4j.bookmarkManager({ + initialBookmarks, + bookmarksConsumer, + bookmarksSupplier + }) + + return bookmarkManager + }) + + wire.writeResponse(responses.BookmarkManager({ id })) +} + +export function BookmarkManagerClose ( + context, + { + id + }, + wire +) { + context.removeBookmarkManager(id) + wire.writeResponse(responses.BookmarkManager({ id })) +} + +export function BookmarksSupplierCompleted ( + context, + { + requestId, + bookmarks + } +) { + const bookmarkSupplierRequest = context.getBookmarkSupplierRequest(requestId) + bookmarkSupplierRequest.resolve(bookmarks) +} + +export function BookmarksConsumerCompleted ( + context, + { + requestId + } +) { + const notifyBookmarksRequest = context.getNotifyBookmarksRequest(requestId) + notifyBookmarksRequest.resolve() +} + export function GetRoutingTable (context, { driverId, database }, wire) { const driver = context.getDriver(driverId) const routingTable = @@ -439,7 +522,7 @@ export function ForcedRoutingTableUpdate (context, { driverId, database, bookmar return provider._freshRoutingTable({ accessMode: 'READ', database, - bookmarks: bookmarks, + bookmarks, onDatabaseNameResolved: () => {} }) .then(() => wire.writeResponse(responses.Driver({ id: driverId }))) diff --git a/packages/testkit-backend/src/responses.js b/packages/testkit-backend/src/responses.js index 03a9f0487..000d906e4 100644 --- a/packages/testkit-backend/src/responses.js +++ b/packages/testkit-backend/src/responses.js @@ -14,6 +14,18 @@ export function ResolverResolutionRequired ({ id, address }) { return response('ResolverResolutionRequired', { id, address }) } +export function BookmarkManager ({ id }) { + return response('BookmarkManager', { id }) +} + +export function BookmarksSupplierRequest ({ id, bookmarkManagerId, database }) { + return response('BookmarksSupplierRequest', { id, bookmarkManagerId, database }) +} + +export function BookmarksConsumerRequest ({ id, bookmarkManagerId, database, bookmarks }) { + return response('BookmarksConsumerRequest', { id, bookmarkManagerId, database, bookmarks }) +} + export function Session ({ id }) { return response('Session', { id }) }