Skip to content

Commit 8763126

Browse files
committed
More byte arrays stuff
1 parent 021e9a5 commit 8763126

File tree

5 files changed

+201
-22
lines changed

5 files changed

+201
-22
lines changed

src/v1/internal/connection-providers.js

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ export class DirectConnectionProvider extends ConnectionProvider {
5454
}
5555

5656
acquireConnection(mode) {
57-
const connection = this._connectionPool.acquire(this._address);
58-
const connectionPromise = Promise.resolve(connection);
57+
const connectionPromise = acquireConnectionFromPool(this._connectionPool, this._address);
5958
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
6059
}
6160
}
@@ -102,7 +101,7 @@ export class LoadBalancer extends ConnectionProvider {
102101
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${this._routingTable}`,
103102
SESSION_EXPIRED));
104103
}
105-
return this._connectionPool.acquire(address);
104+
return acquireConnectionFromPool(this._connectionPool, address);
106105
}
107106

108107
_freshRoutingTable(accessMode) {
@@ -199,10 +198,9 @@ export class LoadBalancer extends ConnectionProvider {
199198
}
200199

201200
_createSessionForRediscovery(routerAddress) {
202-
const connection = this._connectionPool.acquire(routerAddress);
203201
// initialized connection is required for routing procedure call
204202
// server version needs to be known to decide which routing procedure to use
205-
const initializedConnectionPromise = connection.initializationCompleted();
203+
const initializedConnectionPromise = acquireConnectionFromPool(this._connectionPool, routerAddress);
206204
const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise);
207205
return new Session(READ, connectionProvider);
208206
}
@@ -263,3 +261,19 @@ export class SingleConnectionProvider extends ConnectionProvider {
263261
return connectionPromise;
264262
}
265263
}
264+
265+
// todo: test that all connection providers return initialized connections
266+
267+
/**
268+
* Acquire an initialized connection from the given connection pool for the given address. Returned connection
269+
* promise will be resolved by a connection which completed initialization, i.e. received a SUCCESS response
270+
* for it's INIT message.
271+
* @param {Pool} connectionPool the connection pool to acquire connection from.
272+
* @param {string} address the server address.
273+
* @return {Promise<Connection>} the initialized connection.
274+
*/
275+
function acquireConnectionFromPool(connectionPool, address) {
276+
const connection = connectionPool.acquire(address);
277+
// initialized connection is required to be able to perform subsequent server version checks
278+
return connection.initializationCompleted();
279+
}

src/v1/internal/connector.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {newError} from './../error';
2727
import ChannelConfig from './ch-config';
2828
import {parseHost, parsePort} from './util';
2929
import StreamObserver from './stream-observer';
30+
import {ServerVersion, VERSION_3_2_0} from './server-version';
3031

3132
let Channel;
3233
if( NodeChannel.available ) {
@@ -472,8 +473,17 @@ class Connection {
472473
return this._packer.packable(value, (err) => this._handleFatalError(err));
473474
}
474475

475-
setServerVersion(version) {
476-
this.server.version = version;
476+
/**
477+
* @protected
478+
*/
479+
_markInitialized(metadata) {
480+
const serverVersion = metadata.server;
481+
if (!this.server.version) {
482+
this.server.version = serverVersion;
483+
if (ServerVersion.fromString(serverVersion).compareTo(VERSION_3_2_0) < 0) {
484+
this._packer.disableByteArrays();
485+
}
486+
}
477487
}
478488
}
479489

@@ -524,7 +534,7 @@ class ConnectionState {
524534
},
525535
onCompleted: metaData => {
526536
if (metaData && metaData.server) {
527-
this._connection.setServerVersion(metaData.server);
537+
this._connection._markInitialized(metaData);
528538
}
529539
this._initialized = true;
530540
if (this._resolvePromise) {

src/v1/internal/packstream.js

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
import utf8 from "./utf8";
20-
import Integer, {int, isInt} from "../integer";
21-
import {newError} from "./../error";
19+
import utf8 from './utf8';
20+
import Integer, {int, isInt} from '../integer';
21+
import {newError} from './../error';
2222

2323
const TINY_STRING = 0x80;
2424
const TINY_LIST = 0x90;
@@ -38,6 +38,9 @@ const STRING_32 = 0xD2;
3838
const LIST_8 = 0xD4;
3939
const LIST_16 = 0xD5;
4040
const LIST_32 = 0xD6;
41+
const BYTES_8 = 0xCC;
42+
const BYTES_16 = 0xCD;
43+
const BYTES_32 = 0xCE;
4144
const MAP_8 = 0xD8;
4245
const MAP_16 = 0xD9;
4346
const MAP_32 = 0xDA;
@@ -74,6 +77,7 @@ class Structure {
7477
class Packer {
7578
constructor (channel) {
7679
this._ch = channel;
80+
this._byteArraysSupported = true;
7781
}
7882

7983
/**
@@ -95,6 +99,8 @@ class Packer {
9599
return () => this.packString(x, onError);
96100
} else if (isInt(x)) {
97101
return () => this.packInteger( x );
102+
} else if (x instanceof Int8Array) {
103+
return () => this.packBytes(x, onError);
98104
} else if (x instanceof Array) {
99105
return () => {
100106
this.packListHeader(x.length, onError);
@@ -225,6 +231,36 @@ class Packer {
225231
}
226232
}
227233

234+
packBytes(array, onError) {
235+
if(this._byteArraysSupported) {
236+
this.packBytesHeader(array.length, onError);
237+
for (let i = 0; i < array.length; i++) {
238+
this._ch.writeInt8(array[i]);
239+
}
240+
}else {
241+
onError(newError("Byte arrays are not supported by the database this driver is connected to"));
242+
}
243+
}
244+
245+
packBytesHeader(size, onError) {
246+
if (size < 0x100) {
247+
this._ch.writeUInt8(BYTES_8);
248+
this._ch.writeUInt8(size);
249+
} else if (size < 0x10000) {
250+
this._ch.writeUInt8(BYTES_16);
251+
this._ch.writeUInt8((size / 256 >> 0) % 256);
252+
this._ch.writeUInt8(size % 256);
253+
} else if (size < 0x100000000) {
254+
this._ch.writeUInt8(BYTES_32);
255+
this._ch.writeUInt8((size / 16777216 >> 0) % 256);
256+
this._ch.writeUInt8((size / 65536 >> 0) % 256);
257+
this._ch.writeUInt8((size / 256 >> 0) % 256);
258+
this._ch.writeUInt8(size % 256);
259+
} else {
260+
onError(newError('Byte arrays of size ' + size + ' are not supported'));
261+
}
262+
}
263+
228264
packMapHeader (size, onError) {
229265
if (size < 0x10) {
230266
this._ch.writeUInt8(TINY_MAP | size);
@@ -262,6 +298,10 @@ class Packer {
262298
onError(newError("Structures of size " + size + " are not supported"));
263299
}
264300
}
301+
302+
disableByteArrays() {
303+
this._byteArraysSupported = false;
304+
}
265305
}
266306

267307
/**
@@ -284,6 +324,14 @@ class Unpacker {
284324
return value;
285325
}
286326

327+
unpackBytes(size, buffer) {
328+
const value = new Int8Array(size);
329+
for (let i = 0; i < size; i++) {
330+
value[i] = buffer.readInt8();
331+
}
332+
return value;
333+
}
334+
287335
unpackMap (size, buffer) {
288336
let value = {};
289337
for(let i = 0; i < size; i++) {
@@ -344,6 +392,12 @@ class Unpacker {
344392
return this.unpackList(buffer.readUInt16(), buffer);
345393
} else if (marker == LIST_32) {
346394
return this.unpackList(buffer.readUInt32(), buffer);
395+
} else if (marker == BYTES_8) {
396+
return this.unpackBytes(buffer.readUInt8(), buffer);
397+
} else if (marker == BYTES_16) {
398+
return this.unpackBytes(buffer.readUInt16(), buffer);
399+
} else if (marker == BYTES_32) {
400+
return this.unpackBytes(buffer.readUInt32(), buffer);
347401
} else if (marker == MAP_8) {
348402
return this.unpackMap(buffer.readUInt8(), buffer);
349403
} else if (marker == MAP_16) {

test/internal/shared-neo4j.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ const password = 'password';
9595
const authToken = neo4j.auth.basic(username, password);
9696

9797
const neoCtrlVersionParam = '-e';
98-
const defaultNeo4jVersion = '3.1.3';
98+
const defaultNeo4jVersion = '3.2.0';
9999
const defaultNeoCtrlArgs = `${neoCtrlVersionParam} ${defaultNeo4jVersion}`;
100100

101101
function neo4jCertPath(dir) {

test/v1/types.test.js

Lines changed: 111 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import neo4j from '../../src/v1';
2121
import sharedNeo4j from '../internal/shared-neo4j';
22+
import _ from 'lodash';
23+
import {ServerVersion, VERSION_3_2_0} from '../../src/v1/internal/server-version';
2224

2325
describe('floating point values', () => {
2426
it('should support float 1.0 ', testValue(1));
@@ -136,18 +138,117 @@ describe('path values', () => {
136138
});
137139
});
138140

139-
function testValue(actual, expected) {
140-
return done => {
141+
describe('byte arrays', () => {
142+
143+
let originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL;
144+
let serverSupportsByteArrays = false;
145+
146+
beforeEach(done => {
147+
jasmine.DEFAULT_TIMEOUT_INTERVAL = 60000;
148+
141149
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
142150
const session = driver.session();
151+
session.run('RETURN 1').then(result => {
152+
driver.close();
153+
const serverVersion = ServerVersion.fromString(result.summary.server.version);
154+
serverSupportsByteArrays = serverVersion.compareTo(VERSION_3_2_0) >= 0;
155+
done();
156+
});
157+
});
143158

144-
session.run('RETURN {val} as v', {val: actual})
145-
.then(result => {
146-
expect(result.records[0].get('v')).toEqual(expected || actual);
147-
driver.close();
148-
done();
149-
}).catch(err => {
150-
console.log(err);
159+
afterEach(() => {
160+
jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout;
161+
});
162+
163+
it('should support returning empty byte array', done => {
164+
if(!serverSupportsByteArrays) {
165+
done();
166+
return;
167+
}
168+
169+
testValue(new Int8Array(0))(done);
170+
});
171+
172+
it('should support returning empty byte array', conditionalTestValues(serverSupportsByteArrays, new Int8Array(0)));
173+
174+
it('should support returning short byte arrays', conditionalTestValues(serverSupportsByteArrays, randomByteArrays(100, 1, 255)));
175+
176+
it('should support returning medium byte arrays', conditionalTestValues(serverSupportsByteArrays, randomByteArrays(50, 256, 65535)));
177+
178+
it('should support returning long byte arrays', conditionalTestValues(serverSupportsByteArrays, randomByteArrays(10, 65536, 2 * 65536)));
179+
180+
it('should fail to return byte array', done => {
181+
if (serverSupportsByteArrays) {
182+
done();
183+
return;
184+
}
185+
186+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
187+
const session = driver.session();
188+
session.run('RETURN {array}', {array: randomByteArray(42)}).catch(error => {
189+
driver.close();
190+
expect(error.message).toEqual('Byte arrays are not supported by the database this driver is connected to');
191+
done();
192+
});
193+
});
194+
});
195+
196+
function conditionalTestValues(condition, values) {
197+
if (!condition) {
198+
return done => done();
199+
}
200+
201+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
202+
const queriesPromise = values.reduce((acc, value) =>
203+
acc.then(() => runReturnQuery(driver, value)), Promise.resolve());
204+
return asTestFunction(queriesPromise, driver);
205+
}
206+
207+
function testValue(actual, expected) {
208+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
209+
const queryPromise = runReturnQuery(driver, actual, expected);
210+
return asTestFunction(queryPromise, driver);
211+
}
212+
213+
function testValues(values) {
214+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken);
215+
const queriesPromise = values.reduce((acc, value) =>
216+
acc.then(() => runReturnQuery(driver, value)), Promise.resolve());
217+
return asTestFunction(queriesPromise, driver);
218+
}
219+
220+
function runReturnQuery(driver, actual, expected) {
221+
const session = driver.session();
222+
return new Promise((resolve, reject) => {
223+
session.run('RETURN {val} as v', {val: actual}).then(result => {
224+
expect(result.records[0].get('v')).toEqual(expected || actual);
225+
session.close();
226+
resolve();
227+
}).catch(error => {
228+
reject(error);
151229
});
152-
};
230+
});
231+
}
232+
233+
function asTestFunction(promise, driver) {
234+
return done =>
235+
promise.then(() => {
236+
driver.close();
237+
done();
238+
}).catch(error => {
239+
driver.close();
240+
console.log(error);
241+
});
242+
}
243+
244+
function randomByteArrays(count, minLength, maxLength) {
245+
return _.range(count).map(() => {
246+
const length = _.random(minLength, maxLength);
247+
return randomByteArray(length);
248+
});
249+
}
250+
251+
function randomByteArray(length) {
252+
const array = _.range(length).map(() => _.random(-128, 127));
253+
return new Int8Array(array);
153254
}

0 commit comments

Comments
 (0)