Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Commit 398c71e

Browse files
committed
Merge branch 'develop'
2 parents 7975fc5 + c6d9f60 commit 398c71e

File tree

6 files changed

+58
-135
lines changed

6 files changed

+58
-135
lines changed

ReadMe.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ The following parameters can be set in config files or in env variables:
3434
if provided, it can be either path to private key file or private key content
3535
- KAFKA_GROUP_ID: the Kafka group id, default value is 'legacy-challenge-processor'
3636
- KAFKA_ERROR_TOPIC: The kafka error topic.
37+
- BUSAPI_URL: Bus API URL
38+
- RETRY_TIMEOUT: The timeout to retry processing the same message
3739
- CREATE_CHALLENGE_TOPIC: the create challenge Kafka message topic, default value is 'challenge.notification.create'
3840
- UPDATE_CHALLENGE_TOPIC: the update challenge Kafka message topic, default value is 'challenge.notification.update'
3941
- AUTH0_URL: Auth0 URL, used to get TC M2M token

config/default.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ module.exports = {
1414
// Kafka group id
1515
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'legacy-challenge-processor',
1616
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
17+
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
18+
RETRY_TIMEOUT: process.env.RETRY_TIMEOUT || 10 * 1000,
1719

1820
// Topics to listen
1921
CREATE_CHALLENGE_TOPIC: process.env.CREATE_CHALLENGE_TOPIC || 'challenge.notification.create',

package-lock.json

Lines changed: 0 additions & 132 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
},
2222
"dependencies": {
2323
"@hapi/joi": "^15.0.2",
24-
"@topcoder-platform/topcoder-bus-api-wrapper": "github:topcoder-platform/tc-bus-api-wrapper#master",
24+
"topcoder-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git",
2525
"async-mutex": "^0.1.4",
2626
"bluebird": "^3.7.2",
2727
"config": "^3.3.1",

src/common/helper.js

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ const _ = require('lodash')
66
const config = require('config')
77
const request = require('superagent')
88
const m2mAuth = require('tc-core-library-js').auth.m2m
9+
const busApi = require('topcoder-bus-api-wrapper')
10+
const constants = require('../constants')
911
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
1012

13+
// Bus API Client
14+
let busApiClient
15+
1116
/**
1217
* Get Kafka options
1318
* @return {Object} the Kafka options
@@ -90,11 +95,44 @@ async function postRequest (url, body, m2mToken) {
9095
.set('Accept', 'application/json')
9196
}
9297

98+
/**
99+
* Get Bus API Client
100+
* @return {Object} Bus API Client Instance
101+
*/
102+
function getBusApiClient () {
103+
// if there is no bus API client instance, then create a new instance
104+
if (!busApiClient) {
105+
busApiClient = busApi(_.pick(config,
106+
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
107+
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
108+
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
109+
}
110+
111+
return busApiClient
112+
}
113+
114+
/**
115+
* Post bus event.
116+
* @param {String} topic the event topic
117+
* @param {Object} payload the event payload
118+
*/
119+
async function postBusEvent (topic, payload) {
120+
const client = getBusApiClient()
121+
await client.postEvent({
122+
topic,
123+
originator: constants.EVENT_ORIGINATOR,
124+
timestamp: new Date().toISOString(),
125+
'mime-type': constants.EVENT_MIME_TYPE,
126+
payload
127+
})
128+
}
129+
93130
module.exports = {
94131
getKafkaOptions,
95132
getM2MToken,
96133
patchRequest,
97134
getRequest,
98135
putRequest,
99-
postRequest
136+
postRequest,
137+
postBusEvent
100138
}

src/services/ProcessorService.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,22 @@ async function processUpdate (message) {
286286

287287
const saveDraftContestDTO = await parsePayload(message.payload, m2mToken, false)
288288
logger.debug('Parsed Payload', saveDraftContestDTO)
289+
let challenge
289290
try {
290291
// ensure challenge existed
291-
const challenge = await getChallengeById(m2mToken, message.payload.legacyId)
292+
challenge = await getChallengeById(m2mToken, message.payload.legacyId)
293+
} catch (e) {
294+
// postponne kafka event
295+
logger.info('Challenge does not exist yet. Will post the same message back to the bus API')
296+
await new Promise((resolve) => {
297+
setTimeout(async () => {
298+
await helper.postBusEvent(config.UPDATE_CHALLENGE_TOPIC, message.payload)
299+
resolve()
300+
}, config.RETRY_TIMEOUT)
301+
})
302+
return
303+
}
304+
try {
292305
if (!challenge) {
293306
throw new Error(`Could not find challenge ${message.payload.legacyId}`)
294307
}

0 commit comments

Comments
 (0)