Skip to content

Commit d29d74e

Browse files
committed
Routing support for running on session.
1 parent f3e3d01 commit d29d74e

File tree

10 files changed

+302
-29
lines changed

10 files changed

+302
-29
lines changed

src/v1/driver.js

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,12 @@ class Driver {
108108
* it is returned to the pool, the session will be reset to a clean state and
109109
* made available for others to use.
110110
*
111+
* @param {String} mode of session - optional
111112
* @return {Session} new session.
112113
*/
113-
session() {
114-
let conn = this._pool.acquire(this._url);
115-
return this._createSession(Promise.resolve(conn));
116-
}
117-
118-
_createSession(connectionPromise) {
119-
return new Session(connectionPromise, (cb) => {
114+
session(mode) {
115+
let connectionPromise = this._acquireConnection(mode);
116+
return this._createSession(connectionPromise, (cb) => {
120117
// This gets called on Session#close(), and is where we return
121118
// the pooled 'connection' instance.
122119

@@ -134,14 +131,23 @@ class Driver {
134131
conn._release();
135132
});
136133

137-
138134
// Call user callback
139135
if (cb) {
140136
cb();
141137
}
142138
});
143139
}
144140

141+
//Extension point
142+
_acquireConnection(mode) {
143+
return Promise.resolve(this._pool.acquire(this._url));
144+
}
145+
146+
//Extension point
147+
_createSession(connectionPromise, cb) {
148+
return new Session(connectionPromise, cb);
149+
}
150+
145151
/**
146152
* Close all open sessions and other associated resources. You should
147153
* make sure to use this when you are done with this driver instance.
@@ -152,6 +158,8 @@ class Driver {
152158
if (this._openSessions.hasOwnProperty(sessionId)) {
153159
this._openSessions[sessionId].close();
154160
}
161+
162+
this._pool.purgeAll();
155163
}
156164
}
157165
}

src/v1/internal/ch-node.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,10 @@ class NodeChannel {
293293
}
294294

295295
_handleConnectionError( err ) {
296-
this._error = err;
296+
let msg = err.message || 'Failed to connect to server';
297+
this._error = newError(msg, SESSION_EXPIRED);
297298
if( this.onerror ) {
298-
this.onerror(err);
299+
this.onerror(this._error);
299300
}
300301
}
301302

src/v1/internal/pool.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ class Pool {
4141

4242
acquire(key) {
4343
let resource;
44-
let pool = this._pools[key] || [];
44+
let pool = this._pools[key];
45+
if (!pool) {
46+
pool = [];
47+
this._pools[key] = pool;
48+
}
4549
while (pool.length) {
4650
resource = pool.pop();
4751

@@ -65,15 +69,23 @@ class Pool {
6569
delete this._pools[key]
6670
}
6771

72+
purgeAll() {
73+
for (let key in this._pools.keys) {
74+
if (this._pools.hasOwnPropertykey) {
75+
this.purge(key);
76+
}
77+
}
78+
}
79+
6880
has(key) {
6981
return (key in this._pools);
7082
}
7183

7284
_release(key, resource) {
7385
let pool = this._pools[key];
7486
if (!pool) {
75-
pool = [];
76-
this._pools[key] = pool;
87+
//key has been purged, don't put it back
88+
return;
7789
}
7890
if( pool.length >= this._maxIdle || !this._validate(resource) ) {
7991
this._destroy(resource);

src/v1/internal/stream-observer.js

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,16 @@ import Record from "../record";
3232
class StreamObserver {
3333
/**
3434
* @constructor
35+
* @param errorCallback optional callback to be used for adding additional logic on error
3536
*/
36-
constructor() {
37+
constructor(errorCallback = () => {}) {
3738
this._fieldKeys = null;
3839
this._fieldLookup = null;
3940
this._queuedRecords = [];
4041
this._tail = null;
4142
this._error = null;
4243
this._hasFailed = false;
44+
this._errorCallback = errorCallback;
4345
}
4446

4547
/**
@@ -81,25 +83,30 @@ class StreamObserver {
8183
}
8284
}
8385

86+
resolveConnection(conn) {
87+
this._conn = conn;
88+
}
89+
8490
/**
8591
* Will be called on errors.
8692
* If user-provided observer is present, pass the error
8793
* to it's onError method, otherwise set instance variable _error.
8894
* @param {Object} error - An error object
8995
*/
9096
onError(error) {
97+
let transformedError = this._errorCallback(error, this._conn);
9198
if(this._hasFailed) {
9299
return;
93100
}
94101
this._hasFailed = true;
95102
if( this._observer ) {
96103
if( this._observer.onError ) {
97-
this._observer.onError( error );
104+
this._observer.onError( transformedError );
98105
} else {
99-
console.log( error );
106+
console.log( transformedError );
100107
}
101108
} else {
102-
this._error = error;
109+
this._error = transformedError;
103110
}
104111
}
105112

src/v1/routing-driver.js

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "./error";
2323
import RoundRobinArray from './internal/round-robin-array';
2424
import "babel-polyfill";
2525

26-
27-
2826
/**
2927
* A driver that supports routing in a core-edge cluster.
3028
*/
@@ -35,9 +33,40 @@ class RoutingDriver extends Driver {
3533
this._clusterView = new ClusterView(new RoundRobinArray([url]));
3634
}
3735

38-
session(mode) {
39-
let conn = this._acquireConnection(mode);
40-
return this._createSession(conn);
36+
_createSession(connectionPromise, cb) {
37+
return new RoutingSession(connectionPromise, cb, (err, conn) => {
38+
let code = err.code;
39+
if (!code) {
40+
try {
41+
code = err.fields[0].code;
42+
} catch (e) {
43+
code = 'UNKNOWN';
44+
}
45+
}
46+
47+
if (code === SERVICE_UNAVAILABLE || code === SESSION_EXPIRED) {
48+
if (conn) {
49+
this._forget(conn.url)
50+
} else {
51+
connectionPromise.then((conn) => {
52+
this._forget(conn.url);
53+
});
54+
}
55+
return err;
56+
} else if (code === 'Neo.ClientError.Cluster.NotALeader') {
57+
let url = 'UNKNOWN';
58+
if (conn) {
59+
url = conn.url;
60+
this._clusterView.writers.remove(conn.url);
61+
} else {
62+
connectionPromise.then((conn) => {
63+
this._clusterView.writers.remove(conn.url);
64+
});
65+
}
66+
67+
return newError("No longer possible to write to server at " + url, SESSION_EXPIRED);
68+
}
69+
});
4170
}
4271

4372
_updatedClusterView() {
@@ -60,6 +89,7 @@ class RoutingDriver extends Driver {
6089
return acc;
6190
}
6291
}
92+
6393
_diff(oldView, updatedView) {
6494
let oldSet = oldView.all();
6595
let newSet = updatedView.all();
@@ -81,9 +111,17 @@ class RoutingDriver extends Driver {
81111
//update our cached view
82112
this._clusterView = view;
83113
if (m === READ) {
84-
return this._pool.acquire(view.readers.hop());
114+
let key = view.readers.hop();
115+
if (!key) {
116+
return Promise.reject(newError('No read servers available', SESSION_EXPIRED));
117+
}
118+
return this._pool.acquire(key);
85119
} else if (m === WRITE) {
86-
return this._pool.acquire(view.writers.hop());
120+
let key = view.writers.hop();
121+
if (!key) {
122+
return Promise.reject(newError('No write servers available', SESSION_EXPIRED));
123+
}
124+
return this._pool.acquire(key);
87125
} else {
88126
return Promise.reject(m + " is not a valid option");
89127
}
@@ -96,7 +134,6 @@ class RoutingDriver extends Driver {
96134
}
97135
}
98136

99-
100137
class ClusterView {
101138
constructor(routers, readers, writers, expires) {
102139
this.routers = routers || new RoundRobinArray();
@@ -133,6 +170,17 @@ class ClusterView {
133170
}
134171
}
135172

173+
class RoutingSession extends Session {
174+
constructor(connectionPromise, onClose, onFailedConnection) {
175+
super(connectionPromise, onClose);
176+
this._onFailedConnection = onFailedConnection;
177+
}
178+
179+
_onRunFailure() {
180+
return this._onFailedConnection;
181+
}
182+
}
183+
136184
let GET_SERVERS = "CALL dbms.cluster.routing.getServers";
137185

138186
/**
@@ -170,6 +218,9 @@ function newClusterView(session) {
170218
}
171219
}
172220
return new ClusterView(routers, readers, writers, expires);
221+
})
222+
.catch(() => {
223+
return Promise.reject(newError("No servers could be found at this instant.", SERVICE_UNAVAILABLE));
173224
});
174225
}
175226

src/v1/session.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ class Session {
5555
parameters = statement.parameters || {};
5656
statement = statement.text;
5757
}
58-
let streamObserver = new _RunObserver();
58+
let streamObserver = new _RunObserver(this._onRunFailure());
5959
if (!this._hasTx) {
6060
this._connectionPromise.then((conn) => {
61+
streamObserver.resolveConnection(conn);
6162
conn.run(statement, parameters, streamObserver);
6263
conn.pullAll(streamObserver);
6364
conn.sync();
@@ -105,12 +106,17 @@ class Session {
105106
cb();
106107
}
107108
}
109+
110+
//Can be overridden to add error callback on RUN
111+
_onRunFailure() {
112+
return (err) => {return err};
113+
}
108114
}
109115

110116
/** Internal stream observer used for transactional results*/
111117
class _RunObserver extends StreamObserver {
112-
constructor() {
113-
super();
118+
constructor(onError) {
119+
super(onError);
114120
this._meta = {};
115121
}
116122

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
C: PULL_ALL
7+
S: FAILURE {"code": "Neo.ClientError.Procedure.ProcedureNotFound", "message": "blabla"}
8+
S: IGNORED
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
!: AUTO RUN "ROLLBACK" {}
5+
!: AUTO RUN "BEGIN" {}
6+
!: AUTO PULL_ALL
7+
8+
C: RUN "CREATE ()" {}
9+
C: PULL_ALL
10+
S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"}
11+
S: IGNORED
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO PULL_ALL
4+
5+
C: RUN "CALL dbms.cluster.routing.getServers" {}
6+
PULL_ALL
7+
S: SUCCESS {"fields": ["ttl", "servers"]}
8+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
9+
SUCCESS {}
10+
C: RUN "CALL dbms.cluster.routing.getServers" {}
11+
PULL_ALL
12+
S: SUCCESS {"fields": ["ttl", "servers"]}
13+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9002","127.0.0.1:9003","127.0.0.1:9004"], "role": "ROUTE"}]]
14+
SUCCESS {}

0 commit comments

Comments
 (0)