diff --git a/config/default.js b/config/default.js index aa252c3..15a8e52 100755 --- a/config/default.js +++ b/config/default.js @@ -23,6 +23,7 @@ module.exports = { KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY, // Topics CHALLENGE_CREATE_TOPIC: process.env.CHALLENGE_CREATE_TOPIC || 'challenge.notification.create', + CHALLENGE_UPDATE_TOPIC: process.env.CHALLENGE_UPDATE_TOPIC || 'challenge.notification.update', PROJECT_MEMBER_ADDED_TOPIC: process.env.PROJECT_MEMBER_ADDED_TOPIC || 'connect.notification.project.member.joined', PROJECT_MEMBER_REMOVED_TOPIC: process.env.PROJECT_MEMBER_REMOVED_TOPIC || 'connect.notification.project.member.removed', @@ -37,5 +38,9 @@ module.exports = { RESOURCES_API: process.env.RESOURCES_API || 'http://localhost:4000/v5/resources', CHALLENGE_API: process.env.CHALLENGE_API || 'http://localhost:4000/v5/challenges', - IGNORED_ORIGINATORS: process.env.IGNORED_ORIGINATORS ? process.env.IGNORED_ORIGINATORS.split(',') : ['legacy-migration-script'] + IGNORED_ORIGINATORS: process.env.IGNORED_ORIGINATORS ? process.env.IGNORED_ORIGINATORS.split(',') : ['legacy-migration-script'], + + GROUPS_TO_IGNORE: process.env.GROUPS_TO_IGNORE ? process.env.GROUPS_TO_IGNORE.split(',') : ['72a0b8a0-aa45-44f7-86c2-bf9de6321e5b'], + GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups', + } diff --git a/src/app.js b/src/app.js index 4a4423e..4ae8ad6 100755 --- a/src/app.js +++ b/src/app.js @@ -40,6 +40,8 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, ( } else { if (topic === config.CHALLENGE_CREATE_TOPIC) { await ProcessorService.handleChallengeCreate(messageJSON) + } else if (topic === config.CHALLENGE_UPDATE_TOPIC) { + await ProcessorService.handleChallengeUpdate(messageJSON) } else if (topic === config.PROJECT_MEMBER_ADDED_TOPIC) { await ProcessorService.handleMemberAdded(messageJSON) } else if (topic === config.PROJECT_MEMBER_REMOVED_TOPIC) { @@ -71,6 +73,7 @@ consumer .init([{ subscriptions: [ config.CHALLENGE_CREATE_TOPIC, + config.CHALLENGE_UPDATE_TOPIC, config.PROJECT_MEMBER_ADDED_TOPIC, config.PROJECT_MEMBER_REMOVED_TOPIC ], diff --git a/src/common/helper.js b/src/common/helper.js index d0a938b..4761b0f 100755 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -188,6 +188,41 @@ async function deleteResource (challengeId, memberHandle, roleId) { return res.body } +/** + * Search members of the given group ids + * @param {Array} members + * @param {Array} groupIds + * @return {Array} filtered members + */ +async function filterMemberForGroups(memberIds, groupIds) { + for (const memberId of memberIds) { + const res = await Promise.allSettled(groupIds.map(groupId => memberGroupsCall(groupId, memberId))); + const memberGroups =_.compact(_.flattenDeep(_.map(res, 'value'))) + + if (memberGroups.length != groupIds.length) memberList.push(memberId) + } +} + +/** + * Return the memberId if member is part of the groups + * @param {String} groupId + * @param {String} memberId + * @returns {String} memberId in case of member of group + */ +async function memberGroupsCall(groupId, memberId) { + // M2M token is cached by 'tc-core-library-js' lib + const token = await getM2MToken() + + const url = `${config.GROUPS_API_URL}/${groupId}/members/${memberId}` + const res = await superagent + .get(url) + .set('Authorization', `Bearer ${token}`) + .timeout(config.REQUEST_TIMEOUT) + + return memberId +} + + module.exports = { getKafkaOptions, getProject, @@ -195,5 +230,6 @@ module.exports = { createResource, deleteResource, getProjectChallenges, - getChallengeResources + getChallengeResources, + filterMemberForGroups } diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 11aa7e1..b255d1f 100755 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -24,7 +24,20 @@ async function handleChallengeCreate (message) { logger.info(`Found member ids [${memberIds.join(', ')}] of project id ${projectId}`) // search members - const members = await helper.searchMembers(memberIds) + let members = await helper.searchMembers(memberIds) + + // fetch all members of groups + const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE) + + // filter members who are NOT part of all the groups + if (groupIds.length > 0 && members.length > 0) { + const memberIds = members.map(m => m.id) + const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds) + + // remove the members who are not part of all the groups + members = members.filter(m => !filteredMemberIds.includes(m.id)) + } + // create resource for each member for (const member of members) { const resource = await helper.createResource(challengeId, member.handle) @@ -46,6 +59,49 @@ handleChallengeCreate.schema = { }).required() } +/** + * Process Kafka message of challenge updated + * @param {Object} message the challenge update message + */ + async function handleChallengeUpdate (message) { + const challengeId = message.payload.id + const projectId = message.payload.projectId + logger.info(`Process message of challenge id ${challengeId} and project id ${projectId}`) + + // get challenge resources (all observers for the challenge) + let challengeResources = await helper.getChallengeResources(challengeId, config.RESOURCE_ROLE_ID) + + // fetch all members of groups + const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE) + + // filter members who are NOT part of all the groups + if (groupIds.length > 0 && challengeResources.length > 0) { + const memberIds = challengeResources.map(cr => cr.memberId) + const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds) + + // filter the members who are not part of all the groups + challengeResources = challengeResources.filter(member => filteredMemberIds.includes(member.memberId) ) + + // remove members from resources who are not part of all the groups + await Promise.allSettled(challengeResources.map(member => helper.deleteResource(challengeId, member.memberHandle, config.RESOURCE_ROLE_ID))); + } + + logger.info(`Successfully processed message of challenge id ${challengeId} and project id ${projectId}`) +} + +handleChallengeUpdate.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), // challenge id + projectId: Joi.number().integer().positive().required() + }).unknown(true).required() + }).required() +} + /** * Handle project member changes * @param {Number} projectId the project ID @@ -61,8 +117,8 @@ async function handleProjectMemberChange (projectId, userId, isDeleted) { const [memberDetails] = await helper.searchMembers([userId]) const { handle } = memberDetails for (const challenge of challenges) { - const challenngeResources = await helper.getChallengeResources(challenge.id, config.MANAGER_RESOURCE_ROLE_ID) - const existing = _.find(challenngeResources, r => _.toString(r.memberId) === _.toString(userId)) + const challengeResources = await helper.getChallengeResources(challenge.id, config.MANAGER_RESOURCE_ROLE_ID) + const existing = _.find(challengeResources, r => _.toString(r.memberId) === _.toString(userId)) if (isDeleted) { if (existing) { await helper.deleteResource(challenge.id, handle, config.MANAGER_RESOURCE_ROLE_ID)