Skip to content

updated the logic to remove the members who are not part of all the r… #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ module.exports = {
CHALLENGE_UPDATE_TOPIC: process.env.CHALLENGE_UPDATE_TOPIC || 'challenge.notification.update',
PROJECT_MEMBER_ADDED_TOPIC: process.env.PROJECT_MEMBER_ADDED_TOPIC || 'connect.notification.project.member.joined',
PROJECT_MEMBER_REMOVED_TOPIC: process.env.PROJECT_MEMBER_REMOVED_TOPIC || 'connect.notification.project.member.removed',
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'resource-processor',
RESOURCE_DELETE_TOPIC: process.env.RESOURCE_DELETE_TOPIC || 'challenge.action.resource.delete',
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',

// superagent request timeout in milliseconds
REQUEST_TIMEOUT: process.env.REQUEST_TIMEOUT ? Number(process.env.REQUEST_TIMEOUT) : 20000,
Expand All @@ -37,10 +40,13 @@ module.exports = {
SEARCH_MEMBERS_API_BASE: process.env.SEARCH_MEMBERS_API_BASE || 'https://api.topcoder-dev.com/v3/members/_search',
RESOURCES_API: process.env.RESOURCES_API || 'http://localhost:4000/v5/resources',
CHALLENGE_API: process.env.CHALLENGE_API || 'http://localhost:4000/v5/challenges',
V4_RESOURCES_API: process.env.V4_RESOURCES_API || 'http://localhost:4000/v4/challenges/',

IGNORED_ORIGINATORS: process.env.IGNORED_ORIGINATORS ? process.env.IGNORED_ORIGINATORS.split(',') : ['legacy-migration-script'],

GROUPS_TO_IGNORE: process.env.GROUPS_TO_IGNORE ? process.env.GROUPS_TO_IGNORE.split(',') : ['72a0b8a0-aa45-44f7-86c2-bf9de6321e5b'],
GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups'
GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups',

BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5'

}
101 changes: 98 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
"lodash": "^4.17.19",
"no-kafka": "^3.4.3",
"superagent": "^5.1.0",
"tc-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git",
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v2.6.4",
"topcoder-healthcheck-dropin": "^1.0.2",
"uuid": "^8.3.2",
"winston": "^3.1.0"
},
"engines": {
Expand Down
4 changes: 2 additions & 2 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* The application entry point
*/

global.Promise = require('bluebird')
const QPromise = require('bluebird')
const config = require('config')
const Kafka = require('no-kafka')
const _ = require('lodash')
Expand All @@ -15,7 +15,7 @@ const ProcessorService = require('./services/ProcessorService')
const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())

