Skip to content

Commit 79cf7da

Browse files
committed
Multiple bookmarks support
Previously it was possible to only supply a single bookmark when creating a new session. However there is a use-case to be able to supply multiple bookmarks when multiple concurrent tasks execute write queries and then reader should be able to observe all those writes. To achieve this driver now allows passing an array of bookmarks to `Driver#session()` function. Driver will now send: ``` { bookmark: "max", bookmarks: ["one", "two", "max"] } ``` instead of simple: ``` { bookmark: "max" } ``` this is done to maintain backwards compatibility with databases that only support a single bookmark. It forces driver to parse and compare bookmarks which violates the fact that bookmarks are opaque. This is done only to maintain backwards compatibility and should not be copied. Code doing this will eventually be removed. Related Bolt server PR: neo4j/neo4j#9404
1 parent e0bb062 commit 79cf7da

15 files changed

+424
-73
lines changed

src/v1/driver.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {connect} from './internal/connector';
2323
import StreamObserver from './internal/stream-observer';
2424
import {newError, SERVICE_UNAVAILABLE} from './error';
2525
import {DirectConnectionProvider} from './internal/connection-providers';
26+
import Bookmark from './internal/bookmark';
2627

2728
const READ = 'READ', WRITE = 'WRITE';
2829
/**
@@ -115,13 +116,14 @@ class Driver {
115116
* made available for others to use.
116117
*
117118
* @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
118-
* @param {string} [bookmark=null] the initial reference to some previous transaction. Value is optional and
119-
* absence indicates that that the bookmark does not exist or is unknown.
119+
* @param {string|string[]} [bookmarkOrBookmarks=null] the initial reference or references to some previous
120+
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
120121
* @return {Session} new session.
121122
*/
122-
session(mode, bookmark) {
123+
session(mode, bookmarkOrBookmarks) {
123124
const sessionMode = Driver._validateSessionMode(mode);
124125
const connectionProvider = this._getOrCreateConnectionProvider();
126+
const bookmark = new Bookmark(bookmarkOrBookmarks);
125127
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
126128
}
127129

src/v1/internal/bookmark.js

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import * as util from './util';
21+
22+
const BOOKMARK_KEY = 'bookmark';
23+
const BOOKMARKS_KEY = 'bookmarks';
24+
const BOOKMARK_PREFIX = 'neo4j:bookmark:v1:tx';
25+
26+
const UNKNOWN_BOOKMARK_VALUE = -1;
27+
28+
export default class Bookmark {
29+
30+
/**
31+
* @constructor
32+
* @param {string|string[]} values single bookmark as string or multiple bookmarks as a string array.
33+
*/
34+
constructor(values) {
35+
this._values = asStringArray(values);
36+
this._maxValue = maxBookmark(this._values);
37+
}
38+
39+
/**
40+
* Check if the given bookmark is meaningful and can be send to the database.
41+
* @return {boolean} returns <code>true</code> bookmark has a value, <code>false</code> otherwise.
42+
*/
43+
isEmpty() {
44+
return this._maxValue === null;
45+
}
46+
47+
/**
48+
* Get maximum value of this bookmark as string.
49+
* @return {string|null} the maximum value or <code>null</code> if it is not defined.
50+
*/
51+
maxBookmarkAsString() {
52+
return this._maxValue;
53+
}
54+
55+
/**
56+
* Get this bookmark as an object for begin transaction call.
57+
* @return {object} the value of this bookmark as object.
58+
*/
59+
asBeginTransactionParameters() {
60+
if (this.isEmpty()) {
61+
return {};
62+
}
63+
64+
// Driver sends {bookmark: "max", bookmarks: ["one", "two", "max"]} instead of simple
65+
// {bookmarks: ["one", "two", "max"]} for backwards compatibility reasons. Old servers can only accept single
66+
// bookmark that is why driver has to parse and compare given list of bookmarks. This functionality will
67+
// eventually be removed.
68+
return {
69+
[BOOKMARK_KEY]: this._maxValue,
70+
[BOOKMARKS_KEY]: this._values
71+
};
72+
}
73+
}
74+
75+
/**
76+
* Converts given value to an array.
77+
* @param {string|string[]} [value=undefined] argument to convert.
78+
* @return {string[]} value converted to an array.
79+
*/
80+
function asStringArray(value) {
81+
if (!value) {
82+
return [];
83+
}
84+
85+
if (util.isString(value)) {
86+
return [value];
87+
}
88+
89+
if (Array.isArray(value)) {
90+
const result = [];
91+
for (let i = 0; i < value.length; i++) {
92+
const element = value[i];
93+
if (!util.isString(element)) {
94+
throw new TypeError(`Bookmark should be a string, given: '${element}'`);
95+
}
96+
result.push(element);
97+
}
98+
return result;
99+
}
100+
101+
throw new TypeError(`Bookmark should either be a string or a string array, given: '${value}'`);
102+
}
103+
104+
/**
105+
* Find latest bookmark in the given array of bookmarks.
106+
* @param {string[]} bookmarks array of bookmarks.
107+
* @return {string|null} latest bookmark value.
108+
*/
109+
function maxBookmark(bookmarks) {
110+
if (!bookmarks || bookmarks.length === 0) {
111+
return null;
112+
}
113+
114+
let maxBookmark = bookmarks[0];
115+
let maxValue = bookmarkValue(maxBookmark);
116+
117+
for (let i = 1; i < bookmarks.length; i++) {
118+
const bookmark = bookmarks[i];
119+
const value = bookmarkValue(bookmark);
120+
121+
if (value > maxValue) {
122+
maxBookmark = bookmark;
123+
maxValue = value;
124+
}
125+
}
126+
127+
return maxBookmark;
128+
}
129+
130+
/**
131+
* Calculate numeric value for the given bookmark.
132+
* @param {string} bookmark argument to get numeric value for.
133+
* @return {number} value of the bookmark.
134+
*/
135+
function bookmarkValue(bookmark) {
136+
if (bookmark && bookmark.indexOf(BOOKMARK_PREFIX) === 0) {
137+
const result = parseInt(bookmark.substring(BOOKMARK_PREFIX.length));
138+
return result ? result : UNKNOWN_BOOKMARK_VALUE;
139+
}
140+
return UNKNOWN_BOOKMARK_VALUE;
141+
}

src/v1/internal/util.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ function trimAndVerify(string, name, url) {
116116

117117
export {
118118
isEmptyObjectOrNull,
119+
isString,
119120
assertString,
120121
parseScheme,
121122
parseUrl,

src/v1/session.js

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {assertString} from './internal/util';
2424
import ConnectionHolder from './internal/connection-holder';
2525
import Driver, {READ, WRITE} from './driver';
2626
import TransactionExecutor from './internal/transaction-executor';
27+
import Bookmark from './internal/bookmark';
2728

2829
/**
2930
* A Session instance is used for handling the connection and
@@ -37,7 +38,7 @@ class Session {
3738
* @constructor
3839
* @param {string} mode the default access mode for this session.
3940
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
40-
* @param {string} [bookmark=undefined] - the initial bookmark for this session.
41+
* @param {Bookmark} bookmark - the initial bookmark for this session.
4142
* @param {Object} [config={}] - this driver configuration.
4243
*/
4344
constructor(mode, connectionProvider, bookmark, config) {
@@ -94,21 +95,17 @@ class Session {
9495
*
9596
* While a transaction is open the session cannot be used to run statements outside the transaction.
9697
*
97-
* @param {string} bookmark - a reference to a previous transaction. DEPRECATED: This parameter is deprecated in
98-
* favour of {@link Driver#session} that accepts an initial bookmark. Session will ensure that all nested
99-
* transactions are chained with bookmarks to guarantee causal consistency.
98+
* @param {string|string[]} [bookmarkOrBookmarks=null] - reference or references to some previous transactions.
99+
* DEPRECATED: This parameter is deprecated in favour of {@link Driver#session} that accepts an initial bookmark.
100+
* Session will ensure that all nested transactions are chained with bookmarks to guarantee causal consistency.
100101
* @returns {Transaction} - New Transaction
101102
*/
102-
beginTransaction(bookmark) {
103-
return this._beginTransaction(this._mode, bookmark);
103+
beginTransaction(bookmarkOrBookmarks) {
104+
this._updateBookmark(new Bookmark(bookmarkOrBookmarks));
105+
return this._beginTransaction(this._mode);
104106
}
105107

106-
_beginTransaction(accessMode, bookmark) {
107-
if (bookmark) {
108-
assertString(bookmark, 'Bookmark');
109-
this._updateBookmark(bookmark);
110-
}
111-
108+
_beginTransaction(accessMode) {
112109
if (this._hasTx) {
113110
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
114111
'either run from within the transaction or use a different session.');
@@ -128,10 +125,10 @@ class Session {
128125
/**
129126
* Return the bookmark received following the last completed {@link Transaction}.
130127
*
131-
* @return a reference to a previous transaction
128+
* @return {string|null} a reference to a previous transaction
132129
*/
133130
lastBookmark() {
134-
return this._lastBookmark;
131+
return this._lastBookmark.maxBookmarkAsString();
135132
}
136133

137134
/**
@@ -170,13 +167,18 @@ class Session {
170167

171168
_runTransaction(accessMode, transactionWork) {
172169
return this._transactionExecutor.execute(
173-
() => this._beginTransaction(accessMode, this.lastBookmark()),
170+
() => this._beginTransaction(accessMode),
174171
transactionWork
175172
);
176173
}
177174

175+
/**
176+
* Update value of the last bookmark.
177+
* @param {Bookmark} newBookmark the new bookmark.
178+
* @private
179+
*/
178180
_updateBookmark(newBookmark) {
179-
if (newBookmark) {
181+
if (newBookmark && !newBookmark.isEmpty()) {
180182
this._lastBookmark = newBookmark;
181183
}
182184
}

src/v1/transaction.js

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import StreamObserver from './internal/stream-observer';
2020
import Result from './result';
2121
import {assertString} from './internal/util';
2222
import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder';
23+
import Bookmark from './internal/bookmark';
2324

2425
/**
2526
* Represents a transaction in the Neo4j database.
@@ -31,27 +32,23 @@ class Transaction {
3132
* @constructor
3233
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
3334
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
34-
* @param errorTransformer callback use to transform error
35-
* @param bookmark optional bookmark
36-
* @param onBookmark callback invoked when new bookmark is produced
35+
* @param {function(error: Error): Error} errorTransformer callback use to transform error.
36+
* @param {Bookmark} bookmark bookmark for transaction begin.
37+
* @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
3738
*/
3839
constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) {
3940
this._connectionHolder = connectionHolder;
40-
let streamObserver = new _TransactionStreamObserver(this);
41-
let params = {};
42-
if (bookmark) {
43-
params = {bookmark: bookmark};
44-
}
41+
const streamObserver = new _TransactionStreamObserver(this);
4542

4643
this._connectionHolder.getConnection(streamObserver).then(conn => {
47-
conn.run('BEGIN', params, streamObserver);
44+
conn.run('BEGIN', bookmark.asBeginTransactionParameters(), streamObserver);
4845
conn.pullAll(streamObserver);
4946
}).catch(error => streamObserver.onError(error));
5047

5148
this._state = _states.ACTIVE;
5249
this._onClose = onClose;
5350
this._errorTransformer = errorTransformer;
54-
this._onBookmark = onBookmark || (() => {});
51+
this._onBookmark = onBookmark;
5552
}
5653

5754
/**
@@ -149,7 +146,7 @@ class _TransactionStreamObserver extends StreamObserver {
149146

150147
onCompleted(meta) {
151148
super.onCompleted(meta);
152-
const bookmark = meta.bookmark;
149+
const bookmark = new Bookmark(meta.bookmark);
153150
this._tx._onBookmark(bookmark);
154151
}
155152
}

0 commit comments

Comments
 (0)