Skip to content

Commit e750e7e

Browse files
committed
add group consumer
1 parent d890d5e commit e750e7e

File tree

1 file changed

+14
-10
lines changed

1 file changed

+14
-10
lines changed

src/app.js

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ const logger = require('./common/logger');
1818
const errors = require('./common/errors');
1919
const models = require('./models');
2020

21-
let emailTries = {};
22-
2321
/**
2422
* Configure Kafka consumer.
2523
* @param {Object} handlers the handlers
@@ -32,7 +30,12 @@ function configureKafkaConsumer(handlers) {
3230
}
3331
const pauseTime = parseInt(config.EMAIL_PAUSE_TIME);
3432
const maxErrors = parseInt(config.EMAIL_MAX_ERRORS);
35-
const consumer = new Kafka.SimpleConsumer(options);
33+
34+
// email tries
35+
let emailTries = 0;
36+
37+
// Kafka Consumer
38+
const consumer = new Kafka.GroupConsumer(options);
3639

3740
// data handler
3841
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
@@ -50,10 +53,11 @@ function configureKafkaConsumer(handlers) {
5053
// return null to ignore this message
5154
return null;
5255
}
56+
5357
let emailModel = {};
5458
const messageJSON = JSON.parse(message);
5559
const handlerAsync = Promise.promisify(handler);
56-
// use handler to create notification instances for each recipient
60+
5761
return models.Email.create(
5862
Object.assign({ status: 'PENDING' }, {
5963
topicName,
@@ -72,23 +76,23 @@ function configureKafkaConsumer(handlers) {
7276
status: result.success ? 'Message accepted' : 'Message rejected',
7377
});
7478
if (result.success) {
75-
emailTries[topicName] = 0;
79+
emailTries = 0;
7680
emailModel.status = 'SUCCESS';
7781
return emailModel.save();
7882
} else {
79-
emailTries[topicName] += 1;
83+
emailTries += 1;
8084
emailModel.status = 'FAILED';
8185
return emailModel.save().then(() => {
82-
const currentTries = emailTries[topicName];
86+
const currentTries = emailTries;
8387
if (currentTries > maxErrors) {
8488
logger.debug(`Failed to send email. Will sleep for ${pauseTime}s`);
85-
emailTries[topicName] = 0;
89+
emailTries = 0;
8690

8791
schedule.scheduleJob(new Date(now.getTime() + pauseTime * 1000), () => {
88-
consumer.subscribe(topic, dataHandler);
92+
return startKafkaConsumer(consumer, handlers, dataHandler);
8993
});
9094

91-
return consumer.unsubscribe(topic, partition).then(() => {
95+
return consumer.end().then(() => {
9296
throw result.error
9397
});
9498
} else {

0 commit comments

Comments
 (0)