Skip to content

Commit c208671

Browse files
committed
implement event handling
1 parent 421d2a2 commit c208671

File tree

9 files changed

+303
-38
lines changed

9 files changed

+303
-38
lines changed

app.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const cors = require('cors')
1010
const HttpStatus = require('http-status-codes')
1111
const interceptor = require('express-interceptor')
1212
const logger = require('./src/common/logger')
13+
const eventHandlers = require('./src/eventHandlers')
1314

1415
// setup express app
1516
const app = express()
@@ -91,6 +92,7 @@ app.use((err, req, res, next) => {
9192

9293
const server = app.listen(app.get('port'), () => {
9394
logger.info({ component: 'app', message: `Express server listening on port ${app.get('port')}` })
95+
eventHandlers.init()
9496
})
9597

9698
if (process.env.NODE_ENV === 'test') {

docs/swagger.yaml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -575,8 +575,8 @@ paths:
575575
required: false
576576
schema:
577577
type: string
578-
enum: ['open', 'selected', 'shortlist', 'rejected']
579-
description: The user id.
578+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
579+
description: The job candidate status.
580580
responses:
581581
'200':
582582
description: OK
@@ -1686,8 +1686,8 @@ components:
16861686
description: "The user id."
16871687
status:
16881688
type: string
1689-
enum: ['open', 'selected', 'shortlist', 'rejected']
1690-
description: "The job status."
1689+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
1690+
description: "The job candidate status."
16911691
createdAt:
16921692
type: string
16931693
format: date-time
@@ -1722,7 +1722,7 @@ components:
17221722
properties:
17231723
status:
17241724
type: string
1725-
enum: ['open', 'selected', 'shortlist', 'rejected']
1725+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
17261726
JobPatchRequestBody:
17271727
properties:
17281728
status:
@@ -2130,8 +2130,8 @@ components:
21302130
description: 'The link for the resume that can be downloaded'
21312131
status:
21322132
type: string
2133-
enum: ['open', 'selected', 'shortlist', 'rejected']
2134-
description: "The job status."
2133+
enum: ['open', 'selected', 'shortlist', 'rejected', 'cancelled']
2134+
description: "The job candidate status."
21352135
skills:
21362136
type: array
21372137
items:

src/bootstrap.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Joi.perPage = () => Joi.number().integer().min(1).default(20)
88
Joi.rateType = () => Joi.string().valid('hourly', 'daily', 'weekly', 'monthly')
99
Joi.jobStatus = () => Joi.string().valid('sourcing', 'in-review', 'assigned', 'closed', 'cancelled')
1010
Joi.workload = () => Joi.string().valid('full-time', 'fractional')
11-
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected')
11+
Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist', 'rejected', 'cancelled')
1212

1313
function buildServices (dir) {
1414
const files = fs.readdirSync(dir)

src/common/eventDispatcher.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Implement an event dispatcher that handles events synchronously.
3+
*/
4+
5+
const handlers = []
6+
7+
/**
8+
* Handle event.
9+
*
10+
* @param {String} topic the topic name
11+
* @param {Object} payload the message payload
12+
* @returns {undefined}
13+
*/
14+
async function handleEvent (topic, payload) {
15+
for (const handler of handlers) {
16+
await handler.handleEvent(topic, payload)
17+
}
18+
}
19+
20+
/**
21+
* Register to the dispatcher.
22+
*
23+
* @param {Object} handler the handler containing the `handleEvent` function
24+
* @returns {undefined}
25+
*/
26+
function register (handler) {
27+
handlers.push(handler)
28+
}
29+
30+
module.exports = {
31+
handleEvent,
32+
register
33+
}

src/common/helper.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const elasticsearch = require('@elastic/elasticsearch')
1212
const errors = require('../common/errors')
1313
const logger = require('./logger')
1414
const models = require('../models')
15+
const eventDispatcher = require('./eventDispatcher')
1516
const busApi = require('@topcoder-platform/topcoder-bus-api-wrapper')
1617

1718
const localLogger = {
@@ -313,6 +314,7 @@ async function postEvent (topic, payload) {
313314
payload
314315
}
315316
await client.postEvent(message)
317+
await eventDispatcher.handleEvent(topic, payload)
316318
}
317319

318320
/**
@@ -589,6 +591,15 @@ async function ensureUserById (userId) {
589591
}
590592
}
591593

594+
/**
595+
* Generate M2M auth user.
596+
*
597+
* @returns {Object} the M2M auth user
598+
*/
599+
function getAuditM2Muser () {
600+
return { isMachine: true, userId: config.m2m.M2M_AUDIT_USER_ID, handle: config.m2m.M2M_AUDIT_HANDLE }
601+
}
602+
592603
module.exports = {
593604
checkIfExists,
594605
autoWrapExpress,
@@ -615,5 +626,6 @@ module.exports = {
615626
getSkillById,
616627
getUserSkill,
617628
ensureJobById,
618-
ensureUserById
629+
ensureUserById,
630+
getAuditM2Muser
619631
}

src/eventHandlers/JobEventHandler.js

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Handle events for Job.
3+
*/
4+
5+
const { Op } = require('sequelize')
6+
const models = require('../models')
7+
const logger = require('../common/logger')
8+
const helper = require('../common/helper')
9+
const JobCandidateService = require('../services/JobCandidateService')
10+
const ResourceBookingService = require('../services/ResourceBookingService')
11+
12+
/**
13+
* Cancel all related resource bookings and all related candidates when a job is cancelled.
14+
*
15+
* @param {Object} payload the event payload
16+
* @returns {undefined}
17+
*/
18+
async function cancelJob (payload) {
19+
if (payload.status !== 'cancelled') {
20+
logger.info({
21+
component: 'JobEventHandler',
22+
context: 'cancelJob',
23+
message: `not interested job - status: ${payload.status}`
24+
})
25+
return
26+
}
27+
// pull data from db instead of directly extract data from the payload
28+
// since the payload may not contain all fields when it is from partically update operation.
29+
const job = await models.Job.findById(payload.id)
30+
const candidates = await models.JobCandidate.findAll({
31+
where: {
32+
jobId: job.id,
33+
status: {
34+
[Op.not]: 'cancelled'
35+
},
36+
deletedAt: null
37+
}
38+
})
39+
const resourceBookings = await models.ResourceBooking.findAll({
40+
where: {
41+
projectId: job.projectId,
42+
status: {
43+
[Op.not]: 'cancelled'
44+
},
45+
deletedAt: null
46+
}
47+
})
48+
await Promise.all([
49+
...candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
50+
helper.getAuditM2Muser(),
51+
candidate.id,
52+
{ status: 'cancelled' }
53+
).then(result => {
54+
logger.info({
55+
component: 'JobEventHandler',
56+
context: 'cancelJob',
57+
message: `id: ${result.id} candidate got cancelled.`
58+
})
59+
})),
60+
...resourceBookings.map(resource => ResourceBookingService.partiallyUpdateResourceBooking(
61+
helper.getAuditM2Muser(),
62+
resource.id,
63+
{ status: 'cancelled' }
64+
).then(result => {
65+
logger.info({
66+
component: 'JobEventHandler',
67+
context: 'cancelJob',
68+
message: `id: ${result.id} resource booking got cancelled.`
69+
})
70+
}))
71+
])
72+
}
73+
74+
/**
75+
* Process job update event.
76+
*
77+
* @param {Object} payload the event payload
78+
* @returns {undefined}
79+
*/
80+
async function processUpdate (payload) {
81+
await cancelJob(payload)
82+
}
83+
84+
module.exports = {
85+
processUpdate
86+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Handle events for ResourceBooking.
3+
*/
4+
5+
const { Op } = require('sequelize')
6+
const models = require('../models')
7+
const logger = require('../common/logger')
8+
const helper = require('../common/helper')
9+
const JobService = require('../services/JobService')
10+
const JobCandidateService = require('../services/JobCandidateService')
11+
12+
/**
13+
* When ResourceBooking's status is changed to `assigned`
14+
* the corresponding JobCandidate record (with the same userId and jobId)
15+
* should be updated with status `selected`
16+
*
17+
* @param {String} jobId the job id
18+
* @param {String} userId the user id
19+
* @returns {undefined}
20+
*/
21+
async function selectJobCandidate (jobId, userId) {
22+
const candidates = await models.JobCandidate.findAll({
23+
where: {
24+
jobId,
25+
userId,
26+
status: {
27+
[Op.not]: 'selected'
28+
},
29+
deletedAt: null
30+
}
31+
})
32+
await Promise.all(candidates.map(candidate => JobCandidateService.partiallyUpdateJobCandidate(
33+
helper.getAuditM2Muser(),
34+
candidate.id,
35+
{ status: 'selected' }
36+
).then(result => {
37+
logger.info({
38+
component: 'ResourceBookingEventHandler',
39+
context: 'selectJobCandidate',
40+
message: `id: ${result.id} candidate got selected.`
41+
})
42+
})))
43+
}
44+
45+
/**
46+
* Update the status of the Job to assigned when it positions requirement is fullfilled.
47+
*
48+
* @param {Object} job the job data
49+
* @returns {undefined}
50+
*/
51+
async function assignJob (job) {
52+
if (job.status === 'assigned') {
53+
logger.info({
54+
component: 'ResourceBookingEventHandler',
55+
context: 'assignJob',
56+
message: `job with projectId ${job.projectId} is already assigned`
57+
})
58+
return
59+
}
60+
const resourceBookings = await models.ResourceBooking.findAll({
61+
where: {
62+
status: 'assigned',
63+
deletedAt: null
64+
}
65+
})
66+
logger.debug({
67+
component: 'ResourceBookingEventHandler',
68+
context: 'assignJob',
69+
message: `the number of assigned resource bookings is ${resourceBookings.length} - the numPositions of the job is ${job.numPositions}`
70+
})
71+
if (job.numPositions === resourceBookings.length) {
72+
await JobService.partiallyUpdateJob(helper.getAuditM2Muser(), job.id, { status: 'assigned' })
73+
logger.info({ component: 'ResourceBookingEventHandler', context: 'assignJob', message: `job with projectId ${job.projectId} is assigned` })
74+
}
75+
}
76+
77+
/**
78+
* Process resource booking update event.
79+
*
80+
* @param {Object} payload the event payload
81+
* @returns {undefined}
82+
*/
83+
async function processUpdate (payload) {
84+
if (payload.status !== 'assigned') {
85+
logger.info({
86+
component: 'ResourceBookingEventHandler',
87+
context: 'selectJobCandidate',
88+
message: `not interested resource booking - status: ${payload.status}`
89+
})
90+
return
91+
}
92+
const resourceBooking = await models.ResourceBooking.findById(payload.id)
93+
const job = await models.Job.findOne({
94+
where: {
95+
projectId: resourceBooking.projectId,
96+
deletedAt: null
97+
}
98+
})
99+
await selectJobCandidate(job.id, resourceBooking.userId)
100+
await assignJob(job)
101+
}
102+
103+
module.exports = {
104+
processUpdate
105+
}

src/eventHandlers/index.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* The entry of event handlers.
3+
*/
4+
5+
const config = require('config')
6+
const eventDispatcher = require('../common/eventDispatcher')
7+
const JobEventHandler = require('./JobEventHandler')
8+
const ResourceBookingEventHandler = require('./ResourceBookingEventHandler')
9+
const logger = require('../common/logger')
10+
11+
const TopicOperationMapping = {
12+
[config.TAAS_JOB_UPDATE_TOPIC]: JobEventHandler.processUpdate,
13+
[config.TAAS_RESOURCE_BOOKING_UPDATE_TOPIC]: ResourceBookingEventHandler.processUpdate
14+
}
15+
16+
/**
17+
* Handle event.
18+
*
19+
* @param {String} topic the topic name
20+
* @param {Object} payload the message payload
21+
* @returns {undefined}
22+
*/
23+
async function handleEvent (topic, payload) {
24+
if (!TopicOperationMapping[topic]) {
25+
logger.info({ component: 'eventHanders', context: 'handleEvent', message: `not interested event - topic: ${topic}` })
26+
return
27+
}
28+
logger.debug({ component: 'eventHanders', context: 'handleEvent', message: `handling event - topic: ${topic} - payload: ${JSON.stringify(payload)}` })
29+
try {
30+
await TopicOperationMapping[topic](payload)
31+
} catch (err) {
32+
logger.error({ component: 'eventHanders', context: 'handleEvent', message: 'failed to handle event' })
33+
// throw error so that it can be handled by the app
34+
throw err
35+
}
36+
logger.info({ component: 'eventHanders', context: 'handleEvent', message: 'event successfully handled' })
37+
}
38+
39+
/**
40+
* Attach the handlers to the event dispatcher.
41+
*
42+
* @returns {undefined}
43+
*/
44+
function init () {
45+
eventDispatcher.register({
46+
handleEvent
47+
})
48+
}
49+
50+
module.exports = {
51+
init
52+
}

0 commit comments

Comments
 (0)