diff --git a/README.md b/README.md index e22cea7..0501dec 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,10 @@ The following parameters can be set in config files or in env variables: - `zapier.ZAPIER_COMPANYID_SLUG`: your company id in zapier; numeric value - `zapier.ZAPIER_CONTACTID_SLUG`: your contact id in zapier; numeric value -- `zapier.ZAPIER_SWITCH`: decides whether posting message to zapier or not; possible values are `ON` and `OFF`, default is `OFF` -- `zapier.ZAPIER_WEBHOOK`: the remote zapier zap webhook url for posting message +- `zapier.ZAPIER_SWITCH`: decides whether posting job related message to zapier or not; possible values are `ON` and `OFF`, default is `OFF` +- `zapier.ZAPIER_WEBHOOK`: the remote zapier zap webhook url for posting job related message +- `zapier.ZAPIER_JOB_CANDIDATE_SWITCH`: decides whether posting job candidate related message to zapier or not; possible values are `ON` and `OFF`, default is `OFF` +- `zapier.ZAPIER_JOB_CANDIDATE_WEBHOOK`: the remote zapier zap webhook url for posting job candidate related message ## Local Kafka and ElasticSearch setup diff --git a/config/default.js b/config/default.js index f018a2d..c09c3e5 100644 --- a/config/default.js +++ b/config/default.js @@ -58,6 +58,8 @@ module.exports = { ZAPIER_CONTACTID_SLUG: process.env.ZAPIER_CONTACTID_SLUG, ZAPIER_SWITCH: process.env.ZAPIER_SWITCH || 'OFF', ZAPIER_WEBHOOK: process.env.ZAPIER_WEBHOOK, + ZAPIER_JOB_CANDIDATE_SWITCH: process.env.ZAPIER_JOB_CANDIDATE_SWITCH || 'OFF', + ZAPIER_JOB_CANDIDATE_WEBHOOK: process.env.ZAPIER_JOB_CANDIDATE_WEBHOOK, TOPCODER_API_URL: process.env.TOPCODER_API_URL || 'http://api.topcoder-dev.com/v5' } } diff --git a/src/app.js b/src/app.js index 4d384f4..7414dc2 100644 --- a/src/app.js +++ b/src/app.js @@ -21,9 +21,9 @@ const eventEmitter = new events.EventEmitter() process.env.PORT = config.PORT const localLogger = { - 'info': (message) => logger.info({ component: 'app', message }), - 'debug': (message) => logger.debug({ component: 'app', message }), - 'error': (message) => logger.error({ component: 'app', message }) + info: (message) => logger.info({ component: 'app', message }), + debug: (message) => logger.debug({ component: 'app', message }), + error: (message) => logger.error({ component: 'app', message }) } const topicServiceMapping = { @@ -47,7 +47,7 @@ localLogger.info('Starting kafka consumer') const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions()) let count = 0 -let mutex = new Mutex() +const mutex = new Mutex() async function getLatestCount () { const release = await mutex.acquire() @@ -71,7 +71,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a localLogger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${ m.offset}; Message: ${message}.`) let messageJSON - let messageCount = await getLatestCount() + const messageCount = await getLatestCount() localLogger.debug(`Current message count: ${messageCount}`) try { diff --git a/src/bootstrap.js b/src/bootstrap.js index 4efacee..e304834 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -6,14 +6,16 @@ global.Promise = require('bluebird') Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly') Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled') -Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected') +Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled') Joi.workload = () => Joi.string().valid('full-time', 'fractional') +Joi.title = () => Joi.string().max(64) const zapierSwitch = Joi.string().label('ZAPIER_SWITCH').valid(...Object.values(constants.Zapier.Switch)) // validate configuration try { Joi.attempt(config.zapier.ZAPIER_SWITCH, zapierSwitch) + Joi.attempt(config.zapier.ZAPIER_JOB_CANDIDATE_SWITCH, zapierSwitch) } catch (err) { console.error(err.message) process.exit(1) diff --git a/src/common/constants.js b/src/common/constants.js index f48aec4..62d8720 100644 --- a/src/common/constants.js +++ b/src/common/constants.js @@ -12,7 +12,9 @@ module.exports = { }, MessageType: { JobCreate: 'job:create', - JobUpdate: 'job:update' + JobUpdate: 'job:update', + JobCandidateCreate: 'jobcandidate:create', + JobCandidateUpdate: 'jobcandidate:update' } } } diff --git a/src/common/helper.js b/src/common/helper.js index fe30d7b..d06b1a1 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -10,7 +10,6 @@ const elasticsearch = require('@elastic/elasticsearch') const _ = require('lodash') const { Mutex } = require('async-mutex') const m2mAuth = require('tc-core-library-js').auth.m2m -const constants = require('./constants') AWS.config.region = config.esConfig.AWS_REGION @@ -154,33 +153,19 @@ async function getM2MToken () { /** * Post message to zapier via webhook url. * - * @param {Object} message the message object + * @param {String} webhook the webhook url + * @param {Object} message the message data * @returns {undefined} */ -async function postMessageToZapier ({ type, payload }) { - if (config.zapier.ZAPIER_SWITCH === constants.Zapier.Switch.OFF) { - logger.debug({ component: 'helper', context: 'postMessageToZapier', message: 'Zapier Switch off via config, no messages sent' }) - return - } - const requestBody = { - type, - payload, - companySlug: config.zapier.ZAPIER_COMPANYID_SLUG, - contactSlug: config.zapier.ZAPIER_CONTACTID_SLUG - } - if (type === constants.Zapier.MessageType.JobCreate) { - const token = await getM2MToken() - requestBody.authToken = token - requestBody.topcoderApiUrl = config.zapier.TOPCODER_API_URL - } - logger.debug({ component: 'helper', context: 'postMessageToZapier', message: `request body: ${JSON.stringify(requestBody)}` }) - await request.post(config.zapier.ZAPIER_WEBHOOK) - .send(requestBody) +async function postMessageViaWebhook (webhook, message) { + logger.debug({ component: 'helper', context: 'postMessageToZapier', message: `message: ${JSON.stringify(message)}` }) + await request.post(webhook).send(message) } module.exports = { getKafkaOptions, getESClient, checkEsMutexRelease, - postMessageToZapier + getM2MToken, + postMessageViaWebhook } diff --git a/src/scripts/createIndex.js b/src/scripts/createIndex.js index ce3ede3..4edce30 100644 --- a/src/scripts/createIndex.js +++ b/src/scripts/createIndex.js @@ -18,6 +18,7 @@ async function createIndex () { projectId: { type: 'integer' }, externalId: { type: 'keyword' }, description: { type: 'text' }, + title: { type: 'text' }, startDate: { type: 'date' }, endDate: { type: 'date' }, numPositions: { type: 'integer' }, @@ -42,6 +43,8 @@ async function createIndex () { jobId: { type: 'keyword' }, userId: { type: 'keyword' }, status: { type: 'keyword' }, + externalId: { type: 'keyword' }, + resume: { type: 'text' }, createdAt: { type: 'date' }, createdBy: { type: 'keyword' }, updatedAt: { type: 'date' }, diff --git a/src/services/JobCandidateProcessorService.js b/src/services/JobCandidateProcessorService.js index 4543001..0936815 100644 --- a/src/services/JobCandidateProcessorService.js +++ b/src/services/JobCandidateProcessorService.js @@ -11,6 +11,64 @@ const config = require('config') const esClient = helper.getESClient() +const localLogger = { + debug: ({ context, message }) => logger.debug({ component: 'JobCandidateProcessorService', context, message }) +} + +/** + * Update job candidate status in recruit CRM. + * + * @param {Object} message the message object + * @returns {undefined} + */ +async function updateCandidateStatus ({ type, payload }) { + if (!payload.status) { + localLogger.debug({ context: 'updateCandidateStatus', message: 'status not updated' }) + return + } + if (!['rejected', 'shortlist'].includes(payload.status)) { + localLogger.debug({ context: 'updateCandidateStatus', message: `not interested status: ${payload.status}` }) + return + } + const { body: jobCandidate } = await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + id: payload.id + }) + if (!jobCandidate.externalId) { + localLogger.debug({ context: 'updateCandidateStatus', message: `id: ${jobCandidate.id} candidate without externalId - ignored` }) + return + } + const { body: job } = await esClient.getSource({ + index: config.get('esConfig.ES_INDEX_JOB'), + id: jobCandidate.jobId + }) + const message = { + type, + status: jobCandidate.status, + jobCandidateSlug: jobCandidate.externalId, + jobSlug: job.externalId + } + await helper.postMessageViaWebhook(config.zapier.ZAPIER_JOB_CANDIDATE_WEBHOOK, message) +} + +/** + * Post message to zapier for JobCandidate. + * + * @param {Object} message the message object + * @returns {undefined} + */ +async function postMessageToZapier ({ type, payload }) { + if (config.zapier.ZAPIER_JOB_CANDIDATE_SWITCH === constants.Zapier.Switch.OFF) { + localLogger.debug({ context: 'postMessageToZapier', message: 'Zapier Switch off via config, no messages sent' }) + return + } + if (type === constants.Zapier.MessageType.JobCandidateUpdate) { + await updateCandidateStatus({ type, payload }) + return + } + throw new Error(`unrecognized message type: ${type}`) +} + /** * Process create entity message * @param {Object} message the kafka message @@ -39,7 +97,9 @@ processCreate.schema = { userId: Joi.string().uuid().required(), createdAt: Joi.date().required(), createdBy: Joi.string().uuid().required(), - status: Joi.jobCandidateStatus().required() + status: Joi.jobCandidateStatus().required(), + externalId: Joi.string(), + resume: Joi.string().uri() }).required() }).required(), transactionId: Joi.string().required() @@ -61,6 +121,10 @@ async function processUpdate (message, transactionId) { }, refresh: constants.esRefreshOption }) + await postMessageToZapier({ + type: constants.Zapier.MessageType.JobCandidateUpdate, + payload: data + }) } processUpdate.schema = { @@ -74,6 +138,8 @@ processUpdate.schema = { jobId: Joi.string().uuid(), userId: Joi.string().uuid(), status: Joi.jobCandidateStatus(), + externalId: Joi.string(), + resume: Joi.string().uri(), updatedAt: Joi.date(), updatedBy: Joi.string().uuid() }).required() diff --git a/src/services/JobProcessorService.js b/src/services/JobProcessorService.js index 2a0a7e3..72131ed 100644 --- a/src/services/JobProcessorService.js +++ b/src/services/JobProcessorService.js @@ -11,6 +11,35 @@ const config = require('config') const esClient = helper.getESClient() +const localLogger = { + debug: ({ context, message }) => logger.debug({ component: 'JobProcessorService', context, message }) +} + +/** + * Post message to zapier for Job. + * + * @param {Object} message the message object + * @returns {undefined} + */ +async function postMessageToZapier ({ type, payload }) { + if (config.zapier.ZAPIER_SWITCH === constants.Zapier.Switch.OFF) { + localLogger.debug({ context: 'postMessageToZapier', message: 'Zapier Switch off via config, no messages sent' }) + return + } + const message = { + type, + payload, + companySlug: config.zapier.ZAPIER_COMPANYID_SLUG, + contactSlug: config.zapier.ZAPIER_CONTACTID_SLUG + } + if (type === constants.Zapier.MessageType.JobCreate) { + const token = await helper.getM2MToken() + message.authToken = token + message.topcoderApiUrl = config.zapier.TOPCODER_API_URL + } + await helper.postMessageViaWebhook(config.zapier.ZAPIER_WEBHOOK, message) +} + /** * Process create entity message * @param {Object} message the kafka message @@ -25,7 +54,7 @@ async function processCreate (message, transactionId) { body: _.omit(job, 'id'), refresh: constants.esRefreshOption }) - await helper.postMessageToZapier({ + await postMessageToZapier({ type: constants.Zapier.MessageType.JobCreate, payload: job }) @@ -40,12 +69,13 @@ processCreate.schema = { payload: Joi.object().keys({ id: Joi.string().uuid().required(), projectId: Joi.number().integer().required(), - externalId: Joi.string().required(), - description: Joi.string().required(), - startDate: Joi.date().required(), - endDate: Joi.date().required(), + externalId: Joi.string(), + description: Joi.string(), + title: Joi.title().required(), + startDate: Joi.date(), + endDate: Joi.date(), numPositions: Joi.number().integer().min(1).required(), - resourceType: Joi.string().required(), + resourceType: Joi.string(), rateType: Joi.rateType(), workload: Joi.workload(), skills: Joi.array().items(Joi.string().uuid()).required(), @@ -73,7 +103,7 @@ async function processUpdate (message, transactionId) { }, refresh: constants.esRefreshOption }) - await helper.postMessageToZapier({ + await postMessageToZapier({ type: constants.Zapier.MessageType.JobUpdate, payload: data }) @@ -90,6 +120,7 @@ processUpdate.schema = { projectId: Joi.number().integer(), externalId: Joi.string(), description: Joi.string(), + title: Joi.title(), startDate: Joi.date(), endDate: Joi.date(), numPositions: Joi.number().integer().min(1), diff --git a/src/services/ResourceBookingProcessorService.js b/src/services/ResourceBookingProcessorService.js index 55271bf..23b1de3 100644 --- a/src/services/ResourceBookingProcessorService.js +++ b/src/services/ResourceBookingProcessorService.js @@ -38,10 +38,10 @@ processCreate.schema = { projectId: Joi.number().integer().required(), userId: Joi.string().uuid().required(), jobId: Joi.string().uuid(), - startDate: Joi.date().required(), - endDate: Joi.date().required(), - memberRate: Joi.number().required(), - customerRate: Joi.number().required(), + startDate: Joi.date(), + endDate: Joi.date(), + memberRate: Joi.number(), + customerRate: Joi.number(), rateType: Joi.rateType().required(), createdAt: Joi.date().required(), createdBy: Joi.string().uuid().required(),