diff --git a/README.md b/README.md index 1a5c557..f672ef5 100755 --- a/README.md +++ b/README.md @@ -45,12 +45,16 @@ The following parameters can be set in config files or in env variables: - KAFKA_CLIENT_CERT_KEY: Kafka connection private key, optional, default value is undefined; if not provided, then SSL connection is not used, direct insecure connection is used; if provided, it can be either path to private key file or private key content -- KAFKA_TOPIC: Kafka topic to listen, default value is 'challenge.notification.create' +- CHALLENGE_CREATE_TOPIC: Kafka topic to listen, default value is 'challenge.notification.create' +- PROJECT_MEMBER_ADDED_TOPIC: Kafka topic to listen when a member is added to a project, default value: connect.notification.project.member.joined +- PROJECT_MEMBER_REMOVED_TOPIC: Kafka topic to listen when a member is removed to a project, default value: connect.notification.project.member.removed - REQUEST_TIMEOUT: superagent request timeout in milliseconds, default value is 20000 - RESOURCE_ROLE_ID: the challenge member resource role id +- MANAGER_RESOURCE_ROLE_ID: the challenge manager resource role ID - GET_PROJECT_API_BASE: get project API base URL, default value is mock API 'http://localhost:4000/v5/projects' - SEARCH_MEMBERS_API_BASE: search members API base URL, default value is 'https://api.topcoder.com/v3/members/_search' -- CREATE_RESOURCE_API: create resource API URL, default value is mock API 'http://localhost:4000/v5/resources' +- RESOURCES_API: create resource API URL, default value is mock API 'http://localhost:4000/v5/resources' +- CHALLENGE_API: the challennge API URL, default value is http://localhost:4000/v5/challenges Set the following environment variables so that the app can get TC M2M token (use 'set' insted of 'export' for Windows OS): diff --git a/config/default.js b/config/default.js index 3efe39d..7c83144 100755 --- a/config/default.js +++ b/config/default.js @@ -21,14 +21,19 @@ module.exports = { // for the local Kafka, they are not needed KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT, KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY, - KAFKA_TOPIC: process.env.KAFKA_TOPIC || 'challenge.notification.create', + // Topics + CHALLENGE_CREATE_TOPIC: process.env.CHALLENGE_CREATE_TOPIC || 'challenge.notification.create', + PROJECT_MEMBER_ADDED_TOPIC: process.env.PROJECT_MEMBER_ADDED_TOPIC || 'connect.notification.project.member.joined', + PROJECT_MEMBER_REMOVED_TOPIC: process.env.PROJECT_MEMBER_REMOVED_TOPIC || 'connect.notification.project.member.removed', // superagent request timeout in milliseconds REQUEST_TIMEOUT: process.env.REQUEST_TIMEOUT ? Number(process.env.REQUEST_TIMEOUT) : 20000, RESOURCE_ROLE_ID: process.env.RESOURCE_ROLE_ID || '2a4dc376-a31c-4d00-b173-13934d89e286', + MANAGER_RESOURCE_ROLE_ID: process.env.MANAGER_RESOURCE_ROLE_ID || '0e9c6879-39e4-4eb6-b8df-92407890faf1', GET_PROJECT_API_BASE: process.env.GET_PROJECT_API_BASE || 'http://localhost:4000/v5/projects', SEARCH_MEMBERS_API_BASE: process.env.SEARCH_MEMBERS_API_BASE || 'https://api.topcoder-dev.com/v3/members/_search', - CREATE_RESOURCE_API: process.env.CREATE_RESOURCE_API || 'http://localhost:4000/v5/resources' + RESOURCES_API: process.env.RESOURCES_API || 'http://localhost:4000/v5/resources', + CHALLENGE_API: process.env.CHALLENGE_API || 'http://localhost:4000/v5/challenges' } diff --git a/docs/dev.env b/docs/dev.env index a16b1ff..fa06688 100644 --- a/docs/dev.env +++ b/docs/dev.env @@ -23,5 +23,5 @@ RESOURCE_ROLE_ID: '', GET_PROJECT_API_BASE: '', SEARCH_MEMBERS_API_BASE: '', - CREATE_RESOURCE_API: '' + RESOURCES_API: '' } \ No newline at end of file diff --git a/docs/prod.env b/docs/prod.env index 65435b0..8559358 100644 --- a/docs/prod.env +++ b/docs/prod.env @@ -23,5 +23,5 @@ RESOURCE_ROLE_ID: '', GET_PROJECT_API_BASE: '', SEARCH_MEMBERS_API_BASE: '', - CREATE_RESOURCE_API: '' + RESOURCES_API: '' } diff --git a/src/app.js b/src/app.js index c6a5137..2249983 100755 --- a/src/app.js +++ b/src/app.js @@ -34,7 +34,14 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, ( } return (async () => { - await ProcessorService.processMessage(messageJSON) + if (topic === config.CHALLENGE_CREATE_TOPIC) { + await ProcessorService.handleChallengeCreate(messageJSON) + } else if (topic === config.PROJECT_MEMBER_ADDED_TOPIC) { + await ProcessorService.handleMemberAdded(messageJSON) + } else if (topic === config.PROJECT_MEMBER_REMOVED_TOPIC) { + await ProcessorService.handleMemberRemoved(messageJSON) + } + logger.debug('Successfully processed message') })() // commit offset .then(() => consumer.commitOffset({ topic, partition, offset: m.offset })) @@ -57,7 +64,11 @@ function check () { logger.info('Starting kafka consumer') consumer .init([{ - subscriptions: [config.KAFKA_TOPIC], + subscriptions: [ + config.CHALLENGE_CREATE_TOPIC, + config.PROJECT_MEMBER_ADDED_TOPIC, + config.PROJECT_MEMBER_REMOVED_TOPIC + ], handler: dataHandler }]) .then(() => { diff --git a/src/common/helper.js b/src/common/helper.js index 6a1368e..d0a938b 100755 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -47,6 +47,75 @@ async function getProject (projectId) { return res.body } +/** + * Get all challenges for a specific project + * @param {Number} projectId the project ID + */ +async function getProjectChallenges (projectId) { + const token = await getM2MToken() + const url = `${config.CHALLENGE_API}` + let allChallenges = [] + let page = 1 + while (true) { + const res = await superagent + .get(url) + .query({ + projectId, + isLightweight: true, + perPage: 100 + }) + .set('Authorization', `Bearer ${token}`) + if (res.status !== 200) { + throw new Error(`Failed to get project details of id ${projectId}: ${_.get(res.body, 'message')}`) + } + const challenges = res.body || [] + if (challenges.length === 0) { + break + } + allChallenges = allChallenges.concat(_.map(challenges, c => _.pick(c, ['id']))) + page += 1 + if (res.headers['x-total-pages'] && page > Number(res.headers['x-total-pages'])) { + break + } + } + return allChallenges +} + +/** + * Get challenge resources + * @param {String} challengeId the challenge ID + * @param {String} roleId the role ID + */ +async function getChallengeResources (challengeId, roleId) { + const token = await getM2MToken() + const url = `${config.RESOURCES_API}` + let allResources = [] + let page = 1 + while (true) { + const res = await superagent + .get(url) + .query({ + challengeId, + roleId: roleId || config.RESOURCE_ROLE_ID, + perPage: 100 + }) + .set('Authorization', `Bearer ${token}`) + if (res.status !== 200) { + throw new Error(`Failed to get resources for challenge id ${challengeId}: ${_.get(res.body, 'message')}`) + } + const resources = res.body || [] + if (resources.length === 0) { + break + } + allResources = allResources.concat(resources) + page += 1 + if (res.headers['x-total-pages'] && page > Number(res.headers['x-total-pages'])) { + break + } + } + return allResources +} + /** * Search members of given member ids * @param {Array} memberIds the member ids @@ -79,18 +148,41 @@ async function searchMembers (memberIds) { * Create resource. * @param {String} challengeId the challenge id * @param {String} memberHandle the member handle + * @param {String} roleId the role ID + * @return {Object} the created resource + */ +async function createResource (challengeId, memberHandle, roleId) { + // M2M token is cached by 'tc-core-library-js' lib + const token = await getM2MToken() + const res = await superagent + .post(config.RESOURCES_API) + .set('Authorization', `Bearer ${token}`) + .send({ + challengeId, + memberHandle, + roleId: roleId || config.RESOURCE_ROLE_ID + }) + .timeout(config.REQUEST_TIMEOUT) + return res.body +} + +/** + * Delete resource. + * @param {String} challengeId the challenge id + * @param {String} memberHandle the member handle + * @param {String} roleId the role ID * @return {Object} the created resource */ -async function createResource (challengeId, memberHandle) { +async function deleteResource (challengeId, memberHandle, roleId) { // M2M token is cached by 'tc-core-library-js' lib const token = await getM2MToken() const res = await superagent - .post(config.CREATE_RESOURCE_API) + .delete(config.RESOURCES_API) .set('Authorization', `Bearer ${token}`) .send({ challengeId, memberHandle, - roleId: config.RESOURCE_ROLE_ID + roleId: roleId || config.RESOURCE_ROLE_ID }) .timeout(config.REQUEST_TIMEOUT) return res.body @@ -100,5 +192,8 @@ module.exports = { getKafkaOptions, getProject, searchMembers, - createResource + createResource, + deleteResource, + getProjectChallenges, + getChallengeResources } diff --git a/src/services/ProcessorService.js b/src/services/ProcessorService.js index 64d075d..11aa7e1 100755 --- a/src/services/ProcessorService.js +++ b/src/services/ProcessorService.js @@ -4,6 +4,7 @@ const _ = require('lodash') const Joi = require('joi') +const config = require('config') const logger = require('../common/logger') const helper = require('../common/helper') @@ -11,7 +12,7 @@ const helper = require('../common/helper') * Process Kafka message of challenge created * @param {Object} message the challenge created message */ -async function processMessage (message) { +async function handleChallengeCreate (message) { const challengeId = message.payload.id const projectId = message.payload.projectId logger.info(`Process message of challenge id ${challengeId} and project id ${projectId}`) @@ -32,7 +33,7 @@ async function processMessage (message) { logger.info(`Successfully processed message of challenge id ${challengeId} and project id ${projectId}`) } -processMessage.schema = { +handleChallengeCreate.schema = { message: Joi.object().keys({ topic: Joi.string().required(), originator: Joi.string().required(), @@ -45,9 +46,82 @@ processMessage.schema = { }).required() } +/** + * Handle project member changes + * @param {Number} projectId the project ID + * @param {Number} userId the user ID + * @param {Boolean} isDeleted flag to indicate that a member has been deleted + */ +async function handleProjectMemberChange (projectId, userId, isDeleted) { + // verify project exists + await helper.getProject(projectId) + // get project challenges + const challenges = await helper.getProjectChallenges(projectId) + // get member handle + const [memberDetails] = await helper.searchMembers([userId]) + const { handle } = memberDetails + for (const challenge of challenges) { + const challenngeResources = await helper.getChallengeResources(challenge.id, config.MANAGER_RESOURCE_ROLE_ID) + const existing = _.find(challenngeResources, r => _.toString(r.memberId) === _.toString(userId)) + if (isDeleted) { + if (existing) { + await helper.deleteResource(challenge.id, handle, config.MANAGER_RESOURCE_ROLE_ID) + } + } else { + if (!existing) { + await helper.createResource(challenge.id, handle, config.MANAGER_RESOURCE_ROLE_ID) + } + } + } +} + +/** + * Process kafka message of member added to a project + * @param {Object} message the member added message + */ +async function handleMemberAdded (message) { + await handleProjectMemberChange(message.payload.projectId, message.payload.userId) +} + +handleMemberAdded.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + projectId: Joi.number().integer().positive().required(), + userId: Joi.number().integer().positive().required() + }).unknown(true).required() + }).required() +} + +/** + * Process kafka message of member removed to a project + * @param {Object} message the member added message + */ +async function handleMemberRemoved (message) { + await handleProjectMemberChange(message.payload.projectId, message.payload.userId, true) +} + +handleMemberRemoved.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + projectId: Joi.number().integer().positive().required(), + userId: Joi.number().integer().positive().required() + }).unknown(true).required() + }).required() +} + // Exports module.exports = { - processMessage + handleChallengeCreate, + handleMemberAdded, + handleMemberRemoved } logger.buildService(module.exports) diff --git a/test/unit/test.js b/test/unit/test.js index 5f34cd0..ab7714c 100755 --- a/test/unit/test.js +++ b/test/unit/test.js @@ -75,7 +75,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { let message = _.cloneDeep(testMessage) message = _.omit(message, requiredField) try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { assertValidationError(err, `"${_.last(requiredField.split('.'))}" is required`) return @@ -89,7 +89,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) _.set(message, stringField, 123) try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { assertValidationError(err, `"${_.last(stringField.split('.'))}" must be a string`) return @@ -101,7 +101,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) _.set(message, stringField, '') try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { assertValidationError(err, `"${_.last(stringField.split('.'))}" is not allowed to be empty`) return @@ -115,7 +115,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) _.set(message, guidField, '12345') try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { assertValidationError(err, `"${_.last(guidField.split('.'))}" must be a valid GUID`) return @@ -130,7 +130,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) _.set(message, positiveIntegerField, -123) try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { assertValidationError(err, `"${_.last(positiveIntegerField.split('.'))}" must be a positive number`) return @@ -144,7 +144,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) _.set(message, dateField, 'abc') try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { assertValidationError(err, `"${_.last(dateField.split('.'))}" must be a number of milliseconds or valid date string`) @@ -158,7 +158,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) message.payload.projectId = notFoundProjectId try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { should.equal(err.message, `Failed to get project details of id ${notFoundProjectId}: it is not found`) return @@ -170,7 +170,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { const message = _.cloneDeep(testMessage) message.payload.id = createResourceFailedChallengeId try { - await service.processMessage(message) + await service.handleChallengeCreate(message) } catch (err) { should.equal(err.message, 'Internal Server Error') return @@ -179,7 +179,7 @@ describe('Topcoder Challenge Resources Processor Service Unit Tests', () => { }) it('test process message successfully', async () => { - await service.processMessage(testMessage) + await service.handleChallengeCreate(testMessage) assertInfoMessage(`Process message of challenge id ${ testMessage.payload.id} and project id ${testMessage.payload.projectId}`) assertInfoMessage(`Successfully processed message of challenge id ${