diff --git a/README.md b/README.md index 045684f..9cf88c6 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ The following parameters can be set in config files or in env variables: - `topics.TAAS_WORK_PERIOD_CREATE_TOPIC`: the create work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_UPDATE_TOPIC`: the update work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic +- `topics.TAAS_INTERVIEW_REQUEST_TOPIC`: the request interview entity Kafka message topic +- `topics.TAAS_INTERVIEW_UPDATE_TOPIC`: the update interview entity Kafka message topic - `topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC`: the create work period payment entity Kafka message topic - `topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC`: the update work period payment entity Kafka message topic - `esConfig.HOST`: Elasticsearch host diff --git a/config/default.js b/config/default.js index 7a8dbf8..d1f2fae 100644 --- a/config/default.js +++ b/config/default.js @@ -32,6 +32,9 @@ module.exports = { TAAS_WORK_PERIOD_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_CREATE_TOPIC || 'taas.workperiod.create', TAAS_WORK_PERIOD_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_UPDATE_TOPIC || 'taas.workperiod.update', TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete', + // topics for interview service + TAAS_INTERVIEW_REQUEST_TOPIC: process.env.TAAS_INTERVIEW_REQUEST_TOPIC || 'taas.interview.requested', + TAAS_INTERVIEW_UPDATE_TOPIC: process.env.TAAS_INTERVIEW_UPDATE_TOPIC || 'taas.interview.update', // topics for work period payment service TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC || 'taas.workperiodpayment.create', TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update' diff --git a/local/docker-compose.yml b/local/docker-compose.yml index 35e9486..3d01ede 100644 --- a/local/docker-compose.yml +++ b/local/docker-compose.yml @@ -12,7 +12,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1" + KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.interview.requested:1:1,taas.interview.update:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 esearch: image: elasticsearch:7.7.1 diff --git a/src/app.js b/src/app.js index c38c03d..e0f5a22 100644 --- a/src/app.js +++ b/src/app.js @@ -13,6 +13,7 @@ const JobProcessorService = require('./services/JobProcessorService') const JobCandidateProcessorService = require('./services/JobCandidateProcessorService') const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService') const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService') +const InterviewProcessorService = require('./services/InterviewProcessorService') const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService') const Mutex = require('async-mutex').Mutex const events = require('events') @@ -45,6 +46,9 @@ const topicServiceMapping = { [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate, [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate, [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, + // interview + [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview, + [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview, // work period payment [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate, [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate diff --git a/src/bootstrap.js b/src/bootstrap.js index 794ab83..8f23fe3 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -1,7 +1,11 @@ const Joi = require('@hapi/joi') const config = require('config') +const _ = require('lodash') +const { Interview } = require('../src/common/constants') const constants = require('./common/constants') +const allowedInterviewStatuses = _.values(Interview.Status) + global.Promise = require('bluebird') Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly') @@ -11,6 +15,7 @@ Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist Joi.workload = () => Joi.string().valid('full-time', 'fractional') Joi.title = () => Joi.string().max(128) Joi.paymentStatus = () => Joi.string().valid('pending', 'partially-completed', 'completed', 'cancelled') +Joi.interviewStatus = () => Joi.string().valid(...allowedInterviewStatuses) Joi.workPeriodPaymentStatus = () => Joi.string().valid('completed', 'cancelled') // Empty string is not allowed by Joi by default and must be enabled with allow(''). // See https://joi.dev/api/?v=17.3.0#string fro details why it's like this. diff --git a/src/common/constants.js b/src/common/constants.js index 62d8720..d7f16ec 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -16,5 +16,15 @@ module.exports = { JobCandidateCreate: 'jobcandidate:create', JobCandidateUpdate: 'jobcandidate:update' } + }, + Interview: { + Status: { + Scheduling: 'Scheduling', + Scheduled: 'Scheduled', + RequestedForReschedule: 'Requested for reschedule', + Rescheduled: 'Rescheduled', + Completed: 'Completed', + Cancelled: 'Cancelled' + } } } diff --git a/src/scripts/createIndex.js b/src/scripts/createIndex.js index fad96d3..7142f03 100644 --- a/src/scripts/createIndex.js +++ b/src/scripts/createIndex.js @@ -46,6 +46,24 @@ async function createIndex () { status: { type: 'keyword' }, externalId: { type: 'keyword' }, resume: { type: 'text' }, + interviews: { + type: 'nested', + properties: { + id: { type: 'keyword' }, + jobCandidateId: { type: 'keyword' }, + googleCalendarId: { type: 'keyword' }, + startTimestamp: { type: 'date' }, + attendeesList: { type: 'keyword' }, + customMessage: { type: 'text' }, + xaiTemplate: { type: 'keyword' }, + round: { type: 'integer' }, + status: { type: 'keyword' }, + createdAt: { type: 'date' }, + createdBy: { type: 'keyword' }, + updatedAt: { type: 'date' }, + updatedBy: { type: 'keyword' } + } + }, createdAt: { type: 'date' }, createdBy: { type: 'keyword' }, updatedAt: { type: 'date' }, diff --git a/src/services/InterviewProcessorService.js b/src/services/InterviewProcessorService.js new file mode 100644 index 0000000..15af5ef --- /dev/null +++ b/src/services/InterviewProcessorService.js @@ -0,0 +1,140 @@ +/** + * Interview Processor Service + */ + +const _ = require('lodash') +const Joi = require('@hapi/joi') +const logger = require('../common/logger') +const helper = require('../common/helper') +const constants = require('../common/constants') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Updates jobCandidate via a painless script + * + * @param {String} jobCandidateId job candidate id + * @param {String} script script definition + * @param {String} transactionId transaction id + */ +async function updateJobCandidateViaScript (jobCandidateId, script, transactionId) { + await esClient.updateExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: jobCandidateId, + transactionId, + body: { script }, + refresh: constants.esRefreshOption + }) +} + +/** + * Process request interview entity message. + * Creates an interview record under jobCandidate. + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processRequestInterview (message, transactionId) { + const interview = message.payload + // add interview in collection if there's already an existing collection + // or initiate a new one with this interview + const script = { + source: ` + ctx._source.containsKey("interviews") + ? ctx._source.interviews.add(params.interview) + : ctx._source.interviews = [params.interview] + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script, transactionId) +} + +processRequestInterview.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), + jobCandidateId: Joi.string().uuid().required(), + googleCalendarId: Joi.string().allow(null), + customMessage: Joi.string().allow(null), + xaiTemplate: Joi.string().required(), + round: Joi.number().integer().positive().required(), + status: Joi.interviewStatus().required(), + createdAt: Joi.date().required(), + createdBy: Joi.string().uuid().required(), + updatedAt: Joi.date().allow(null), + updatedBy: Joi.string().uuid().allow(null), + attendeesList: Joi.array().items(Joi.string().email()).allow(null), + startTimestamp: Joi.date().allow(null) + }).required() + }).required(), + transactionId: Joi.string().required() +} + +/** + * Process update interview entity message. + * Update an interview record under jobCandidate. + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processUpdateInterview (message, transactionId) { + const data = message.payload + const { body: jobCandidate } = await esClient.getExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: data.jobCandidateId + }) + const interviews = jobCandidate.interviews || [] + const index = _.findIndex(interviews, ['id', data.id]) + if (index === -1) { + interviews.push(data) + } else { + interviews.splice(index, 1, data) + } + jobCandidate.interviews = interviews + await esClient.updateExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: data.jobCandidateId, + transactionId, + body: { + doc: jobCandidate + }, + refresh: constants.esRefreshOption + }) +} + +processUpdateInterview.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), + jobCandidateId: Joi.string().uuid().required(), + googleCalendarId: Joi.string().allow(null), + customMessage: Joi.string().allow(null), + xaiTemplate: Joi.string().required(), + round: Joi.number().integer().positive().required(), + status: Joi.interviewStatus().required(), + createdAt: Joi.date().required(), + createdBy: Joi.string().uuid().required(), + updatedAt: Joi.date().required(), + updatedBy: Joi.string().uuid().required(), + attendeesList: Joi.array().items(Joi.string().email()).allow(null), + startTimestamp: Joi.date().allow(null) + }).required() + }).required(), + transactionId: Joi.string().required() +} + +module.exports = { + processRequestInterview, + processUpdateInterview +} + +logger.buildService(module.exports, 'InterviewProcessorService')