diff --git a/.circleci/config.yml b/.circleci/config.yml index cb606a8a..19092136 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -68,8 +68,7 @@ workflows: branches: only: - dev - - change-validatations-in-job-jc - - feature/enriching-skills-data-with-api-2 + - feature/shapeup4-cqrs-update # Production builds are exectuted only on tagged commits to the # master branch. diff --git a/config/default.js b/config/default.js index e3dc3cf8..319720ab 100644 --- a/config/default.js +++ b/config/default.js @@ -95,6 +95,9 @@ module.exports = { KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting', // The originator value for the kafka messages KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api', + + // topics for error + TAAS_ERROR_TOPIC: process.env.TAAS_ERROR_TOPIC || 'taas.action.error', // topics for job service // the create job entity Kafka message topic TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create', diff --git a/data/demo-data.json b/data/demo-data.json index 0e67b097..dddaaf13 100644 --- a/data/demo-data.json +++ b/data/demo-data.json @@ -19,7 +19,7 @@ "a2b4bc11-c641-4a19-9eb7-33980378f82e" ], "status": "in-review", - "isApplicationPageActive": false, + "isApplicationPageActive": true, "minSalary": 100, "maxSalary": 200, "hoursPerWeek": 20, @@ -51,7 +51,7 @@ "a2b4bc11-c641-4a19-9eb7-33980378f82e" ], "status": "in-review", - "isApplicationPageActive": false, + "isApplicationPageActive": true, "minSalary": 100, "maxSalary": 200, "hoursPerWeek": 80, @@ -83,7 +83,7 @@ "a2b4bc11-c641-4a19-9eb7-33980378f82e" ], "status": "in-review", - "isApplicationPageActive": false, + "isApplicationPageActive": true, "minSalary": 100, "maxSalary": 200, "hoursPerWeek": 90, @@ -115,7 +115,7 @@ "a2b4bc11-c641-4a19-9eb7-33980378f82e" ], "status": "in-review", - "isApplicationPageActive": false, + "isApplicationPageActive": true, "minSalary": 100, "maxSalary": 200, "hoursPerWeek": 20, @@ -148,7 +148,7 @@ "0b104b7c-0792-4118-8bc7-a274e9ee19e3" ], "status": "closed", - "isApplicationPageActive": false, + "isApplicationPageActive": true, "minSalary": null, "maxSalary": null, "hoursPerWeek": null, diff --git a/docs/swagger.yaml b/docs/swagger.yaml index dced7a41..a3dc43e4 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -118,6 +118,12 @@ paths: schema: type: integer description: The project id. + - in: query + name: isApplicationPageActive + required: false + schema: + type: boolean + description: Is application page active. - in: query name: projectIds required: false diff --git a/docs/taas-ER-diagram.png b/docs/taas-ER-diagram.png index 9733bbd6..e5726ee1 100644 Binary files a/docs/taas-ER-diagram.png and b/docs/taas-ER-diagram.png differ diff --git a/scripts/demo-email-notifications/index.js b/scripts/demo-email-notifications/index.js index 7e5b19c0..87ee16b8 100644 --- a/scripts/demo-email-notifications/index.js +++ b/scripts/demo-email-notifications/index.js @@ -31,22 +31,22 @@ async function resetNotificationRecords () { // reset completed interview records localLogger.info('reset completed interview records') const pastTime = moment.duration(config.INTERVIEW_COMPLETED_PAST_TIME) - const endTimestamp = moment().subtract(pastTime).add(config.INTERVIEW_COMPLETED_MATCH_WINDOW).toDate() + const completedStartTimestamp = moment().subtract(pastTime).add(config.INTERVIEW_COMPLETED_MATCH_WINDOW).toDate() const completedInterview = await Interview.findById('9efd72c3-1dc7-4ce2-9869-8cca81d0adeb') const duration = 30 - const completedStartTimestamp = moment().subtract(pastTime).subtract(30, 'm').toDate() - await completedInterview.update({ startTimestamp: completedStartTimestamp, duration, endTimestamp, status: Interviews.Status.Scheduled, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) + const completedEndTimestamp = moment(completedStartTimestamp).clone().add(30, 'm').toDate() + await completedInterview.update({ startTimestamp: completedStartTimestamp, duration, endTimeStamp: completedEndTimestamp, status: Interviews.Status.Scheduled, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) + const completedInterview2 = await Interview.findById('3144fa65-ea1a-4bec-81b0-7cb1c8845826') + await completedInterview2.update({ startTimestamp: completedStartTimestamp, duration, endTimeStamp: completedEndTimestamp, status: Interviews.Status.Scheduled, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) // reset post interview candidate action reminder records localLogger.info('reset post interview candidate action reminder records') - const jobCandidate = await JobCandidate.findById('881a19de-2b0c-4bb9-b36a-4cb5e223bdb5') + const jobCandidate = await JobCandidate.findById('827ee401-df04-42e1-abbe-7b97ce7937ff') await jobCandidate.update({ status: 'interview' }) - const c2Interview = await Interview.findById('077aa2ca-5b60-4ad9-a965-1b37e08a5046') - await c2Interview.update({ startTimestamp: moment().subtract(moment.duration(config.POST_INTERVIEW_ACTION_MATCH_WINDOW)).subtract(30, 'm').toDate(), duration, endTimestamp, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) - const jobCandidateWithinOneDay = await JobCandidate.findById('827ee401-df04-42e1-abbe-7b97ce7937ff') - await jobCandidateWithinOneDay.update({ status: 'interview' }) - const interviewWithinOneDay = await Interview.findById('3144fa65-ea1a-4bec-81b0-7cb1c8845826') - await interviewWithinOneDay.update({ startTimestamp: completedStartTimestamp, duration, endTimestamp, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) + const c2Interview = await Interview.findById('3144fa65-ea1a-4bec-81b0-7cb1c8845826') + await c2Interview.update({ startTimestamp: moment().subtract(moment.duration(config.POST_INTERVIEW_ACTION_MATCH_WINDOW)).subtract(30, 'm').toDate(), duration, endTimeStamp: completedEndTimestamp, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) + const c2InterviewR2 = await Interview.findById('b1f7ba76-640f-47e2-9463-59e51b51ec60') + await c2InterviewR2.update({ status: 'Scheduled', startTimestamp: moment().subtract(moment.duration(config.POST_INTERVIEW_ACTION_MATCH_WINDOW)).subtract(30, 'm').toDate(), duration, endTimeStamp: completedEndTimestamp, guestNames: ['guest1', 'guest2'], hostName: 'hostName' }) // reset upcoming resource booking expiration records localLogger.info('reset upcoming resource booking expiration records') diff --git a/src/common/helper.js b/src/common/helper.js index 4d2f1dab..c96fbfb0 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -999,6 +999,26 @@ async function postEvent (topic, payload, options = {}) { await eventDispatcher.handleEvent(topic, { value: payload, options }) } +/** + * Send error event to Kafka + * @params {String} topic the topic name + * @params {Object} payload the payload + * @params {String} action for which operation error occurred + */ +async function postErrorEvent (topic, payload, action) { + _.set(payload, 'apiAction', action) + const client = getBusApiClient() + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload + } + logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`) + await client.postEvent(message) +} + /** * Test if an error is document missing exception * @@ -1220,7 +1240,7 @@ async function getTopcoderSkills (criteria) { const token = await getM2MUbahnToken() try { const res = await request - .get(`${config.TC_BETA_API}/skills`) + .get(`${config.TC_API}/skills`) .query({ taxonomyId: config.TOPCODER_TAXONOMY_ID, ...criteria @@ -1272,7 +1292,7 @@ async function getAllTopcoderSkills (criteria) { async function getSkillById (skillId) { const token = await getM2MUbahnToken() const res = await request - .get(`${config.TC_BETA_API}/skills/${skillId}`) + .get(`${config.TC_API}/skills/${skillId}`) .set('Authorization', `Bearer ${token}`) .set('Content-Type', 'application/json') .set('Accept', 'application/json') @@ -2094,6 +2114,7 @@ module.exports = { getM2MToken, getM2MUbahnToken, postEvent, + postErrorEvent, getBusApiClient, isDocumentMissingException, getProjects, diff --git a/src/esProcessors/InterviewProcessor.js b/src/esProcessors/InterviewProcessor.js new file mode 100644 index 00000000..9334f04c --- /dev/null +++ b/src/esProcessors/InterviewProcessor.js @@ -0,0 +1,127 @@ +/** + * Interview Processor + */ + +const _ = require('lodash') +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Updates jobCandidate via a painless script + * + * @param {String} jobCandidateId job candidate id + * @param {String} script script definition + */ +async function updateJobCandidateViaScript (jobCandidateId, script) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: jobCandidateId, + body: { script }, + refresh: 'wait_for' + }) +} + +/** + * Process request interview entity. + * Creates an interview record under jobCandidate. + * + * @param {Object} interview interview object + */ +async function processRequestInterview (interview) { + // add interview in collection if there's already an existing collection + // or initiate a new one with this interview + const script = { + source: ` + ctx._source.containsKey("interviews") + ? ctx._source.interviews.add(params.interview) + : ctx._source.interviews = [params.interview] + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script) +} + +/** + * Process update interview entity + * Updates the interview record under jobCandidate. + * + * @param {Object} interview interview object + */ +async function processUpdateInterview (interview) { + // if there's an interview with this id, + // update it + const script = { + source: ` + if (ctx._source.containsKey("interviews")) { + def target = ctx._source.interviews.find(i -> i.id == params.interview.id); + if (target != null) { + for (prop in params.interview.entrySet()) { + target[prop.getKey()] = prop.getValue() + } + } + } + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script) +} + +/** + * Process bulk (partially) update interviews entity. + * Currently supports status, updatedAt and updatedBy fields. + * Update Joi schema to allow more fields. + * (implementation should already handle new fields - just updating Joi schema should be enough) + * + * payload format: + * { + * "jobCandidateId": { + * "interviewId": { ...fields }, + * "interviewId2": { ...fields }, + * ... + * }, + * "jobCandidateId2": { // like above... }, + * ... + * } + * + * @param {Object} jobCandidates job candidates + */ +async function processBulkUpdateInterviews (jobCandidates) { + // script to update & params + const script = { + source: ` + def completedInterviews = params.jobCandidates[ctx._id]; + for (interview in completedInterviews.entrySet()) { + def interviewId = interview.getKey(); + def affectedFields = interview.getValue(); + def target = ctx._source.interviews.find(i -> i.id == interviewId); + if (target != null) { + for (field in affectedFields.entrySet()) { + target[field.getKey()] = field.getValue(); + } + } + } + `, + params: { jobCandidates } + } + // update interviews + await esClient.updateByQuery({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + body: { + script, + query: { + ids: { + values: _.keys(jobCandidates) + } + } + }, + refresh: true + }) +} + +module.exports = { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} diff --git a/src/esProcessors/JobCandidateProcessor.js b/src/esProcessors/JobCandidateProcessor.js new file mode 100644 index 00000000..8e82869b --- /dev/null +++ b/src/esProcessors/JobCandidateProcessor.js @@ -0,0 +1,54 @@ +/** + * Jobcandidate Processor + */ + +const config = require('config') +const helper = require('../common/helper') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/JobProcessor.js b/src/esProcessors/JobProcessor.js new file mode 100644 index 00000000..00db929c --- /dev/null +++ b/src/esProcessors/JobProcessor.js @@ -0,0 +1,54 @@ +/** + * Job Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/ResourceBookingProcessor.js b/src/esProcessors/ResourceBookingProcessor.js new file mode 100644 index 00000000..e81e3ccb --- /dev/null +++ b/src/esProcessors/ResourceBookingProcessor.js @@ -0,0 +1,54 @@ +/** + * ResourceBooking Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity message + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity message + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity message + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/RoleProcessor.js b/src/esProcessors/RoleProcessor.js new file mode 100644 index 00000000..22f819bf --- /dev/null +++ b/src/esProcessors/RoleProcessor.js @@ -0,0 +1,54 @@ +/** + * Role Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/WorkPeriodPaymentProcessor.js b/src/esProcessors/WorkPeriodPaymentProcessor.js new file mode 100644 index 00000000..78e379c6 --- /dev/null +++ b/src/esProcessors/WorkPeriodPaymentProcessor.js @@ -0,0 +1,85 @@ +/** + * WorkPeriodPayment Processor + */ + +const config = require('config') +const helper = require('../common/helper') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + // find related resourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.workPeriodId } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.workPeriodId} "WorkPeriod" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)', + params: { workPeriodPayment: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + // find workPeriodPayment in it's parent ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods.payments', + query: { + match: { 'workPeriods.payments.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.id} "WorkPeriodPayment" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate +} diff --git a/src/esProcessors/WorkPeriodProcessor.js b/src/esProcessors/WorkPeriodProcessor.js new file mode 100644 index 00000000..2fc7261f --- /dev/null +++ b/src/esProcessors/WorkPeriodProcessor.js @@ -0,0 +1,132 @@ +/** + * WorkPeriod Processor + */ + +const helper = require('../common/helper') +const config = require('config') +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity, options) { + // Find related resourceBooking + const resourceBooking = await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.resourceBookingId + }) + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.id, + body: { + script: { + lang: 'painless', + source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)', + params: { workPeriod: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + // find workPeriod in it's parent ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + // Find related ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + const resourceBookingId = entity.key.replace('resourceBooking.id:', '') + if (resourceBookingId) { + try { + await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBookingId + }) + if (!resourceBooking) { + return + } + + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } catch (e) { + // if ResourceBooking is deleted, ignore + if (e.message === 'resource_not_found_exception') { + return + } + throw e + } + } + // if ResourceBooking is deleted, ignore, else throw error + if (resourceBooking) { + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/eventHandlers/WorkPeriodPaymentEventHandler.js b/src/eventHandlers/WorkPeriodPaymentEventHandler.js index 53ab7823..3587186b 100644 --- a/src/eventHandlers/WorkPeriodPaymentEventHandler.js +++ b/src/eventHandlers/WorkPeriodPaymentEventHandler.js @@ -9,7 +9,11 @@ const logger = require('../common/logger') const helper = require('../common/helper') const { ActiveWorkPeriodPaymentStatuses } = require('../../app-constants') const WorkPeriod = models.WorkPeriod +const { + processUpdate: processUpdateEs +} = require('../esProcessors/WorkPeriodProcessor') +const sequelize = models.sequelize /** * When a WorkPeriodPayment is updated or created, the workPeriod related to * that WorkPeriodPayment should be updated also. @@ -39,8 +43,25 @@ async function updateWorkPeriod (payload) { }) return } - const updated = await workPeriodModel.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, _.omit(updated.toJSON(), 'payments'), { oldValue: workPeriod, key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodModel.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdateEs({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: workPeriod, key }) + logger.debug({ component: 'WorkPeriodPaymentEventHandler', context: 'updateWorkPeriod', diff --git a/src/services/InterviewService.js b/src/services/InterviewService.js index 3ddb1a3d..4ed04c13 100644 --- a/src/services/InterviewService.js +++ b/src/services/InterviewService.js @@ -13,7 +13,15 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') - +const { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} = require('../esProcessors/InterviewProcessor') +const { + processUpdate: jobCandidateProcessUpdate +} = require('../esProcessors/JobCandidateProcessor') +const sequelize = models.sequelize const Interview = models.Interview const esClient = helper.getESClient() @@ -245,25 +253,35 @@ async function requestInterview (currentUser, jobCandidateId, interview) { return (foundGuestMember !== undefined) ? `${foundGuestMember.firstName} ${foundGuestMember.lastName}` : guestEmail.split('@')[0] }) + let entity + let jobCandidateEntity try { - // create the interview - const created = await Interview.create(interview) - await helper.postEvent(config.TAAS_INTERVIEW_REQUEST_TOPIC, created.toJSON()) - // update jobCandidate.status to Interview - const [, affectedRows] = await models.JobCandidate.update( - { status: 'interview' }, - { where: { id: created.jobCandidateId }, returning: true } - ) - const updatedJobCandidate = _.omit(_.get(affectedRows, '0.dataValues'), 'deletedAt') - await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, updatedJobCandidate) - // return created interview - return created.dataValues + await sequelize.transaction(async (t) => { + // create the interview + const created = await Interview.create(interview, { transaction: t }) + entity = created.toJSON() + await processRequestInterview(entity) + // update jobCandidate.status to Interview + const [, affectedRows] = await models.JobCandidate.update( + { status: 'interview' }, + { where: { id: created.jobCandidateId }, returning: true, transaction: t } + ) + jobCandidateEntity = _.omit(_.get(affectedRows, '0.dataValues'), 'deletedAt') + await jobCandidateProcessUpdate(jobCandidateEntity) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.request') + } // gracefully handle if one of the common sequelize errors handleSequelizeError(err, jobCandidateId) // if reaches here, it's not one of the common errors handled in `handleSequelizeError` throw err } + await helper.postEvent(config.TAAS_INTERVIEW_REQUEST_TOPIC, entity) + await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, jobCandidateEntity) + // return created interview + return entity } requestInterview.schema = Joi.object().keys({ @@ -301,16 +319,24 @@ async function partiallyUpdateInterview (currentUser, interview, data) { } data.updatedBy = await helper.getUserId(currentUser.userId) + let entity try { - const updated = await interview.update(data) - await helper.postEvent(config.TAAS_INTERVIEW_UPDATE_TOPIC, updated.toJSON(), { oldValue: interview.toJSON() }) - return updated.dataValues + await sequelize.transaction(async (t) => { + const updated = await interview.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdateInterview(entity) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.update') + } // gracefully handle if one of the common sequelize errors handleSequelizeError(err, interview.jobCandidateId) // if reaches here, it's not one of the common errors handled in `handleSequelizeError` throw err } + await helper.postEvent(config.TAAS_INTERVIEW_UPDATE_TOPIC, entity, { oldValue: interview.toJSON() }) + return entity } /** @@ -571,38 +597,58 @@ searchInterviews.schema = Joi.object().keys({ async function updateCompletedInterviews () { logger.info({ component: 'InterviewService', context: 'updateCompletedInterviews', message: 'Running the scheduled job...' }) const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000) - const [affectedCount, updatedRows] = await Interview.update( - // '00000000-0000-0000-0000-000000000000' - to indicate it's updated by the system job - { status: InterviewConstants.Status.Completed, updatedBy: '00000000-0000-0000-0000-000000000000' }, - { - where: { - status: [InterviewConstants.Status.Scheduled, InterviewConstants.Status.Rescheduled], - startTimestamp: { - [Op.lte]: oneHourAgo + + let entity + let affectedCount + try { + await sequelize.transaction(async (t) => { + const updated = await Interview.update( + // '00000000-0000-0000-0000-000000000000' - to indicate it's updated by the system job + { status: InterviewConstants.Status.Completed, updatedBy: '00000000-0000-0000-0000-000000000000' }, + { + where: { + status: [InterviewConstants.Status.Scheduled, InterviewConstants.Status.Rescheduled], + startTimestamp: { + [Op.lte]: oneHourAgo + } + }, + returning: true, + transaction: t } - }, - returning: true - } - ) - - // post event if there are affected/updated interviews - if (affectedCount > 0) { - // payload format: - // { - // jobCandidateId: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, - // jobCandidateId2: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, - // ... - // } - const bulkUpdatePayload = {} - // construct payload - _.forEach(updatedRows, row => { - const interview = row.toJSON() - const affectedFields = _.pick(interview, ['status', 'updatedBy', 'updatedAt']) - _.set(bulkUpdatePayload, [interview.jobCandidateId, interview.id], affectedFields) + ) + let updatedRows + [affectedCount, updatedRows] = updated + + // post event if there are affected/updated interviews + if (affectedCount > 0) { + // payload format: + // { + // jobCandidateId: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, + // jobCandidateId2: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, + // ... + // } + const bulkUpdatePayload = {} + // construct payload + _.forEach(updatedRows, row => { + const interview = row.toJSON() + const affectedFields = _.pick(interview, ['status', 'updatedBy', 'updatedAt']) + _.set(bulkUpdatePayload, [interview.jobCandidateId, interview.id], affectedFields) + }) + entity = bulkUpdatePayload + await processBulkUpdateInterviews(bulkUpdatePayload) + } }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.bulkupdate') + } + throw e + } + if (affectedCount) { // post event - await helper.postEvent(config.TAAS_INTERVIEW_BULK_UPDATE_TOPIC, bulkUpdatePayload) + await helper.postEvent(config.TAAS_INTERVIEW_BULK_UPDATE_TOPIC, entity) } + logger.info({ component: 'InterviewService', context: 'updateCompletedInterviews', message: `Completed running. Updated ${affectedCount} interviews.` }) } diff --git a/src/services/JobCandidateService.js b/src/services/JobCandidateService.js index e142fbc4..59a2b4f5 100644 --- a/src/services/JobCandidateService.js +++ b/src/services/JobCandidateService.js @@ -14,6 +14,13 @@ const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') const JobService = require('./JobService') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/JobCandidateProcessor') + +const sequelize = models.sequelize const NotificationSchedulerService = require('./NotificationsSchedulerService') const JobCandidate = models.JobCandidate const esClient = helper.getESClient() @@ -118,9 +125,21 @@ async function createJobCandidate (currentUser, jobCandidate) { jobCandidate.id = uuid() jobCandidate.createdBy = await helper.getUserId(currentUser.userId) - const created = await JobCandidate.create(jobCandidate) - await helper.postEvent(config.TAAS_JOB_CANDIDATE_CREATE_TOPIC, created.toJSON()) - return created.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const created = await JobCandidate.create(jobCandidate, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'jobcandidate.create') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CANDIDATE_CREATE_TOPIC, entity) + return entity } createJobCandidate.schema = Joi.object().keys({ @@ -155,8 +174,20 @@ async function updateJobCandidate (currentUser, id, data) { data.updatedBy = userId - const updated = await jobCandidate.update(data) - await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await jobCandidate.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'jobcandidate.update') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, entity, { oldValue: oldValue }) const result = _.assign(jobCandidate.dataValues, data) return result } @@ -227,7 +258,15 @@ async function deleteJobCandidate (currentUser, id) { } const jobCandidate = await JobCandidate.findById(id) - await jobCandidate.destroy() + try { + await sequelize.transaction(async (t) => { + await jobCandidate.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'jobcandidate.delete') + throw e + } await helper.postEvent(config.TAAS_JOB_CANDIDATE_DELETE_TOPIC, { id }) } @@ -374,11 +413,10 @@ async function downloadJobCandidateResume (currentUser, id) { try { const job = await models.Job.findById(jobCandidate.jobId) const { handle } = await helper.getUserById(jobCandidate.userId, true) - const { email } = await helper.getMemberDetailsByHandle(handle) await NotificationSchedulerService.sendNotification(currentUser, { template: 'taas.notification.job-candidate-resume-viewed', - recipients: [email], + recipients: [{ handle }], data: { jobCandidateUserHandle: handle, jobName: job.title, diff --git a/src/services/JobService.js b/src/services/JobService.js index 7e38227c..a84d56e6 100644 --- a/src/services/JobService.js +++ b/src/services/JobService.js @@ -12,7 +12,13 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/JobProcessor') +const sequelize = models.sequelize const Job = models.Job const esClient = helper.getESClient() @@ -32,7 +38,8 @@ async function _getJobCandidates (jobId) { value: jobId } } - } + }, + size: 10000 } }) @@ -182,9 +189,22 @@ async function createJob (currentUser, job, onTeamCreating) { job.id = uuid() job.createdBy = await helper.getUserId(currentUser.userId) - const created = await Job.create(job) - await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, created.toJSON(), { onTeamCreating }) - return created.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const created = await Job.create(job, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'job.create') + } + throw e + } + + await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, entity, { onTeamCreating }) + return entity } createJob.schema = Joi.object() @@ -253,8 +273,20 @@ async function updateJob (currentUser, id, data) { data.updatedBy = ubahnUserId - const updated = await job.update(data) - await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await job.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'job.update') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, entity, { oldValue: oldValue }) job = await Job.findById(id, true) job.dataValues.candidates = _.map(job.dataValues.candidates, (c) => c.dataValues) return job.dataValues @@ -351,7 +383,15 @@ async function deleteJob (currentUser, id) { } const job = await Job.findById(id) - await job.destroy() + try { + await sequelize.transaction(async (t) => { + await job.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'job.delete') + throw e + } await helper.postEvent(config.TAAS_JOB_DELETE_TOPIC, { id }) } @@ -414,6 +454,7 @@ async function searchJobs (currentUser, criteria, options = { returnAll: false } } _.each(_.pick(criteria, [ + 'isApplicationPageActive', 'projectId', 'externalId', 'description', @@ -493,6 +534,7 @@ async function searchJobs (currentUser, criteria, options = { returnAll: false } logger.info({ component: 'JobService', context: 'searchJobs', message: 'fallback to DB query' }) const filter = { [Op.and]: [] } _.each(_.pick(criteria, [ + 'isApplicationPageActive', 'projectId', 'externalId', 'startDate', @@ -556,6 +598,7 @@ searchJobs.schema = Joi.object().keys({ sortOrder: Joi.string().valid('desc', 'asc'), projectId: Joi.number().integer(), externalId: Joi.string(), + isApplicationPageActive: Joi.boolean(), description: Joi.string(), title: Joi.title(), startDate: Joi.date(), diff --git a/src/services/NotificationsSchedulerService.js b/src/services/NotificationsSchedulerService.js index 133553d1..90a6b58f 100644 --- a/src/services/NotificationsSchedulerService.js +++ b/src/services/NotificationsSchedulerService.js @@ -192,7 +192,7 @@ async function sendCandidatesAvailableNotifications () { */ async function sendInterviewComingUpNotifications () { localLogger.debug('[sendInterviewComingUpNotifications]: Looking for due records...') - const currentTime = moment.utc() + const currentTime = moment.utc().startOf('minute') const timestampFilter = { [Op.or]: [] } @@ -204,10 +204,10 @@ async function sendInterviewComingUpNotifications () { timestampFilter[Op.or].push({ [Op.and]: [ { - [Op.gt]: rangeStart + [Op.gte]: rangeStart }, { - [Op.lte]: rangeEnd + [Op.lt]: rangeEnd } ] }) @@ -216,7 +216,12 @@ async function sendInterviewComingUpNotifications () { const filter = { [Op.and]: [ { - status: { [Op.eq]: constants.Interviews.Status.Scheduled } + status: { + [Op.in]: [ + constants.Interviews.Status.Scheduled, + constants.Interviews.Status.Rescheduled + ] + } }, { startTimestamp: timestampFilter @@ -285,15 +290,21 @@ async function sendInterviewComingUpNotifications () { async function sendInterviewCompletedNotifications () { localLogger.debug('[sendInterviewCompletedNotifications]: Looking for due records...') const window = moment.duration(config.INTERVIEW_COMPLETED_MATCH_WINDOW) - const rangeStart = moment.utc().subtract(moment.duration(config.INTERVIEW_COMPLETED_PAST_TIME)) + const rangeStart = moment.utc().startOf('minute').subtract(moment.duration(config.INTERVIEW_COMPLETED_PAST_TIME)) const rangeEnd = rangeStart.clone().add(window) const filter = { [Op.and]: [ { - status: { [Op.eq]: constants.Interviews.Status.Scheduled } + status: { + [Op.in]: [ + constants.Interviews.Status.Scheduled, + constants.Interviews.Status.Rescheduled, + constants.Interviews.Status.Completed + ] + } }, { - endTimestamp: { + startTimestamp: { [Op.and]: [ { [Op.gte]: rangeStart @@ -307,10 +318,14 @@ async function sendInterviewCompletedNotifications () { ] } - const interviews = await Interview.findAll({ + let interviews = await Interview.findAll({ where: filter, raw: true }) + interviews = _.map(_.values(_.groupBy(interviews, 'jobCandidateId')), (interviews) => _.maxBy(interviews, 'round')) + + const jobCandidates = await JobCandidate.findAll({ where: { id: _.map(interviews, 'jobCandidateId') } }) + const jcMap = _.keyBy(jobCandidates, 'id') localLogger.debug(`[sendInterviewCompletedNotifications]: Found ${interviews.length} interviews which must be ended by now.`) @@ -320,8 +335,12 @@ async function sendInterviewCompletedNotifications () { localLogger.error(`Interview id: ${interview.id} host email not present`) continue } + if (!jcMap[interview.jobCandidateId] || jcMap[interview.jobCandidateId].status !== constants.JobCandidateStatus.INTERVIEW) { + localLogger.error(`Interview id: ${interview.id} job candidate status is not ${constants.JobCandidateStatus.INTERVIEW}`) + continue + } - const data = await getDataForInterview(interview) + const data = await getDataForInterview(interview, jcMap[interview.jobCandidateId]) if (!data) { continue } sendNotification({}, { @@ -357,7 +376,13 @@ async function sendPostInterviewActionNotifications () { as: 'interviews', required: true, where: { - status: constants.Interviews.Status.Completed, + status: { + [Op.in]: [ + constants.Interviews.Status.Scheduled, + constants.Interviews.Status.Rescheduled, + constants.Interviews.Status.Completed + ] + }, startTimestamp: { [Op.lte]: moment.utc().subtract(moment.duration(config.POST_INTERVIEW_ACTION_MATCH_WINDOW)) } @@ -394,28 +419,27 @@ async function sendPostInterviewActionNotifications () { const projectJcs = _.filter(completedJobCandidates, jc => jc.jobId === projectJob.id) numCandidates += projectJcs.length for (const projectJc of projectJcs) { - for (const interview of projectJc.interviews) { - const d = await getDataForInterview(interview, projectJc, projectJob) - if (!d) { continue } - d.jobUrl = `${config.TAAS_APP_URL}/${projectId}/positions/${projectJob.id}` - webNotifications.push({ - serviceId: 'web', - type: template, - details: { - recipients: projectTeamRecipients, - contents: { - jobTitle: d.jobTitle, - teamName: project.name, - projectId, - jobId: projectJob.id, - userHandle: d.handle - }, - version: 1 - } - }) + const interview = _.maxBy(projectJc.interviews, 'round') + const d = await getDataForInterview(interview, projectJc, projectJob) + if (!d) { continue } + d.jobUrl = `${config.TAAS_APP_URL}/${projectId}/positions/${projectJob.id}` + webNotifications.push({ + serviceId: 'web', + type: template, + details: { + recipients: projectTeamRecipients, + contents: { + jobTitle: d.jobTitle, + teamName: project.name, + projectId, + jobId: projectJob.id, + userHandle: d.handle + }, + version: 1 + } + }) - teamInterviews.push(d) - } + teamInterviews.push(d) } } diff --git a/src/services/PaymentSchedulerService.js b/src/services/PaymentSchedulerService.js index f38eab6f..e718c295 100644 --- a/src/services/PaymentSchedulerService.js +++ b/src/services/PaymentSchedulerService.js @@ -2,11 +2,16 @@ const _ = require('lodash') const config = require('config') const moment = require('moment') const models = require('../models') -const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent } = require('../common/helper') +const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent, postErrorEvent } = require('../common/helper') const logger = require('../common/logger') const { createChallenge, addResourceToChallenge, activateChallenge, closeChallenge } = require('./PaymentService') const { ChallengeStatus, PaymentSchedulerStatus, PaymentProcessingSwitch } = require('../../app-constants') +const { + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod const PaymentScheduler = models.PaymentScheduler @@ -88,9 +93,22 @@ async function processPayment (workPeriodPayment) { } } else { const oldValue = workPeriodPayment.toJSON() - const updated = await workPeriodPayment.update({ status: 'in-progress' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let entity + let key + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update({ status: 'in-progress' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) } // Check whether the number of processed records per minute exceeds the specified number, if it exceeds, wait for the next minute before processing await checkWait(PaymentSchedulerStatus.START_PROCESS) @@ -112,11 +130,24 @@ async function processPayment (workPeriodPayment) { } const oldValue = workPeriodPayment.toJSON() - // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". - const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let key + let entity + try { + await sequelize.transaction(async (t) => { + // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". + const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }, { transaction: t }) + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'completed' }) localLogger.info(`Processed workPeriodPayment ${workPeriodPayment.id} successfully`, 'processPayment') @@ -125,10 +156,24 @@ async function processPayment (workPeriodPayment) { logger.logFullError(err, { component: 'PaymentSchedulerService', context: 'processPayment' }) const statusDetails = { errorMessage: extractErrorMessage(err), errorCode: _.get(err, 'status', -1), retry: _.get(err, 'retry', -1), step: _.get(err, 'step'), challengeId: paymentScheduler ? paymentScheduler.challengeId : null } const oldValue = workPeriodPayment.toJSON() - // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. - const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + + let entity + let key + try { + await sequelize.transaction(async (t) => { + // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. + const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) if (paymentScheduler) { await paymentScheduler.update({ step: _.get(err, 'step'), userId: paymentScheduler.userId, status: 'failed' }) diff --git a/src/services/ResourceBookingService.js b/src/services/ResourceBookingService.js index 6b155ce6..8ef7dba4 100644 --- a/src/services/ResourceBookingService.js +++ b/src/services/ResourceBookingService.js @@ -13,6 +13,11 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/ResourceBookingProcessor') const constants = require('../../app-constants') const moment = require('moment') @@ -22,6 +27,8 @@ const WorkPeriodPayment = models.WorkPeriodPayment const esClient = helper.getESClient() const cachedModelFields = _cacheModelFields() +const sequelize = models.sequelize + /** * Get the fields of the ResourceBooking model and the nested WorkPeriod model * @returns {Array} array of field names @@ -332,9 +339,21 @@ async function createResourceBooking (currentUser, resourceBooking) { resourceBooking.id = uuid() resourceBooking.createdBy = await helper.getUserId(currentUser.userId) - const created = await ResourceBooking.create(resourceBooking) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_CREATE_TOPIC, created.toJSON()) - return created.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const created = await ResourceBooking.create(resourceBooking, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'resourcebooking.create') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_CREATE_TOPIC, entity) + return entity } createResourceBooking.schema = Joi.object().keys({ @@ -385,9 +404,22 @@ async function updateResourceBooking (currentUser, id, data) { data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await resourceBooking.update(data) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) - return updated.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await resourceBooking.update(data, { transaction: t }) + + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'resourcebooking.update') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, entity, { oldValue: oldValue }) + return entity } /** @@ -477,7 +509,16 @@ async function deleteResourceBooking (currentUser, id) { // we can't delete workperiods with paymentStatus 'partially-completed' or 'completed'. await _ensurePaidWorkPeriodsNotDeleted(id) const resourceBooking = await ResourceBooking.findById(id) - await resourceBooking.destroy() + + try { + await sequelize.transaction(async (t) => { + await resourceBooking.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'resourcebooking.delete') + throw e + } await helper.postEvent(config.TAAS_RESOURCE_BOOKING_DELETE_TOPIC, { id }) } diff --git a/src/services/RoleService.js b/src/services/RoleService.js index ba128170..b5d68673 100644 --- a/src/services/RoleService.js +++ b/src/services/RoleService.js @@ -11,7 +11,13 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/RoleProcessor') +const sequelize = models.sequelize const Role = models.Role const esClient = helper.getESClient() @@ -118,10 +124,21 @@ async function createRole (currentUser, role) { role.id = uuid.v4() role.createdBy = await helper.getUserId(currentUser.userId) - const created = await Role.create(role) - - await helper.postEvent(config.TAAS_ROLE_CREATE_TOPIC, created.toJSON()) - return created.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const created = await Role.create(role, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'role.create') + } + throw e + } + await helper.postEvent(config.TAAS_ROLE_CREATE_TOPIC, entity) + return entity } createRole.schema = Joi.object().keys({ @@ -175,10 +192,22 @@ async function updateRole (currentUser, id, data) { } data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await role.update(data) - await helper.postEvent(config.TAAS_ROLE_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) - return updated.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await role.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'role.update') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, entity, { oldValue: oldValue }) + return entity } updateRole.schema = Joi.object().keys({ @@ -220,7 +249,16 @@ async function deleteRole (currentUser, id) { await _checkUserPermissionForWriteDeleteRole(currentUser) const role = await Role.findById(id) - await role.destroy() + + try { + await sequelize.transaction(async (t) => { + await role.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'role.delete') + throw e + } await helper.postEvent(config.TAAS_ROLE_DELETE_TOPIC, { id }) } diff --git a/src/services/TeamService.js b/src/services/TeamService.js index 0ec04a3f..f290c417 100644 --- a/src/services/TeamService.js +++ b/src/services/TeamService.js @@ -790,11 +790,17 @@ roleSearchRequest.schema = Joi.object() */ async function getRoleBySkills (skills) { // find all roles which includes any of the given skills + + logger.debug(`getRoleBySkills: ${JSON.stringify(skills)}`) + const queryCriteria = { where: { listOfSkills: { [Op.overlap]: skills } }, raw: true } let roles = await Role.findAll(queryCriteria) + + logger.debug(`find roles: ${JSON.stringify(roles)}`) + roles = _.filter(roles, role => _.find(role.rates, r => r.global && r.rate20Global && r.rate30Global)) if (roles.length > 0) { let result = _.each(roles, role => { @@ -808,6 +814,9 @@ async function getRoleBySkills (skills) { }) // sort roles by skillsMatch, global rate and name result = _.orderBy(result, ['skillsMatch', 'maxGlobal', 'name'], ['desc', 'desc', 'asc']) + + logger.debug(`after sorting result: ${JSON.stringify(result)}`) + if (result[0].skillsMatch >= config.ROLE_MATCHING_RATE) { // return the 1st role return _.omit(result[0], ['maxGlobal']) @@ -815,6 +824,9 @@ async function getRoleBySkills (skills) { } // if no matching role found then return Custom role or empty object const customRole = await Role.findOne({ where: { name: { [Op.iLike]: 'Custom' } }, raw: true }) || {} + + logger.debug(`got custom role: ${JSON.stringify(customRole)}`) + customRole.rates[0].rate30Global = customRole.rates[0].global * 0.75 customRole.rates[0].rate20Global = customRole.rates[0].global * 0.5 return customRole diff --git a/src/services/WorkPeriodPaymentService.js b/src/services/WorkPeriodPaymentService.js index 81ca79e9..390db252 100644 --- a/src/services/WorkPeriodPaymentService.js +++ b/src/services/WorkPeriodPaymentService.js @@ -15,6 +15,12 @@ const errors = require('../common/errors') const models = require('../models') const { WorkPeriodPaymentStatus, ActiveWorkPeriodPaymentStatuses } = require('../../app-constants') const { searchResourceBookings } = require('./ResourceBookingService') +const { + processCreate, + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod @@ -119,9 +125,23 @@ async function _createSingleWorkPeriodPaymentWithWorkPeriodAndResourceBooking (w workPeriodPayment.status = WorkPeriodPaymentStatus.SCHEDULED workPeriodPayment.createdBy = createdBy - const created = await WorkPeriodPayment.create(workPeriodPayment) - await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC, created.toJSON(), { key: `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` }) - return created.dataValues + const key = `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` + + let entity + try { + await sequelize.transaction(async (t) => { + const created = await WorkPeriodPayment.create(workPeriodPayment, { transaction: t }) + entity = created.toJSON() + await processCreate({ ...entity, key }) + }) + } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.create') + } + throw err + } + await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC, entity, { key }) + return entity } /** @@ -296,9 +316,23 @@ async function updateWorkPeriodPayment (id, data) { await _updateChallenge(workPeriodPayment.challengeId, data) } - const updated = await workPeriodPayment.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) - return updated.dataValues + const key = `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update(data, { transaction: t }) + entity = updated.toJSON() + + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) + return entity } /** diff --git a/src/services/WorkPeriodService.js b/src/services/WorkPeriodService.js index e48c6cb1..8c90eece 100644 --- a/src/services/WorkPeriodService.js +++ b/src/services/WorkPeriodService.js @@ -12,12 +12,19 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/WorkPeriodProcessor') const constants = require('../../app-constants') const moment = require('moment') const WorkPeriod = models.WorkPeriod const esClient = helper.getESClient() +const sequelize = models.sequelize + // "startDate" and "endDate" should always represent one week: // "startDate" should be always Monday and "endDate" should be always Sunday of the same week. // It should not include time or timezone, only date. @@ -221,19 +228,27 @@ async function createWorkPeriod (workPeriod) { workPeriod.id = uuid.v4() workPeriod.createdBy = config.m2m.M2M_AUDIT_USER_ID - let created = null + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + + let entity try { - created = await WorkPeriod.create(workPeriod) + await sequelize.transaction(async (t) => { + const created = await WorkPeriod.create(workPeriod, { transaction: t }) + entity = created.toJSON() + await processCreate({ ...entity, key }) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.create') + } if (!_.isUndefined(err.original)) { throw new errors.BadRequestError(err.original.detail) } else { throw err } } - - await helper.postEvent(config.TAAS_WORK_PERIOD_CREATE_TOPIC, created.toJSON(), { key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) - return created.dataValues + await helper.postEvent(config.TAAS_WORK_PERIOD_CREATE_TOPIC, entity, { key }) + return entity } createWorkPeriod.schema = Joi.object().keys({ @@ -278,11 +293,25 @@ async function updateWorkPeriod (currentUser, id, data) { if (!currentUser.isMachine) { data.updatedBy = await helper.getUserId(currentUser.userId) } - const updated = await workPeriod.update(data) - const updatedDataWithoutPayments = _.omit(updated.toJSON(), ['payments']) + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriod.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } const oldValueWithoutPayments = _.omit(oldValue, ['payments']) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, updatedDataWithoutPayments, { oldValue: oldValueWithoutPayments, key: `resourceBooking.id:${updated.resourceBookingId}` }) - return updatedDataWithoutPayments + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: oldValueWithoutPayments, key }) + return entity } /** @@ -318,13 +347,24 @@ async function deleteWorkPeriod (id) { if (_.some(workPeriod.payments, payment => constants.ActiveWorkPeriodPaymentStatuses.indexOf(payment.status) !== -1)) { throw new errors.BadRequestError(`Can't delete WorkPeriod as it has associated WorkPeriodsPayment with one of statuses ${constants.ActiveWorkPeriodPaymentStatuses.join(', ')}`) } - await models.WorkPeriodPayment.destroy({ - where: { - workPeriodId: id - } - }) - await workPeriod.destroy() - await helper.postEvent(config.TAAS_WORK_PERIOD_DELETE_TOPIC, { id }, { key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + try { + await sequelize.transaction(async (t) => { + await models.WorkPeriodPayment.destroy({ + where: { + workPeriodId: id + }, + transaction: t + }) + await workPeriod.destroy({ transaction: t }) + await processDelete({ id, key }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'workperiod.delete') + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_DELETE_TOPIC, { id }, { key }) } deleteWorkPeriod.schema = Joi.object().keys({