Skip to content

Commit 3e66df6

Browse files
author
sachin-maheshwari
authored
Merge pull request #96 from yoution/feature/shapeup4-cqrs-update
Feature/shapeup4 cqrs update
2 parents 7e3a208 + ab7be1f commit 3e66df6

File tree

3 files changed

+53
-55
lines changed

3 files changed

+53
-55
lines changed

src/app.js

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ const logger = require('./common/logger')
1111
const helper = require('./common/helper')
1212
const JobProcessorService = require('./services/JobProcessorService')
1313
const JobCandidateProcessorService = require('./services/JobCandidateProcessorService')
14-
const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
15-
const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
16-
const InterviewProcessorService = require('./services/InterviewProcessorService')
17-
const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
18-
const RoleProcessorService = require('./services/RoleProcessorService')
14+
// const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
15+
// const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
16+
// const InterviewProcessorService = require('./services/InterviewProcessorService')
17+
// const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
18+
// const RoleProcessorService = require('./services/RoleProcessorService')
1919
const ActionProcessorService = require('./services/ActionProcessorService')
2020
const Mutex = require('async-mutex').Mutex
2121
const events = require('events')
@@ -34,30 +34,30 @@ const topicServiceMapping = {
3434
// job
3535
[config.topics.TAAS_JOB_CREATE_TOPIC]: JobProcessorService.processCreate,
3636
[config.topics.TAAS_JOB_UPDATE_TOPIC]: JobProcessorService.processUpdate,
37-
[config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete,
37+
// [config.topics.TAAS_JOB_DELETE_TOPIC]: JobProcessorService.processDelete,
3838
// job candidate
39-
[config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate,
39+
// [config.topics.TAAS_JOB_CANDIDATE_CREATE_TOPIC]: JobCandidateProcessorService.processCreate,
4040
[config.topics.TAAS_JOB_CANDIDATE_UPDATE_TOPIC]: JobCandidateProcessorService.processUpdate,
41-
[config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete,
41+
// [config.topics.TAAS_JOB_CANDIDATE_DELETE_TOPIC]: JobCandidateProcessorService.processDelete,
4242
// resource booking
43-
[config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate,
44-
[config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate,
45-
[config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete,
43+
// [config.topics.TAAS_RESOURCE_BOOKING_CREATE_TOPIC]: ResourceBookingProcessorService.processCreate,
44+
// [config.topics.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingProcessorService.processUpdate,
45+
// [config.topics.TAAS_RESOURCE_BOOKING_DELETE_TOPIC]: ResourceBookingProcessorService.processDelete,
4646
// work period
47-
[config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
48-
[config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
49-
[config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
47+
// [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
48+
// [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
49+
// [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
5050
// work period payment
51-
[config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
52-
[config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
51+
// [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
52+
// [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate,
5353
// interview
54-
[config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
55-
[config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
56-
[config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews,
54+
// [config.topics.TAAS_INTERVIEW_REQUEST_TOPIC]: InterviewProcessorService.processRequestInterview,
55+
// [config.topics.TAAS_INTERVIEW_UPDATE_TOPIC]: InterviewProcessorService.processUpdateInterview,
56+
// [config.topics.TAAS_INTERVIEW_BULK_UPDATE_TOPIC]: InterviewProcessorService.processBulkUpdateInterviews,
5757
// role
58-
[config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate,
59-
[config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate,
60-
[config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete,
58+
// [config.topics.TAAS_ROLE_CREATE_TOPIC]: RoleProcessorService.processCreate,
59+
// [config.topics.TAAS_ROLE_UPDATE_TOPIC]: RoleProcessorService.processUpdate,
60+
// [config.topics.TAAS_ROLE_DELETE_TOPIC]: RoleProcessorService.processDelete,
6161
// action
6262
[config.topics.TAAS_ACTION_RETRY_TOPIC]: ActionProcessorService.processRetry
6363
}
@@ -117,12 +117,10 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
117117
}
118118
const transactionId = _.uniqueId('transaction_')
119119
try {
120-
if (!topicServiceMapping[topic]) {
121-
throw new Error(`Unknown topic: ${topic}`) // normally it never reaches this line
120+
if (topicServiceMapping[topic]) {
121+
await topicServiceMapping[topic](messageJSON, transactionId)
122+
localLogger.debug(`Successfully processed message with count ${messageCount}`)
122123
}
123-
await topicServiceMapping[topic](messageJSON, transactionId)
124-
125-
localLogger.debug(`Successfully processed message with count ${messageCount}`)
126124
} catch (err) {
127125
logger.logFullError(err, { component: 'app' })
128126
} finally {

src/services/JobCandidateProcessorService.js

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ async function processUpdate (message, transactionId) {
127127
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
128128
id: data.id
129129
})
130-
await esClient.updateExtra({
131-
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
132-
id: data.id,
133-
transactionId,
134-
body: {
135-
doc: data
136-
},
137-
refresh: constants.esRefreshOption
138-
})
130+
// await esClient.updateExtra({
131+
// index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
132+
// id: data.id,
133+
// transactionId,
134+
// body: {
135+
// doc: data
136+
// },
137+
// refresh: constants.esRefreshOption
138+
// })
139139
await postMessageToZapier({
140140
type: constants.Zapier.MessageType.JobCandidateUpdate,
141141
payload: data,
@@ -175,9 +175,9 @@ processDelete.schema = {
175175
}
176176

177177
module.exports = {
178-
processCreate,
179-
processUpdate,
180-
processDelete
178+
// processCreate,
179+
processUpdate
180+
// processDelete
181181
}
182182

183183
logger.buildService(module.exports, 'JobCandidateProcessorService')

src/services/JobProcessorService.js

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ async function postMessageToZapier ({ type, payload }) {
4646
*/
4747
async function processCreate (message, transactionId) {
4848
const job = message.payload
49-
await esClient.createExtra({
50-
index: config.get('esConfig.ES_INDEX_JOB'),
51-
id: job.id,
52-
transactionId,
53-
body: job,
54-
refresh: constants.esRefreshOption
55-
})
49+
// await esClient.createExtra({
50+
// index: config.get('esConfig.ES_INDEX_JOB'),
51+
// id: job.id,
52+
// transactionId,
53+
// body: job,
54+
// refresh: constants.esRefreshOption
55+
// })
5656
await postMessageToZapier({
5757
type: constants.Zapier.MessageType.JobCreate,
5858
payload: job
@@ -110,15 +110,15 @@ processCreate.schema = {
110110
*/
111111
async function processUpdate (message, transactionId) {
112112
const data = message.payload
113-
await esClient.updateExtra({
114-
index: config.get('esConfig.ES_INDEX_JOB'),
115-
id: data.id,
116-
transactionId,
117-
body: {
118-
doc: data
119-
},
120-
refresh: constants.esRefreshOption
121-
})
113+
// await esClient.updateExtra({
114+
// index: config.get('esConfig.ES_INDEX_JOB'),
115+
// id: data.id,
116+
// transactionId,
117+
// body: {
118+
// doc: data
119+
// },
120+
// refresh: constants.esRefreshOption
121+
// })
122122
await postMessageToZapier({
123123
type: constants.Zapier.MessageType.JobUpdate,
124124
payload: data

0 commit comments

Comments
 (0)