diff --git a/README.md b/README.md index 045684f..b33bd0c 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,9 @@ The following parameters can be set in config files or in env variables: - `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC`: the create work period payment entity Kafka message topic - `topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC`: the update work period payment entity Kafka message topic +- `topics.TAAS_INTERVIEW_REQUEST_TOPIC`: the request interview entity Kafka message topic +- `topics.TAAS_INTERVIEW_UPDATE_TOPIC`: the update interview entity Kafka message topic +- `topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC`: the bulk update interview entity Kafka message topic - `esConfig.HOST`: Elasticsearch host - `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service - `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this diff --git a/VERIFICATION.md b/VERIFICATION.md index 5410bcf..ef9046d 100644 --- a/VERIFICATION.md +++ b/VERIFICATION.md @@ -2,13 +2,15 @@ ## Create documents in ES -- Run the following commands to create `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES. +- Run the following commands to create `Job`, `JobCandidate`, `Interview`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES. ``` bash # for Job docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.job.create < test/messages/taas.job.create.event.json # for JobCandidate docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.jobcandidate.create < test/messages/taas.jobcandidate.create.event.json + # for Interview + docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.interview.requested < test/messages/taas.interview.requested.event.json # for ResourceBooking docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.create < test/messages/taas.resourcebooking.create.event.json # for WorkPeriod @@ -20,13 +22,15 @@ - Run `npm run view-data ` to see if documents were created. ## Update documents in ES -- Run the following commands to update `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES. +- Run the following commands to update `Job`, `JobCandidate`, `Interview`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES. ``` bash # for Job docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.job.update < test/messages/taas.job.update.event.json # for JobCandidate docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.jobcandidate.update < test/messages/taas.jobcandidate.update.event.json + # for Interview + docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.interview.update < test/messages/taas.interview.update.event.json # for ResourceBooking docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.update < test/messages/taas.resourcebooking.update.event.json # for WorkPeriod diff --git a/config/default.js b/config/default.js index 7a8dbf8..5476e65 100644 --- a/config/default.js +++ b/config/default.js @@ -34,7 +34,11 @@ module.exports = { TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete', // topics for work period payment service TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC || 'taas.workperiodpayment.create', - TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update' + TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update', + // topics for interview service + TAAS_INTERVIEW_REQUEST_TOPIC: process.env.TAAS_INTERVIEW_REQUEST_TOPIC || 'taas.interview.requested', + TAAS_INTERVIEW_UPDATE_TOPIC: process.env.TAAS_INTERVIEW_UPDATE_TOPIC || 'taas.interview.update', + TAAS_INTERVIEW_BULK_UPDATE_TOPIC: process.env.TAAS_INTERVIEW_BULK_UPDATE_TOPIC || 'taas.interview.bulkUpdate' }, esConfig: { diff --git a/local/docker-compose.yml b/local/docker-compose.yml index 35e9486..936f378 100644 --- a/local/docker-compose.yml +++ b/local/docker-compose.yml @@ -12,7 +12,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1" + KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.interview.requested:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.interview.update:1:1,taas.interview.bulkUpdate:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 esearch: image: elasticsearch:7.7.1 diff --git a/src/app.js b/src/app.js index c38c03d..5ebb63c 100644 --- a/src/app.js +++ b/src/app.js @@ -13,6 +13,7 @@ const JobProcessorService = require('./services/JobProcessorService') const JobCandidateProcessorService = require('./services/JobCandidateProcessorService') const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService') const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService') +const InterviewProcessorService = require('./services/InterviewProcessorService') const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService') const Mutex = require('async-mutex').Mutex const events = require('events') @@ -47,7 +48,11 @@ const topicServiceMapping = { [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, // work period payment [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate, - [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate + [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate, + // interview + [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview, + [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview, + [config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews } // Start kafka consumer diff --git a/src/bootstrap.js b/src/bootstrap.js index 794ab83..0be6ce3 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -1,7 +1,12 @@ const Joi = require('@hapi/joi') const config = require('config') +const _ = require('lodash') +const { Interview } = require('../src/common/constants') const constants = require('./common/constants') +const allowedXAITemplates = _.values(Interview.XaiTemplate) +const allowedInterviewStatuses = _.values(Interview.Status) + global.Promise = require('bluebird') Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly') @@ -11,6 +16,8 @@ Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist Joi.workload = () => Joi.string().valid('full-time', 'fractional') Joi.title = () => Joi.string().max(128) Joi.paymentStatus = () => Joi.string().valid('pending', 'partially-completed', 'completed', 'cancelled') +Joi.xaiTemplate = () => Joi.string().valid(...allowedXAITemplates) +Joi.interviewStatus = () => Joi.string().valid(...allowedInterviewStatuses) Joi.workPeriodPaymentStatus = () => Joi.string().valid('completed', 'cancelled') // Empty string is not allowed by Joi by default and must be enabled with allow(''). // See https://joi.dev/api/?v=17.3.0#string fro details why it's like this. diff --git a/src/common/constants.js b/src/common/constants.js index 62d8720..e3b917d 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -16,5 +16,19 @@ module.exports = { JobCandidateCreate: 'jobcandidate:create', JobCandidateUpdate: 'jobcandidate:update' } + }, + Interview: { + Status: { + Scheduling: 'Scheduling', + Scheduled: 'Scheduled', + RequestedForReschedule: 'Requested for reschedule', + Rescheduled: 'Rescheduled', + Completed: 'Completed', + Cancelled: 'Cancelled' + }, + XaiTemplate: { + '30MinInterview': '30-min-interview', + '60MinInterview': '60-min-interview' + } } } diff --git a/src/scripts/createIndex.js b/src/scripts/createIndex.js index fad96d3..3c0cb98 100644 --- a/src/scripts/createIndex.js +++ b/src/scripts/createIndex.js @@ -46,6 +46,24 @@ async function createIndex () { status: { type: 'keyword' }, externalId: { type: 'keyword' }, resume: { type: 'text' }, + interviews: { + type: 'nested', + properties: { + id: { type: 'keyword' }, + jobCandidateId: { type: 'keyword' }, + googleCalendarId: { type: 'keyword' }, + customMessage: { type: 'text' }, + xaiTemplate: { type: 'keyword' }, + round: { type: 'integer' }, + startTimestamp: { type: 'date' }, + attendeesList: [], + status: { type: 'keyword' }, + createdAt: { type: 'date' }, + createdBy: { type: 'keyword' }, + updatedAt: { type: 'date' }, + updatedBy: { type: 'keyword' } + } + }, createdAt: { type: 'date' }, createdBy: { type: 'keyword' }, updatedAt: { type: 'date' }, diff --git a/src/services/InterviewProcessorService.js b/src/services/InterviewProcessorService.js new file mode 100644 index 0000000..2c3d595 --- /dev/null +++ b/src/services/InterviewProcessorService.js @@ -0,0 +1,189 @@ +/** + * Interview Processor Service + */ + +const Joi = require('@hapi/joi') +const _ = require('lodash') +const logger = require('../common/logger') +const helper = require('../common/helper') +const constants = require('../common/constants') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Updates jobCandidate via a painless script + * + * @param {String} jobCandidateId job candidate id + * @param {String} script script definition + * @param {String} transactionId transaction id + */ +async function updateJobCandidateViaScript (jobCandidateId, script, transactionId) { + await esClient.updateExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: jobCandidateId, + transactionId, + body: { script }, + refresh: constants.esRefreshOption + }) +} + +/** + * Process request interview entity message. + * Creates an interview record under jobCandidate. + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processRequestInterview (message, transactionId) { + const interview = message.payload + // add interview in collection if there's already an existing collection + // or initiate a new one with this interview + const script = { + source: ` + ctx._source.containsKey("interviews") + ? ctx._source.interviews.add(params.interview) + : ctx._source.interviews = [params.interview] + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script, transactionId) +} + +processRequestInterview.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), + jobCandidateId: Joi.string().uuid().required(), + googleCalendarId: Joi.string().allow(null), + customMessage: Joi.string().allow(null), + xaiTemplate: Joi.xaiTemplate().required(), + round: Joi.number().integer().positive().required(), + startTimestamp: Joi.date().allow(null), + attendeesList: Joi.array().items(Joi.string().email()).allow(null), + status: Joi.interviewStatus().required(), + createdAt: Joi.date().required(), + createdBy: Joi.string().uuid().required(), + updatedAt: Joi.date().allow(null), + updatedBy: Joi.string().uuid().allow(null) + }).required() + }).required(), + transactionId: Joi.string().required() +} + +/** + * Process update interview entity message + * Updates the interview record under jobCandidate. + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processUpdateInterview (message, transactionId) { + const interview = message.payload + // if there's an interview with this id, + // update it with the payload + const script = { + source: ` + if (ctx._source.containsKey("interviews")) { + def target = ctx._source.interviews.find(i -> i.id == params.interview.id); + if (target != null) { + for (prop in params.interview.entrySet()) { + target[prop.getKey()] = prop.getValue() + } + } + } + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script, transactionId) +} + +processUpdateInterview.schema = processRequestInterview.schema + +/** + * Process bulk (partially) update interviews entity message. + * Currently supports status, updatedAt and updatedBy fields. + * Update Joi schema to allow more fields. + * (implementation should already handle new fields - just updating Joi schema should be enough) + * + * payload format: + * { + * "jobCandidateId": { + * "interviewId": { ...fields }, + * "interviewId2": { ...fields }, + * ... + * }, + * "jobCandidateId2": { // like above... }, + * ... + * } + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processBulkUpdateInterviews (message, transactionId) { + const jobCandidates = message.payload + // script to update & params + const script = { + source: ` + def completedInterviews = params.jobCandidates[ctx._id]; + for (interview in completedInterviews.entrySet()) { + def interviewId = interview.getKey(); + def affectedFields = interview.getValue(); + def target = ctx._source.interviews.find(i -> i.id == interviewId); + if (target != null) { + for (field in affectedFields.entrySet()) { + target[field.getKey()] = field.getValue(); + } + } + } + `, + params: { jobCandidates } + } + // update interviews + await esClient.updateByQuery({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + transactionId, + body: { + script, + query: { + ids: { + values: _.keys(jobCandidates) + } + } + }, + refresh: true + }) +} + +processBulkUpdateInterviews.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().pattern( + Joi.string().uuid(), // key - jobCandidateId + Joi.object().pattern( + Joi.string().uuid(), // inner key - interviewId + Joi.object().keys({ + status: Joi.interviewStatus(), + updatedAt: Joi.date(), + updatedBy: Joi.string().uuid() + }) // inner value - affected fields of interview + ) // value - object containing interviews + ).min(1) // at least one key - i.e. don't allow empty object + }).required(), + transactionId: Joi.string().required() +} + +module.exports = { + processRequestInterview, + processUpdateInterview, + processBulkUpdateInterviews +} + +logger.buildService(module.exports, 'InterviewProcessorService')