Skip to content

Added tracing again #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = {
DATABASE_URL: process.env.DATABASE_URL,
DATABASE_OPTIONS: {
dialect: 'postgres',
logging: false,
dialectOptions: {
ssl: process.env.DATABASE_SSL != null,
},
Expand Down
33 changes: 15 additions & 18 deletions connect/connectEmailServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const config = require('config');
const emailServer = require('../index');
const service = require('./service');
const logger = require('../src/common/logger');
const { functionWrapper } = require('../src/common/wrapper');

// set configuration for the server, see ../config/default.js for available config parameters
// setConfig should be called before initDatabase and start functions
Expand All @@ -18,23 +17,20 @@ emailServer.setConfig({ LOG_LEVEL: config.LOG_LEVEL });
// the topic is topic name,
// the message is JSON event message,
const handler = async (topic, message) => {
(await functionWrapper(async () => {

let templateId = config.TEMPLATE_MAP[topic];
templateId = _.get(message, config.PAYLOAD_SENDGRID_TEMPLATE_KEY, templateId);
if (!templateId) {
return { success: false, error: `Template not found for topic ${topic}` };
}

try {
await service.sendEmail(templateId, message)
return { success: true };
} catch (err) {
logger.error("Error occurred in sendgrid api calling:", err);
return { success: false, error: err };
}

}, 'emailHandler'))(topic, message);
let templateId = config.TEMPLATE_MAP[topic];
templateId = _.get(message, config.PAYLOAD_SENDGRID_TEMPLATE_KEY, templateId);
if (!templateId) {
return { success: false, error: `Template not found for topic ${topic}` };
}

try {
await service.sendEmail(templateId, message)
return { success: true };
} catch (err) {
logger.error("Error occurred in sendgrid api calling:", err);
return { success: false, error: err };
}

};

// init all events
Expand All @@ -47,6 +43,7 @@ emailServer
.initDatabase()
.then(() => {
logger.info('Database initialized successfully.')

emailServer.start()
})
.catch((e) => {
Expand Down
103 changes: 52 additions & 51 deletions connect/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,65 @@
const sgMail = require('@sendgrid/mail');
const config = require('config');
const logger = require('../src/common/logger');
const { functionWrapper } = require('../src/common/wrapper');

// set api key for SendGrid email client
sgMail.setApiKey(config.SENDGRID_API_KEY);

const sendEmail = async (templateId, message) => { // send email
(await functionWrapper(async () => {
const span = await logger.startSpan('sendEmail');
let msg = {}
const from = message.from ? message.from : config.EMAIL_FROM;
const replyTo = message.replyTo ? message.replyTo : config.EMAIL_FROM;
const substitutions = message.data;
const categories = message.categories ? message.categories : [];
const to = message.recipients;
const cc = message.cc ? message.cc : [];
const bcc = message.bcc ? message.bcc : [];
const sendAt = message.sendAt ? message.sendAt : undefined;
const personalizations = message.personalizations ? message.personalizations : undefined
const attachments = message.attachments ? message.attachments : [];

let msg = {}
const from = message.from ? message.from : config.EMAIL_FROM;
const replyTo = message.replyTo ? message.replyTo : config.EMAIL_FROM;
const substitutions = message.data;
const categories = message.categories ? message.categories : [];
const to = message.recipients;
const cc = message.cc ? message.cc : [];
const bcc = message.bcc ? message.bcc : [];
const sendAt = message.sendAt ? message.sendAt : undefined;
const personalizations = message.personalizations ? message.personalizations : undefined
const attachments = message.attachments ? message.attachments : [];

if (message.version && message.version == "v3") {
msg = {
to,
templateId,
dynamicTemplateData: substitutions,
personalizations,
from,
replyTo,
categories,
cc,
bcc,
attachments,
sendAt
};
} else {
msg = {
to,
templateId,
substitutions,
substitutionWrappers: ['{{', '}}'],
from,
replyTo,
categories,
cc,
bcc,
};
}
logger.info(`Sending email with templateId: ${templateId} and message: ${JSON.stringify(msg)}`);
try {
const result = await sgMail.send(msg)
logger.info(`Email sent successfully with result: ${JSON.stringify(result)}`);
return result
} catch (err) {
logger.error(`Error occurred in sendgrid api calling: ${err}`);
throw err
}
}, 'sendgridSendEmail'))(templateId, message);
if (message.version && message.version == "v3") {
msg = {
to,
templateId,
dynamicTemplateData: substitutions,
personalizations,
from,
replyTo,
categories,
cc,
bcc,
attachments,
sendAt
};
} else {
msg = {
to,
templateId,
substitutions,
substitutionWrappers: ['{{', '}}'],
from,
replyTo,
categories,
cc,
bcc,
};
}
logger.info(`Sending email with templateId: ${templateId} and message: ${JSON.stringify(msg)}`);
try {
await logger.endSpan(span);
const sgSpan = await logger.startSpan('sendgrid');
const result = await sgMail.send(msg)
await logger.endSpan(sgSpan);
logger.info(`Email sent successfully with result: ${JSON.stringify(result)}`);
return result
} catch (err) {
logger.error(`Error occurred in sendgrid api calling: ${err}`);
throw err
}
}
module.exports = {
sendEmail,
}
logger.buildService(module.exports)
6 changes: 6 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ function start() {
try {
await retryEmail(handlers)
} catch (err) {
console.log('Error in retrying email', err)
logger.error(err);
}
});
app.listen(app.get('port'), () => {
logger.info(`Express server listening on port ${app.get('port')}`);
});
}).catch((err) => {

logger.error(err);
process.exit(1);
})
Expand All @@ -167,7 +169,9 @@ function start() {
async function initDatabase() {
// load models only after config is set
logger.info('Initializing database...');
const span = await logger.startSpan('initDatabase');
await models.init(true);
await logger.endSpan(span);
}

// Exports
Expand All @@ -179,3 +183,5 @@ module.exports = {
start,
initDatabase,
};

logger.buildService(module.exports)
20 changes: 0 additions & 20 deletions src/common/wrapper.js

This file was deleted.

43 changes: 24 additions & 19 deletions src/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ const _ = require('lodash');
const { Kafka } = require('kafkajs')
const logger = require('./common/logger');
const models = require('./models');
const { functionWrapper } = require('./common/wrapper');


/**
* Configure Kafka consumer.
* @param {Object} handlers the handlers
*/
async function configureKafkaConsumer(handlers) {

// create group consumer
let brokers = ['']
if (config.KAFKA_URL.startsWith('ssl://')) {
Expand All @@ -28,24 +26,23 @@ async function configureKafkaConsumer(handlers) {
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY };
}


const kafka = new Kafka(options)
const consumer = kafka.consumer({ groupId: config.KAFKA_GROUP_ID });

await consumer.connect()
await consumer.subscribe({ topics: _.keys(handlers) });
dataHandler(consumer, handlers).catch((err) => {
console.log('error', 'Kafka consumer error', err);
logger.error(err);
});
}


async function dataHandler(consumer, handlers) {

(await functionWrapper(async () => {


try {
await consumer.run({
eachMessage: async (data) => {
const span = await logger.startSpan('dataHandler');
const topic = data.topic
const msg = data.message
const partition = data.partition
Expand Down Expand Up @@ -94,21 +91,17 @@ async function dataHandler(consumer, handlers) {
logger.error('error', 'Send email error details', result.error);
}
}
await logger.endSpan(span);
} catch (e) {
await logger.endSpanWithError(span, e);
logger.error(e)
}




},
})



}, 'dataHandler'))(consumer, handlers);


} catch (e) {
logger.error(e)
}

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
Expand Down Expand Up @@ -146,40 +139,52 @@ async function dataHandler(consumer, handlers) {
* @param {Object} handlers the handlers
*/
async function retryEmail(handlers) {
const span = await logger.startSpan('retryEmail');
const loader = await models.loadEmailModule()
const emailModel = await loader.findAll({ where: { status: 'FAILED', createdAt: { $gt: new Date(new Date() - config.EMAIL_RETRY_MAX_AGE) } } })

if (emailModel.length > 0) {
logger.info(`Found ${emailModel.length} e-mails to be resent`);
emailModel.map(async m => {
// find handler
const handler = handlers[m.topicName];
if (!handler) {
logger.warn(`No handler configured for topic: ${m.topicName}`);
await logger.endSpan(span);
return m;
}
const messageJSON = { data: JSON.parse(m.data), recipients: JSON.parse(m.recipients) };
const result = await handler(m.topicName, messageJSON);
if (result.success) {
logger.info(`Email model with ${m.id} was sent correctly`);
m.status = 'SUCCESS';
await logger.endSpan(span);
return m.save();
}
logger.info(`Email model with ${m.id} wasn't sent correctly`);
await logger.endSpan(span);
return m;
});
} else {
await logger.endSpan(span);
return models;
}

}

async function initServer(handlers) {
await models.init()
await configureKafkaConsumer(handlers)
try {
const span = await logger.startSpan('initServer');
await models.init()
await configureKafkaConsumer(handlers)
await logger.endSpan(span);
} catch (e) {
await logger.endSpanWithError(span, e);
}
}
// Exports
module.exports = {
initServer,
retryEmail,
};

logger.buildService(module.exports)
5 changes: 5 additions & 0 deletions src/models/datasource.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ let sequelizeInstance = null;
async function getSequelize() {
if (!sequelizeInstance) {
sequelizeInstance = new Sequelize(config.DATABASE_URL, config.DATABASE_OPTIONS);
const span = await logger.startSpan('getSequelize');
try {
await sequelizeInstance.authenticate()
await logger.endSpan(span);
logger.info('Database connection has been established successfully.');
} catch (e) {
await logger.endSpanWithErr(span, e);
logger.error('Unable to connect to the database:', err);
}
}
Expand All @@ -34,3 +37,5 @@ async function getSequelize() {
module.exports = {
getSequelize,
};

logger.buildService(module.exports)
Loading