Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Sup 1481 java bridge reads #460

Merged
merged 8 commits into from
Sep 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 75 additions & 66 deletions initializers/dataAccess.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ var fs = require("fs");
var async = require("async");
var java = require('java');
var Jdbc = require('informix-wrapper');
var req = require('request');
var helper;

var javaReadBridge = process.env.JAVA_READ_BRIDGE || "http://localhost:8082/bridge";

/**
* Regex for sql paramters e.g @param_name@
*/
Expand Down Expand Up @@ -110,6 +113,67 @@ function parameterizeQuery(query, params, callback) {
});
}

function executePreparedStatement(api, sql, parameters, connection, next, db) {
async.waterfall([
function (cb) {
parameterizeQuery(sql, parameters, cb);
},
function (parametrizedQuery, cb) {
sql = parametrizedQuery;

if (api.helper.readTransaction) {
api.log("Calling Java Bridge", "debug");

api.log(sql, "debug");

var body = {
"sql": new Buffer(sql).toString('base64'),
"db": db
};

api.log(JSON.stringify(body), "debug");

req({ url: javaReadBridge, method: "POST", body: body, json: true }, function(error, response, body) {
if (error) {
api.log(error, "error");
cb(error);
}

if (response.statusCode != 200) {
api.log(response, "error");
cb(response.statusMessage);
}

api.log("Response:" + JSON.stringify(body), "debug");
cb(null, body.results);
});
} else {
api.log("Database connected", 'debug');
// the connection might have been closed due to other errors, so this check must be done
if (connection.isConnected()) {
// Run the query
connection.query(sql, cb, {
start: function (q) {
api.log('Start to execute ' + q, 'debug');
},
finish: function (f) {
api.log('Finish executing ' + f, 'debug');
}
}).execute();
} else cb("Connection closed unexpectedly");
}
}
], function (err, result) {
if (err) {
api.log("Error occurred: " + err + " " + (err.stack || ''), 'error');
} else {
api.log("Query executed", "debug");
}

next(err, result);
});
}


/**
* Expose the "dataAccess" utility.
Expand Down Expand Up @@ -239,9 +303,10 @@ exports.dataAccess = function (api, next) {
return;
}

connection = connectionMap[queries[queryName].db];

error = helper.checkObject(connection, "connection");
if (!api.helper.readTransaction) {
connection = connectionMap[queries[queryName].db];
error = helper.checkObject(connection, "connection");
}

if (error) {
next(error);
Expand All @@ -254,36 +319,8 @@ exports.dataAccess = function (api, next) {
next('The query for name ' + queryName + ' is not registered');
return;
}

async.waterfall([
function (cb) {
parameterizeQuery(sql, parameters, cb);
}, function (parametrizedQuery, cb) {
sql = parametrizedQuery;
api.log("Database connected", 'debug');

// the connection might have been closed due to other errors, so this check must be done
if (connection.isConnected()) {
// Run the query
connection.query(sql, cb, {
start: function (q) {
api.log('Start to execute ' + q, 'debug');
},
finish: function (f) {
api.log('Finish executing ' + f, 'debug');
}
}).execute();
} else cb("Connection closed unexpectedly");
}
], function (err, result) {
if (err) {
api.log("Error occurred: " + err + " " + (err.stack || ''), 'error');
} else {
api.log("Query executed", "debug");
}

next(err, result);
});

executePreparedStatement(api, sql, parameters, connection, next, queries[queryName].db);
},

/**
Expand Down Expand Up @@ -316,45 +353,17 @@ exports.dataAccess = function (api, next) {
return;
}

connection = connectionMap[dbName];

error = helper.checkObject(connection, "connection");
if (!api.helper.readTransaction) {
connection = connectionMap[dbName];
error = helper.checkObject(connection, "connection");
}

if (error) {
next(error);
return;
}

async.waterfall([
function (cb) {
parameterizeQuery(sql, parameters, cb);
}, function (parametrizedQuery, cb) {
sql = parametrizedQuery;
api.log("Database connected", 'info');

// the connection might have been closed due to other errors, so this check must be done
if (connection.isConnected()) {
// Run the query
connection.query(sql, cb, {
start: function (q) {
api.log('Start to execute ' + q, 'debug');
},
finish: function (f) {
api.log('Finish executing ' + f, 'debug');
}
}).execute();
} else cb("Connection closed unexpectedly");
}
], function (err, result) {
if (err) {
api.log("Error occurred: " + err + " " + (err.stack || ''), 'error');
} else {
api.log("Query executed", "debug");
}

next(err, result);
});

executePreparedStatement(api, sql, parameters, connection, next, dbName);
}
};
next();
Expand Down
20 changes: 14 additions & 6 deletions initializers/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ exports.transaction = function (api, next) {
* @param {Function} next - The callback function
*/
transactionPreProcessor = function (connection, actionTemplate, next) {
if (actionTemplate.transaction === "read" || actionTemplate.transaction === "write") {
var dbConnectionMap = {}, dbConnection, callback, connectionOpenedCount = 0;
var dbConnectionMap = {}, dbConnection, callback, connectionOpenedCount = 0;

if (actionTemplate.transaction === "write") {
api.helper.readTransaction = false;

var connectTimeout = function() {
api.log("Timed out without obtaining all DB connections", "error");
Expand Down Expand Up @@ -120,6 +122,8 @@ exports.transaction = function (api, next) {
});

} else {
connection.dbConnectionMap = dbConnectionMap;
api.helper.readTransaction = true;
next(connection, true);
}
};
Expand All @@ -136,16 +140,20 @@ exports.transaction = function (api, next) {
* @param {Function} next - The callback function
*/
transactionPostProcessor = function (connection, actionTemplate, toRender, next) {

var disconnectTimeout = function() {
api.error("Timed out without closing all DB connections", "error");
// I dont want to call next(connection); here because I want to allow the execution to to continue in case connection can be closed after timeout
}

var clearMe = setTimeout(disconnectTimeout, DISCONN_TIMEOUT);

var connectionClosedCount = 0;
if (connection.dbConnectionMap !== null && connection.dbConnectionMap !== undefined && actionTemplate.transaction !== null && actionTemplate.transaction !== undefined) {
if (connection.dbConnectionMap !== null
&& connection.dbConnectionMap !== undefined
&& actionTemplate.transaction !== null
&& actionTemplate.transaction !== undefined
&& actionTemplate.transaction === "write") {

var clearMe = setTimeout(disconnectTimeout, DISCONN_TIMEOUT);

actionTemplate.databases.forEach(function (databaseName) {
var callback;
callback = function (err, result) {
Expand Down