Skip to content

Commit 59ec92d

Browse files
committed
use retry value from payload
1 parent 3a7e4a7 commit 59ec92d

File tree

3 files changed

+22
-19
lines changed

3 files changed

+22
-19
lines changed

src/common/helper.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ const _ = require('lodash')
1212
const { Mutex } = require('async-mutex')
1313
const m2mAuth = require('tc-core-library-js').auth.m2m
1414
const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper')
15-
const ActionProcessorService = require('../services/ActionProcessorService')
1615

1716
AWS.config.region = config.esConfig.AWS_REGION
1817

@@ -185,10 +184,11 @@ async function postMessageViaWebhook (webhook, message) {
185184
* Calls ActionProcessorService to attempt to retry failed process
186185
* @param {String} topic the failed topic name
187186
* @param {Object} payload the payload
188-
* @param {String} id the id that was the subject of the operation failed
187+
* @param {String} retry how many times has it been retried
189188
*/
190-
async function retryFailedProcess (topic, payload, id) {
191-
await ActionProcessorService.processCreate(topic, payload, id)
189+
async function retryFailedProcess (topic, payload, retry) {
190+
const ActionProcessorService = require('../services/ActionProcessorService')
191+
await ActionProcessorService.processCreate(topic, payload, retry)
192192
}
193193

194194
let busApiClient

src/services/ActionProcessorService.js

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,11 @@ const Joi = require('@hapi/joi')
66
const logger = require('../common/logger')
77
const helper = require('../common/helper')
88
const config = require('config')
9-
const _ = require('lodash')
109

1110
const localLogger = {
1211
debug: ({ context, message }) => logger.debug({ component: 'ActionProcessorService', context, message })
1312
}
1413

15-
const retryMap = {}
16-
1714
/**
1815
* Process retry operation message
1916
* @param {Object} message the kafka message
@@ -25,9 +22,10 @@ async function processRetry (message, transactionId) {
2522
return
2623
}
2724
const { topicServiceMapping } = require('../app')
25+
const retry = message.payload.retry
2826
message.topic = message.payload.originalTopic
2927
message.payload = message.payload.originalPayload
30-
await topicServiceMapping[message.topic](message, transactionId)
28+
await topicServiceMapping[message.topic](message, transactionId, { retry })
3129
}
3230

3331
processRetry.schema = {
@@ -49,30 +47,29 @@ processRetry.schema = {
4947
* Analyzes the failed process and sends it to bus api to be received again.
5048
* @param {String} originalTopic the failed topic name
5149
* @param {Object} originalPayload the payload
52-
* @param {String} id the id that was the subject of the operation failed
50+
* @param {Number} retry how many times has it been retried
5351
*/
54-
async function processCreate (originalTopic, originalPayload, id) {
55-
const retry = _.defaultTo(retryMap[id], 0) + 1
52+
async function processCreate (originalTopic, originalPayload, retry) {
53+
retry = retry + 1
5654
if (retry > config.MAX_RETRY) {
57-
localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${id} exceeds the max retry: ${config.MAX_RETRY} - ignored` })
55+
localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${originalPayload.id} exceeds the max retry: ${config.MAX_RETRY} - ignored` })
5856
return
5957
}
60-
localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${id}` })
61-
retryMap[id] = retry
58+
localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${originalPayload.id}` })
6259
const payload = {
6360
originalTopic,
6461
originalPayload,
6562
retry
6663
}
67-
setTimeout(async function () {
64+
setTimeout(async () => {
6865
await helper.postEvent(config.topics.TAAS_ACTION_RETRY_TOPIC, payload)
6966
}, 2 ** retry * config.BASE_RETRY_DELAY)
7067
}
7168

7269
processCreate.schema = {
7370
originalTopic: Joi.string().required(),
7471
originalPayload: Joi.object().required(),
75-
id: Joi.string().uuid().required()
72+
retry: Joi.number().integer().min(0).required()
7673
}
7774

7875
module.exports = {

src/services/WorkPeriodProcessorService.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ const esClient = helper.getESClient()
1414
* Process create entity message
1515
* @param {Object} message the kafka message
1616
* @param {String} transactionId
17+
* @param {Object} options
1718
*/
18-
async function processCreate (message, transactionId) {
19+
async function processCreate (message, transactionId, options) {
1920
const workPeriod = message.payload
2021
// Find related resourceBooking
2122
let resourceBooking
@@ -30,7 +31,7 @@ async function processCreate (message, transactionId) {
3031
// it has not yet been created. We should send a retry request.
3132
if (err.httpStatus === 404) {
3233
logger.logFullError(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' })
33-
await helper.retryFailedProcess(message.topic, workPeriod, workPeriod.resourceBookingId)
34+
await helper.retryFailedProcess(message.topic, workPeriod, options.retry)
3435
return
3536
} else {
3637
throw err
@@ -78,7 +79,12 @@ processCreate.schema = {
7879
updatedBy: Joi.string().uuid().allow(null)
7980
}).required()
8081
}).required(),
81-
transactionId: Joi.string().required()
82+
transactionId: Joi.string().required(),
83+
options: Joi.object().keys({
84+
retry: Joi.number().integer().min(0).default(0)
85+
}).default({
86+
retry: 0
87+
})
8288
}
8389

8490
/**

0 commit comments

Comments
 (0)