Skip to content

Commit b00b8b1

Browse files
authored
Merge pull request #41 from topcoder-platform/plat-1143-OR-validation-issue
updated the logic to remove the members who are not part of all the r…
2 parents 6def634 + de1c75a commit b00b8b1

File tree

6 files changed

+204
-45
lines changed

6 files changed

+204
-45
lines changed

config/default.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ module.exports = {
2626
CHALLENGE_UPDATE_TOPIC: process.env.CHALLENGE_UPDATE_TOPIC || 'challenge.notification.update',
2727
PROJECT_MEMBER_ADDED_TOPIC: process.env.PROJECT_MEMBER_ADDED_TOPIC || 'connect.notification.project.member.joined',
2828
PROJECT_MEMBER_REMOVED_TOPIC: process.env.PROJECT_MEMBER_REMOVED_TOPIC || 'connect.notification.project.member.removed',
29+
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'resource-processor',
30+
RESOURCE_DELETE_TOPIC: process.env.RESOURCE_DELETE_TOPIC || 'challenge.action.resource.delete',
31+
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
2932

3033
// superagent request timeout in milliseconds
3134
REQUEST_TIMEOUT: process.env.REQUEST_TIMEOUT ? Number(process.env.REQUEST_TIMEOUT) : 20000,
@@ -37,10 +40,13 @@ module.exports = {
3740
SEARCH_MEMBERS_API_BASE: process.env.SEARCH_MEMBERS_API_BASE || 'https://api.topcoder-dev.com/v3/members/_search',
3841
RESOURCES_API: process.env.RESOURCES_API || 'http://localhost:4000/v5/resources',
3942
CHALLENGE_API: process.env.CHALLENGE_API || 'http://localhost:4000/v5/challenges',
43+
V4_RESOURCES_API: process.env.V4_RESOURCES_API || 'http://localhost:4000/v4/challenges/',
4044

4145
IGNORED_ORIGINATORS: process.env.IGNORED_ORIGINATORS ? process.env.IGNORED_ORIGINATORS.split(',') : ['legacy-migration-script'],
4246

4347
GROUPS_TO_IGNORE: process.env.GROUPS_TO_IGNORE ? process.env.GROUPS_TO_IGNORE.split(',') : ['72a0b8a0-aa45-44f7-86c2-bf9de6321e5b'],
44-
GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups'
48+
GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups',
49+
50+
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5'
4551

4652
}

package-lock.json

Lines changed: 98 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
"lodash": "^4.17.19",
2929
"no-kafka": "^3.4.3",
3030
"superagent": "^5.1.0",
31+
"tc-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git",
3132
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v2.6.4",
3233
"topcoder-healthcheck-dropin": "^1.0.2",
34+
"uuid": "^8.3.2",
3335
"winston": "^3.1.0"
3436
},
3537
"engines": {

src/app.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* The application entry point
33
*/
44

5-
global.Promise = require('bluebird')
5+
const QPromise = require('bluebird')
66
const config = require('config')
77
const Kafka = require('no-kafka')
88
const _ = require('lodash')
@@ -15,7 +15,7 @@ const ProcessorService = require('./services/ProcessorService')
1515
const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
1616

1717
// data handler
18-
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
18+
const dataHandler = (messageSet, topic, partition) => QPromise.each(messageSet, (m) => {
1919
const message = m.message.value.toString('utf8')
2020
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
2121
m.offset}; Message: ${message}.`)

src/common/helper.js

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@
44

55
const _ = require('lodash')
66
const config = require('config')
7+
const busApi = require('tc-bus-api-wrapper')
8+
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
9+
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
710
const m2mAuth = require('tc-core-library-js').auth.m2m
811
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
912
const superagent = require('superagent')
13+
const { v4: uuid } = require('uuid')
14+
15+
const logger = require('./logger')
1016

1117
/**
1218
* Get Kafka options
@@ -116,6 +122,24 @@ async function getChallengeResources (challengeId, roleId) {
116122
return allResources
117123
}
118124

125+
/**
126+
* Get challenge resources using v4 API
127+
* @param {String} legacyId the legacy challenge ID
128+
* @param {String} challengeId the challenge ID
129+
* @return {Array} list of observers
130+
*/
131+
async function getChallengeResourcesV4 (legacyId, challengeId) {
132+
const token = await getM2MToken()
133+
const url = `${config.V4_RESOURCES_API}` + legacyId + '/resources'
134+
const res = await superagent.get(url).set('Authorization', `Bearer ${token}`)
135+
if (res.status !== 200) {
136+
throw new Error(`Failed to get resources for challenge id ${challengeId}: ${JSON.stringify(_.get(res.body, 'result.content'))}`)
137+
}
138+
139+
// TODO: Make generic and not hardcoded `OBSERVER`
140+
return _.filter(_.get(res.body, 'result.content'), { role: 'Observer' })
141+
}
142+
119143
/**
120144
* Search members of given member ids
121145
* @param {Array} memberIds the member ids
@@ -198,7 +222,7 @@ async function filterMemberForGroups (memberIds, groupIds) {
198222
const memberList = []
199223

200224
for (const memberId of memberIds) {
201-
const res = await Promise.all(groupIds.map(groupId => memberGroupsCall(groupId, memberId)))
225+
const res = await Promise.allSettled(groupIds.map(groupId => checkMemberGroup(groupId, memberId)))
202226
const memberGroups = _.compact(_.flattenDeep(_.map(res, 'value')))
203227

204228
if (memberGroups.length !== groupIds.length) memberList.push(memberId)
@@ -208,24 +232,61 @@ async function filterMemberForGroups (memberIds, groupIds) {
208232
}
209233

210234
/**
211-
* Return the memberId if member is part of the groups
235+
* Check for membership against the provided group
212236
* @param {String} groupId
213237
* @param {String} memberId
214238
* @returns {String} memberId in case of member of group
215239
*/
216-
async function memberGroupsCall (groupId, memberId) {
240+
async function checkMemberGroup (groupId, memberId) {
217241
// M2M token is cached by 'tc-core-library-js' lib
218242
const token = await getM2MToken()
219243
const url = `${config.GROUPS_API_URL}/${groupId}/members/${memberId}`
220244

221-
try {
222-
return superagent
223-
.get(url)
224-
.set('Authorization', `Bearer ${token}`)
225-
.timeout(config.REQUEST_TIMEOUT)
226-
} catch (error) {
227-
return []
245+
return superagent
246+
.get(url)
247+
.set('Authorization', `Bearer ${token}`)
248+
.timeout(config.REQUEST_TIMEOUT)
249+
}
250+
251+
/**
252+
* Remove resources using V4 API
253+
* @param {String} groupId
254+
* @param {String} memberId
255+
* @returns {String} memberId in case of member of group
256+
*/
257+
async function deleteResourcesV4 (challengeId, resources) {
258+
const payloads = []
259+
260+
resources.map(resource => {
261+
payloads.push({
262+
id: uuid(),
263+
challengeId,
264+
memberId: resource['properties']['External Reference ID'],
265+
memberHandle: resource['properties']['Handle'],
266+
roleId: config.RESOURCE_ROLE_ID,
267+
created: resource['properties']['Handle'],
268+
createdBy: new Date().toUTCString()
269+
})
270+
})
271+
272+
await Promise.allSettled(payloads.map(payload => postEvent(config.RESOURCE_DELETE_TOPIC, payload)))
273+
}
274+
275+
/**
276+
* Send Kafka event message
277+
* @params {String} topic the topic name
278+
* @params {Object} payload the payload
279+
*/
280+
async function postEvent (topic, payload) {
281+
logger.info(`Publish event to Kafka topic ${topic}`)
282+
const message = {
283+
topic,
284+
originator: config.KAFKA_MESSAGE_ORIGINATOR,
285+
timestamp: new Date().toISOString(),
286+
'mime-type': 'application/json',
287+
payload
228288
}
289+
await busApiClient.postEvent(message)
229290
}
230291

231292
module.exports = {
@@ -236,5 +297,7 @@ module.exports = {
236297
deleteResource,
237298
getProjectChallenges,
238299
getChallengeResources,
239-
filterMemberForGroups
300+
filterMemberForGroups,
301+
getChallengeResourcesV4,
302+
deleteResourcesV4
240303
}

src/services/ProcessorService.js

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
const _ = require('lodash')
66
const Joi = require('joi')
77
const config = require('config')
8+
89
const logger = require('../common/logger')
910
const helper = require('../common/helper')
1011

@@ -24,19 +25,7 @@ async function handleChallengeCreate (message) {
2425
logger.info(`Found member ids [${memberIds.join(', ')}] of project id ${projectId}`)
2526

2627
// search members
27-
let members = await helper.searchMembers(memberIds)
28-
29-
// fetch all members of groups
30-
const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE)
31-
32-
// filter members who are NOT part of all the groups
33-
if (groupIds.length > 0 && members.length > 0) {
34-
const memberIds = members.map(m => m.id)
35-
const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds)
36-
37-
// remove the members who are not part of all the groups
38-
members = members.filter(m => !filteredMemberIds.includes(m.id))
39-
}
28+
const members = await helper.searchMembers(memberIds)
4029

4130
// create resource for each member
4231
for (const member of members) {
@@ -64,29 +53,33 @@ handleChallengeCreate.schema = {
6453
* @param {Object} message the challenge update message
6554
*/
6655
async function handleChallengeUpdate (message) {
56+
const legacyId = message.payload.legacyId
6757
const challengeId = message.payload.id
6858
const projectId = message.payload.projectId
69-
logger.info(`Process message of challenge id ${challengeId} and project id ${projectId}`)
7059

71-
// get challenge resources (all observers for the challenge)
72-
let challengeResources = await helper.getChallengeResources(challengeId, config.RESOURCE_ROLE_ID)
60+
if (!legacyId) {
61+
logger.info(`Skipping update message of challenge id ${challengeId} and project id ${projectId} as legacyId not present`)
62+
} else {
63+
logger.info(`Process update message of challenge id ${challengeId} and project id ${projectId}`)
7364

74-
// fetch all members of groups
75-
const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE)
65+
// get challenge resources (all observers for the challenge)
66+
const challengeResources = await helper.getChallengeResourcesV4(legacyId, challengeId)
7667

77-
// filter members who are NOT part of all the groups
78-
if (groupIds.length > 0 && challengeResources.length > 0) {
79-
const memberIds = challengeResources.map(cr => cr.memberId)
80-
const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds)
68+
// get all challenge groups
69+
const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE)
8170

82-
// filter the members who are not part of all the groups
83-
challengeResources = challengeResources.filter(member => filteredMemberIds.includes(member.memberId))
71+
// filter members who are NOT part of all the groups
72+
if (groupIds.length > 0 && challengeResources.length > 0) {
73+
let memberIds = challengeResources.map(resource => resource['properties']['External Reference ID'])
8474

85-
// remove members from resources who are not part of all the groups
86-
await Promise.all(challengeResources.map(member => helper.deleteResource(challengeId, member.memberHandle, config.RESOURCE_ROLE_ID)))
87-
}
75+
const filteredResources = await helper.filterMemberForGroups(memberIds, groupIds)
76+
const resourcesToDelete = challengeResources.filter(resource => filteredResources.includes(resource['properties']['External Reference ID']))
8877

89-
logger.info(`Successfully processed message of challenge id ${challengeId} and project id ${projectId}`)
78+
await helper.deleteResourcesV4(challengeId, resourcesToDelete)
79+
}
80+
81+
logger.info(`Successfully processed update message of challenge id ${challengeId} and project id ${projectId}`)
82+
}
9083
}
9184

9285
handleChallengeUpdate.schema = {

0 commit comments

Comments
 (0)