Skip to content

[Kafka Consumer] Create TaaS Jobs during project creation #596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ workflows:
- test
filters:
branches:
only: ['develop']
only: ['develop', 'feature/create-taas-jobs']
- deployProd:
context : org-global
requires:
Expand Down
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@
"EMBED_REPORTS_MAPPING": "{\"mock\": \"/embed/looks/2\"}",
"ALLOWED_USERS": "[]"
},
"DEFAULT_M2M_USERID": -101
"DEFAULT_M2M_USERID": -101,
"taasJobApiUrl": "https://api.topcoder.com/v5/jobs"
}
3 changes: 2 additions & 1 deletion config/development.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
"fileServiceEndpoint": "https://api.topcoder-dev.com/v3/files/",
"connectProjectsUrl": "https://connect.topcoder-dev.com/projects/",
"memberServiceEndpoint": "https://api.topcoder-dev.com/v3/members",
"identityServiceEndpoint": "https://api.topcoder-dev.com/v3/"
"identityServiceEndpoint": "https://api.topcoder-dev.com/v3/",
"taasJobApiUrl": "https://api.topcoder-dev.com/v5/jobs"
}
94 changes: 70 additions & 24 deletions src/events/kafkaHandlers.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
/**
* BUS Event Handlers
*/
import { CONNECT_NOTIFICATION_EVENT, BUS_API_EVENT, RESOURCES } from '../constants';
import {
projectCreatedKafkaHandler,
projectUpdatedKafkaHandler } from './projects';
import { projectPhaseAddedKafkaHandler, projectPhaseRemovedKafkaHandler,
projectPhaseUpdatedKafkaHandler } from './projectPhases';
CONNECT_NOTIFICATION_EVENT,
BUS_API_EVENT,
RESOURCES,
} from '../constants';
import {
timelineAdjustedKafkaHandler,
} from './timelines';
projectCreatedKafkaHandler,
projectUpdatedKafkaHandler,
} from './projects';
import {
milestoneUpdatedKafkaHandler,
} from './milestones';
projectPhaseAddedKafkaHandler,
projectPhaseRemovedKafkaHandler,
projectPhaseUpdatedKafkaHandler,
} from './projectPhases';
import { timelineAdjustedKafkaHandler } from './timelines';
import { milestoneUpdatedKafkaHandler } from './milestones';

