diff --git a/ReadMe.md b/ReadMe.md index 2a92568..fa91166 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -35,6 +35,7 @@ The following parameters can be set in config files or in env variables: - KAFKA_GROUP_ID: the Kafka group id, default value is 'legacy-challenge-processor' - KAFKA_ERROR_TOPIC: The kafka error topic. - BUSAPI_URL: Bus API URL +- MAX_RETRIES: the number of max retries; default value: 3 - RETRY_TIMEOUT: The timeout to retry processing the same message - CREATE_CHALLENGE_TOPIC: the create challenge Kafka message topic, default value is 'challenge.notification.create' - UPDATE_CHALLENGE_TOPIC: the update challenge Kafka message topic, default value is 'challenge.notification.update' diff --git a/config/default.js b/config/default.js index a8865e3..86d84f5 100644 --- a/config/default.js +++ b/config/default.js @@ -15,6 +15,7 @@ module.exports = { KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'legacy-challenge-processor', KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting', BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5', + MAX_RETRIES: process.env.MAX_RETRIES || 3, RETRY_TIMEOUT: process.env.RETRY_TIMEOUT || 10 * 1000, // Topics to listen diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 7d17473..a26d286 100644 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -381,12 +381,19 @@ async function processUpdate (message) { // postponne kafka event logger.info('Challenge does not exist yet. Will post the same message back to the bus API') logger.error(`Error: ${JSON.stringify(e)}`) - // await new Promise((resolve) => { - // setTimeout(async () => { - // await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, message.payload) - // resolve() - // }, config.RETRY_TIMEOUT) - // }) + + const retryCountIdentifier = `${config.KAFKA_GROUP_ID.split(' ').join('_')}_retry_count` + let currentRetryCount = parseInt(get(messageJSON.payload, retryCountIdentifier, 1), 10) + if (currentRetryCount <= config.MAX_RETRIES) { + await new Promise((resolve) => { + setTimeout(async () => { + await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, { ...messageJSON.payload, [retryCountIdentifier]: currentRetryCount }) + resolve() + }, config.RETRY_TIMEOUT * currentRetryCount) + }) + } else { + logger.error(`Failed to process message after ${config.MAX_RETRIES} retries. Aborting...`) + } return }