Skip to content

Commit b349be1

Browse files
committed
refactor: improve retry logic
1 parent 3538258 commit b349be1

File tree

5 files changed

+56
-16
lines changed

5 files changed

+56
-16
lines changed

config/test.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ module.exports = {
66
zapier: {
77
ZAPIER_SWITCH: process.env.ZAPIER_SWITCH || 'ON',
88
ZAPIER_JOB_CANDIDATE_SWITCH: process.env.ZAPIER_JOB_CANDIDATE_SWITCH || 'ON'
9-
}
9+
},
10+
// don't retry actions during tests because tests for now don't expect it and should be updated first
11+
MAX_RETRY: 0,
1012
}

src/common/helper.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,18 @@ async function postEvent (topic, payload) {
225225
await client.postEvent(message)
226226
}
227227

228+
/**
229+
* Sleep for a given number of milliseconds.
230+
*
231+
* @param {Number} milliseconds the sleep time
232+
* @returns {undefined}
233+
*/
234+
async function sleep (milliseconds) {
235+
return new Promise((resolve) => setTimeout(resolve, milliseconds))
236+
}
237+
228238
module.exports = {
239+
sleep,
229240
getKafkaOptions,
230241
getESClient,
231242
checkEsMutexRelease,

src/common/logger.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,24 @@ logger.logFullError = (err, context = {}) => {
5656
err.logged = true
5757
}
5858

59+
/**
60+
* Log warning details
61+
* @param {Object} err the error
62+
* @param {Object} context contains extra info about errors
63+
*/
64+
logger.logFullWarning = (err, context = {}) => {
65+
if (!err) {
66+
return
67+
}
68+
if (err.logged) {
69+
return
70+
}
71+
const signature = context.signature ? `${context.signature} : ` : ''
72+
const errMessage = err.message || util.inspect(err).split('\n')[0]
73+
logger.warn({ ..._.pick(context, ['component', 'context']), message: `${signature}${errMessage}` })
74+
err.logged = true
75+
}
76+
5977
/**
6078
* Remove invalid properties from the object and hide long arrays
6179
* @param {Object} obj the object

src/services/ActionProcessorService.js

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,33 +48,36 @@ processRetry.schema = {
4848
* @param {String} originalTopic the failed topic name
4949
* @param {Object} originalPayload the payload
5050
* @param {Number} retry how many times has it been retried
51+
*
52+
* @returns {Promise|null} returns Promise which would be resolved when retry event sent to Kafka,
53+
* or `null` if it would not be scheduled
5154
*/
52-
async function processCreate (originalTopic, originalPayload, retry) {
55+
function scheduleRetry (originalTopic, originalPayload, retry) {
5356
retry = retry + 1
5457
if (retry > config.MAX_RETRY) {
55-
localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${originalPayload.id} exceeds the max retry: ${config.MAX_RETRY} - ignored` })
58+
localLogger.debug({ context: 'scheduleRetry', message: `retry: ${retry} for topic: ${originalTopic} id: ${originalPayload.id} exceeds the max retry: ${config.MAX_RETRY} - ignored` })
5659
return
5760
}
58-
localLogger.debug({ context: 'processCreate', message: `retry: ${retry} for ${originalPayload.id}` })
61+
62+
localLogger.debug({ context: 'scheduleRetry', message: `retry: ${retry} for topic: ${originalTopic} id: ${originalPayload.id}` })
63+
5964
const payload = {
6065
originalTopic,
6166
originalPayload,
6267
retry
6368
}
64-
setTimeout(async () => {
65-
await helper.postEvent(config.topics.TAAS_ACTION_RETRY_TOPIC, payload)
66-
}, 2 ** retry * config.BASE_RETRY_DELAY)
67-
}
6869

69-
processCreate.schema = {
70-
originalTopic: Joi.string().required(),
71-
originalPayload: Joi.object().required(),
72-
retry: Joi.number().integer().min(0).required()
70+
return helper.sleep(2 ** retry * config.BASE_RETRY_DELAY).then(() =>
71+
helper.postEvent(config.topics.TAAS_ACTION_RETRY_TOPIC, payload)
72+
)
7373
}
7474

7575
module.exports = {
76-
processRetry,
77-
processCreate
76+
processRetry
7877
}
7978

8079
logger.buildService(module.exports, 'ActionProcessorService')
80+
81+
// we don't want to wrap this method into service wrappers
82+
// because it would transform this method to `async` while we want to keep it sync
83+
module.exports.scheduleRetry = scheduleRetry

src/services/WorkPeriodProcessorService.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,14 @@ async function processCreate (message, transactionId, options) {
3131
// if resource booking was not found, it may be because
3232
// it has not yet been created. We should send a retry request.
3333
if (err.httpStatus === 404) {
34-
logger.logFullError(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' })
35-
await ActionProcessorService.processCreate(message.topic, workPeriod, options.retry)
34+
const schedulePromise = ActionProcessorService.scheduleRetry(message.topic, workPeriod, options.retry)
35+
if (schedulePromise) {
36+
// as retry was scheduled, log this error as warning
37+
logger.logFullWarning(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' })
38+
} else {
39+
// as retry was not scheduled, then log this error as error
40+
logger.logFullError(err, { component: 'WorkPeriodProcessorService', context: 'processCreate' })
41+
}
3642
return
3743
} else {
3844
throw err

0 commit comments

Comments
 (0)