From de1c75ac123c9e9d941dd50e2725f69048585a1f Mon Sep 17 00:00:00 2001 From: bountyCoder Date: Fri, 19 Aug 2022 14:42:03 +0000 Subject: [PATCH] updated the logic to remove the members who are not part of all the relavent groups --- config/default.js | 8 ++- package-lock.json | 101 ++++++++++++++++++++++++++++++- package.json | 2 + src/app.js | 4 +- src/common/helper.js | 85 ++++++++++++++++++++++---- src/services/ProcessorService.js | 49 +++++++-------- 6 files changed, 204 insertions(+), 45 deletions(-) diff --git a/config/default.js b/config/default.js index 43e7b1b..7c4c02c 100755 --- a/config/default.js +++ b/config/default.js @@ -26,6 +26,9 @@ module.exports = { CHALLENGE_UPDATE_TOPIC: process.env.CHALLENGE_UPDATE_TOPIC || 'challenge.notification.update', PROJECT_MEMBER_ADDED_TOPIC: process.env.PROJECT_MEMBER_ADDED_TOPIC || 'connect.notification.project.member.joined', PROJECT_MEMBER_REMOVED_TOPIC: process.env.PROJECT_MEMBER_REMOVED_TOPIC || 'connect.notification.project.member.removed', + KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'resource-processor', + RESOURCE_DELETE_TOPIC: process.env.RESOURCE_DELETE_TOPIC || 'challenge.action.resource.delete', + KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting', // superagent request timeout in milliseconds REQUEST_TIMEOUT: process.env.REQUEST_TIMEOUT ? Number(process.env.REQUEST_TIMEOUT) : 20000, @@ -37,10 +40,13 @@ module.exports = { SEARCH_MEMBERS_API_BASE: process.env.SEARCH_MEMBERS_API_BASE || 'https://api.topcoder-dev.com/v3/members/_search', RESOURCES_API: process.env.RESOURCES_API || 'http://localhost:4000/v5/resources', CHALLENGE_API: process.env.CHALLENGE_API || 'http://localhost:4000/v5/challenges', + V4_RESOURCES_API: process.env.V4_RESOURCES_API || 'http://localhost:4000/v4/challenges/', IGNORED_ORIGINATORS: process.env.IGNORED_ORIGINATORS ? process.env.IGNORED_ORIGINATORS.split(',') : ['legacy-migration-script'], GROUPS_TO_IGNORE: process.env.GROUPS_TO_IGNORE ? process.env.GROUPS_TO_IGNORE.split(',') : ['72a0b8a0-aa45-44f7-86c2-bf9de6321e5b'], - GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups' + GROUPS_API_URL: process.env.GROUPS_API_URL || 'http://localhost:4000/v5/groups', + + BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5' } diff --git a/package-lock.json b/package-lock.json index c5023e0..89e5c21 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3088,6 +3088,12 @@ "requires": { "glob": "^7.1.3" } + }, + "uuid": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", + "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==", + "dev": true } } }, @@ -3644,6 +3650,11 @@ "version": "6.5.2", "resolved": "https://registry.npmjs.org/qs/-/qs-6.5.2.tgz", "integrity": "sha512-N5ZAX4/LxJmF+7wN74pUD6qAh9/wnvdQcjq9TZjevvXzSUo7bfmw91saqMjzGS2xq91/odN2dW/WOl7qQHNDGA==" + }, + "uuid": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", + "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==" } } }, @@ -4152,6 +4163,90 @@ "string-width": "^2.1.1" } }, + "tc-bus-api-wrapper": { + "version": "github:topcoder-platform/tc-bus-api-wrapper#f8cbd335a0e0b4d6edd7cae859473593271fd97f", + "from": "github:topcoder-platform/tc-bus-api-wrapper", + "requires": { + "joi": "^13.4.0", + "lodash": "^4.17.15", + "superagent": "^3.8.3", + "tc-core-library-js": "github:appirio-tech/tc-core-library-js#v2.6.4" + }, + "dependencies": { + "debug": { + "version": "3.2.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz", + "integrity": "sha512-CFjzYYAi4ThfiQvizrFQevTTXHtnCqWfe7x1AhgEscTz6ZbLbfoLRLPugTQyBth6f8ZERVUSyWHFD/7Wu4t1XQ==", + "requires": { + "ms": "^2.1.1" + } + }, + "hoek": { + "version": "5.0.4", + "resolved": "https://registry.npmjs.org/hoek/-/hoek-5.0.4.tgz", + "integrity": "sha512-Alr4ZQgoMlnere5FZJsIyfIjORBqZll5POhDsF4q64dPuJR6rNxXdDxtHSQq8OXRurhmx+PWYEE8bXRROY8h0w==" + }, + "joi": { + "version": "13.7.0", + "resolved": "https://registry.npmjs.org/joi/-/joi-13.7.0.tgz", + "integrity": "sha512-xuY5VkHfeOYK3Hdi91ulocfuFopwgbSORmIwzcwHKESQhC7w1kD5jaVSPnqDxS2I8t3RZ9omCKAxNwXN5zG1/Q==", + "requires": { + "hoek": "5.x.x", + "isemail": "3.x.x", + "topo": "3.x.x" + } + }, + "mime": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", + "integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==" + }, + "readable-stream": { + "version": "2.3.7", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.7.tgz", + "integrity": "sha512-Ebho8K4jIbHAxnuxi7o42OrZgF/ZTNcsZj6nRKyUmkhLFq8CHItp/fy6hQZuZmP/n3yZ9VBUbp4zz/mX8hmYPw==", + "requires": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.3", + "isarray": "~1.0.0", + "process-nextick-args": "~2.0.0", + "safe-buffer": "~5.1.1", + "string_decoder": "~1.1.1", + "util-deprecate": "~1.0.1" + } + }, + "safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, + "string_decoder": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", + "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", + "requires": { + "safe-buffer": "~5.1.0" + } + }, + "superagent": { + "version": "3.8.3", + "resolved": "https://registry.npmjs.org/superagent/-/superagent-3.8.3.tgz", + "integrity": "sha512-GLQtLMCoEIK4eDv6OGtkOoSMt3D+oq0y3dsxMuYuDvaNUvuT8eFBuLmfR0iYYzHC1e8hpzC6ZsxbuP6DIalMFA==", + "requires": { + "component-emitter": "^1.2.0", + "cookiejar": "^2.1.0", + "debug": "^3.1.0", + "extend": "^3.0.0", + "form-data": "^2.3.1", + "formidable": "^1.2.0", + "methods": "^1.1.1", + "mime": "^1.4.1", + "qs": "^6.5.1", + "readable-stream": "^2.3.5" + } + } + } + }, "tc-core-library-js": { "version": "github:appirio-tech/tc-core-library-js#df0b36c51cf80918194cbff777214b3c0cf5a151", "from": "github:appirio-tech/tc-core-library-js#v2.6.4", @@ -4359,9 +4454,9 @@ "integrity": "sha1-n5VxD1CiZ5R7LMwSR0HBAoQn5xM=" }, "uuid": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.3.3.tgz", - "integrity": "sha512-pW0No1RGHgzlpHJO1nsVrHKpOEIxkGg1xB+v0ZmdNH5OAeAwzAVrCnI2/6Mtx+Uys6iaylxa+D3g4j63IKKjSQ==" + "version": "8.3.2", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", + "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==" }, "validate-npm-package-license": { "version": "3.0.4", diff --git a/package.json b/package.json index 2974883..5cc8361 100755 --- a/package.json +++ b/package.json @@ -28,8 +28,10 @@ "lodash": "^4.17.19", "no-kafka": "^3.4.3", "superagent": "^5.1.0", + "tc-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git", "tc-core-library-js": "appirio-tech/tc-core-library-js.git#v2.6.4", "topcoder-healthcheck-dropin": "^1.0.2", + "uuid": "^8.3.2", "winston": "^3.1.0" }, "engines": { diff --git a/src/app.js b/src/app.js index 4ae8ad6..f6bb602 100755 --- a/src/app.js +++ b/src/app.js @@ -2,7 +2,7 @@ * The application entry point */ -global.Promise = require('bluebird') +const QPromise = require('bluebird') const config = require('config') const Kafka = require('no-kafka') const _ = require('lodash') @@ -15,7 +15,7 @@ const ProcessorService = require('./services/ProcessorService') const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions()) // data handler -const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, (m) => { +const dataHandler = (messageSet, topic, partition) => QPromise.each(messageSet, (m) => { const message = m.message.value.toString('utf8') logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${ m.offset}; Message: ${message}.`) diff --git a/src/common/helper.js b/src/common/helper.js index b5cbdc6..46a2a52 100755 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -4,9 +4,15 @@ const _ = require('lodash') const config = require('config') +const busApi = require('tc-bus-api-wrapper') +const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID', + 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) const m2mAuth = require('tc-core-library-js').auth.m2m const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL'])) const superagent = require('superagent') +const { v4: uuid } = require('uuid') + +const logger = require('./logger') /** * Get Kafka options @@ -116,6 +122,24 @@ async function getChallengeResources (challengeId, roleId) { return allResources } +/** + * Get challenge resources using v4 API + * @param {String} legacyId the legacy challenge ID + * @param {String} challengeId the challenge ID + * @return {Array} list of observers + */ +async function getChallengeResourcesV4 (legacyId, challengeId) { + const token = await getM2MToken() + const url = `${config.V4_RESOURCES_API}` + legacyId + '/resources' + const res = await superagent.get(url).set('Authorization', `Bearer ${token}`) + if (res.status !== 200) { + throw new Error(`Failed to get resources for challenge id ${challengeId}: ${JSON.stringify(_.get(res.body, 'result.content'))}`) + } + + // TODO: Make generic and not hardcoded `OBSERVER` + return _.filter(_.get(res.body, 'result.content'), { role: 'Observer' }) +} + /** * Search members of given member ids * @param {Array} memberIds the member ids @@ -198,7 +222,7 @@ async function filterMemberForGroups (memberIds, groupIds) { const memberList = [] for (const memberId of memberIds) { - const res = await Promise.all(groupIds.map(groupId => memberGroupsCall(groupId, memberId))) + const res = await Promise.allSettled(groupIds.map(groupId => checkMemberGroup(groupId, memberId))) const memberGroups = _.compact(_.flattenDeep(_.map(res, 'value'))) if (memberGroups.length !== groupIds.length) memberList.push(memberId) @@ -208,24 +232,61 @@ async function filterMemberForGroups (memberIds, groupIds) { } /** - * Return the memberId if member is part of the groups + * Check for membership against the provided group * @param {String} groupId * @param {String} memberId * @returns {String} memberId in case of member of group */ -async function memberGroupsCall (groupId, memberId) { +async function checkMemberGroup (groupId, memberId) { // M2M token is cached by 'tc-core-library-js' lib const token = await getM2MToken() const url = `${config.GROUPS_API_URL}/${groupId}/members/${memberId}` - try { - return superagent - .get(url) - .set('Authorization', `Bearer ${token}`) - .timeout(config.REQUEST_TIMEOUT) - } catch (error) { - return [] + return superagent + .get(url) + .set('Authorization', `Bearer ${token}`) + .timeout(config.REQUEST_TIMEOUT) +} + +/** + * Remove resources using V4 API + * @param {String} groupId + * @param {String} memberId + * @returns {String} memberId in case of member of group + */ +async function deleteResourcesV4 (challengeId, resources) { + const payloads = [] + + resources.map(resource => { + payloads.push({ + id: uuid(), + challengeId, + memberId: resource['properties']['External Reference ID'], + memberHandle: resource['properties']['Handle'], + roleId: config.RESOURCE_ROLE_ID, + created: resource['properties']['Handle'], + createdBy: new Date().toUTCString() + }) + }) + + await Promise.allSettled(payloads.map(payload => postEvent(config.RESOURCE_DELETE_TOPIC, payload))) +} + +/** + * Send Kafka event message + * @params {String} topic the topic name + * @params {Object} payload the payload + */ +async function postEvent (topic, payload) { + logger.info(`Publish event to Kafka topic ${topic}`) + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload } + await busApiClient.postEvent(message) } module.exports = { @@ -236,5 +297,7 @@ module.exports = { deleteResource, getProjectChallenges, getChallengeResources, - filterMemberForGroups + filterMemberForGroups, + getChallengeResourcesV4, + deleteResourcesV4 } diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 1298f46..61071b5 100755 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -5,6 +5,7 @@ const _ = require('lodash') const Joi = require('joi') const config = require('config') + const logger = require('../common/logger') const helper = require('../common/helper') @@ -24,19 +25,7 @@ async function handleChallengeCreate (message) { logger.info(`Found member ids [${memberIds.join(', ')}] of project id ${projectId}`) // search members - let members = await helper.searchMembers(memberIds) - - // fetch all members of groups - const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE) - - // filter members who are NOT part of all the groups - if (groupIds.length > 0 && members.length > 0) { - const memberIds = members.map(m => m.id) - const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds) - - // remove the members who are not part of all the groups - members = members.filter(m => !filteredMemberIds.includes(m.id)) - } + const members = await helper.searchMembers(memberIds) // create resource for each member for (const member of members) { @@ -64,29 +53,33 @@ handleChallengeCreate.schema = { * @param {Object} message the challenge update message */ async function handleChallengeUpdate (message) { + const legacyId = message.payload.legacyId const challengeId = message.payload.id const projectId = message.payload.projectId - logger.info(`Process message of challenge id ${challengeId} and project id ${projectId}`) - // get challenge resources (all observers for the challenge) - let challengeResources = await helper.getChallengeResources(challengeId, config.RESOURCE_ROLE_ID) + if (!legacyId) { + logger.info(`Skipping update message of challenge id ${challengeId} and project id ${projectId} as legacyId not present`) + } else { + logger.info(`Process update message of challenge id ${challengeId} and project id ${projectId}`) - // fetch all members of groups - const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE) + // get challenge resources (all observers for the challenge) + const challengeResources = await helper.getChallengeResourcesV4(legacyId, challengeId) - // filter members who are NOT part of all the groups - if (groupIds.length > 0 && challengeResources.length > 0) { - const memberIds = challengeResources.map(cr => cr.memberId) - const filteredMemberIds = await helper.filterMemberForGroups(memberIds, groupIds) + // get all challenge groups + const groupIds = _.difference(message.payload.groups, config.GROUPS_TO_IGNORE) - // filter the members who are not part of all the groups - challengeResources = challengeResources.filter(member => filteredMemberIds.includes(member.memberId)) + // filter members who are NOT part of all the groups + if (groupIds.length > 0 && challengeResources.length > 0) { + let memberIds = challengeResources.map(resource => resource['properties']['External Reference ID']) - // remove members from resources who are not part of all the groups - await Promise.all(challengeResources.map(member => helper.deleteResource(challengeId, member.memberHandle, config.RESOURCE_ROLE_ID))) - } + const filteredResources = await helper.filterMemberForGroups(memberIds, groupIds) + const resourcesToDelete = challengeResources.filter(resource => filteredResources.includes(resource['properties']['External Reference ID'])) - logger.info(`Successfully processed message of challenge id ${challengeId} and project id ${projectId}`) + await helper.deleteResourcesV4(challengeId, resourcesToDelete) + } + + logger.info(`Successfully processed update message of challenge id ${challengeId} and project id ${projectId}`) + } } handleChallengeUpdate.schema = {