Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Implement retry functionality if a challenge was not created on legac… #14

Merged
merged 2 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The following parameters can be set in config files or in env variables:
if provided, it can be either path to private key file or private key content
- KAFKA_GROUP_ID: the Kafka group id, default value is 'legacy-challenge-processor'
- KAFKA_ERROR_TOPIC: The kafka error topic.
- BUSAPI_URL: Bus API URL
- RETRY_TIMEOUT: The timeout to retry processing the same message
- CREATE_CHALLENGE_TOPIC: the create challenge Kafka message topic, default value is 'challenge.notification.create'
- UPDATE_CHALLENGE_TOPIC: the update challenge Kafka message topic, default value is 'challenge.notification.update'
- AUTH0_URL: Auth0 URL, used to get TC M2M token
Expand Down
2 changes: 2 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ module.exports = {
// Kafka group id
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'legacy-challenge-processor',
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
RETRY_TIMEOUT: process.env.RETRY_TIMEOUT || 10 * 1000,

// Topics to listen
CREATE_CHALLENGE_TOPIC: process.env.CREATE_CHALLENGE_TOPIC || 'challenge.notification.create',
Expand Down
132 changes: 0 additions & 132 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
},
"dependencies": {
"@hapi/joi": "^15.0.2",
"@topcoder-platform/topcoder-bus-api-wrapper": "github:topcoder-platform/tc-bus-api-wrapper#master",
"topcoder-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git",
"async-mutex": "^0.1.4",
"bluebird": "^3.7.2",
"config": "^3.3.1",
Expand Down
40 changes: 39 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ const _ = require('lodash')
const config = require('config')
const request = require('superagent')
const m2mAuth = require('tc-core-library-js').auth.m2m
const busApi = require('topcoder-bus-api-wrapper')
const constants = require('../constants')
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))

// Bus API Client
let busApiClient

/**
* Get Kafka options
* @return {Object} the Kafka options
Expand Down Expand Up @@ -90,11 +95,44 @@ async function postRequest (url, body, m2mToken) {
.set('Accept', 'application/json')
}

/**
* Get Bus API Client
* @return {Object} Bus API Client Instance
*/
function getBusApiClient () {
// if there is no bus API client instance, then create a new instance
if (!busApiClient) {
busApiClient = busApi(_.pick(config,
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
}

return busApiClient
}

/**
* Post bus event.
* @param {String} topic the event topic
* @param {Object} payload the event payload
*/
async function postBusEvent (topic, payload) {
const client = getBusApiClient()
await client.postEvent({
topic,
originator: constants.EVENT_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': constants.EVENT_MIME_TYPE,
payload
})
}

module.exports = {
getKafkaOptions,
getM2MToken,
patchRequest,
getRequest,
putRequest,
postRequest
postRequest,
postBusEvent
}
15 changes: 14 additions & 1 deletion src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,22 @@ async function processUpdate (message) {

const saveDraftContestDTO = await parsePayload(message.payload, m2mToken, false)
logger.debug('Parsed Payload', saveDraftContestDTO)
let challenge
try {
// ensure challenge existed
const challenge = await getChallengeById(m2mToken, message.payload.legacyId)
challenge = await getChallengeById(m2mToken, message.payload.legacyId)
} catch (e) {
// postponne kafka event
logger.info('Challenge does not exist yet. Will post the same message back to the bus API')
await new Promise((resolve) => {
setTimeout(async () => {
await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, message.payload)
resolve()
}, config.RETRY_TIMEOUT)
})
return
}
try {
if (!challenge) {
throw new Error(`Could not find challenge ${message.payload.legacyId}`)
}
Expand Down