Skip to content

Commit 0946965

Browse files
authored
Merge pull request #235 from lutovich/1.3-routing-context
Routing context in URI
2 parents 6299153 + 4dc299d commit 0946965

24 files changed

+941
-130
lines changed

src/v1/driver.js

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,13 @@ class Driver {
5858
Driver._validateConnection.bind(this),
5959
config.connectionPoolSize
6060
);
61-
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));
61+
62+
/**
63+
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
64+
* @type {ConnectionProvider}
65+
* @private
66+
*/
67+
this._connectionProvider = null;
6268
}
6369

6470
/**
@@ -115,7 +121,8 @@ class Driver {
115121
*/
116122
session(mode, bookmark) {
117123
const sessionMode = Driver._validateSessionMode(mode);
118-
return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config);
124+
const connectionProvider = this._getOrCreateConnectionProvider();
125+
return this._createSession(sessionMode, connectionProvider, bookmark, this._config);
119126
}
120127

121128
static _validateSessionMode(rawMode) {
@@ -142,6 +149,14 @@ class Driver {
142149
return SERVICE_UNAVAILABLE;
143150
}
144151

152+
_getOrCreateConnectionProvider() {
153+
if (!this._connectionProvider) {
154+
const driverOnErrorCallback = this._driverOnErrorCallback.bind(this);
155+
this._connectionProvider = this._createConnectionProvider(this._url, this._pool, driverOnErrorCallback);
156+
}
157+
return this._connectionProvider;
158+
}
159+
145160
_driverOnErrorCallback(error) {
146161
const userDefinedOnErrorCallback = this.onError;
147162
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
@@ -189,8 +204,9 @@ class _ConnectionStreamObserver extends StreamObserver {
189204
if (this._driver.onCompleted) {
190205
this._driver.onCompleted(message);
191206
}
192-
if (this._conn && message && message.server) {
193-
this._conn.setServerVersion(message.server);
207+
208+
if (this._observer && this._observer.onComplete) {
209+
this._observer.onCompleted(message);
194210
}
195211
}
196212
}

src/v1/index.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ import Record from './record';
2626
import {Driver, READ, WRITE} from './driver';
2727
import RoutingDriver from './routing-driver';
2828
import VERSION from '../version';
29-
import {parseScheme, parseUrl} from './internal/connector';
30-
import {assertString} from './internal/util';
31-
29+
import {assertString, isEmptyObjectOrNull, parseRoutingContext, parseScheme, parseUrl} from './internal/util';
3230

3331
const auth = {
3432
basic: (username, password, realm = undefined) => {
@@ -129,17 +127,19 @@ const USER_AGENT = "neo4j-javascript/" + VERSION;
129127
function driver(url, authToken, config = {}) {
130128
assertString(url, 'Bolt URL');
131129
const scheme = parseScheme(url);
132-
if (scheme === "bolt+routing://") {
133-
return new RoutingDriver(parseUrl(url), USER_AGENT, authToken, config);
134-
} else if (scheme === "bolt://") {
130+
const routingContext = parseRoutingContext(url);
131+
if (scheme === 'bolt+routing://') {
132+
return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config);
133+
} else if (scheme === 'bolt://') {
134+
if (!isEmptyObjectOrNull(routingContext)) {
135+
throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
136+
}
135137
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
136138
} else {
137-
throw new Error("Unknown scheme: " + scheme);
138-
139+
throw new Error(`Unknown scheme: ${scheme}`);
139140
}
140141
}
141142

142-
143143
const types ={
144144
Node,
145145
Relationship,

src/v1/internal/connection-providers.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import RoutingTable from './routing-table';
2525
import Rediscovery from './rediscovery';
2626
import hasFeature from './features';
2727
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
28+
import RoutingUtil from './routing-util';
2829

2930
class ConnectionProvider {
3031

@@ -61,11 +62,11 @@ export class DirectConnectionProvider extends ConnectionProvider {
6162

6263
export class LoadBalancer extends ConnectionProvider {
6364

64-
constructor(address, connectionPool, driverOnErrorCallback) {
65+
constructor(address, routingContext, connectionPool, driverOnErrorCallback) {
6566
super();
6667
this._seedRouter = address;
6768
this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter]));
68-
this._rediscovery = new Rediscovery();
69+
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
6970
this._connectionPool = connectionPool;
7071
this._driverOnErrorCallback = driverOnErrorCallback;
7172
this._hostNameResolver = LoadBalancer._createHostNameResolver();
@@ -171,8 +172,10 @@ export class LoadBalancer extends ConnectionProvider {
171172

172173
_createSessionForRediscovery(routerAddress) {
173174
const connection = this._connectionPool.acquire(routerAddress);
174-
const connectionPromise = Promise.resolve(connection);
175-
const connectionProvider = new SingleConnectionProvider(connectionPromise);
175+
// initialized connection is required for routing procedure call
176+
// server version needs to be known to decide which routing procedure to use
177+
const initializedConnectionPromise = connection.initializationCompleted();
178+
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
176179
return new Session(READ, connectionProvider);
177180
}
178181

src/v1/internal/connector.js

Lines changed: 91 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19+
1920
import WebSocketChannel from './ch-websocket';
2021
import NodeChannel from './ch-node';
2122
import {Chunker, Dechunker} from './chunking';
@@ -24,6 +25,8 @@ import {alloc} from './buf';
2425
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
2526
import {newError} from './../error';
2627
import ChannelConfig from './ch-config';
28+
import {parseHost, parsePort} from './util';
29+
import StreamObserver from './stream-observer';
2730

2831
let Channel;
2932
if( NodeChannel.available ) {
@@ -59,29 +62,6 @@ PATH = 0x50,
5962
MAGIC_PREAMBLE = 0x6060B017,
6063
DEBUG = false;
6164

62-
let URLREGEX = new RegExp([
63-
"([^/]+//)?", // scheme
64-
"(([^:/?#]*)", // hostname
65-
"(?::([0-9]+))?)", // port (optional)
66-
".*"].join("")); // everything else
67-
68-
function parseScheme( url ) {
69-
let scheme = url.match(URLREGEX)[1] || '';
70-
return scheme.toLowerCase();
71-
}
72-
73-
function parseUrl(url) {
74-
return url.match( URLREGEX )[2];
75-
}
76-
77-
function parseHost( url ) {
78-
return url.match( URLREGEX )[3];
79-
}
80-
81-
function parsePort( url ) {
82-
return url.match( URLREGEX )[4];
83-
}
84-
8565
/**
8666
* Very rudimentary log handling, should probably be replaced by something proper at some point.
8767
* @param actor the part that sent the message, 'S' for server and 'C' for client
@@ -204,6 +184,8 @@ class Connection {
204184
this._isHandlingFailure = false;
205185
this._currentFailure = null;
206186

187+
this._state = new ConnectionState(this);
188+
207189
// Set to true on fatal errors, to get this out of session pool.
208190
this._isBroken = false;
209191

@@ -351,7 +333,8 @@ class Connection {
351333
/** Queue an INIT-message to be sent to the database */
352334
initialize( clientName, token, observer ) {
353335
log("C", "INIT", clientName, token);
354-
this._queueObserver(observer);
336+
const initObserver = this._state.wrap(observer);
337+
this._queueObserver(initObserver);
355338
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
356339
(err) => this._handleFatalError(err) );
357340
this._chunker.messageBoundary();
@@ -437,6 +420,15 @@ class Connection {
437420
}
438421
}
439422

423+
/**
424+
* Get promise resolved when connection initialization succeed or rejected when it fails.
425+
* Connection is initialized using {@link initialize} function.
426+
* @return {Promise<Connection>} the result of connection initialization.
427+
*/
428+
initializationCompleted() {
429+
return this._state.initializationCompleted();
430+
}
431+
440432
/**
441433
* Synchronize - flush all queued outgoing messages and route their responses
442434
* to their respective handlers.
@@ -471,6 +463,78 @@ class Connection {
471463
}
472464
}
473465

466+
class ConnectionState {
467+
468+
/**
469+
* @constructor
470+
* @param {Connection} connection the connection to track state for.
471+
*/
472+
constructor(connection) {
473+
this._connection = connection;
474+
475+
this._initialized = false;
476+
this._initializationError = null;
477+
478+
this._resolvePromise = null;
479+
this._rejectPromise = null;
480+
}
481+
482+
/**
483+
* Wrap the given observer to track connection's initialization state.
484+
* @param {StreamObserver} observer the observer used for INIT message.
485+
* @return {StreamObserver} updated observer.
486+
*/
487+
wrap(observer) {
488+
return {
489+
onNext: record => {
490+
if (observer && observer.onNext) {
491+
observer.onNext(record);
492+
}
493+
},
494+
onError: error => {
495+
this._initializationError = error;
496+
if (this._rejectPromise) {
497+
this._rejectPromise(error);
498+
this._rejectPromise = null;
499+
}
500+
if (observer && observer.onError) {
501+
observer.onError(error);
502+
}
503+
},
504+
onCompleted: metaData => {
505+
if (metaData && metaData.server) {
506+
this._connection.setServerVersion(metaData.server);
507+
}
508+
this._initialized = true;
509+
if (this._resolvePromise) {
510+
this._resolvePromise(this._connection);
511+
this._resolvePromise = null;
512+
}
513+
if (observer && observer.onCompleted) {
514+
observer.onCompleted(metaData);
515+
}
516+
}
517+
};
518+
}
519+
520+
/**
521+
* Get promise resolved when connection initialization succeed or rejected when it fails.
522+
* @return {Promise<Connection>} the result of connection initialization.
523+
*/
524+
initializationCompleted() {
525+
if (this._initialized) {
526+
return Promise.resolve(this._connection);
527+
} else if (this._initializationError) {
528+
return Promise.reject(this._initializationError);
529+
} else {
530+
return new Promise((resolve, reject) => {
531+
this._resolvePromise = resolve;
532+
this._rejectPromise = reject;
533+
});
534+
}
535+
}
536+
}
537+
474538
/**
475539
* Crete new connection to the provided url.
476540
* @access private
@@ -490,10 +554,6 @@ function connect(url, config = {}, connectionErrorCode = null) {
490554
}
491555

492556
export {
493-
connect,
494-
parseScheme,
495-
parseUrl,
496-
parseHost,
497-
parsePort,
498-
Connection
499-
}
557+
connect,
558+
Connection
559+
};

src/v1/internal/host-name-resolvers.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20-
import {parseHost, parsePort} from './connector';
20+
import {parseHost, parsePort} from './util';
2121

2222
class HostNameResolver {
2323

src/v1/internal/rediscovery.js

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@
1717
* limitations under the License.
1818
*/
1919

20-
import GetServersUtil from "./get-servers-util";
21-
import RoutingTable from "./routing-table";
22-
import {newError, PROTOCOL_ERROR} from "../error";
20+
import RoutingTable from './routing-table';
21+
import {newError, PROTOCOL_ERROR} from '../error';
2322

2423
export default class Rediscovery {
2524

26-
constructor(getServersUtil) {
27-
this._getServersUtil = getServersUtil || new GetServersUtil();
25+
/**
26+
* @constructor
27+
* @param {RoutingUtil} routingUtil the util to use.
28+
*/
29+
constructor(routingUtil) {
30+
this._routingUtil = routingUtil;
2831
}
2932

33+
/**
34+
* Try to fetch new routing table from the given router.
35+
* @param {Session} session the session to use.
36+
* @param {string} routerAddress the URL of the router.
37+
* @return {Promise<RoutingTable>} promise resolved with new routing table or null when connection error happened.
38+
*/
3039
lookupRoutingTableOnRouter(session, routerAddress) {
31-
return this._getServersUtil.callGetServers(session, routerAddress).then(records => {
40+
return this._routingUtil.callRoutingProcedure(session, routerAddress).then(records => {
3241
if (records === null) {
3342
// connection error happened, unable to retrieve routing table from this router, next one should be queried
3443
return null;
@@ -42,8 +51,8 @@ export default class Rediscovery {
4251

4352
const record = records[0];
4453

45-
const expirationTime = this._getServersUtil.parseTtl(record, routerAddress);
46-
const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress);
54+
const expirationTime = this._routingUtil.parseTtl(record, routerAddress);
55+
const {routers, readers, writers} = this._routingUtil.parseServers(record, routerAddress);
4756

4857
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
4958
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);

0 commit comments

Comments
 (0)