Skip to content

Multiple bookmarks support #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {connect} from './internal/connector';
import StreamObserver from './internal/stream-observer';
import {newError, SERVICE_UNAVAILABLE} from './error';
import {DirectConnectionProvider} from './internal/connection-providers';
import Bookmark from './internal/bookmark';

const READ = 'READ', WRITE = 'WRITE';
/**
Expand Down Expand Up @@ -115,13 +116,14 @@ class Driver {
* made available for others to use.
*
* @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string} [bookmark=null] the initial reference to some previous transaction. Value is optional and
* absence indicates that that the bookmark does not exist or is unknown.
* @param {string|string[]} [bookmarkOrBookmarks=null] the initial reference or references to some previous
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
* @return {Session} new session.
*/
session(mode, bookmark) {
session(mode, bookmarkOrBookmarks) {
const sessionMode = Driver._validateSessionMode(mode);
const connectionProvider = this._getOrCreateConnectionProvider();
const bookmark = new Bookmark(bookmarkOrBookmarks);
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
}

Expand Down
141 changes: 141 additions & 0 deletions src/v1/internal/bookmark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/**
* Copyright (c) 2002-2017 "Neo Technology,","
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as util from './util';

const BOOKMARK_KEY = 'bookmark';
const BOOKMARKS_KEY = 'bookmarks';
const BOOKMARK_PREFIX = 'neo4j:bookmark:v1:tx';

const UNKNOWN_BOOKMARK_VALUE = -1;

export default class Bookmark {

/**
* @constructor
* @param {string|string[]} values single bookmark as string or multiple bookmarks as a string array.
*/
constructor(values) {
this._values = asStringArray(values);
this._maxValue = maxBookmark(this._values);
}

/**
* Check if the given bookmark is meaningful and can be send to the database.
* @return {boolean} returns <code>true</code> bookmark has a value, <code>false</code> otherwise.
*/
isEmpty() {
return this._maxValue === null;
}

/**
* Get maximum value of this bookmark as string.
* @return {string|null} the maximum value or <code>null</code> if it is not defined.
*/
maxBookmarkAsString() {
return this._maxValue;
}

/**
* Get this bookmark as an object for begin transaction call.
* @return {object} the value of this bookmark as object.
*/
asBeginTransactionParameters() {
if (this.isEmpty()) {
return {};
}

// Driver sends {bookmark: "max", bookmarks: ["one", "two", "max"]} instead of simple
// {bookmarks: ["one", "two", "max"]} for backwards compatibility reasons. Old servers can only accept single
// bookmark that is why driver has to parse and compare given list of bookmarks. This functionality will
// eventually be removed.
return {
[BOOKMARK_KEY]: this._maxValue,
[BOOKMARKS_KEY]: this._values
};
}
}

/**
* Converts given value to an array.
* @param {string|string[]} [value=undefined] argument to convert.
* @return {string[]} value converted to an array.
*/
function asStringArray(value) {
if (!value) {
return [];
}

if (util.isString(value)) {
return [value];
}

if (Array.isArray(value)) {
const result = [];
for (let i = 0; i < value.length; i++) {
const element = value[i];
if (!util.isString(element)) {
throw new TypeError(`Bookmark should be a string, given: '${element}'`);
}
result.push(element);
}
return result;
}

throw new TypeError(`Bookmark should either be a string or a string array, given: '${value}'`);
}

/**
* Find latest bookmark in the given array of bookmarks.
* @param {string[]} bookmarks array of bookmarks.
* @return {string|null} latest bookmark value.
*/
function maxBookmark(bookmarks) {
if (!bookmarks || bookmarks.length === 0) {
return null;
}

let maxBookmark = bookmarks[0];
let maxValue = bookmarkValue(maxBookmark);

for (let i = 1; i < bookmarks.length; i++) {
const bookmark = bookmarks[i];
const value = bookmarkValue(bookmark);

if (value > maxValue) {
maxBookmark = bookmark;
maxValue = value;
}
}

return maxBookmark;
}

