Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Commit 5983700

Browse files
update the retry logic
1 parent 096a826 commit 5983700

File tree

3 files changed

+15
-6
lines changed

3 files changed

+15
-6
lines changed

ReadMe.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The following parameters can be set in config files or in env variables:
3535
- KAFKA_GROUP_ID: the Kafka group id, default value is 'legacy-challenge-processor'
3636
- KAFKA_ERROR_TOPIC: The kafka error topic.
3737
- BUSAPI_URL: Bus API URL
38+
- MAX_RETRIES: the number of max retries; default value: 3
3839
- RETRY_TIMEOUT: The timeout to retry processing the same message
3940
- CREATE_CHALLENGE_TOPIC: the create challenge Kafka message topic, default value is 'challenge.notification.create'
4041
- UPDATE_CHALLENGE_TOPIC: the update challenge Kafka message topic, default value is 'challenge.notification.update'

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module.exports = {
1515
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'legacy-challenge-processor',
1616
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
1717
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
18+
MAX_RETRIES: process.env.MAX_RETRIES || 3,
1819
RETRY_TIMEOUT: process.env.RETRY_TIMEOUT || 10 * 1000,
1920

2021
// Topics to listen

src/services/ProcessorService.js

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,12 +381,19 @@ async function processUpdate (message) {
381381
// postponne kafka event
382382
logger.info('Challenge does not exist yet. Will post the same message back to the bus API')
383383
logger.error(`Error: ${JSON.stringify(e)}`)
384-
// await new Promise((resolve) => {
385-
// setTimeout(async () => {
386-
// await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, message.payload)
387-
// resolve()
388-
// }, config.RETRY_TIMEOUT)
389-
// })
384+
385+
const retryCountIdentifier = `${config.KAFKA_GROUP_ID.split(' ').join('_')}_retry_count`
386+
let currentRetryCount = parseInt(get(messageJSON.payload, retryCountIdentifier, 1), 10)
387+
if (currentRetryCount <= config.MAX_RETRIES) {
388+
await new Promise((resolve) => {
389+
setTimeout(async () => {
390+
await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, { ...messageJSON.payload, [retryCountIdentifier]: currentRetryCount })
391+
resolve()
392+
}, config.RETRY_TIMEOUT * currentRetryCount)
393+
})
394+
} else {
395+
logger.error(`Failed to process message after ${config.MAX_RETRIES} retries. Aborting...`)
396+
}
390397
return
391398
}
392399

0 commit comments

Comments
 (0)