From 130d37aac484fa1d0f8a67c5e0d6d12d7a795574 Mon Sep 17 00:00:00 2001 From: xxcxy Date: Wed, 21 Apr 2021 13:59:43 +0800 Subject: [PATCH 1/2] part1 --- README.md | 1 + config/default.js | 4 +- local/docker-compose.yml | 2 +- src/app.js | 5 +- src/bootstrap.js | 5 ++ src/common/constants.js | 5 ++ src/scripts/createIndex.js | 16 +++++ src/services/InterviewProcessorService.js | 79 +++++++++++++++++++++++ 8 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 src/services/InterviewProcessorService.js diff --git a/README.md b/README.md index d8409ab..07e97c6 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ The following parameters can be set in config files or in env variables: - `topics.TAAS_WORK_PERIOD_CREATE_TOPIC`: the create work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_UPDATE_TOPIC`: the update work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic +- `topics.TAAS_INTERVIEW_REQUEST_TOPIC`: the request interview entity Kafka message topic - `esConfig.HOST`: Elasticsearch host - `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service - `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this diff --git a/config/default.js b/config/default.js index 446ab4e..478c719 100644 --- a/config/default.js +++ b/config/default.js @@ -31,7 +31,9 @@ module.exports = { // topics for work period service TAAS_WORK_PERIOD_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_CREATE_TOPIC || 'taas.workperiod.create', TAAS_WORK_PERIOD_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_UPDATE_TOPIC || 'taas.workperiod.update', - TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete' + TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete', + // topics for interview service + TAAS_INTERVIEW_REQUEST_TOPIC: process.env.TAAS_INTERVIEW_REQUEST_TOPIC || 'taas.interview.request' }, esConfig: { diff --git a/local/docker-compose.yml b/local/docker-compose.yml index 5d2d803..5f355f3 100644 --- a/local/docker-compose.yml +++ b/local/docker-compose.yml @@ -12,7 +12,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1" + KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.interview.request:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 esearch: image: elasticsearch:7.7.1 diff --git a/src/app.js b/src/app.js index 1d3ae47..b4ee7bb 100644 --- a/src/app.js +++ b/src/app.js @@ -13,6 +13,7 @@ const JobProcessorService = require('./services/JobProcessorService') const JobCandidateProcessorService = require('./services/JobCandidateProcessorService') const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService') const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService') +const InterviewProcessorService = require('./services/InterviewProcessorService') const Mutex = require('async-mutex').Mutex const events = require('events') @@ -43,7 +44,9 @@ const topicServiceMapping = { // work period [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate, [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate, - [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete + [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, + // interview + [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview } // Start kafka consumer diff --git a/src/bootstrap.js b/src/bootstrap.js index d25de1a..bb0761a 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -1,7 +1,11 @@ const Joi = require('@hapi/joi') const config = require('config') +const _ = require('lodash') +const { Interview } = require('../src/common/constants') const constants = require('./common/constants') +const allowedInterviewStatuses = _.values(Interview.Status) + global.Promise = require('bluebird') Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly') @@ -10,6 +14,7 @@ Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist Joi.workload = () => Joi.string().valid('full-time', 'fractional') Joi.title = () => Joi.string().max(128) Joi.paymentStatus = () => Joi.string().valid('pending', 'partially-completed', 'completed', 'cancelled') +Joi.interviewStatus = () => Joi.string().valid(...allowedInterviewStatuses) // Empty string is not allowed by Joi by default and must be enabled with allow(''). // See https://joi.dev/api/?v=17.3.0#string fro details why it's like this. // In many cases we would like to allow empty string to make it easier to create UI for editing data. diff --git a/src/common/constants.js b/src/common/constants.js index 62d8720..0d47cb4 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -16,5 +16,10 @@ module.exports = { JobCandidateCreate: 'jobcandidate:create', JobCandidateUpdate: 'jobcandidate:update' } + }, + Interview: { + Status: { + Requested: 'Requested' + } } } diff --git a/src/scripts/createIndex.js b/src/scripts/createIndex.js index 095633e..6ba13e0 100644 --- a/src/scripts/createIndex.js +++ b/src/scripts/createIndex.js @@ -46,6 +46,22 @@ async function createIndex () { status: { type: 'keyword' }, externalId: { type: 'keyword' }, resume: { type: 'text' }, + interviews: { + type: 'nested', + properties: { + id: { type: 'keyword' }, + jobCandidateId: { type: 'keyword' }, + googleCalendarId: { type: 'keyword' }, + customMessage: { type: 'text' }, + xaiTemplate: { type: 'keyword' }, + round: { type: 'integer' }, + status: { type: 'keyword' }, + createdAt: { type: 'date' }, + createdBy: { type: 'keyword' }, + updatedAt: { type: 'date' }, + updatedBy: { type: 'keyword' } + } + }, createdAt: { type: 'date' }, createdBy: { type: 'keyword' }, updatedAt: { type: 'date' }, diff --git a/src/services/InterviewProcessorService.js b/src/services/InterviewProcessorService.js new file mode 100644 index 0000000..d02fe07 --- /dev/null +++ b/src/services/InterviewProcessorService.js @@ -0,0 +1,79 @@ +/** + * Interview Processor Service + */ + +const Joi = require('@hapi/joi') +const logger = require('../common/logger') +const helper = require('../common/helper') +const constants = require('../common/constants') +const config = require('config') + +const esClient = helper.getESClient() + +/** + * Updates jobCandidate via a painless script + * + * @param {String} jobCandidateId job candidate id + * @param {String} script script definition + * @param {String} transactionId transaction id + */ +async function updateJobCandidateViaScript (jobCandidateId, script, transactionId) { + await esClient.updateExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: jobCandidateId, + transactionId, + body: { script }, + refresh: constants.esRefreshOption + }) +} + +/** + * Process request interview entity message. + * Creates an interview record under jobCandidate. + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processRequestInterview (message, transactionId) { + const interview = message.payload + // add interview in collection if there's already an existing collection + // or initiate a new one with this interview + const script = { + source: ` + ctx._source.containsKey("interviews") + ? ctx._source.interviews.add(params.interview) + : ctx._source.interviews = [params.interview] + `, + params: { interview } + } + await updateJobCandidateViaScript(interview.jobCandidateId, script, transactionId) +} + +processRequestInterview.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), + jobCandidateId: Joi.string().uuid().required(), + googleCalendarId: Joi.string().allow(null), + customMessage: Joi.string().allow(null), + xaiTemplate: Joi.string().required(), + round: Joi.number().integer().positive().required(), + status: Joi.interviewStatus().required(), + createdAt: Joi.date().required(), + createdBy: Joi.string().uuid().required(), + updatedAt: Joi.date().allow(null), + updatedBy: Joi.string().uuid().allow(null) + }).required() + }).required(), + transactionId: Joi.string().required() +} + +module.exports = { + processRequestInterview +} + +logger.buildService(module.exports, 'InterviewProcessorService') From 47ae415d081b8e08c320d8ab078e67dc7aa0adab Mon Sep 17 00:00:00 2001 From: xxcxy Date: Wed, 21 Apr 2021 14:00:37 +0800 Subject: [PATCH 2/2] part2 --- README.md | 1 + config/default.js | 3 +- local/docker-compose.yml | 2 +- src/app.js | 3 +- src/common/constants.js | 7 ++- src/scripts/createIndex.js | 2 + src/services/InterviewProcessorService.js | 65 ++++++++++++++++++++++- 7 files changed, 77 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 07e97c6..519f60c 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ The following parameters can be set in config files or in env variables: - `topics.TAAS_WORK_PERIOD_UPDATE_TOPIC`: the update work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic - `topics.TAAS_INTERVIEW_REQUEST_TOPIC`: the request interview entity Kafka message topic +- `topics.TAAS_INTERVIEW_UPDATE_TOPIC`: the update interview entity Kafka message topic - `esConfig.HOST`: Elasticsearch host - `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service - `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this diff --git a/config/default.js b/config/default.js index 478c719..9e53a7e 100644 --- a/config/default.js +++ b/config/default.js @@ -33,7 +33,8 @@ module.exports = { TAAS_WORK_PERIOD_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_UPDATE_TOPIC || 'taas.workperiod.update', TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete', // topics for interview service - TAAS_INTERVIEW_REQUEST_TOPIC: process.env.TAAS_INTERVIEW_REQUEST_TOPIC || 'taas.interview.request' + TAAS_INTERVIEW_REQUEST_TOPIC: process.env.TAAS_INTERVIEW_REQUEST_TOPIC || 'taas.interview.requested', + TAAS_INTERVIEW_UPDATE_TOPIC: process.env.TAAS_INTERVIEW_UPDATE_TOPIC || 'taas.interview.update' }, esConfig: { diff --git a/local/docker-compose.yml b/local/docker-compose.yml index 5f355f3..d5c65ac 100644 --- a/local/docker-compose.yml +++ b/local/docker-compose.yml @@ -12,7 +12,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.interview.request:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1" + KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.interview.requested:1:1,taas.interview.update:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 esearch: image: elasticsearch:7.7.1 diff --git a/src/app.js b/src/app.js index b4ee7bb..471acb5 100644 --- a/src/app.js +++ b/src/app.js @@ -46,7 +46,8 @@ const topicServiceMapping = { [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate, [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, // interview - [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview + [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview, + [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview } // Start kafka consumer diff --git a/src/common/constants.js b/src/common/constants.js index 0d47cb4..d7f16ec 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -19,7 +19,12 @@ module.exports = { }, Interview: { Status: { - Requested: 'Requested' + Scheduling: 'Scheduling', + Scheduled: 'Scheduled', + RequestedForReschedule: 'Requested for reschedule', + Rescheduled: 'Rescheduled', + Completed: 'Completed', + Cancelled: 'Cancelled' } } } diff --git a/src/scripts/createIndex.js b/src/scripts/createIndex.js index 6ba13e0..8303f3f 100644 --- a/src/scripts/createIndex.js +++ b/src/scripts/createIndex.js @@ -52,6 +52,8 @@ async function createIndex () { id: { type: 'keyword' }, jobCandidateId: { type: 'keyword' }, googleCalendarId: { type: 'keyword' }, + startTimestamp: { type: 'date' }, + attendeesList: { type: 'keyword' }, customMessage: { type: 'text' }, xaiTemplate: { type: 'keyword' }, round: { type: 'integer' }, diff --git a/src/services/InterviewProcessorService.js b/src/services/InterviewProcessorService.js index d02fe07..15af5ef 100644 --- a/src/services/InterviewProcessorService.js +++ b/src/services/InterviewProcessorService.js @@ -2,6 +2,7 @@ * Interview Processor Service */ +const _ = require('lodash') const Joi = require('@hapi/joi') const logger = require('../common/logger') const helper = require('../common/helper') @@ -66,14 +67,74 @@ processRequestInterview.schema = { createdAt: Joi.date().required(), createdBy: Joi.string().uuid().required(), updatedAt: Joi.date().allow(null), - updatedBy: Joi.string().uuid().allow(null) + updatedBy: Joi.string().uuid().allow(null), + attendeesList: Joi.array().items(Joi.string().email()).allow(null), + startTimestamp: Joi.date().allow(null) + }).required() + }).required(), + transactionId: Joi.string().required() +} + +/** + * Process update interview entity message. + * Update an interview record under jobCandidate. + * + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processUpdateInterview (message, transactionId) { + const data = message.payload + const { body: jobCandidate } = await esClient.getExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: data.jobCandidateId + }) + const interviews = jobCandidate.interviews || [] + const index = _.findIndex(interviews, ['id', data.id]) + if (index === -1) { + interviews.push(data) + } else { + interviews.splice(index, 1, data) + } + jobCandidate.interviews = interviews + await esClient.updateExtra({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: data.jobCandidateId, + transactionId, + body: { + doc: jobCandidate + }, + refresh: constants.esRefreshOption + }) +} + +processUpdateInterview.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), + jobCandidateId: Joi.string().uuid().required(), + googleCalendarId: Joi.string().allow(null), + customMessage: Joi.string().allow(null), + xaiTemplate: Joi.string().required(), + round: Joi.number().integer().positive().required(), + status: Joi.interviewStatus().required(), + createdAt: Joi.date().required(), + createdBy: Joi.string().uuid().required(), + updatedAt: Joi.date().required(), + updatedBy: Joi.string().uuid().required(), + attendeesList: Joi.array().items(Joi.string().email()).allow(null), + startTimestamp: Joi.date().allow(null) }).required() }).required(), transactionId: Joi.string().required() } module.exports = { - processRequestInterview + processRequestInterview, + processUpdateInterview } logger.buildService(module.exports, 'InterviewProcessorService')