Skip to content

Dry Release v0.2 #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ The following parameters can be set in config files or in env variables:

- `zapier.ZAPIER_COMPANYID_SLUG`: your company id in zapier; numeric value
- `zapier.ZAPIER_CONTACTID_SLUG`: your contact id in zapier; numeric value
- `zapier.ZAPIER_SWITCH`: decides whether posting message to zapier or not; possible values are `ON` and `OFF`, default is `OFF`
- `zapier.ZAPIER_WEBHOOK`: the remote zapier zap webhook url for posting message
- `zapier.ZAPIER_SWITCH`: decides whether posting job related message to zapier or not; possible values are `ON` and `OFF`, default is `OFF`
- `zapier.ZAPIER_WEBHOOK`: the remote zapier zap webhook url for posting job related message
- `zapier.ZAPIER_JOB_CANDIDATE_SWITCH`: decides whether posting job candidate related message to zapier or not; possible values are `ON` and `OFF`, default is `OFF`
- `zapier.ZAPIER_JOB_CANDIDATE_WEBHOOK`: the remote zapier zap webhook url for posting job candidate related message

## Local Kafka and ElasticSearch setup

Expand Down
2 changes: 2 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ module.exports = {
ZAPIER_CONTACTID_SLUG: process.env.ZAPIER_CONTACTID_SLUG,
ZAPIER_SWITCH: process.env.ZAPIER_SWITCH || 'OFF',
ZAPIER_WEBHOOK: process.env.ZAPIER_WEBHOOK,
ZAPIER_JOB_CANDIDATE_SWITCH: process.env.ZAPIER_JOB_CANDIDATE_SWITCH || 'OFF',
ZAPIER_JOB_CANDIDATE_WEBHOOK: process.env.ZAPIER_JOB_CANDIDATE_WEBHOOK,
TOPCODER_API_URL: process.env.TOPCODER_API_URL || 'http://api.topcoder-dev.com/v5'
}
}
10 changes: 5 additions & 5 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ const eventEmitter = new events.EventEmitter()
process.env.PORT = config.PORT

const localLogger = {
'info': (message) => logger.info({ component: 'app', message }),
'debug': (message) => logger.debug({ component: 'app', message }),
'error': (message) => logger.error({ component: 'app', message })
info: (message) => logger.info({ component: 'app', message }),
debug: (message) => logger.debug({ component: 'app', message }),
error: (message) => logger.error({ component: 'app', message })
}

const topicServiceMapping = {
Expand All @@ -47,7 +47,7 @@ localLogger.info('Starting kafka consumer')
const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())

let count = 0
let mutex = new Mutex()
const mutex = new Mutex()

