Skip to content

Commit 98ac8d5

Browse files
committed
Multiple bookmarks support
Previously it was possible to only supply a single bookmark when creating a new session. However there is a use-case to be able to supply multiple bookmarks when multiple concurrent tasks execute write queries and then reader should be able to observe all those writes. To achieve this driver now allows passing an array of bookmarks to `Driver#session()` function. Driver will now send: ``` { bookmark: "max", bookmarks: ["one", "two", "max"] } ``` instead of simple: ``` { bookmark: "max" } ``` this is done to maintain backwards compatibility with databases that only support a single bookmark. It forces driver to parse and compare bookmarks which violates the fact that bookmarks are opaque. This is done only to maintain backwards compatibility and should not be copied. Code doing this will eventually be removed. Related Bolt server PR: neo4j/neo4j#9404
1 parent 4578c2d commit 98ac8d5

15 files changed

+424
-73
lines changed

src/v1/driver.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {connect} from './internal/connector';
2323
import StreamObserver from './internal/stream-observer';
2424
import {newError, SERVICE_UNAVAILABLE} from './error';
2525
import {DirectConnectionProvider} from './internal/connection-providers';
26+
import Bookmark from './internal/bookmark';
2627

2728
const READ = 'READ', WRITE = 'WRITE';
2829
/**
@@ -115,13 +116,14 @@ class Driver {
115116
* made available for others to use.
116117
*
117118
* @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
118-
* @param {string} [bookmark=null] the initial reference to some previous transaction. Value is optional and
119-
* absence indicates that that the bookmark does not exist or is unknown.
119+
* @param {string|string[]} [bookmarkOrBookmarks=null] the initial reference or references to some previous
120+
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
120121
* @return {Session} new session.
121122
*/
122-
session(mode, bookmark) {
123+
session(mode, bookmarkOrBookmarks) {
123124
const sessionMode = Driver._validateSessionMode(mode);
124125
const connectionProvider = this._getOrCreateConnectionProvider();
126+
const bookmark = new Bookmark(bookmarkOrBookmarks);
125127
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
126128
}
127129

src/v1/internal/bookmark.js

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import * as util from './util';
21+
22+
const BOOKMARK_KEY = 'bookmark';
23+
const BOOKMARKS_KEY = 'bookmarks';
24+
const BOOKMARK_PREFIX = 'neo4j:bookmark:v1:tx';
25+
26+
const UNKNOWN_BOOKMARK_VALUE = -1;
27+
28+
export default class Bookmark {
29+
30+
/**
31+
* @constructor
32+
* @param {string|string[]} values single bookmark as string or multiple bookmarks as a string array.
33+
*/
34+
constructor(values) {
35+
this._values = asStringArray(values);
36+
this._maxValue = maxBookmark(this._values);
37+
}
38+
39+
/**
40+
* Check if the given bookmark is meaningful and can be send to the database.
41+
* @return {boolean} returns <code>true</code> bookmark has a value, <code>false</code> otherwise.
42+
*/
43+
isEmpty() {
44+
return this._maxValue === null;
45+
}
46+
47+
/**
48+
* Get maximum value of this bookmark as string.
49+
* @return {string|null} the maximum value or <code>null</code> if it is not defined.
50+
*/
51+
maxBookmarkAsString() {
52+
return this._maxValue;
53+
}
54+
55+
/**
56+
* Get this bookmark as an object for begin transaction call.
57+
* @return {object} the value of this bookmark as object.
58+
*/
59+
asBeginTransactionParameters() {
60+
if (this.isEmpty()) {
61+
return {};
62+
}
63+
64+
// Driver sends {bookmark: "max", bookmarks: ["one", "two", "max"]} instead of simple
65+
// {bookmarks: ["one", "two", "max"]} for backwards compatibility reasons. Old servers can only accept single
66+
// bookmark that is why driver has to parse and compare given list of bookmarks. This functionality will
67+
// eventually be removed.
68+
return {
69+
[BOOKMARK_KEY]: this._maxValue,
70+
[BOOKMARKS_KEY]: this._values
71+
};
72+
}
73+
}
74+
75+
/**
76+
* Converts given value to an array.
77+
* @param {string|string[]} [value=undefined] argument to convert.
78+
* @return {string[]} value converted to an array.
79+
*/
80+
function asStringArray(value) {
81+
if (!value) {
82+
return [];
83+
}
84+
85+
if (util.isString(value)) {
86+
return [value];
87+
}
88+
89+
if (Array.isArray(value)) {
90+
const result = [];
91+
for (let i = 0; i < value.length; i++) {
92+
const element = value[i];
93+
if (!util.isString(element)) {
94+
throw new TypeError(`Bookmark should be a string, given: '${element}'`);
95+
}
96+
result.push(element);
97+
}
98+
return result;
99+
}
100+
101+
throw new TypeError(`Bookmark should either be a string or a string array, given: '${value}'`);
102+
}
103+
104+
/**
105+
* Find latest bookmark in the given array of bookmarks.
106+
* @param {string[]} bookmarks array of bookmarks.
107+
* @return {string|null} latest bookmark value.
108+
*/
109+
function maxBookmark(bookmarks) {
110+
if (!bookmarks || bookmarks.length === 0) {
111+
return null;
112+
}
113+
114+
let maxBookmark = bookmarks[0];
115+
let maxValue = bookmarkValue(maxBookmark);
116+
117+
for (let i = 1; i < bookmarks.length; i++) {
118+
const bookmark = bookmarks[i];
119+
const value = bookmarkValue(bookmark);
120+
121+
if (value > maxValue) {
122+
maxBookmark = bookmark;
123+
maxValue = value;
124+
}
125+
}
126+
127+
return maxBookmark;
128+
}
129+
130+
/**
131+
* Calculate numeric value for the given bookmark.
132+
* @param {string} bookmark argument to get numeric value for.
133+
* @return {number} value of the bookmark.
134+
*/
135+
function bookmarkValue(bookmark) {
136+
if (bookmark && bookmark.indexOf(BOOKMARK_PREFIX) === 0) {
137+
const result = parseInt(bookmark.substring(BOOKMARK_PREFIX.length));
138+
return result ? result : UNKNOWN_BOOKMARK_VALUE;
139+
}
140+
return UNKNOWN_BOOKMARK_VALUE;
141+
}

src/v1/internal/util.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ function trimAndVerify(string, name, url) {
116116

117117
export {
118118
isEmptyObjectOrNull,
119+
isString,
119120
assertString,
120121
parseScheme,
121122
parseUrl,

src/v1/session.js

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {assertString} from './internal/util';
2424
import ConnectionHolder from './internal/connection-holder';
2525
import Driver, {READ, WRITE} from './driver';
2626
import TransactionExecutor from './internal/transaction-executor';
27+
import Bookmark from './internal/bookmark';
2728

2829
/**
2930
* A Session instance is used for handling the connection and
@@ -37,7 +38,7 @@ class Session {
3738
* @constructor
3839
* @param {string} mode the default access mode for this session.
3940
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
40-
* @param {string} [bookmark=undefined] - the initial bookmark for this session.
41+
* @param {Bookmark} bookmark - the initial bookmark for this session.
4142
* @param {Object} [config={}] - this driver configuration.
4243
*/
4344
constructor(mode, connectionProvider, bookmark, config) {
@@ -95,21 +96,17 @@ class Session {
9596
*
9697
* While a transaction is open the session cannot be used to run statements outside the transaction.
9798
*
98-
* @param {string} bookmark - a reference to a previous transaction. DEPRECATED: This parameter is deprecated in
99-
* favour of {@link Driver#session} that accepts an initial bookmark. Session will ensure that all nested
100-
* transactions are chained with bookmarks to guarantee causal consistency.
99+
* @param {string|string[]} [bookmarkOrBookmarks=null] - reference or references to some previous transactions.
100+
* DEPRECATED: This parameter is deprecated in favour of {@link Driver#session} that accepts an initial bookmark.
101+
* Session will ensure that all nested transactions are chained with bookmarks to guarantee causal consistency.
101102
* @returns {Transaction} - New Transaction
102103
*/
103-
beginTransaction(bookmark) {
104-
return this._beginTransaction(this._mode, bookmark);
104+
beginTransaction(bookmarkOrBookmarks) {
105+
this._updateBookmark(new Bookmark(bookmarkOrBookmarks));
106+
return this._beginTransaction(this._mode);
105107
}
106108

107-
_beginTransaction(accessMode, bookmark) {
108-
if (bookmark) {
109-
assertString(bookmark, 'Bookmark');
110-
this._updateBookmark(bookmark);
111-
}
112-
109+
_beginTransaction(accessMode) {
113110
if (this._hasTx) {
114111
throw newError('You cannot begin a transaction on a session with an open transaction; ' +
115112
'either run from within the transaction or use a different session.');
@@ -129,10 +126,10 @@ class Session {
129126
/**
130127
* Return the bookmark received following the last completed {@link Transaction}.
131128
*
132-
* @return a reference to a previous transaction
129+
* @return {string|null} a reference to a previous transaction
133130
*/
134131
lastBookmark() {
135-
return this._lastBookmark;
132+
return this._lastBookmark.maxBookmarkAsString();
136133
}
137134

138135
/**
@@ -171,13 +168,18 @@ class Session {
171168

172169
_runTransaction(accessMode, transactionWork) {
173170
return this._transactionExecutor.execute(
174-
() => this._beginTransaction(accessMode, this.lastBookmark()),
171+
() => this._beginTransaction(accessMode),
175172
transactionWork
176173
);
177174
}
178175

176+
/**
177+
* Update value of the last bookmark.
178+
* @param {Bookmark} newBookmark the new bookmark.
179+
* @private
180+
*/
179181
_updateBookmark(newBookmark) {
180-
if (newBookmark) {
182+
if (newBookmark && !newBookmark.isEmpty()) {
181183
this._lastBookmark = newBookmark;
182184
}
183185
}

src/v1/transaction.js

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import StreamObserver from './internal/stream-observer';
2020
import Result from './result';
2121
import {assertString} from './internal/util';
2222
import {EMPTY_CONNECTION_HOLDER} from './internal/connection-holder';
23+
import Bookmark from './internal/bookmark';
2324

2425
/**
2526
* Represents a transaction in the Neo4j database.
@@ -31,28 +32,24 @@ class Transaction {
3132
* @constructor
3233
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
3334
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
34-
* @param errorTransformer callback use to transform error
35-
* @param bookmark optional bookmark
36-
* @param onBookmark callback invoked when new bookmark is produced
35+
* @param {function(error: Error): Error} errorTransformer callback use to transform error.
36+
* @param {Bookmark} bookmark bookmark for transaction begin.
37+
* @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
3738
*/
3839
constructor(connectionHolder, onClose, errorTransformer, bookmark, onBookmark) {
3940
this._connectionHolder = connectionHolder;
40-
let streamObserver = new _TransactionStreamObserver(this);
41-
let params = {};
42-
if (bookmark) {
43-
params = {bookmark: bookmark};
44-
}
41+
const streamObserver = new _TransactionStreamObserver(this);
4542

4643
this._connectionHolder.getConnection().then(conn => {
4744
streamObserver.resolveConnection(conn);
48-
conn.run('BEGIN', params, streamObserver);
45+
conn.run('BEGIN', bookmark.asBeginTransactionParameters(), streamObserver);
4946
conn.pullAll(streamObserver);
5047
}).catch(error => streamObserver.onError(error));
5148

5249
this._state = _states.ACTIVE;
5350
this._onClose = onClose;
5451
this._errorTransformer = errorTransformer;
55-
this._onBookmark = onBookmark || (() => {});
52+
this._onBookmark = onBookmark;
5653
}
5754

5855
/**
@@ -150,7 +147,7 @@ class _TransactionStreamObserver extends StreamObserver {
150147

151148
onCompleted(meta) {
152149
super.onCompleted(meta);
153-
const bookmark = meta.bookmark;
150+
const bookmark = new Bookmark(meta.bookmark);
154151
this._tx._onBookmark(bookmark);
155152
}
156153
}

0 commit comments

Comments
 (0)