diff --git a/src/app.js b/src/app.js index bc6bb2a..534b155 100644 --- a/src/app.js +++ b/src/app.js @@ -11,11 +11,11 @@ const logger = require('./common/logger') const helper = require('./common/helper') const JobProcessorService = require('./services/JobProcessorService') const JobCandidateProcessorService = require('./services/JobCandidateProcessorService') -const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService') -const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService') -const InterviewProcessorService = require('./services/InterviewProcessorService') -const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService') -const RoleProcessorService = require('./services/RoleProcessorService') +// const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService') +// const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService') +// const InterviewProcessorService = require('./services/InterviewProcessorService') +// const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService') +// const RoleProcessorService = require('./services/RoleProcessorService') const ActionProcessorService = require('./services/ActionProcessorService') const Mutex = require('async-mutex').Mutex const events = require('events') @@ -34,30 +34,30 @@ const topicServiceMapping = { // job [config.topics.TAAS_JOB_CREATE_TOPIC]: JobProcessorService.processCreate, [config.topics.TAAS_JOB_UPDATE_TOPIC]: JobProcessorService.processUpdate, - [config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete, + // [config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete, // job candidate - [config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate, + // [config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate, [config.topics.TAAS_JOB_CANDIDATE_UPDATE_TOPIC]: JobCandidateProcessorService.processUpdate, - [config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete, + // [config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete, // resource booking - [config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate, - [config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate, - [config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete, + // [config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate, + // [config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate, + // [config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete, // work period - [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate, - [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate, - [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, + // [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate, + // [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate, + // [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, // work period payment - [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate, - [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate, + // [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate, + // [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate, // interview - [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview, - [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview, - [config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews, + // [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview, + // [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview, + // [config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews, // role - [config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate, - [config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate, - [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete, + // [config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate, + // [config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate, + // [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete, // action [config.topics.TAAS_ACTION_RETRY_TOPIC]: ActionProcessorService.processRetry } @@ -117,12 +117,10 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a } const transactionId = _.uniqueId('transaction_') try { - if (!topicServiceMapping[topic]) { - throw new Error(`Unknown topic: ${topic}`) // normally it never reaches this line + if (topicServiceMapping[topic]) { + await topicServiceMapping[topic](messageJSON, transactionId) + localLogger.debug(`Successfully processed message with count ${messageCount}`) } - await topicServiceMapping[topic](messageJSON, transactionId) - - localLogger.debug(`Successfully processed message with count ${messageCount}`) } catch (err) { logger.logFullError(err, { component: 'app' }) } finally { diff --git a/src/services/JobCandidateProcessorService.js b/src/services/JobCandidateProcessorService.js index d245c68..58137ed 100644 --- a/src/services/JobCandidateProcessorService.js +++ b/src/services/JobCandidateProcessorService.js @@ -126,15 +126,15 @@ async function processUpdate (message, transactionId) { index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), id: data.id }) - await esClient.updateExtra({ - index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), - id: data.id, - transactionId, - body: { - doc: data - }, - refresh: constants.esRefreshOption - }) + // await esClient.updateExtra({ + // index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'), + // id: data.id, + // transactionId, + // body: { + // doc: data + // }, + // refresh: constants.esRefreshOption + // }) await postMessageToZapier({ type: constants.Zapier.MessageType.JobCandidateUpdate, payload: data, @@ -174,9 +174,9 @@ processDelete.schema = { } module.exports = { - processCreate, - processUpdate, - processDelete + // processCreate, + processUpdate + // processDelete } logger.buildService(module.exports, 'JobCandidateProcessorService') diff --git a/src/services/JobProcessorService.js b/src/services/JobProcessorService.js index 9e9cfd0..a1e6388 100644 --- a/src/services/JobProcessorService.js +++ b/src/services/JobProcessorService.js @@ -46,13 +46,13 @@ async function postMessageToZapier ({ type, payload }) { */ async function processCreate (message, transactionId) { const job = message.payload - await esClient.createExtra({ - index: config.get('esConfig.ES_INDEX_JOB'), - id: job.id, - transactionId, - body: job, - refresh: constants.esRefreshOption - }) + // await esClient.createExtra({ + // index: config.get('esConfig.ES_INDEX_JOB'), + // id: job.id, + // transactionId, + // body: job, + // refresh: constants.esRefreshOption + // }) await postMessageToZapier({ type: constants.Zapier.MessageType.JobCreate, payload: job @@ -110,15 +110,15 @@ processCreate.schema = { */ async function processUpdate (message, transactionId) { const data = message.payload - await esClient.updateExtra({ - index: config.get('esConfig.ES_INDEX_JOB'), - id: data.id, - transactionId, - body: { - doc: data - }, - refresh: constants.esRefreshOption - }) + // await esClient.updateExtra({ + // index: config.get('esConfig.ES_INDEX_JOB'), + // id: data.id, + // transactionId, + // body: { + // doc: data + // }, + // refresh: constants.esRefreshOption + // }) await postMessageToZapier({ type: constants.Zapier.MessageType.JobUpdate, payload: data