async function getLatestCount () {
const release = await mutex.acquire()
Expand All @@ -71,7 +71,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
localLogger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
m.offset}; Message: ${message}.`)
let messageJSON
let messageCount = await getLatestCount()
const messageCount = await getLatestCount()

localLogger.debug(`Current message count: ${messageCount}`)
try {
Expand Down
4 changes: 3 additions & 1 deletion src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ global.Promise = require('bluebird')

Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly')
Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled')
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected')
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled')
Joi.workload = () => Joi.string().valid('full-time', 'fractional')
Joi.title = () => Joi.string().max(64)

const zapierSwitch = Joi.string().label('ZAPIER_SWITCH').valid(...Object.values(constants.Zapier.Switch))

// validate configuration
try {
Joi.attempt(config.zapier.ZAPIER_SWITCH, zapierSwitch)
Joi.attempt(config.zapier.ZAPIER_JOB_CANDIDATE_SWITCH, zapierSwitch)
} catch (err) {
console.error(err.message)
process.exit(1)
Expand Down
4 changes: 3 additions & 1 deletion src/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ module.exports = {
},
MessageType: {
JobCreate: 'job:create',
JobUpdate: 'job:update'
JobUpdate: 'job:update',
JobCandidateCreate: 'jobcandidate:create',
JobCandidateUpdate: 'jobcandidate:update'
}
}
}
29 changes: 7 additions & 22 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const elasticsearch = require('@elastic/elasticsearch')
const _ = require('lodash')
const { Mutex } = require('async-mutex')
const m2mAuth = require('tc-core-library-js').auth.m2m
const constants = require('./constants')

AWS.config.region = config.esConfig.AWS_REGION

Expand Down Expand Up @@ -154,33 +153,19 @@ async function getM2MToken () {
/**
* Post message to zapier via webhook url.
*
* @param {Object} message the message object
* @param {String} webhook the webhook url
* @param {Object} message the message data
* @returns {undefined}
*/
async function postMessageToZapier ({ type, payload }) {
if (config.zapier.ZAPIER_SWITCH === constants.Zapier.Switch.OFF) {
logger.debug({ component: 'helper', context: 'postMessageToZapier', message: 'Zapier Switch off via config, no messages sent' })
return
}
const requestBody = {
type,
payload,
companySlug: config.zapier.ZAPIER_COMPANYID_SLUG,
contactSlug: config.zapier.ZAPIER_CONTACTID_SLUG
}
if (type === constants.Zapier.MessageType.JobCreate) {
const token = await getM2MToken()
requestBody.authToken = token
requestBody.topcoderApiUrl = config.zapier.TOPCODER_API_URL
}
logger.debug({ component: 'helper', context: 'postMessageToZapier', message: `request body: ${JSON.stringify(requestBody)}` })
await request.post(config.zapier.ZAPIER_WEBHOOK)
.send(requestBody)
async function postMessageViaWebhook (webhook, message) {
logger.debug({ component: 'helper', context: 'postMessageToZapier', message: `message: ${JSON.stringify(message)}` })
await request.post(webhook).send(message)
}

module.exports = {
getKafkaOptions,
getESClient,
checkEsMutexRelease,
postMessageToZapier
getM2MToken,
postMessageViaWebhook
}
3 changes: 3 additions & 0 deletions src/scripts/createIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async function createIndex () {
projectId: { type: 'integer' },
externalId: { type: 'keyword' },
description: { type: 'text' },
title: { type: 'text' },
startDate: { type: 'date' },
endDate: { type: 'date' },
numPositions: { type: 'integer' },
Expand All @@ -42,6 +43,8 @@ async function createIndex () {
jobId: { type: 'keyword' },
userId: { type: 'keyword' },
status: { type: 'keyword' },
externalId: { type: 'keyword' },
resume: { type: 'text' },
createdAt: { type: 'date' },
createdBy: { type: 'keyword' },
updatedAt: { type: 'date' },
Expand Down
68 changes: 67 additions & 1 deletion src/services/JobCandidateProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,64 @@ const config = require('config')

const esClient = helper.getESClient()

const localLogger = {
debug: ({ context, message }) => logger.debug({ component: 'JobCandidateProcessorService', context, message })
}

/**
* Update job candidate status in recruit CRM.
*
* @param {Object} message the message object
* @returns {undefined}
*/
async function updateCandidateStatus ({ type, payload }) {
if (!payload.status) {
localLogger.debug({ context: 'updateCandidateStatus', message: 'status not updated' })
return
}
if (!['rejected', 'shortlist'].includes(payload.status)) {
localLogger.debug({ context: 'updateCandidateStatus', message: `not interested status: ${payload.status}` })
return
}
const { body: jobCandidate } = await esClient.getSource({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: payload.id
})
if (!jobCandidate.externalId) {
localLogger.debug({ context: 'updateCandidateStatus', message: `id: ${jobCandidate.id} candidate without externalId - ignored` })
return
}
const { body: job } = await esClient.getSource({
index: config.get('esConfig.ES_INDEX_JOB'),
id: jobCandidate.jobId
})
const message = {
type,
status: jobCandidate.status,
jobCandidateSlug: jobCandidate.externalId,
jobSlug: job.externalId
}
await helper.postMessageViaWebhook(config.zapier.ZAPIER_JOB_CANDIDATE_WEBHOOK, message)
}

/**
* Post message to zapier for JobCandidate.
*
* @param {Object} message the message object
* @returns {undefined}
*/
async function postMessageToZapier ({ type, payload }) {
if (config.zapier.ZAPIER_JOB_CANDIDATE_SWITCH === constants.Zapier.Switch.OFF) {
localLogger.debug({ context: 'postMessageToZapier', message: 'Zapier Switch off via config, no messages sent' })
return
}
if (type === constants.Zapier.MessageType.JobCandidateUpdate) {
await updateCandidateStatus({ type, payload })
return
}
throw new Error(`unrecognized message type: ${type}`)
}

/**
* Process create entity message
* @param {Object} message the kafka message
Expand Down Expand Up @@ -39,7 +97,9 @@ processCreate.schema = {
userId: Joi.string().uuid().required(),
createdAt: Joi.date().required(),
createdBy: Joi.string().uuid().required(),
status: Joi.jobCandidateStatus().required()
status: Joi.jobCandidateStatus().required(),
externalId: Joi.string(),
resume: Joi.string().uri()
}).required()
}).required(),
transactionId: Joi.string().required()
Expand All @@ -61,6 +121,10 @@ async function processUpdate (message, transactionId) {
},
refresh: constants.esRefreshOption
})
await postMessageToZapier({
type: constants.Zapier.MessageType.JobCandidateUpdate,
payload: data
})
}

processUpdate.schema = {
Expand All @@ -74,6 +138,8 @@ processUpdate.schema = {
jobId: Joi.string().uuid(),
userId: Joi.string().uuid(),
status: Joi.jobCandidateStatus(),
externalId: Joi.string(),
resume: Joi.string().uri(),
updatedAt: Joi.date(),
updatedBy: Joi.string().uuid()
}).required()
Expand Down
45 changes: 38 additions & 7 deletions src/services/JobProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,35 @@ const config = require('config')

const esClient = helper.getESClient()

const localLogger = {
debug: ({ context, message }) => logger.debug({ component: 'JobProcessorService', context, message })
}

/**
* Post message to zapier for Job.
*
* @param {Object} message the message object
* @returns {undefined}
*/
async function postMessageToZapier ({ type, payload }) {
if (config.zapier.ZAPIER_SWITCH === constants.Zapier.Switch.OFF) {
localLogger.debug({ context: 'postMessageToZapier', message: 'Zapier Switch off via config, no messages sent' })
return
}
const message = {
type,
payload,
companySlug: config.zapier.ZAPIER_COMPANYID_SLUG,
contactSlug: config.zapier.ZAPIER_CONTACTID_SLUG
}
if (type === constants.Zapier.MessageType.JobCreate) {
const token = await helper.getM2MToken()
message.authToken = token
message.topcoderApiUrl = config.zapier.TOPCODER_API_URL
}
await helper.postMessageViaWebhook(config.zapier.ZAPIER_WEBHOOK, message)
}

/**
* Process create entity message
* @param {Object} message the kafka message
Expand All @@ -25,7 +54,7 @@ async function processCreate (message, transactionId) {
body: _.omit(job, 'id'),
refresh: constants.esRefreshOption
})
await helper.postMessageToZapier({
await postMessageToZapier({
type: constants.Zapier.MessageType.JobCreate,
payload: job
})
Expand All @@ -40,12 +69,13 @@ processCreate.schema = {
payload: Joi.object().keys({
id: Joi.string().uuid().required(),
projectId: Joi.number().integer().required(),
externalId: Joi.string().required(),
description: Joi.string().required(),
startDate: Joi.date().required(),
endDate: Joi.date().required(),
externalId: Joi.string(),
description: Joi.string(),
title: Joi.title().required(),
startDate: Joi.date(),
endDate: Joi.date(),
numPositions: Joi.number().integer().min(1).required(),
resourceType: Joi.string().required(),
resourceType: Joi.string(),
rateType: Joi.rateType(),
workload: Joi.workload(),
skills: Joi.array().items(Joi.string().uuid()).required(),
Expand Down Expand Up @@ -73,7 +103,7 @@ async function processUpdate (message, transactionId) {
},
refresh: constants.esRefreshOption
})
await helper.postMessageToZapier({
await postMessageToZapier({
type: constants.Zapier.MessageType.JobUpdate,
payload: data
})
Expand All @@ -90,6 +120,7 @@ processUpdate.schema = {
projectId: Joi.number().integer(),
externalId: Joi.string(),
description: Joi.string(),
title: Joi.title(),
startDate: Joi.date(),
endDate: Joi.date(),
numPositions: Joi.number().integer().min(1),
Expand Down
8 changes: 4 additions & 4 deletions src/services/ResourceBookingProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ processCreate.schema = {
projectId: Joi.number().integer().required(),
userId: Joi.string().uuid().required(),
jobId: Joi.string().uuid(),
startDate: Joi.date().required(),
endDate: Joi.date().required(),
memberRate: Joi.number().required(),
customerRate: Joi.number().required(),
startDate: Joi.date(),
endDate: Joi.date(),
memberRate: Joi.number(),
customerRate: Joi.number(),
rateType: Joi.rateType().required(),
createdAt: Joi.date().required(),
createdBy: Joi.string().uuid().required(),
Expand Down