diff --git a/.circleci/config.yml b/.circleci/config.yml index cb606a8a..19092136 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -68,8 +68,7 @@ workflows: branches: only: - dev - - change-validatations-in-job-jc - - feature/enriching-skills-data-with-api-2 + - feature/shapeup4-cqrs-update # Production builds are exectuted only on tagged commits to the # master branch. diff --git a/config/default.js b/config/default.js index e3dc3cf8..319720ab 100644 --- a/config/default.js +++ b/config/default.js @@ -95,6 +95,9 @@ module.exports = { KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting', // The originator value for the kafka messages KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api', + + // topics for error + TAAS_ERROR_TOPIC: process.env.TAAS_ERROR_TOPIC || 'taas.action.error', // topics for job service // the create job entity Kafka message topic TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create', diff --git a/docs/taas-ER-diagram.png b/docs/taas-ER-diagram.png index 9733bbd6..e5726ee1 100644 Binary files a/docs/taas-ER-diagram.png and b/docs/taas-ER-diagram.png differ diff --git a/src/common/helper.js b/src/common/helper.js index 4d2f1dab..a7cb1660 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -999,6 +999,26 @@ async function postEvent (topic, payload, options = {}) { await eventDispatcher.handleEvent(topic, { value: payload, options }) } +/** + * Send error event to Kafka + * @params {String} topic the topic name + * @params {Object} payload the payload + * @params {String} action for which operation error occurred + */ +async function postErrorEvent (topic, payload, action) { + _.set(payload, 'apiAction', action) + const client = getBusApiClient() + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload + } + logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`) + await client.postEvent(message) +} + /** * Test if an error is document missing exception * @@ -2094,6 +2114,7 @@ module.exports = { getM2MToken, getM2MUbahnToken, postEvent, + postErrorEvent, getBusApiClient, isDocumentMissingException, getProjects, diff --git a/src/esProcessors/InterviewProcessor.js b/src/esProcessors/InterviewProcessor.js new file mode 100644 index 00000000..9334f04c --- /dev/null +++ b/src/esProcessors/InterviewProcessor.js @@ -0,0 +1,127 @@ +/** + * Interview Processor + */ + +const _ = require('lodash') +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Updates jobCandidate via a painless script + * + * @param {String} jobCandidateId job candidate id + * @param {String} script script definition + */ +async function updateJobCandidateViaScript (jobCandidateId, script) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: jobCandidateId, + body: { script }, + refresh: 'wait_for' + }) +} + +/** + * Process request interview entity. + * Creates an interview record under jobCandidate. + * + * @param {Object} interview interview object + */ +async function processRequestInterview (interview) { + // add interview in collection if there's already an existing collection + // or initiate a new one with this interview + const script = { + source: ` + ctx._source.containsKey("interviews") + ? ctx._source.interviews.add(params.interview) + : ctx._source.interviews = [params.interview] + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script) +} + +/** + * Process update interview entity + * Updates the interview record under jobCandidate. + * + * @param {Object} interview interview object + */ +async function processUpdateInterview (interview) { + // if there's an interview with this id, + // update it + const script = { + source: ` + if (ctx._source.containsKey("interviews")) { + def target = ctx._source.interviews.find(i -> i.id == params.interview.id); + if (target != null) { + for (prop in params.interview.entrySet()) { + target[prop.getKey()] = prop.getValue() + } + } + } + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script) +} + +/** + * Process bulk (partially) update interviews entity. + * Currently supports status, updatedAt and updatedBy fields. + * Update Joi schema to allow more fields. + * (implementation should already handle new fields - just updating Joi schema should be enough) + * + * payload format: + * { + * "jobCandidateId": { + * "interviewId": { ...fields }, + * "interviewId2": { ...fields }, + * ... + * }, + * "jobCandidateId2": { // like above... }, + * ... + * } + * + * @param {Object} jobCandidates job candidates + */ +async function processBulkUpdateInterviews (jobCandidates) { + // script to update & params + const script = { + source: ` + def completedInterviews = params.jobCandidates[ctx._id]; + for (interview in completedInterviews.entrySet()) { + def interviewId = interview.getKey(); + def affectedFields = interview.getValue(); + def target = ctx._source.interviews.find(i -> i.id == interviewId); + if (target != null) { + for (field in affectedFields.entrySet()) { + target[field.getKey()] = field.getValue(); + } + } + } + `, + params: { jobCandidates } + } + // update interviews + await esClient.updateByQuery({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + body: { + script, + query: { + ids: { + values: _.keys(jobCandidates) + } + } + }, + refresh: true + }) +} + +module.exports = { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} diff --git a/src/esProcessors/JobCandidateProcessor.js b/src/esProcessors/JobCandidateProcessor.js new file mode 100644 index 00000000..8e82869b --- /dev/null +++ b/src/esProcessors/JobCandidateProcessor.js @@ -0,0 +1,54 @@ +/** + * Jobcandidate Processor + */ + +const config = require('config') +const helper = require('../common/helper') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/JobProcessor.js b/src/esProcessors/JobProcessor.js new file mode 100644 index 00000000..00db929c --- /dev/null +++ b/src/esProcessors/JobProcessor.js @@ -0,0 +1,54 @@ +/** + * Job Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/ResourceBookingProcessor.js b/src/esProcessors/ResourceBookingProcessor.js new file mode 100644 index 00000000..e81e3ccb --- /dev/null +++ b/src/esProcessors/ResourceBookingProcessor.js @@ -0,0 +1,54 @@ +/** + * ResourceBooking Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity message + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity message + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity message + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/RoleProcessor.js b/src/esProcessors/RoleProcessor.js new file mode 100644 index 00000000..22f819bf --- /dev/null +++ b/src/esProcessors/RoleProcessor.js @@ -0,0 +1,54 @@ +/** + * Role Processor + */ + +const helper = require('../common/helper') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + await esClient.create({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + await esClient.update({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + body: { + doc: entity + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + await esClient.delete({ + index: config.get('esConfig.ES_INDEX_ROLE'), + id: entity.id, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/esProcessors/WorkPeriodPaymentProcessor.js b/src/esProcessors/WorkPeriodPaymentProcessor.js new file mode 100644 index 00000000..78e379c6 --- /dev/null +++ b/src/esProcessors/WorkPeriodPaymentProcessor.js @@ -0,0 +1,85 @@ +/** + * WorkPeriodPayment Processor + */ + +const config = require('config') +const helper = require('../common/helper') + +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity) { + // find related resourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.workPeriodId } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.workPeriodId} "WorkPeriod" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)', + params: { workPeriodPayment: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + // find workPeriodPayment in it's parent ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods.payments', + query: { + match: { 'workPeriods.payments.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.id} "WorkPeriodPayment" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate +} diff --git a/src/esProcessors/WorkPeriodProcessor.js b/src/esProcessors/WorkPeriodProcessor.js new file mode 100644 index 00000000..2fc7261f --- /dev/null +++ b/src/esProcessors/WorkPeriodProcessor.js @@ -0,0 +1,132 @@ +/** + * WorkPeriod Processor + */ + +const helper = require('../common/helper') +const config = require('config') +const esClient = helper.getESClient() + +/** + * Process create entity + * @param {Object} entity entity object + */ +async function processCreate (entity, options) { + // Find related resourceBooking + const resourceBooking = await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: entity.resourceBookingId + }) + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.id, + body: { + script: { + lang: 'painless', + source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)', + params: { workPeriod: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process update entity + * @param {Object} entity entity object + */ +async function processUpdate (entity) { + // find workPeriod in it's parent ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {Object} entity entity object + */ +async function processDelete (entity) { + // Find related ResourceBooking + const resourceBooking = await esClient.search({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + body: { + query: { + nested: { + path: 'workPeriods', + query: { + match: { 'workPeriods.id': entity.id } + } + } + } + } + }) + if (!resourceBooking.body.hits.total.value) { + const resourceBookingId = entity.key.replace('resourceBooking.id:', '') + if (resourceBookingId) { + try { + await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBookingId + }) + if (!resourceBooking) { + return + } + + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } catch (e) { + // if ResourceBooking is deleted, ignore + if (e.message === 'resource_not_found_exception') { + return + } + throw e + } + } + // if ResourceBooking is deleted, ignore, else throw error + if (resourceBooking) { + throw new Error(`id: ${entity.id} "WorkPeriod" not found`) + } + } + await esClient.update({ + index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), + id: resourceBooking.body.hits.hits[0]._id, + body: { + script: { + lang: 'painless', + source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)', + params: { data: entity } + } + }, + refresh: 'wait_for' + }) +} + +module.exports = { + processCreate, + processUpdate, + processDelete +} diff --git a/src/eventHandlers/WorkPeriodPaymentEventHandler.js b/src/eventHandlers/WorkPeriodPaymentEventHandler.js index 53ab7823..3587186b 100644 --- a/src/eventHandlers/WorkPeriodPaymentEventHandler.js +++ b/src/eventHandlers/WorkPeriodPaymentEventHandler.js @@ -9,7 +9,11 @@ const logger = require('../common/logger') const helper = require('../common/helper') const { ActiveWorkPeriodPaymentStatuses } = require('../../app-constants') const WorkPeriod = models.WorkPeriod +const { + processUpdate: processUpdateEs +} = require('../esProcessors/WorkPeriodProcessor') +const sequelize = models.sequelize /** * When a WorkPeriodPayment is updated or created, the workPeriod related to * that WorkPeriodPayment should be updated also. @@ -39,8 +43,25 @@ async function updateWorkPeriod (payload) { }) return } - const updated = await workPeriodModel.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, _.omit(updated.toJSON(), 'payments'), { oldValue: workPeriod, key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodModel.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdateEs({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: workPeriod, key }) + logger.debug({ component: 'WorkPeriodPaymentEventHandler', context: 'updateWorkPeriod', diff --git a/src/services/InterviewService.js b/src/services/InterviewService.js index 3ddb1a3d..4ed04c13 100644 --- a/src/services/InterviewService.js +++ b/src/services/InterviewService.js @@ -13,7 +13,15 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') - +const { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} = require('../esProcessors/InterviewProcessor') +const { + processUpdate: jobCandidateProcessUpdate +} = require('../esProcessors/JobCandidateProcessor') +const sequelize = models.sequelize const Interview = models.Interview const esClient = helper.getESClient() @@ -245,25 +253,35 @@ async function requestInterview (currentUser, jobCandidateId, interview) { return (foundGuestMember !== undefined) ? `${foundGuestMember.firstName} ${foundGuestMember.lastName}` : guestEmail.split('@')[0] }) + let entity + let jobCandidateEntity try { - // create the interview - const created = await Interview.create(interview) - await helper.postEvent(config.TAAS_INTERVIEW_REQUEST_TOPIC, created.toJSON()) - // update jobCandidate.status to Interview - const [, affectedRows] = await models.JobCandidate.update( - { status: 'interview' }, - { where: { id: created.jobCandidateId }, returning: true } - ) - const updatedJobCandidate = _.omit(_.get(affectedRows, '0.dataValues'), 'deletedAt') - await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, updatedJobCandidate) - // return created interview - return created.dataValues + await sequelize.transaction(async (t) => { + // create the interview + const created = await Interview.create(interview, { transaction: t }) + entity = created.toJSON() + await processRequestInterview(entity) + // update jobCandidate.status to Interview + const [, affectedRows] = await models.JobCandidate.update( + { status: 'interview' }, + { where: { id: created.jobCandidateId }, returning: true, transaction: t } + ) + jobCandidateEntity = _.omit(_.get(affectedRows, '0.dataValues'), 'deletedAt') + await jobCandidateProcessUpdate(jobCandidateEntity) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.request') + } // gracefully handle if one of the common sequelize errors handleSequelizeError(err, jobCandidateId) // if reaches here, it's not one of the common errors handled in `handleSequelizeError` throw err } + await helper.postEvent(config.TAAS_INTERVIEW_REQUEST_TOPIC, entity) + await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, jobCandidateEntity) + // return created interview + return entity } requestInterview.schema = Joi.object().keys({ @@ -301,16 +319,24 @@ async function partiallyUpdateInterview (currentUser, interview, data) { } data.updatedBy = await helper.getUserId(currentUser.userId) + let entity try { - const updated = await interview.update(data) - await helper.postEvent(config.TAAS_INTERVIEW_UPDATE_TOPIC, updated.toJSON(), { oldValue: interview.toJSON() }) - return updated.dataValues + await sequelize.transaction(async (t) => { + const updated = await interview.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdateInterview(entity) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.update') + } // gracefully handle if one of the common sequelize errors handleSequelizeError(err, interview.jobCandidateId) // if reaches here, it's not one of the common errors handled in `handleSequelizeError` throw err } + await helper.postEvent(config.TAAS_INTERVIEW_UPDATE_TOPIC, entity, { oldValue: interview.toJSON() }) + return entity } /** @@ -571,38 +597,58 @@ searchInterviews.schema = Joi.object().keys({ async function updateCompletedInterviews () { logger.info({ component: 'InterviewService', context: 'updateCompletedInterviews', message: 'Running the scheduled job...' }) const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000) - const [affectedCount, updatedRows] = await Interview.update( - // '00000000-0000-0000-0000-000000000000' - to indicate it's updated by the system job - { status: InterviewConstants.Status.Completed, updatedBy: '00000000-0000-0000-0000-000000000000' }, - { - where: { - status: [InterviewConstants.Status.Scheduled, InterviewConstants.Status.Rescheduled], - startTimestamp: { - [Op.lte]: oneHourAgo + + let entity + let affectedCount + try { + await sequelize.transaction(async (t) => { + const updated = await Interview.update( + // '00000000-0000-0000-0000-000000000000' - to indicate it's updated by the system job + { status: InterviewConstants.Status.Completed, updatedBy: '00000000-0000-0000-0000-000000000000' }, + { + where: { + status: [InterviewConstants.Status.Scheduled, InterviewConstants.Status.Rescheduled], + startTimestamp: { + [Op.lte]: oneHourAgo + } + }, + returning: true, + transaction: t } - }, - returning: true - } - ) - - // post event if there are affected/updated interviews - if (affectedCount > 0) { - // payload format: - // { - // jobCandidateId: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, - // jobCandidateId2: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, - // ... - // } - const bulkUpdatePayload = {} - // construct payload - _.forEach(updatedRows, row => { - const interview = row.toJSON() - const affectedFields = _.pick(interview, ['status', 'updatedBy', 'updatedAt']) - _.set(bulkUpdatePayload, [interview.jobCandidateId, interview.id], affectedFields) + ) + let updatedRows + [affectedCount, updatedRows] = updated + + // post event if there are affected/updated interviews + if (affectedCount > 0) { + // payload format: + // { + // jobCandidateId: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, + // jobCandidateId2: { interviewId: { affectedFields }, interviewId2: { affectedFields }, ... }, + // ... + // } + const bulkUpdatePayload = {} + // construct payload + _.forEach(updatedRows, row => { + const interview = row.toJSON() + const affectedFields = _.pick(interview, ['status', 'updatedBy', 'updatedAt']) + _.set(bulkUpdatePayload, [interview.jobCandidateId, interview.id], affectedFields) + }) + entity = bulkUpdatePayload + await processBulkUpdateInterviews(bulkUpdatePayload) + } }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'interview.bulkupdate') + } + throw e + } + if (affectedCount) { // post event - await helper.postEvent(config.TAAS_INTERVIEW_BULK_UPDATE_TOPIC, bulkUpdatePayload) + await helper.postEvent(config.TAAS_INTERVIEW_BULK_UPDATE_TOPIC, entity) } + logger.info({ component: 'InterviewService', context: 'updateCompletedInterviews', message: `Completed running. Updated ${affectedCount} interviews.` }) } diff --git a/src/services/JobCandidateService.js b/src/services/JobCandidateService.js index 29f28d88..59a2b4f5 100644 --- a/src/services/JobCandidateService.js +++ b/src/services/JobCandidateService.js @@ -14,6 +14,13 @@ const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') const JobService = require('./JobService') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/JobCandidateProcessor') + +const sequelize = models.sequelize const NotificationSchedulerService = require('./NotificationsSchedulerService') const JobCandidate = models.JobCandidate const esClient = helper.getESClient() @@ -118,9 +125,21 @@ async function createJobCandidate (currentUser, jobCandidate) { jobCandidate.id = uuid() jobCandidate.createdBy = await helper.getUserId(currentUser.userId) - const created = await JobCandidate.create(jobCandidate) - await helper.postEvent(config.TAAS_JOB_CANDIDATE_CREATE_TOPIC, created.toJSON()) - return created.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const created = await JobCandidate.create(jobCandidate, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'jobcandidate.create') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CANDIDATE_CREATE_TOPIC, entity) + return entity } createJobCandidate.schema = Joi.object().keys({ @@ -155,8 +174,20 @@ async function updateJobCandidate (currentUser, id, data) { data.updatedBy = userId - const updated = await jobCandidate.update(data) - await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await jobCandidate.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'jobcandidate.update') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_CANDIDATE_UPDATE_TOPIC, entity, { oldValue: oldValue }) const result = _.assign(jobCandidate.dataValues, data) return result } @@ -227,7 +258,15 @@ async function deleteJobCandidate (currentUser, id) { } const jobCandidate = await JobCandidate.findById(id) - await jobCandidate.destroy() + try { + await sequelize.transaction(async (t) => { + await jobCandidate.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'jobcandidate.delete') + throw e + } await helper.postEvent(config.TAAS_JOB_CANDIDATE_DELETE_TOPIC, { id }) } diff --git a/src/services/JobService.js b/src/services/JobService.js index 7e38227c..8057c478 100644 --- a/src/services/JobService.js +++ b/src/services/JobService.js @@ -12,7 +12,13 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/JobProcessor') +const sequelize = models.sequelize const Job = models.Job const esClient = helper.getESClient() @@ -182,9 +188,22 @@ async function createJob (currentUser, job, onTeamCreating) { job.id = uuid() job.createdBy = await helper.getUserId(currentUser.userId) - const created = await Job.create(job) - await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, created.toJSON(), { onTeamCreating }) - return created.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const created = await Job.create(job, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'job.create') + } + throw e + } + + await helper.postEvent(config.TAAS_JOB_CREATE_TOPIC, entity, { onTeamCreating }) + return entity } createJob.schema = Joi.object() @@ -253,8 +272,20 @@ async function updateJob (currentUser, id, data) { data.updatedBy = ubahnUserId - const updated = await job.update(data) - await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await job.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'job.update') + } + throw e + } + await helper.postEvent(config.TAAS_JOB_UPDATE_TOPIC, entity, { oldValue: oldValue }) job = await Job.findById(id, true) job.dataValues.candidates = _.map(job.dataValues.candidates, (c) => c.dataValues) return job.dataValues @@ -351,7 +382,15 @@ async function deleteJob (currentUser, id) { } const job = await Job.findById(id) - await job.destroy() + try { + await sequelize.transaction(async (t) => { + await job.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'job.delete') + throw e + } await helper.postEvent(config.TAAS_JOB_DELETE_TOPIC, { id }) } diff --git a/src/services/PaymentSchedulerService.js b/src/services/PaymentSchedulerService.js index f38eab6f..e718c295 100644 --- a/src/services/PaymentSchedulerService.js +++ b/src/services/PaymentSchedulerService.js @@ -2,11 +2,16 @@ const _ = require('lodash') const config = require('config') const moment = require('moment') const models = require('../models') -const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent } = require('../common/helper') +const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent, postErrorEvent } = require('../common/helper') const logger = require('../common/logger') const { createChallenge, addResourceToChallenge, activateChallenge, closeChallenge } = require('./PaymentService') const { ChallengeStatus, PaymentSchedulerStatus, PaymentProcessingSwitch } = require('../../app-constants') +const { + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod const PaymentScheduler = models.PaymentScheduler @@ -88,9 +93,22 @@ async function processPayment (workPeriodPayment) { } } else { const oldValue = workPeriodPayment.toJSON() - const updated = await workPeriodPayment.update({ status: 'in-progress' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let entity + let key + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update({ status: 'in-progress' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) } // Check whether the number of processed records per minute exceeds the specified number, if it exceeds, wait for the next minute before processing await checkWait(PaymentSchedulerStatus.START_PROCESS) @@ -112,11 +130,24 @@ async function processPayment (workPeriodPayment) { } const oldValue = workPeriodPayment.toJSON() - // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". - const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let key + let entity + try { + await sequelize.transaction(async (t) => { + // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". + const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }, { transaction: t }) + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'completed' }) localLogger.info(`Processed workPeriodPayment ${workPeriodPayment.id} successfully`, 'processPayment') @@ -125,10 +156,24 @@ async function processPayment (workPeriodPayment) { logger.logFullError(err, { component: 'PaymentSchedulerService', context: 'processPayment' }) const statusDetails = { errorMessage: extractErrorMessage(err), errorCode: _.get(err, 'status', -1), retry: _.get(err, 'retry', -1), step: _.get(err, 'step'), challengeId: paymentScheduler ? paymentScheduler.challengeId : null } const oldValue = workPeriodPayment.toJSON() - // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. - const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + + let entity + let key + try { + await sequelize.transaction(async (t) => { + // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. + const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) if (paymentScheduler) { await paymentScheduler.update({ step: _.get(err, 'step'), userId: paymentScheduler.userId, status: 'failed' }) diff --git a/src/services/ResourceBookingService.js b/src/services/ResourceBookingService.js index 6b155ce6..8ef7dba4 100644 --- a/src/services/ResourceBookingService.js +++ b/src/services/ResourceBookingService.js @@ -13,6 +13,11 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/ResourceBookingProcessor') const constants = require('../../app-constants') const moment = require('moment') @@ -22,6 +27,8 @@ const WorkPeriodPayment = models.WorkPeriodPayment const esClient = helper.getESClient() const cachedModelFields = _cacheModelFields() +const sequelize = models.sequelize + /** * Get the fields of the ResourceBooking model and the nested WorkPeriod model * @returns {Array} array of field names @@ -332,9 +339,21 @@ async function createResourceBooking (currentUser, resourceBooking) { resourceBooking.id = uuid() resourceBooking.createdBy = await helper.getUserId(currentUser.userId) - const created = await ResourceBooking.create(resourceBooking) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_CREATE_TOPIC, created.toJSON()) - return created.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const created = await ResourceBooking.create(resourceBooking, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'resourcebooking.create') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_CREATE_TOPIC, entity) + return entity } createResourceBooking.schema = Joi.object().keys({ @@ -385,9 +404,22 @@ async function updateResourceBooking (currentUser, id, data) { data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await resourceBooking.update(data) - await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) - return updated.dataValues + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await resourceBooking.update(data, { transaction: t }) + + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'resourcebooking.update') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, entity, { oldValue: oldValue }) + return entity } /** @@ -477,7 +509,16 @@ async function deleteResourceBooking (currentUser, id) { // we can't delete workperiods with paymentStatus 'partially-completed' or 'completed'. await _ensurePaidWorkPeriodsNotDeleted(id) const resourceBooking = await ResourceBooking.findById(id) - await resourceBooking.destroy() + + try { + await sequelize.transaction(async (t) => { + await resourceBooking.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'resourcebooking.delete') + throw e + } await helper.postEvent(config.TAAS_RESOURCE_BOOKING_DELETE_TOPIC, { id }) } diff --git a/src/services/RoleService.js b/src/services/RoleService.js index ba128170..b5d68673 100644 --- a/src/services/RoleService.js +++ b/src/services/RoleService.js @@ -11,7 +11,13 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/RoleProcessor') +const sequelize = models.sequelize const Role = models.Role const esClient = helper.getESClient() @@ -118,10 +124,21 @@ async function createRole (currentUser, role) { role.id = uuid.v4() role.createdBy = await helper.getUserId(currentUser.userId) - const created = await Role.create(role) - - await helper.postEvent(config.TAAS_ROLE_CREATE_TOPIC, created.toJSON()) - return created.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const created = await Role.create(role, { transaction: t }) + entity = created.toJSON() + await processCreate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'role.create') + } + throw e + } + await helper.postEvent(config.TAAS_ROLE_CREATE_TOPIC, entity) + return entity } createRole.schema = Joi.object().keys({ @@ -175,10 +192,22 @@ async function updateRole (currentUser, id, data) { } data.updatedBy = await helper.getUserId(currentUser.userId) - const updated = await role.update(data) - await helper.postEvent(config.TAAS_ROLE_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue }) - return updated.toJSON() + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await role.update(data, { transaction: t }) + entity = updated.toJSON() + await processUpdate(entity) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'role.update') + } + throw e + } + await helper.postEvent(config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC, entity, { oldValue: oldValue }) + return entity } updateRole.schema = Joi.object().keys({ @@ -220,7 +249,16 @@ async function deleteRole (currentUser, id) { await _checkUserPermissionForWriteDeleteRole(currentUser) const role = await Role.findById(id) - await role.destroy() + + try { + await sequelize.transaction(async (t) => { + await role.destroy({ transaction: t }) + await processDelete({ id }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'role.delete') + throw e + } await helper.postEvent(config.TAAS_ROLE_DELETE_TOPIC, { id }) } diff --git a/src/services/TeamService.js b/src/services/TeamService.js index 0ec04a3f..f290c417 100644 --- a/src/services/TeamService.js +++ b/src/services/TeamService.js @@ -790,11 +790,17 @@ roleSearchRequest.schema = Joi.object() */ async function getRoleBySkills (skills) { // find all roles which includes any of the given skills + + logger.debug(`getRoleBySkills: ${JSON.stringify(skills)}`) + const queryCriteria = { where: { listOfSkills: { [Op.overlap]: skills } }, raw: true } let roles = await Role.findAll(queryCriteria) + + logger.debug(`find roles: ${JSON.stringify(roles)}`) + roles = _.filter(roles, role => _.find(role.rates, r => r.global && r.rate20Global && r.rate30Global)) if (roles.length > 0) { let result = _.each(roles, role => { @@ -808,6 +814,9 @@ async function getRoleBySkills (skills) { }) // sort roles by skillsMatch, global rate and name result = _.orderBy(result, ['skillsMatch', 'maxGlobal', 'name'], ['desc', 'desc', 'asc']) + + logger.debug(`after sorting result: ${JSON.stringify(result)}`) + if (result[0].skillsMatch >= config.ROLE_MATCHING_RATE) { // return the 1st role return _.omit(result[0], ['maxGlobal']) @@ -815,6 +824,9 @@ async function getRoleBySkills (skills) { } // if no matching role found then return Custom role or empty object const customRole = await Role.findOne({ where: { name: { [Op.iLike]: 'Custom' } }, raw: true }) || {} + + logger.debug(`got custom role: ${JSON.stringify(customRole)}`) + customRole.rates[0].rate30Global = customRole.rates[0].global * 0.75 customRole.rates[0].rate20Global = customRole.rates[0].global * 0.5 return customRole diff --git a/src/services/WorkPeriodPaymentService.js b/src/services/WorkPeriodPaymentService.js index 81ca79e9..390db252 100644 --- a/src/services/WorkPeriodPaymentService.js +++ b/src/services/WorkPeriodPaymentService.js @@ -15,6 +15,12 @@ const errors = require('../common/errors') const models = require('../models') const { WorkPeriodPaymentStatus, ActiveWorkPeriodPaymentStatuses } = require('../../app-constants') const { searchResourceBookings } = require('./ResourceBookingService') +const { + processCreate, + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod @@ -119,9 +125,23 @@ async function _createSingleWorkPeriodPaymentWithWorkPeriodAndResourceBooking (w workPeriodPayment.status = WorkPeriodPaymentStatus.SCHEDULED workPeriodPayment.createdBy = createdBy - const created = await WorkPeriodPayment.create(workPeriodPayment) - await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC, created.toJSON(), { key: `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` }) - return created.dataValues + const key = `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` + + let entity + try { + await sequelize.transaction(async (t) => { + const created = await WorkPeriodPayment.create(workPeriodPayment, { transaction: t }) + entity = created.toJSON() + await processCreate({ ...entity, key }) + }) + } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.create') + } + throw err + } + await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC, entity, { key }) + return entity } /** @@ -296,9 +316,23 @@ async function updateWorkPeriodPayment (id, data) { await _updateChallenge(workPeriodPayment.challengeId, data) } - const updated = await workPeriodPayment.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue: oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) - return updated.dataValues + const key = `workPeriodPayment.billingAccountId:${workPeriodPayment.billingAccountId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update(data, { transaction: t }) + entity = updated.toJSON() + + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) + return entity } /** diff --git a/src/services/WorkPeriodService.js b/src/services/WorkPeriodService.js index e48c6cb1..8c90eece 100644 --- a/src/services/WorkPeriodService.js +++ b/src/services/WorkPeriodService.js @@ -12,12 +12,19 @@ const helper = require('../common/helper') const logger = require('../common/logger') const errors = require('../common/errors') const models = require('../models') +const { + processCreate, + processUpdate, + processDelete +} = require('../esProcessors/WorkPeriodProcessor') const constants = require('../../app-constants') const moment = require('moment') const WorkPeriod = models.WorkPeriod const esClient = helper.getESClient() +const sequelize = models.sequelize + // "startDate" and "endDate" should always represent one week: // "startDate" should be always Monday and "endDate" should be always Sunday of the same week. // It should not include time or timezone, only date. @@ -221,19 +228,27 @@ async function createWorkPeriod (workPeriod) { workPeriod.id = uuid.v4() workPeriod.createdBy = config.m2m.M2M_AUDIT_USER_ID - let created = null + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + + let entity try { - created = await WorkPeriod.create(workPeriod) + await sequelize.transaction(async (t) => { + const created = await WorkPeriod.create(workPeriod, { transaction: t }) + entity = created.toJSON() + await processCreate({ ...entity, key }) + }) } catch (err) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.create') + } if (!_.isUndefined(err.original)) { throw new errors.BadRequestError(err.original.detail) } else { throw err } } - - await helper.postEvent(config.TAAS_WORK_PERIOD_CREATE_TOPIC, created.toJSON(), { key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) - return created.dataValues + await helper.postEvent(config.TAAS_WORK_PERIOD_CREATE_TOPIC, entity, { key }) + return entity } createWorkPeriod.schema = Joi.object().keys({ @@ -278,11 +293,25 @@ async function updateWorkPeriod (currentUser, id, data) { if (!currentUser.isMachine) { data.updatedBy = await helper.getUserId(currentUser.userId) } - const updated = await workPeriod.update(data) - const updatedDataWithoutPayments = _.omit(updated.toJSON(), ['payments']) + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriod.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } const oldValueWithoutPayments = _.omit(oldValue, ['payments']) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, updatedDataWithoutPayments, { oldValue: oldValueWithoutPayments, key: `resourceBooking.id:${updated.resourceBookingId}` }) - return updatedDataWithoutPayments + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: oldValueWithoutPayments, key }) + return entity } /** @@ -318,13 +347,24 @@ async function deleteWorkPeriod (id) { if (_.some(workPeriod.payments, payment => constants.ActiveWorkPeriodPaymentStatuses.indexOf(payment.status) !== -1)) { throw new errors.BadRequestError(`Can't delete WorkPeriod as it has associated WorkPeriodsPayment with one of statuses ${constants.ActiveWorkPeriodPaymentStatuses.join(', ')}`) } - await models.WorkPeriodPayment.destroy({ - where: { - workPeriodId: id - } - }) - await workPeriod.destroy() - await helper.postEvent(config.TAAS_WORK_PERIOD_DELETE_TOPIC, { id }, { key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + try { + await sequelize.transaction(async (t) => { + await models.WorkPeriodPayment.destroy({ + where: { + workPeriodId: id + }, + transaction: t + }) + await workPeriod.destroy({ transaction: t }) + await processDelete({ id, key }) + }) + } catch (e) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, { id }, 'workperiod.delete') + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_DELETE_TOPIC, { id }, { key }) } deleteWorkPeriod.schema = Joi.object().keys({