Skip to content

Byte array support #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions gulpfile.babel.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var file = require('gulp-file');
var semver = require('semver');
var sharedNeo4j = require('./test/internal/shared-neo4j').default;

/**
* Useful to investigate resource leaks in tests. Enable to see active sockets and file handles after the 'test' task.
*/
var enableActiveNodeHandlesLogging = false;

gulp.task('default', ["test"]);

gulp.task('browser', function(cb){
Expand Down Expand Up @@ -165,15 +170,15 @@ gulp.task('test-nodejs', ['install-driver-into-sandbox'], function () {
.pipe(jasmine({
includeStackTrace: true,
verbose: true
}));
})).on('end', logActiveNodeHandles);
});

gulp.task('test-boltkit', ['nodejs'], function () {
return gulp.src('test/**/*.boltkit.it.js')
.pipe(jasmine({
includeStackTrace: true,
verbose: true
}));
})).on('end', logActiveNodeHandles);
});

gulp.task('test-browser', function (cb) {
Expand Down Expand Up @@ -210,7 +215,7 @@ gulp.task('run-tck', ['download-tck', 'nodejs'], function() {
'steps': 'test/v1/tck/steps/*.js',
'format': 'progress',
'tags' : ['~@fixed_session_pool', '~@db', '~@equality', '~@streaming_and_cursor_navigation']
}));
})).on('end', logActiveNodeHandles);
});

/** Set the project version, controls package.json and version.js */
Expand Down Expand Up @@ -248,5 +253,11 @@ gulp.task('run-stress-tests', function () {
.pipe(jasmine({
includeStackTrace: true,
verbose: true
}));
})).on('end', logActiveNodeHandles);
});

function logActiveNodeHandles() {
if (enableActiveNodeHandlesLogging) {
console.log('-- Active NodeJS handles START\n', process._getActiveHandles(), '\n-- Active NodeJS handles END');
}
}
2 changes: 1 addition & 1 deletion src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class Driver {
/**
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
* @type {ConnectionProvider}
* @private
* @protected
*/
this._connectionProvider = null;
}
Expand Down
8 changes: 8 additions & 0 deletions src/v1/internal/ch-dummy.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

import {CombinedBuffer} from './buf';

const observer = {
instance: null,
updateInstance: (instance) => {
Expand Down Expand Up @@ -55,6 +56,13 @@ class DummyChannel {
toBuffer () {
return new CombinedBuffer( this.written );
}

close(cb) {
this.written = [];
if (cb) {
return cb();
}
}
}

const channel = DummyChannel;
Expand Down
10 changes: 7 additions & 3 deletions src/v1/internal/connection-holder.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ export default class ConnectionHolder {

/**
* Get the current connection promise.
* @param {StreamObserver} streamObserver an observer for this connection.
* @return {Promise<Connection>} promise resolved with the current connection.
*/
getConnection() {
return this._connectionPromise;
getConnection(streamObserver) {
return this._connectionPromise.then(connection => {
streamObserver.resolveConnection(connection);
return connection.initializationCompleted();
});
}

/**
Expand Down Expand Up @@ -117,7 +121,7 @@ class EmptyConnectionHolder extends ConnectionHolder {
// nothing to initialize
}

getConnection() {
getConnection(streamObserver) {
return Promise.reject(newError('This connection holder does not serve connections'));
}

Expand Down
73 changes: 46 additions & 27 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {newError} from './../error';
import ChannelConfig from './ch-config';
import {parseHost, parsePort} from './util';
import StreamObserver from './stream-observer';
import {ServerVersion, VERSION_3_2_0} from './server-version';

let Channel;
if( NodeChannel.available ) {
Expand Down Expand Up @@ -472,8 +473,18 @@ class Connection {
return this._packer.packable(value, (err) => this._handleFatalError(err));
}

setServerVersion(version) {
this.server.version = version;
/**
* @protected
*/
_markInitialized(metadata) {
const serverVersion = metadata ? metadata.server : null;
if (!this.server.version) {
this.server.version = serverVersion;
const version = ServerVersion.fromString(serverVersion);
if (version.compareTo(VERSION_3_2_0) < 0) {
this._packer.disableByteArrays();
}
}
}
}

Expand All @@ -486,11 +497,15 @@ class ConnectionState {
constructor(connection) {
this._connection = connection;

this._initialized = false;
this._initializationError = null;
this._initRequested = false;
this._initError = null;

this._resolvePromise = null;
this._rejectPromise = null;
this._resolveInitPromise = null;
this._rejectInitPromise = null;
this._initPromise = new Promise((resolve, reject) => {
this._resolveInitPromise = resolve;
this._rejectInitPromise = reject;
});
}

/**
Expand All @@ -507,11 +522,7 @@ class ConnectionState {
}
},
onError: error => {
this._initializationError = error;
if (this._rejectPromise) {
this._rejectPromise(error);
this._rejectPromise = null;
}
this._processFailure(error);

this._connection._updateCurrentObserver(); // make sure this same observer will not be called again
try {
Expand All @@ -523,14 +534,9 @@ class ConnectionState {
}
},
onCompleted: metaData => {
if (metaData && metaData.server) {
this._connection.setServerVersion(metaData.server);
}
this._initialized = true;
if (this._resolvePromise) {
this._resolvePromise(this._connection);
this._resolvePromise = null;
}
this._connection._markInitialized(metaData);
this._resolveInitPromise(this._connection);

if (observer && observer.onCompleted) {
observer.onCompleted(metaData);
}
Expand All @@ -543,15 +549,28 @@ class ConnectionState {
* @return {Promise<Connection>} the result of connection initialization.
*/
initializationCompleted() {
if (this._initialized) {
return Promise.resolve(this._connection);
} else if (this._initializationError) {
return Promise.reject(this._initializationError);
this._initRequested = true;

if (this._initError) {
const error = this._initError;
this._initError = null; // to reject initPromise only once
this._rejectInitPromise(error);
}

return this._initPromise;
}

/**
* @private
*/
_processFailure(error) {
if (this._initRequested) {
// someone is waiting for initialization to complete, reject the promise
this._rejectInitPromise(error);
} else {
return new Promise((resolve, reject) => {
this._resolvePromise = resolve;
this._rejectPromise = reject;
});
// no one is waiting for initialization, memorize the error but do not reject the promise
// to avoid unnecessary unhandled promise rejection warnings
this._initError = error;
}
}
}
Expand Down
Loading