Skip to content

DEV- Shapeup4 cqrs update #521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ workflows:
branches:
only:
- dev
- change-validatations-in-job-jc
- feature/enriching-skills-data-with-api-2
- feature/shapeup4-cqrs-update

# Production builds are exectuted only on tagged commits to the
# master branch.
Expand Down
3 changes: 3 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ module.exports = {
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
// The originator value for the kafka messages
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api',

// topics for error
TAAS_ERROR_TOPIC: process.env.TAAS_ERROR_TOPIC || 'taas.action.error',
// topics for job service
// the create job entity Kafka message topic
TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create',
Expand Down
Binary file modified docs/taas-ER-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,26 @@ async function postEvent (topic, payload, options = {}) {
await eventDispatcher.handleEvent(topic, { value: payload, options })
}

/**
* Send error event to Kafka
* @params {String} topic the topic name
* @params {Object} payload the payload
* @params {String} action for which operation error occurred
*/
async function postErrorEvent (topic, payload, action) {
_.set(payload, 'apiAction', action)
const client = getBusApiClient()
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
await client.postEvent(message)
}

/**
* Test if an error is document missing exception
*
Expand Down Expand Up @@ -2094,6 +2114,7 @@ module.exports = {
getM2MToken,
getM2MUbahnToken,
postEvent,
postErrorEvent,
getBusApiClient,
isDocumentMissingException,
getProjects,
Expand Down
127 changes: 127 additions & 0 deletions src/esProcessors/InterviewProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Interview Processor
*/

const _ = require('lodash')
const helper = require('../common/helper')
const config = require('config')

const esClient = helper.getESClient()

/**
* Updates jobCandidate via a painless script
*
* @param {String} jobCandidateId job candidate id
* @param {String} script script definition
*/
async function updateJobCandidateViaScript (jobCandidateId, script) {
await esClient.update({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: jobCandidateId,
body: { script },
refresh: 'wait_for'
})
}

/**
* Process request interview entity.
* Creates an interview record under jobCandidate.
*
* @param {Object} interview interview object
*/
async function processRequestInterview (interview) {
// add interview in collection if there's already an existing collection
// or initiate a new one with this interview
const script = {
source: `
ctx._source.containsKey("interviews")
? ctx._source.interviews.add(params.interview)
: ctx._source.interviews = [params.interview]
`,
params: { interview }
}
await updateJobCandidateViaScript(interview.jobCandidateId, script)
}

/**
* Process update interview entity
* Updates the interview record under jobCandidate.
*
* @param {Object} interview interview object
*/
async function processUpdateInterview (interview) {
// if there's an interview with this id,
// update it
const script = {
source: `
if (ctx._source.containsKey("interviews")) {
def target = ctx._source.interviews.find(i -> i.id == params.interview.id);
if (target != null) {
for (prop in params.interview.entrySet()) {
target[prop.getKey()] = prop.getValue()
}
}
}
`,
params: { interview }
}
await updateJobCandidateViaScript(interview.jobCandidateId, script)
}

/**
* Process bulk (partially) update interviews entity.
* Currently supports status, updatedAt and updatedBy fields.
* Update Joi schema to allow more fields.
* (implementation should already handle new fields - just updating Joi schema should be enough)
*
* payload format:
* {
* "jobCandidateId": {
* "interviewId": { ...fields },
* "interviewId2": { ...fields },
* ...
* },
* "jobCandidateId2": { // like above... },
* ...
* }
*
* @param {Object} jobCandidates job candidates
*/
async function processBulkUpdateInterviews (jobCandidates) {
// script to update & params
const script = {
source: `
def completedInterviews = params.jobCandidates[ctx._id];
for (interview in completedInterviews.entrySet()) {
def interviewId = interview.getKey();
def affectedFields = interview.getValue();
def target = ctx._source.interviews.find(i -> i.id == interviewId);
if (target != null) {
for (field in affectedFields.entrySet()) {
target[field.getKey()] = field.getValue();
}
}
}
`,
params: { jobCandidates }
}
// update interviews
await esClient.updateByQuery({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
body: {
script,
query: {
ids: {
values: _.keys(jobCandidates)
}
}
},
refresh: true
})
}

module.exports = {
processRequestInterview,
processUpdateInterview,
processBulkUpdateInterviews
}
54 changes: 54 additions & 0 deletions src/esProcessors/JobCandidateProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Jobcandidate Processor
*/

const config = require('config')
const helper = require('../common/helper')

const esClient = helper.getESClient()

/**
* Process create entity
* @param {Object} entity entity object
*/
async function processCreate (entity) {
await esClient.create({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: entity.id,
body: entity,
refresh: 'wait_for'
})
}

/**
* Process update entity
* @param {Object} entity entity object
*/
async function processUpdate (entity) {
await esClient.update({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: entity.id,
body: {
doc: entity
},
refresh: 'wait_for'
})
}

/**
* Process delete entity
* @param {Object} entity entity object
*/
async function processDelete (entity) {
await esClient.delete({
index: config.get('esConfig.ES_INDEX_JOB_CANDIDATE'),
id: entity.id,
refresh: 'wait_for'
})
}

module.exports = {
processCreate,
processUpdate,
processDelete
}
54 changes: 54 additions & 0 deletions src/esProcessors/JobProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Job Processor
*/

const helper = require('../common/helper')
const config = require('config')

const esClient = helper.getESClient()

/**
* Process create entity
* @param {Object} entity entity object
*/
async function processCreate (entity) {
await esClient.create({
index: config.get('esConfig.ES_INDEX_JOB'),
id: entity.id,
body: entity,
refresh: 'wait_for'
})
}

/**
* Process update entity
* @param {Object} entity entity object
*/
async function processUpdate (entity) {
await esClient.update({
index: config.get('esConfig.ES_INDEX_JOB'),
id: entity.id,
body: {
doc: entity
},
refresh: 'wait_for'
})
}

/**
* Process delete entity
* @param {Object} entity entity object
*/
async function processDelete (entity) {
await esClient.delete({
index: config.get('esConfig.ES_INDEX_JOB'),
id: entity.id,
refresh: 'wait_for'
})
}

module.exports = {
processCreate,
processUpdate,
processDelete
}
54 changes: 54 additions & 0 deletions src/esProcessors/ResourceBookingProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* ResourceBooking Processor
*/

const helper = require('../common/helper')
const config = require('config')

const esClient = helper.getESClient()

/**
* Process create entity message
* @param {Object} entity entity object
*/
async function processCreate (entity) {
await esClient.create({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.id,
body: entity,
refresh: 'wait_for'
})
}

/**
* Process update entity message
* @param {Object} entity entity object
*/
async function processUpdate (entity) {
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.id,
body: {
doc: entity
},
refresh: 'wait_for'
})
}

/**
* Process delete entity message
* @param {Object} entity entity object
*/
async function processDelete (entity) {
await esClient.delete({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: entity.id,
refresh: 'wait_for'
})
}

module.exports = {
processCreate,
processUpdate,
processDelete
}
Loading