diff --git a/config/default.js b/config/default.js index c6ab49b..bf27a3b 100644 --- a/config/default.js +++ b/config/default.js @@ -9,6 +9,7 @@ module.exports = { DATABASE_URL: process.env.DATABASE_URL, DATABASE_OPTIONS: { dialect: 'postgres', + logging: false, dialectOptions: { ssl: process.env.DATABASE_SSL != null, }, diff --git a/connect/connectEmailServer.js b/connect/connectEmailServer.js index 3e8dbf2..81b76af 100644 --- a/connect/connectEmailServer.js +++ b/connect/connectEmailServer.js @@ -6,7 +6,6 @@ const config = require('config'); const emailServer = require('../index'); const service = require('./service'); const logger = require('../src/common/logger'); -const { functionWrapper } = require('../src/common/wrapper'); // set configuration for the server, see ../config/default.js for available config parameters // setConfig should be called before initDatabase and start functions @@ -18,23 +17,20 @@ emailServer.setConfig({ LOG_LEVEL: config.LOG_LEVEL }); // the topic is topic name, // the message is JSON event message, const handler = async (topic, message) => { - (await functionWrapper(async () => { - - let templateId = config.TEMPLATE_MAP[topic]; - templateId = _.get(message, config.PAYLOAD_SENDGRID_TEMPLATE_KEY, templateId); - if (!templateId) { - return { success: false, error: `Template not found for topic ${topic}` }; - } - - try { - await service.sendEmail(templateId, message) - return { success: true }; - } catch (err) { - logger.error("Error occurred in sendgrid api calling:", err); - return { success: false, error: err }; - } - - }, 'emailHandler'))(topic, message); + let templateId = config.TEMPLATE_MAP[topic]; + templateId = _.get(message, config.PAYLOAD_SENDGRID_TEMPLATE_KEY, templateId); + if (!templateId) { + return { success: false, error: `Template not found for topic ${topic}` }; + } + + try { + await service.sendEmail(templateId, message) + return { success: true }; + } catch (err) { + logger.error("Error occurred in sendgrid api calling:", err); + return { success: false, error: err }; + } + }; // init all events @@ -47,6 +43,7 @@ emailServer .initDatabase() .then(() => { logger.info('Database initialized successfully.') + emailServer.start() }) .catch((e) => { diff --git a/connect/service.js b/connect/service.js index 502ab29..7e591c5 100644 --- a/connect/service.js +++ b/connect/service.js @@ -5,64 +5,65 @@ const sgMail = require('@sendgrid/mail'); const config = require('config'); const logger = require('../src/common/logger'); -const { functionWrapper } = require('../src/common/wrapper'); // set api key for SendGrid email client sgMail.setApiKey(config.SENDGRID_API_KEY); const sendEmail = async (templateId, message) => { // send email - (await functionWrapper(async () => { + const span = await logger.startSpan('sendEmail'); + let msg = {} + const from = message.from ? message.from : config.EMAIL_FROM; + const replyTo = message.replyTo ? message.replyTo : config.EMAIL_FROM; + const substitutions = message.data; + const categories = message.categories ? message.categories : []; + const to = message.recipients; + const cc = message.cc ? message.cc : []; + const bcc = message.bcc ? message.bcc : []; + const sendAt = message.sendAt ? message.sendAt : undefined; + const personalizations = message.personalizations ? message.personalizations : undefined + const attachments = message.attachments ? message.attachments : []; - let msg = {} - const from = message.from ? message.from : config.EMAIL_FROM; - const replyTo = message.replyTo ? message.replyTo : config.EMAIL_FROM; - const substitutions = message.data; - const categories = message.categories ? message.categories : []; - const to = message.recipients; - const cc = message.cc ? message.cc : []; - const bcc = message.bcc ? message.bcc : []; - const sendAt = message.sendAt ? message.sendAt : undefined; - const personalizations = message.personalizations ? message.personalizations : undefined - const attachments = message.attachments ? message.attachments : []; - - if (message.version && message.version == "v3") { - msg = { - to, - templateId, - dynamicTemplateData: substitutions, - personalizations, - from, - replyTo, - categories, - cc, - bcc, - attachments, - sendAt - }; - } else { - msg = { - to, - templateId, - substitutions, - substitutionWrappers: ['{{', '}}'], - from, - replyTo, - categories, - cc, - bcc, - }; - } - logger.info(`Sending email with templateId: ${templateId} and message: ${JSON.stringify(msg)}`); - try { - const result = await sgMail.send(msg) - logger.info(`Email sent successfully with result: ${JSON.stringify(result)}`); - return result - } catch (err) { - logger.error(`Error occurred in sendgrid api calling: ${err}`); - throw err - } - }, 'sendgridSendEmail'))(templateId, message); + if (message.version && message.version == "v3") { + msg = { + to, + templateId, + dynamicTemplateData: substitutions, + personalizations, + from, + replyTo, + categories, + cc, + bcc, + attachments, + sendAt + }; + } else { + msg = { + to, + templateId, + substitutions, + substitutionWrappers: ['{{', '}}'], + from, + replyTo, + categories, + cc, + bcc, + }; + } + logger.info(`Sending email with templateId: ${templateId} and message: ${JSON.stringify(msg)}`); + try { + await logger.endSpan(span); + const sgSpan = await logger.startSpan('sendgrid'); + const result = await sgMail.send(msg) + await logger.endSpan(sgSpan); + logger.info(`Email sent successfully with result: ${JSON.stringify(result)}`); + return result + } catch (err) { + logger.error(`Error occurred in sendgrid api calling: ${err}`); + throw err + } } module.exports = { sendEmail, } +logger.buildService(module.exports) \ No newline at end of file diff --git a/index.js b/index.js index 913fb9d..3b93e9e 100644 --- a/index.js +++ b/index.js @@ -148,6 +148,7 @@ function start() { try { await retryEmail(handlers) } catch (err) { + console.log('Error in retrying email', err) logger.error(err); } }); @@ -155,6 +156,7 @@ function start() { logger.info(`Express server listening on port ${app.get('port')}`); }); }).catch((err) => { + logger.error(err); process.exit(1); }) @@ -167,7 +169,9 @@ function start() { async function initDatabase() { // load models only after config is set logger.info('Initializing database...'); + const span = await logger.startSpan('initDatabase'); await models.init(true); + await logger.endSpan(span); } // Exports @@ -179,3 +183,5 @@ module.exports = { start, initDatabase, }; + +logger.buildService(module.exports) \ No newline at end of file diff --git a/src/common/wrapper.js b/src/common/wrapper.js deleted file mode 100644 index 6eaa6d6..0000000 --- a/src/common/wrapper.js +++ /dev/null @@ -1,20 +0,0 @@ -const logger = require('../common/logger') - -function functionWrapper (fn, fnName) { - return async function () { - const span = await logger.startSpan(fnName ?? fn.name) - try { - const result = await fn.apply(this, arguments) - await logger.endSpan(span) - return result - } catch (e) { - await logger.endSpanWithError(span, e) - throw e - } - } -} - -module.exports = { - functionWrapper -} -logger.buildService(module.exports) diff --git a/src/init.js b/src/init.js index 666ddf5..daeab37 100644 --- a/src/init.js +++ b/src/init.js @@ -7,7 +7,6 @@ const _ = require('lodash'); const { Kafka } = require('kafkajs') const logger = require('./common/logger'); const models = require('./models'); -const { functionWrapper } = require('./common/wrapper'); /** @@ -15,7 +14,6 @@ const { functionWrapper } = require('./common/wrapper'); * @param {Object} handlers the handlers */ async function configureKafkaConsumer(handlers) { - // create group consumer let brokers = [''] if (config.KAFKA_URL.startsWith('ssl://')) { @@ -28,24 +26,23 @@ async function configureKafkaConsumer(handlers) { options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }; } - const kafka = new Kafka(options) const consumer = kafka.consumer({ groupId: config.KAFKA_GROUP_ID }); + await consumer.connect() await consumer.subscribe({ topics: _.keys(handlers) }); dataHandler(consumer, handlers).catch((err) => { + console.log('error', 'Kafka consumer error', err); logger.error(err); }); } async function dataHandler(consumer, handlers) { - - (await functionWrapper(async () => { - - + try { await consumer.run({ eachMessage: async (data) => { + const span = await logger.startSpan('dataHandler'); const topic = data.topic const msg = data.message const partition = data.partition @@ -94,21 +91,17 @@ async function dataHandler(consumer, handlers) { logger.error('error', 'Send email error details', result.error); } } + await logger.endSpan(span); } catch (e) { + await logger.endSpanWithError(span, e); logger.error(e) } - - - }, }) - - - - }, 'dataHandler'))(consumer, handlers); - - + } catch (e) { + logger.error(e) + } const errorTypes = ['unhandledRejection', 'uncaughtException'] const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] @@ -146,9 +139,9 @@ async function dataHandler(consumer, handlers) { * @param {Object} handlers the handlers */ async function retryEmail(handlers) { + const span = await logger.startSpan('retryEmail'); const loader = await models.loadEmailModule() const emailModel = await loader.findAll({ where: { status: 'FAILED', createdAt: { $gt: new Date(new Date() - config.EMAIL_RETRY_MAX_AGE) } } }) - if (emailModel.length > 0) { logger.info(`Found ${emailModel.length} e-mails to be resent`); emailModel.map(async m => { @@ -156,6 +149,7 @@ async function retryEmail(handlers) { const handler = handlers[m.topicName]; if (!handler) { logger.warn(`No handler configured for topic: ${m.topicName}`); + await logger.endSpan(span); return m; } const messageJSON = { data: JSON.parse(m.data), recipients: JSON.parse(m.recipients) }; @@ -163,23 +157,34 @@ async function retryEmail(handlers) { if (result.success) { logger.info(`Email model with ${m.id} was sent correctly`); m.status = 'SUCCESS'; + await logger.endSpan(span); return m.save(); } logger.info(`Email model with ${m.id} wasn't sent correctly`); + await logger.endSpan(span); return m; }); } else { + await logger.endSpan(span); return models; } } async function initServer(handlers) { - await models.init() - await configureKafkaConsumer(handlers) + try { + const span = await logger.startSpan('initServer'); + await models.init() + await configureKafkaConsumer(handlers) + await logger.endSpan(span); + } catch (e) { + await logger.endSpanWithError(span, e); + } } // Exports module.exports = { initServer, retryEmail, }; + +logger.buildService(module.exports) \ No newline at end of file diff --git a/src/models/datasource.js b/src/models/datasource.js index fa9ab28..5b41d13 100644 --- a/src/models/datasource.js +++ b/src/models/datasource.js @@ -21,10 +21,13 @@ let sequelizeInstance = null; async function getSequelize() { if (!sequelizeInstance) { sequelizeInstance = new Sequelize(config.DATABASE_URL, config.DATABASE_OPTIONS); + const span = await logger.startSpan('getSequelize'); try { await sequelizeInstance.authenticate() + await logger.endSpan(span); logger.info('Database connection has been established successfully.'); } catch (e) { + await logger.endSpanWithErr(span, e); logger.error('Unable to connect to the database:', err); } } @@ -34,3 +37,5 @@ async function getSequelize() { module.exports = { getSequelize, }; + +logger.buildService(module.exports) \ No newline at end of file diff --git a/src/models/index.js b/src/models/index.js index ae95b00..1ea3fa5 100644 --- a/src/models/index.js +++ b/src/models/index.js @@ -9,6 +9,7 @@ const DataTypes = require('sequelize/lib/data-types'); const logger = require('../common/logger'); async function defineEmailModel(sequelize, DataTypes) { + const span = await logger.startSpan('defineEmailModel'); const Email = sequelize.define('Email', { id: { type: DataTypes.BIGINT, primaryKey: true, autoIncrement: true }, topicName: { type: DataTypes.STRING, allowNull: true, field: 'topic_name' }, @@ -17,21 +18,29 @@ async function defineEmailModel(sequelize, DataTypes) { status: { type: DataTypes.STRING, allowNull: false }, }); await Email.sync(); + await logger.endSpan(span); return Email; } async function loadSequelizeModule() { - return await sequelizeInstance.getSequelize(); + const span = await logger.startSpan('loadSequelizeModule'); + const res = await sequelizeInstance.getSequelize(); + await logger.endSpan(span); + return res; } async function loadEmailModule() { + const span = await logger.startSpan('loadEmailModule'); const sequelize = await loadSequelizeModule(); + await logger.endSpan(span); return defineEmailModel(sequelize, DataTypes); } async function init() { logger.info("Initializing models"); + const span = await logger.startSpan('init'); const sequelize = await loadSequelizeModule(); await sequelize.sync(); + await logger.endSpan(span); } @@ -39,3 +48,5 @@ module.exports = { loadEmailModule, init, }; + +logger.buildService(module.exports) \ No newline at end of file