Skip to content

Feature/shapeup4 cqrs update #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const logger = require('./common/logger')
const helper = require('./common/helper')
const JobProcessorService = require('./services/JobProcessorService')
const JobCandidateProcessorService = require('./services/JobCandidateProcessorService')
const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
const InterviewProcessorService = require('./services/InterviewProcessorService')
const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
const RoleProcessorService = require('./services/RoleProcessorService')
// const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
// const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
// const InterviewProcessorService = require('./services/InterviewProcessorService')
// const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
// const RoleProcessorService = require('./services/RoleProcessorService')
const ActionProcessorService = require('./services/ActionProcessorService')
const Mutex = require('async-mutex').Mutex
const events = require('events')
Expand All @@ -34,30 +34,30 @@ const topicServiceMapping = {
// job
[config.topics.TAAS_JOB_CREATE_TOPIC]: JobProcessorService.processCreate,
[config.topics.TAAS_JOB_UPDATE_TOPIC]: JobProcessorService.processUpdate,
[config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete,
// [config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete,
// job candidate
[config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate,
// [config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate,
[config.topics.TAAS_JOB_CANDIDATE_UPDATE_TOPIC]: JobCandidateProcessorService.processUpdate,
[config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete,
// [config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete,
// resource booking
[config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate,
[config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate,
[config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete,
// [config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate,
// [config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate,
// [config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete,
// work period
[config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
[config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
[config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
// [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
// [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
// [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
// work period payment
[config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
[config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
// [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
// [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
// interview
[config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
[config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
[config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews,
// [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
// [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
// [config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews,
// role
[config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate,
[config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate,
[config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete,
// [config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate,
// [config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate,
// [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete,
// action
[config.topics.TAAS_ACTION_RETRY_TOPIC]: ActionProcessorService.processRetry
}
Expand Down Expand Up @@ -117,12 +117,10 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
}
const transactionId = _.uniqueId('transaction_')
try {
if (!topicServiceMapping[topic]) {
throw new Error(`Unknown topic: ${topic}`) // normally it never reaches this line
if (topicServiceMapping[topic]) {
await topicServiceMapping[topic](messageJSON, transactionId)
localLogger.debug(`Successfully processed message with count ${messageCount}`)
}
await topicServiceMapping[topic](messageJSON, transactionId)

localLogger.debug(`Successfully processed message with count ${messageCount}`)
} catch (err) {
logger.logFullError(err, { component: 'app' })
} finally {
Expand Down
32 changes: 16 additions & 16 deletions src/services/JobCandidateProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ const localLogger = {
* @returns {undefined}
*/
async function updateCandidateStatus ({ type, payload, previousData }) {
if (previousData.status === payload.status) {
localLogger.debug({ context: 'updateCandidateStatus', message: `jobCandidate is already in status: ${payload.status}` })
return
}
// if (previousData.status === payload.status) {
// localLogger.debug({ context: 'updateCandidateStatus', message: `jobCandidate is already in status: ${payload.status}` })
// return
// }
// if (!['rejected', 'shortlist',].includes(payload.status)) {
if (!['client rejected - screening', 'client rejected - interview', 'interview', 'selected', 'withdrawn', 'withdrawn-prescreen'].includes(payload.status)) {
localLogger.debug({ context: 'updateCandidateStatus', message: `not interested status: ${payload.status}` })
Expand Down Expand Up @@ -127,15 +127,15 @@ async function processUpdate (message, transactionId) {
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: data.id
})
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: data.id,
transactionId,
body: {
doc: data
},
refresh: constants.esRefreshOption
})
// await esClient.updateExtra({
// index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
// id: data.id,
// transactionId,
// body: {
// doc: data
// },
// refresh: constants.esRefreshOption
// })
await postMessageToZapier({
type: constants.Zapier.MessageType.JobCandidateUpdate,
payload: data,
Expand Down Expand Up @@ -175,9 +175,9 @@ processDelete.schema = {
}

module.exports = {
processCreate,
processUpdate,
processDelete
// processCreate,
processUpdate
// processDelete
}

logger.buildService(module.exports, 'JobCandidateProcessorService')
32 changes: 16 additions & 16 deletions src/services/JobProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ async function postMessageToZapier ({ type, payload }) {
*/
async function processCreate (message, transactionId) {
const job = message.payload
await esClient.createExtra({
index: config.get('esConfig.ES_INDEX_JOB'),
id: job.id,
transactionId,
body: job,
refresh: constants.esRefreshOption
})
// await esClient.createExtra({
// index: config.get('esConfig.ES_INDEX_JOB'),
// id: job.id,
// transactionId,
// body: job,
// refresh: constants.esRefreshOption
// })
await postMessageToZapier({
type: constants.Zapier.MessageType.JobCreate,
payload: job
Expand Down Expand Up @@ -110,15 +110,15 @@ processCreate.schema = {
*/
async function processUpdate (message, transactionId) {
const data = message.payload
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_JOB'),
id: data.id,
transactionId,
body: {
doc: data
},
refresh: constants.esRefreshOption
})
// await esClient.updateExtra({
// index: config.get('esConfig.ES_INDEX_JOB'),
// id: data.id,
// transactionId,
// body: {
// doc: data
// },
// refresh: constants.esRefreshOption
// })
await postMessageToZapier({
type: constants.Zapier.MessageType.JobUpdate,
payload: data
Expand Down