// data handler
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => {
const dataHandler = (messageSet, topic, partition) => QPromise.each(messageSet, (m) => {
const message = m.message.value.toString('utf8')
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
m.offset}; Message: ${message}.`)
Expand Down
85 changes: 74 additions & 11 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@

const _ = require('lodash')
const config = require('config')
const busApi = require('tc-bus-api-wrapper')
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
const m2mAuth = require('tc-core-library-js').auth.m2m
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
const superagent = require('superagent')
const { v4: uuid } = require('uuid')

const logger = require('./logger')

/**
* Get Kafka options
Expand Down Expand Up @@ -116,6 +122,24 @@ async function getChallengeResources (challengeId, roleId) {
return allResources
}

/**
* Get challenge resources using v4 API
* @param {String} legacyId the legacy challenge ID
* @param {String} challengeId the challenge ID
* @return {Array} list of observers
*/
async function getChallengeResourcesV4 (legacyId, challengeId) {
const token = await getM2MToken()
const url = `${config.V4_RESOURCES_API}` + legacyId + '/resources'
const res = await superagent.get(url).set('Authorization', `Bearer ${token}`)
if (res.status !== 200) {
throw new Error(`Failed to get resources for challenge id ${challengeId}: ${JSON.stringify(_.get(res.body, 'result.content'))}`)
}

// TODO: Make generic and not hardcoded `OBSERVER`
return _.filter(_.get(res.body, 'result.content'), { role: 'Observer' })
}

/**
* Search members of given member ids
* @param {Array} memberIds the member ids
Expand Down Expand Up @@ -198,7 +222,7 @@ async function filterMemberForGroups (memberIds, groupIds) {
const memberList = []

for (const memberId of memberIds) {
const res = await Promise.all(groupIds.map(groupId => memberGroupsCall(groupId, memberId)))
const res = await Promise.allSettled(groupIds.map(groupId => checkMemberGroup(groupId, memberId)))
const memberGroups = _.compact(_.flattenDeep(_.map(res, 'value')))

if (memberGroups.length !== groupIds.length) memberList.push(memberId)
Expand All @@ -208,24 +232,61 @@ async function filterMemberForGroups (memberIds, groupIds) {
}

/**
* Return the memberId if member is part of the groups
* Check for membership against the provided group
* @param {String} groupId
* @param {String} memberId
* @returns {String} memberId in case of member of group
*/
async function memberGroupsCall (groupId, memberId) {
async function checkMemberGroup (groupId, memberId) {
// M2M token is cached by 'tc-core-library-js' lib
const token = await getM2MToken()
const url = `${config.GROUPS_API_URL}/${groupId}/members/${memberId}`

try {
return superagent
.get(url)
.set('Authorization', `Bearer ${token}`)
.timeout(config.REQUEST_TIMEOUT)
} catch (error) {
return []
return superagent
.get(url)
.set('Authorization', `Bearer ${token}`)
.timeout(config.REQUEST_TIMEOUT)
}

/**
* Remove resources using V4 API
* @param {String} groupId
* @param {String} memberId
* @returns {String} memberId in case of member of group
*/
async function deleteResourcesV4 (challengeId, resources) {
const payloads = []

resources.map(resource => {
payloads.push({
id: uuid(),
challengeId,
memberId: resource['properties']['External Reference ID'],
memberHandle: resource['properties']['Handle'],
roleId: config.RESOURCE_ROLE_ID,
created: resource['properties']['Handle'],
createdBy: new Date().toUTCString()
})
})

await Promise.allSettled(payloads.map(payload => postEvent(config.RESOURCE_DELETE_TOPIC, payload)))
}

/**
* Send Kafka event message
* @params {String} topic the topic name
* @params {Object} payload the payload
*/
async function postEvent (topic, payload) {
logger.info(`Publish event to Kafka topic ${topic}`)
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
await busApiClient.postEvent(message)
}

module.exports = {
Expand All @@ -236,5 +297,7 @@ module.exports = {
deleteResource,
getProjectChallenges,
getChallengeResources,
filterMemberForGroups
filterMemberForGroups,
getChallengeResourcesV4,
deleteResourcesV4
}
49 changes: 21 additions & 28 deletions src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const _ = require('lodash')
const Joi = require('joi')
const config = require('config')

const logger = require('../common/logger')
const helper = require('../common/helper')

Expand All @@ -24,19 +25,7 @@ async function handleChallengeCreate (message) {
logger.info(`Found member ids [${memberIds.join(', ')}] of project id ${projectId}`)

// search members
let members = await helper.searchMembers(memberIds)

// fetch all members of groups
const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE)

// filter members who are NOT part of all the groups
if (groupIds.length > 0 && members.length > 0) {
const memberIds = members.map(m => m.id)
const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds)

// remove the members who are not part of all the groups
members = members.filter(m => !filteredMemberIds.includes(m.id))
}
const members = await helper.searchMembers(memberIds)

// create resource for each member
for (const member of members) {
Expand Down Expand Up @@ -64,29 +53,33 @@ handleChallengeCreate.schema = {
* @param {Object} message the challenge update message
*/
async function handleChallengeUpdate (message) {
const legacyId = message.payload.legacyId
const challengeId = message.payload.id
const projectId = message.payload.projectId
logger.info(`Process message of challenge id ${challengeId} and project id ${projectId}`)

// get challenge resources (all observers for the challenge)
let challengeResources = await helper.getChallengeResources(challengeId, config.RESOURCE_ROLE_ID)
if (!legacyId) {
logger.info(`Skipping update message of challenge id ${challengeId} and project id ${projectId} as legacyId not present`)
} else {
logger.info(`Process update message of challenge id ${challengeId} and project id ${projectId}`)

// fetch all members of groups
const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE)
// get challenge resources (all observers for the challenge)
const challengeResources = await helper.getChallengeResourcesV4(legacyId, challengeId)

// filter members who are NOT part of all the groups
if (groupIds.length > 0 && challengeResources.length > 0) {
const memberIds = challengeResources.map(cr => cr.memberId)
const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds)
// get all challenge groups
const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE)

// filter the members who are not part of all the groups
challengeResources = challengeResources.filter(member => filteredMemberIds.includes(member.memberId))
// filter members who are NOT part of all the groups
if (groupIds.length > 0 && challengeResources.length > 0) {
let memberIds = challengeResources.map(resource => resource['properties']['External Reference ID'])

// remove members from resources who are not part of all the groups
await Promise.all(challengeResources.map(member => helper.deleteResource(challengeId, member.memberHandle, config.RESOURCE_ROLE_ID)))
}
const filteredResources = await helper.filterMemberForGroups(memberIds, groupIds)
const resourcesToDelete = challengeResources.filter(resource => filteredResources.includes(resource['properties']['External Reference ID']))

logger.info(`Successfully processed message of challenge id ${challengeId} and project id ${projectId}`)
await helper.deleteResourcesV4(challengeId, resourcesToDelete)
}

logger.info(`Successfully processed update message of challenge id ${challengeId} and project id ${projectId}`)
}
}

handleChallengeUpdate.schema = {
Expand Down