/**
* Calculate numeric value for the given bookmark.
* @param {string} bookmark argument to get numeric value for.
* @return {number} value of the bookmark.
*/
function bookmarkValue(bookmark) {
if (bookmark && bookmark.indexOf(BOOKMARK_PREFIX) === 0) {
const result = parseInt(bookmark.substring(BOOKMARK_PREFIX.length));
return result ? result : UNKNOWN_BOOKMARK_VALUE;
}
return UNKNOWN_BOOKMARK_VALUE;
}
1 change: 1 addition & 0 deletions src/v1/internal/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ function trimAndVerify(string, name, url) {

export {
isEmptyObjectOrNull,
isString,
assertString,
parseScheme,
parseUrl,
Expand Down
34 changes: 18 additions & 16 deletions src/v1/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {assertString} from './internal/util';
import ConnectionHolder from './internal/connection-holder';
import Driver, {READ, WRITE} from './driver';
import TransactionExecutor from './internal/transaction-executor';
import Bookmark from './internal/bookmark';

/**
* A Session instance is used for handling the connection and
Expand All @@ -37,7 +38,7 @@ class Session {
* @constructor
* @param {string} mode the default access mode for this session.
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
* @param {string} [bookmark=undefined] - the initial bookmark for this session.
* @param {Bookmark} bookmark - the initial bookmark for this session.
* @param {Object} [config={}] - this driver configuration.
*/
constructor(mode, connectionProvider, bookmark, config) {
Expand Down Expand Up @@ -94,21 +95,17 @@ class Session {
*
* While a transaction is open the session cannot be used to run statements outside the transaction.
*
* @param {string} bookmark - a reference to a previous transaction. DEPRECATED: This parameter is deprecated in
* favour of {@link Driver#session} that accepts an initial bookmark. Session will ensure that all nested
* transactions are chained with bookmarks to guarantee causal consistency.
* @param {string|string[]} [bookmarkOrBookmarks=null] - reference or references to some previous transactions.
* DEPRECATED: This parameter is deprecated in favour of {@link Driver#session} that accepts an initial bookmark.
* Session will ensure that all nested transactions are chained with bookmarks to guarantee causal consistency.
* @returns {Transaction} - New Transaction
*/
beginTransaction(bookmark) {
return this._beginTransaction(this._mode, bookmark);
beginTransaction(bookmarkOrBookmarks) {
this._updateBookmark(new Bookmark(bookmarkOrBookmarks));
return this._beginTransaction(this._mode);
}

_beginTransaction(accessMode, bookmark) {
if (bookmark) {
assertString(bookmark, 'Bookmark');
this._updateBookmark(bookmark);
}

_beginTransaction(accessMode) {
if (this._hasTx) {
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
'either run from within the transaction or use a different session.');
Expand All @@ -128,10 +125,10 @@ class Session {
/**
* Return the bookmark received following the last completed {@link Transaction}.
*
* @return a reference to a previous transaction
* @return {string|null} a reference to a previous transaction
*/
lastBookmark() {
return this._lastBookmark;
return this._lastBookmark.maxBookmarkAsString();
}

/**
Expand Down Expand Up @@ -170,13 +167,18 @@ class Session {

_runTransaction(accessMode, transactionWork) {
return this._transactionExecutor.execute(
() => this._beginTransaction(accessMode, this.lastBookmark()),
() => this._beginTransaction(accessMode),
transactionWork
);
}

/**
* Update value of the last bookmark.
* @param {Bookmark} newBookmark the new bookmark.
* @private
*/
_updateBookmark(newBookmark) {
if (newBookmark) {
if (newBookmark && !newBookmark.isEmpty()) {
this._lastBookmark = newBookmark;
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/v1/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import StreamObserver from './internal/stream-observer';
import Result from './result';
import {assertString} from './internal/util';
import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder';
import Bookmark from './internal/bookmark';

/**
* Represents a transaction in the Neo4j database.
Expand All @@ -31,27 +32,23 @@ class Transaction {
* @constructor
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
* @param errorTransformer callback use to transform error
* @param bookmark optional bookmark
* @param onBookmark callback invoked when new bookmark is produced
* @param {function(error: Error): Error} errorTransformer callback use to transform error.
* @param {Bookmark} bookmark bookmark for transaction begin.
* @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
*/
constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) {
this._connectionHolder = connectionHolder;
let streamObserver = new _TransactionStreamObserver(this);
let params = {};
if (bookmark) {
params = {bookmark: bookmark};
}
const streamObserver = new _TransactionStreamObserver(this);

this._connectionHolder.getConnection(streamObserver).then(conn => {
conn.run('BEGIN', params, streamObserver);
conn.run('BEGIN', bookmark.asBeginTransactionParameters(), streamObserver);
conn.pullAll(streamObserver);
}).catch(error => streamObserver.onError(error));

this._state = _states.ACTIVE;
this._onClose = onClose;
this._errorTransformer = errorTransformer;
this._onBookmark = onBookmark || (() => {});
this._onBookmark = onBookmark;
}

/**
Expand Down Expand Up @@ -149,7 +146,7 @@ class _TransactionStreamObserver extends StreamObserver {

onCompleted(meta) {
super.onCompleted(meta);
const bookmark = meta.bookmark;
const bookmark = new Bookmark(meta.bookmark);
this._tx._onBookmark(bookmark);
}
}
Expand Down
Loading