const kafkaHandlers = {
/**
Expand All @@ -33,22 +37,64 @@ const kafkaHandlers = {
// Events coming from timeline/milestones (considering it as a separate module/service in future)
[CONNECT_NOTIFICATION_EVENT.MILESTONE_TRANSITION_COMPLETED]: milestoneUpdatedKafkaHandler,
[CONNECT_NOTIFICATION_EVENT.TIMELINE_ADJUSTED]: timelineAdjustedKafkaHandler,
};

/**
* New Unified Bus Events
*/
[BUS_API_EVENT.PROJECT_CREATED]: {
[RESOURCES.PROJECT]: projectCreatedKafkaHandler,
},
[BUS_API_EVENT.PROJECT_PHASE_CREATED]: {
[RESOURCES.PHASE]: projectPhaseAddedKafkaHandler,
},
[BUS_API_EVENT.PROJECT_PHASE_UPDATED]: {
[RESOURCES.PHASE]: projectPhaseUpdatedKafkaHandler,
},
[BUS_API_EVENT.PROJECT_PHASE_DELETED]: {
[RESOURCES.PHASE]: projectPhaseRemovedKafkaHandler,
},
/**
* Register New Unified Bus Event Handlers
*
* We need this special method so it would properly merge topics with the same names
* but different resources.
*
* @param {String} topic Kafka topic name
* @param {String} resource resource name
* @param {Function} handler handler method
*
* @returns {void}
*/
const registerKafkaHandler = (topic, resource, handler) => {
let topicConfig = kafkaHandlers[topic];

// if config for topic is not yet initialized, create it
if (!topicConfig) {
topicConfig = {};
kafkaHandlers[topic] = topicConfig;
}

if (typeof topicConfig !== 'object') {
throw new Error(
`Topic "${topic}" should be defined as object with resource names as keys.`,
);
}

if (topicConfig[resource]) {
throw new Error(
`Handler for topic "${topic}" with resource ${resource} has been already registered.`,
);
}

topicConfig[resource] = handler;
};

registerKafkaHandler(
BUS_API_EVENT.PROJECT_CREATED,
RESOURCES.PROJECT,
projectCreatedKafkaHandler,
);
registerKafkaHandler(
BUS_API_EVENT.PROJECT_PHASE_CREATED,
RESOURCES.PHASE,
projectPhaseAddedKafkaHandler,
);
registerKafkaHandler(
BUS_API_EVENT.PROJECT_PHASE_UPDATED,
RESOURCES.PHASE,
projectPhaseUpdatedKafkaHandler,
);
registerKafkaHandler(
BUS_API_EVENT.PROJECT_PHASE_DELETED,
RESOURCES.PHASE,
projectPhaseRemovedKafkaHandler,
);


export default kafkaHandlers;
66 changes: 66 additions & 0 deletions src/events/projects/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import _ from 'lodash';
import Joi from 'joi';
import Promise from 'bluebird';
import config from 'config';
import axios from 'axios';
import moment from 'moment';
import util from '../../util';
import models from '../../models';
import { createPhaseTopic } from '../projectPhases';
Expand All @@ -14,6 +16,30 @@ const ES_PROJECT_INDEX = config.get('elasticsearchConfig.indexName');
const ES_PROJECT_TYPE = config.get('elasticsearchConfig.docType');
const eClient = util.getElasticSearchClient();

/**
* creates taas job
* @param {Object} data the job data
* @return {Object} the job created
*/
const createTaasJob = async (data) => {
// TODO uncomment when TaaS API supports M2M tokens
// see https://github.com/topcoder-platform/taas-apis/issues/40
// const token = await util.getM2MToken();
const token = process.env.TAAS_API_TOKEN;
const headers = {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
};
const res = await axios
.post(config.taasJobApiUrl, data, { headers })
.catch((err) => {
const error = new Error();
error.message = _.get(err, 'response.data.message', error.message);
throw error;
});
return res.data;
};

/**
* Payload for deprecated BUS events like `connect.notification.project.updated`.
*/
Expand Down Expand Up @@ -164,6 +190,46 @@ async function projectCreatedKafkaHandler(app, topic, payload) {
await Promise.all(topicPromises);
app.logger.debug('Topics for phases are successfully created.');
}
if (project.type === 'talent-as-a-service') {
const specialists = _.get(project, 'details.taasDefinition.specialists');
if (!specialists || !specialists.length) {
app.logger.debug(`no specialists found in the project ${project.id}`);
return;
}
const targetSpecialists = _.filter(specialists, specialist => Number(specialist.people) > 0); // must be at least one people
await Promise.all(
_.map(
targetSpecialists,
(specialist) => {
const startDate = new Date();
const endDate = moment(startDate).add(Number(specialist.duration), 'M'); // the unit of duration is month
// make sure that skills would be unique in the list
const skills = _.uniq(
// use both, required and additional skills for jobs
specialist.skills.concat(specialist.additionalSkills)
// only include skills with `skillId` and ignore custom skills in jobs
.filter(skill => skill.skillId).map(skill => skill.skillId),
);
return createTaasJob({
projectId: project.id,
externalId: '0', // hardcode for now
description: specialist.roleTitle,
startDate,
endDate,
skills,
numPositions: Number(specialist.people),
resourceType: specialist.role,
rateType: 'hourly', // hardcode for now
workload: _.get(specialist, 'workLoad.title', '').toLowerCase(),
}).then((job) => {
app.logger.debug(`jobId: ${job.id} job created for roleTitle ${specialist.roleTitle}`);
}).catch((err) => {
app.logger.error(`Unable to create job for ${specialist.roleTitle}: ${err.message}`);
});
},
),
);
}
}

module.exports = {
Expand Down