diff --git a/index.js b/index.js index 5c6d9a7..7a1ff11 100644 --- a/index.js +++ b/index.js @@ -144,6 +144,7 @@ function start () { schedule.scheduleJob(config.EMAIL_RETRY_SCHEDULE, async function () { try { + logger.info("Retrying failed emails") await retryEmail(handlers) } catch (err) { console.log('Error in retrying email', err) diff --git a/src/init.js b/src/init.js index 06516e6..41e01c6 100644 --- a/src/init.js +++ b/src/init.js @@ -28,7 +28,9 @@ async function configureKafkaConsumer (handlers) { const kafka = new Kafka(options) const consumer = kafka.consumer({ groupId: config.KAFKA_GROUP_ID }) + logger.info("Connecting to Kafka...") await consumer.connect() + logger.info(`Subscribing to topics: ${_.keys(handlers)}`) await consumer.subscribe({ topics: _.keys(handlers) }) dataHandler(consumer, handlers).catch((err) => { console.log('error', 'Kafka consumer error', err) @@ -93,7 +95,7 @@ async function dataHandler (consumer, handlers) { } catch (e) { await logger.endSpanWithError(span, e) logger.error(e) - } + } } }) } catch (e) { @@ -124,7 +126,7 @@ async function dataHandler (consumer, handlers) { process.kill(process.pid, type) } }) - }) + }) } /** @@ -160,7 +162,7 @@ async function retryEmail (handlers) { } else { await logger.endSpan(span) return models - } + } } async function initServer (handlers) { @@ -179,4 +181,4 @@ module.exports = { retryEmail } -logger.buildService(module.exports) +logger.buildService(module.exports)