Skip to content

Commit dfb04df

Browse files
author
Hamid Tavakoli
authored
Merge pull request #63 from topcoder-platform/tracing
Added tracing again
2 parents 06d2e09 + 5d54523 commit dfb04df

File tree

8 files changed

+115
-109
lines changed

8 files changed

+115
-109
lines changed

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ module.exports = {
99
DATABASE_URL: process.env.DATABASE_URL,
1010
DATABASE_OPTIONS: {
1111
dialect: 'postgres',
12+
logging: false,
1213
dialectOptions: {
1314
ssl: process.env.DATABASE_SSL != null,
1415
},

connect/connectEmailServer.js

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const config = require('config');
66
const emailServer = require('../index');
77
const service = require('./service');
88
const logger = require('../src/common/logger');
9-
const { functionWrapper } = require('../src/common/wrapper');
109

1110
// set configuration for the server, see ../config/default.js for available config parameters
1211
// setConfig should be called before initDatabase and start functions
@@ -18,23 +17,20 @@ emailServer.setConfig({ LOG_LEVEL: config.LOG_LEVEL });
1817
// the topic is topic name,
1918
// the message is JSON event message,
2019
const handler = async (topic, message) => {
21-
(await functionWrapper(async () => {
22-
23-
let templateId = config.TEMPLATE_MAP[topic];
24-
templateId = _.get(message, config.PAYLOAD_SENDGRID_TEMPLATE_KEY, templateId);
25-
if (!templateId) {
26-
return { success: false, error: `Template not found for topic ${topic}` };
27-
}
28-
29-
try {
30-
await service.sendEmail(templateId, message)
31-
return { success: true };
32-
} catch (err) {
33-
logger.error("Error occurred in sendgrid api calling:", err);
34-
return { success: false, error: err };
35-
}
36-
37-
}, 'emailHandler'))(topic, message);
20+
let templateId = config.TEMPLATE_MAP[topic];
21+
templateId = _.get(message, config.PAYLOAD_SENDGRID_TEMPLATE_KEY, templateId);
22+
if (!templateId) {
23+
return { success: false, error: `Template not found for topic ${topic}` };
24+
}
25+
26+
try {
27+
await service.sendEmail(templateId, message)
28+
return { success: true };
29+
} catch (err) {
30+
logger.error("Error occurred in sendgrid api calling:", err);
31+
return { success: false, error: err };
32+
}
33+
3834
};
3935

4036
// init all events
@@ -47,6 +43,7 @@ emailServer
4743
.initDatabase()
4844
.then(() => {
4945
logger.info('Database initialized successfully.')
46+
5047
emailServer.start()
5148
})
5249
.catch((e) => {

connect/service.js

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,64 +5,65 @@
55
const sgMail = require('@sendgrid/mail');
66
const config = require('config');
77
const logger = require('../src/common/logger');
8-
const { functionWrapper } = require('../src/common/wrapper');
98

109
// set api key for SendGrid email client
1110
sgMail.setApiKey(config.SENDGRID_API_KEY);
1211

1312
const sendEmail = async (templateId, message) => { // send email
14-
(await functionWrapper(async () => {
13+
const span = await logger.startSpan('sendEmail');
14+
let msg = {}
15+
const from = message.from ? message.from : config.EMAIL_FROM;
16+
const replyTo = message.replyTo ? message.replyTo : config.EMAIL_FROM;
17+
const substitutions = message.data;
18+
const categories = message.categories ? message.categories : [];
19+
const to = message.recipients;
20+
const cc = message.cc ? message.cc : [];
21+
const bcc = message.bcc ? message.bcc : [];
22+
const sendAt = message.sendAt ? message.sendAt : undefined;
23+
const personalizations = message.personalizations ? message.personalizations : undefined
24+
const attachments = message.attachments ? message.attachments : [];
1525

16-
let msg = {}
17-
const from = message.from ? message.from : config.EMAIL_FROM;
18-
const replyTo = message.replyTo ? message.replyTo : config.EMAIL_FROM;
19-
const substitutions = message.data;
20-
const categories = message.categories ? message.categories : [];
21-
const to = message.recipients;
22-
const cc = message.cc ? message.cc : [];
23-
const bcc = message.bcc ? message.bcc : [];
24-
const sendAt = message.sendAt ? message.sendAt : undefined;
25-
const personalizations = message.personalizations ? message.personalizations : undefined
26-
const attachments = message.attachments ? message.attachments : [];
27-
28-
if (message.version && message.version == "v3") {
29-
msg = {
30-
to,
31-
templateId,
32-
dynamicTemplateData: substitutions,
33-
personalizations,
34-
from,
35-
replyTo,
36-
categories,
37-
cc,
38-
bcc,
39-
attachments,
40-
sendAt
41-
};
42-
} else {
43-
msg = {
44-
to,
45-
templateId,
46-
substitutions,
47-
substitutionWrappers: ['{{', '}}'],
48-
from,
49-
replyTo,
50-
categories,
51-
cc,
52-
bcc,
53-
};
54-
}
55-
logger.info(`Sending email with templateId: ${templateId} and message: ${JSON.stringify(msg)}`);
56-
try {
57-
const result = await sgMail.send(msg)
58-
logger.info(`Email sent successfully with result: ${JSON.stringify(result)}`);
59-
return result
60-
} catch (err) {
61-
logger.error(`Error occurred in sendgrid api calling: ${err}`);
62-
throw err
63-
}
64-
}, 'sendgridSendEmail'))(templateId, message);
26+
if (message.version && message.version == "v3") {
27+
msg = {
28+
to,
29+
templateId,
30+
dynamicTemplateData: substitutions,
31+
personalizations,
32+
from,
33+
replyTo,
34+
categories,
35+
cc,
36+
bcc,
37+
attachments,
38+
sendAt
39+
};
40+
} else {
41+
msg = {
42+
to,
43+
templateId,
44+
substitutions,
45+
substitutionWrappers: ['{{', '}}'],
46+
from,
47+
replyTo,
48+
categories,
49+
cc,
50+
bcc,
51+
};
52+
}
53+
logger.info(`Sending email with templateId: ${templateId} and message: ${JSON.stringify(msg)}`);
54+
try {
55+
await logger.endSpan(span);
56+
const sgSpan = await logger.startSpan('sendgrid');
57+
const result = await sgMail.send(msg)
58+
await logger.endSpan(sgSpan);
59+
logger.info(`Email sent successfully with result: ${JSON.stringify(result)}`);
60+
return result
61+
} catch (err) {
62+
logger.error(`Error occurred in sendgrid api calling: ${err}`);
63+
throw err
64+
}
6565
}
6666
module.exports = {
6767
sendEmail,
6868
}
69+
logger.buildService(module.exports)

index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,15 @@ function start() {
148148
try {
149149
await retryEmail(handlers)
150150
} catch (err) {
151+
console.log('Error in retrying email', err)
151152
logger.error(err);
152153
}
153154
});
154155
app.listen(app.get('port'), () => {
155156
logger.info(`Express server listening on port ${app.get('port')}`);
156157
});
157158
}).catch((err) => {
159+
158160
logger.error(err);
159161
process.exit(1);
160162
})
@@ -167,7 +169,9 @@ function start() {
167169
async function initDatabase() {
168170
// load models only after config is set
169171
logger.info('Initializing database...');
172+
const span = await logger.startSpan('initDatabase');
170173
await models.init(true);
174+
await logger.endSpan(span);
171175
}
172176

173177
// Exports
@@ -179,3 +183,5 @@ module.exports = {
179183
start,
180184
initDatabase,
181185
};
186+
187+
logger.buildService(module.exports)

src/common/wrapper.js

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/init.js

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@ const _ = require('lodash');
77
const { Kafka } = require('kafkajs')
88
const logger = require('./common/logger');
99
const models = require('./models');
10-
const { functionWrapper } = require('./common/wrapper');
1110

1211

1312
/**
1413
* Configure Kafka consumer.
1514
* @param {Object} handlers the handlers
1615
*/
1716
async function configureKafkaConsumer(handlers) {
18-
1917
// create group consumer
2018
let brokers = ['']
2119
if (config.KAFKA_URL.startsWith('ssl://')) {
@@ -28,24 +26,23 @@ async function configureKafkaConsumer(handlers) {
2826
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY };
2927
}
3028

31-
3229
const kafka = new Kafka(options)
3330
const consumer = kafka.consumer({ groupId: config.KAFKA_GROUP_ID });
31+
3432
await consumer.connect()
3533
await consumer.subscribe({ topics: _.keys(handlers) });
3634
dataHandler(consumer, handlers).catch((err) => {
35+
console.log('error', 'Kafka consumer error', err);
3736
logger.error(err);
3837
});
3938
}
4039

4140

4241
async function dataHandler(consumer, handlers) {
43-
44-
(await functionWrapper(async () => {
45-
46-
42+
try {
4743
await consumer.run({
4844
eachMessage: async (data) => {
45+
const span = await logger.startSpan('dataHandler');
4946
const topic = data.topic
5047
const msg = data.message
5148
const partition = data.partition
@@ -94,21 +91,17 @@ async function dataHandler(consumer, handlers) {
9491
logger.error('error', 'Send email error details', result.error);
9592
}
9693
}
94+
await logger.endSpan(span);
9795
} catch (e) {
96+
await logger.endSpanWithError(span, e);
9897
logger.error(e)
9998
}
10099

101-
102-
103-
104100
},
105101
})
106-
107-
108-
109-
}, 'dataHandler'))(consumer, handlers);
110-
111-
102+
} catch (e) {
103+
logger.error(e)
104+
}
112105

113106
const errorTypes = ['unhandledRejection', 'uncaughtException']
114107
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
@@ -146,40 +139,52 @@ async function dataHandler(consumer, handlers) {
146139
* @param {Object} handlers the handlers
147140
*/
148141
async function retryEmail(handlers) {
142+
const span = await logger.startSpan('retryEmail');
149143
const loader = await models.loadEmailModule()
150144
const emailModel = await loader.findAll({ where: { status: 'FAILED', createdAt: { $gt: new Date(new Date() - config.EMAIL_RETRY_MAX_AGE) } } })
151-
152145
if (emailModel.length > 0) {
153146
logger.info(`Found ${emailModel.length} e-mails to be resent`);
154147
emailModel.map(async m => {
155148
// find handler
156149
const handler = handlers[m.topicName];
157150
if (!handler) {
158151
logger.warn(`No handler configured for topic: ${m.topicName}`);
152+
await logger.endSpan(span);
159153
return m;
160154
}
161155
const messageJSON = { data: JSON.parse(m.data), recipients: JSON.parse(m.recipients) };
162156
const result = await handler(m.topicName, messageJSON);
163157
if (result.success) {
164158
logger.info(`Email model with ${m.id} was sent correctly`);
165159
m.status = 'SUCCESS';
160+
await logger.endSpan(span);
166161
return m.save();
167162
}
168163
logger.info(`Email model with ${m.id} wasn't sent correctly`);
164+
await logger.endSpan(span);
169165
return m;
170166
});
171167
} else {
168+
await logger.endSpan(span);
172169
return models;
173170
}
174171

175172
}
176173

177174
async function initServer(handlers) {
178-
await models.init()
179-
await configureKafkaConsumer(handlers)
175+
try {
176+
const span = await logger.startSpan('initServer');
177+
await models.init()
178+
await configureKafkaConsumer(handlers)
179+
await logger.endSpan(span);
180+
} catch (e) {
181+
await logger.endSpanWithError(span, e);
182+
}
180183
}
181184
// Exports
182185
module.exports = {
183186
initServer,
184187
retryEmail,
185188
};
189+
190+
logger.buildService(module.exports)

src/models/datasource.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ let sequelizeInstance = null;
2121
async function getSequelize() {
2222
if (!sequelizeInstance) {
2323
sequelizeInstance = new Sequelize(config.DATABASE_URL, config.DATABASE_OPTIONS);
24+
const span = await logger.startSpan('getSequelize');
2425
try {
2526
await sequelizeInstance.authenticate()
27+
await logger.endSpan(span);
2628
logger.info('Database connection has been established successfully.');
2729
} catch (e) {
30+
await logger.endSpanWithErr(span, e);
2831
logger.error('Unable to connect to the database:', err);
2932
}
3033
}
@@ -34,3 +37,5 @@ async function getSequelize() {
3437
module.exports = {
3538
getSequelize,
3639
};
40+
41+
logger.buildService(module.exports)

0 commit comments

Comments
 (0)