Skip to content

fix parallelism problem #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 29 additions & 106 deletions src/services/WorkPeriodPaymentProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,46 @@

const Joi = require('@hapi/joi')
const config = require('config')
const _ = require('lodash')
const logger = require('../common/logger')
const helper = require('../common/helper')
const constants = require('../common/constants')

const esClient = helper.getESClient()

/**
* Process create entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
* Process create entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processCreate (message, transactionId) {
const data = message.payload
const workPeriodPayment = message.payload
// find related resourceBooking
const result = await esClient.search({
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
nested: {
path: 'workPeriods',
query: {
match: { 'workPeriods.id': data.workPeriodId }
match: { 'workPeriods.id': workPeriodPayment.workPeriodId }
}
}
}
}
})
if (!result.body.hits.total.value) {
throw new Error(`id: ${data.workPeriodId} "WorkPeriod" not found`)
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${workPeriodPayment.workPeriodId} "WorkPeriod" not found`)
}
const resourceBooking = result.body.hits.hits[0]._source
// find related workPeriod record
const workPeriod = _.find(resourceBooking.workPeriods, ['id', data.workPeriodId])
// Get workPeriod's existing payments
const payments = _.isArray(workPeriod.payments) ? workPeriod.payments : []
// Append new payment
payments.push(data)
// Assign new payments array to workPeriod
workPeriod.payments = payments
// Update ResourceBooking's workPeriods property
await esClient.updateExtra({
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.id,
id: resourceBooking.body.hits.hits[0]._id,
transactionId,
body: {
doc: { workPeriods: resourceBooking.workPeriods }
script: {
lang: 'painless',
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)',
params: { workPeriodPayment }
}
},
refresh: constants.esRefreshOption
})
Expand Down Expand Up @@ -87,14 +80,14 @@ processCreate.schema = {
}

/**
* Process update entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
* Process update entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processUpdate (message, transactionId) {
const data = message.payload
// find workPeriodPayment in it's parent ResourceBooking
let result = await esClient.search({
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
Expand All @@ -107,89 +100,19 @@ async function processUpdate (message, transactionId) {
}
}
})
if (!result.body.hits.total.value) {
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${data.id} "WorkPeriodPayment" not found`)
}
const resourceBooking = _.cloneDeep(result.body.hits.hits[0]._source)
let workPeriod = null
let payment = null
let paymentIndex = null
// find workPeriod and workPeriodPayment records
_.forEach(resourceBooking.workPeriods, wp => {
_.forEach(wp.payments, (p, pi) => {
if (p.id === data.id) {
payment = p
paymentIndex = pi
return false
}
})
if (payment) {
workPeriod = wp
return false
}
})
let payments
// if WorkPeriodPayment's workPeriodId changed then it must be deleted from the old WorkPeriod
// and added to the new WorkPeriod
if (payment.workPeriodId !== data.workPeriodId) {
// remove payment from payments
payments = _.filter(workPeriod.payments, p => p.id !== data.id)
// assign payments to workPeriod record
workPeriod.payments = payments
// Update old ResourceBooking's workPeriods property
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.id,
transactionId,
body: {
doc: { workPeriods: resourceBooking.workPeriods }
},
refresh: constants.esRefreshOption
})
// find workPeriodPayment's new parent WorkPeriod
result = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
body: {
query: {
nested: {
path: 'workPeriods',
query: {
match: { 'workPeriods.id': data.workPeriodId }
}
}
}
}
})
const newResourceBooking = result.body.hits.hits[0]._source
// find WorkPeriod record in ResourceBooking
const newWorkPeriod = _.find(newResourceBooking.workPeriods, ['id', data.workPeriodId])
// Get WorkPeriod's existing payments
const newPayments = _.isArray(newWorkPeriod.payments) ? newWorkPeriod.payments : []
// Append new payment
newPayments.push(data)
// Assign new payments array to workPeriod
newWorkPeriod.payments = newPayments
// Update new ResourceBooking's workPeriods property
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: newResourceBooking.id,
transactionId,
body: {
doc: { workPeriods: newResourceBooking.workPeriods }
},
refresh: constants.esRefreshOption
})
return
}
// update payment record
workPeriod.payments[paymentIndex] = data
// Update ResourceBooking's workPeriods property
await esClient.updateExtra({
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.id,
id: resourceBooking.body.hits.hits[0]._id,
transactionId,
body: {
doc: { workPeriods: resourceBooking.workPeriods }
script: {
lang: 'painless',
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)',
params: { data }
}
},
refresh: constants.esRefreshOption
})
Expand Down
120 changes: 35 additions & 85 deletions src/services/WorkPeriodProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ const logger = require('../common/logger')
const helper = require('../common/helper')
const constants = require('../common/constants')
const config = require('config')
const _ = require('lodash')
const esClient = helper.getESClient()
const ActionProcessorService = require('../services/ActionProcessorService')

/**
* Process create entity message
* @param {Object} message the kafka message
* @param {String} transactionId
* @param {Object} options
*/
* Process create entity message
* @param {Object} message the kafka message
* @param {String} transactionId
* @param {Object} options
*/
async function processCreate (message, transactionId, options) {
const workPeriod = message.payload
// Find related resourceBooking
Expand Down Expand Up @@ -44,20 +43,16 @@ async function processCreate (message, transactionId, options) {
throw err
}
}

console.log(`[RB value-999] before update: ${JSON.stringify(resourceBooking)}`)
// Get ResourceBooking's existing workPeriods
const workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : []
// Append new workPeriod
workPeriods.push(workPeriod)
// Update ResourceBooking's workPeriods property
console.log(`[WP value-999]: ${JSON.stringify(workPeriod)}`)
await esClient.updateExtra({
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: workPeriod.resourceBookingId,
id: resourceBooking.body.id,
transactionId,
body: {
doc: { workPeriods }
script: {
lang: 'painless',
source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)',
params: { workPeriod }
}
},
refresh: constants.esRefreshOption
})
Expand Down Expand Up @@ -96,14 +91,14 @@ processCreate.schema = {
}

/**
* Process update entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
* Process update entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processUpdate (message, transactionId) {
const data = message.payload
// find workPeriod in it's parent ResourceBooking
let resourceBooking = await esClient.search({
const resourceBooking = await esClient.search({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
transactionId,
body: {
Expand All @@ -120,62 +115,16 @@ async function processUpdate (message, transactionId) {
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${data.id} "WorkPeriod" not found`)
}
let workPeriods
// if WorkPeriod's resourceBookingId changed then it must be deleted from the old ResourceBooking
// and added to the new ResourceBooking
if (resourceBooking.body.hits.hits[0]._source.id !== data.resourceBookingId) {
// find old workPeriod record, so we can keep it's existing nested payments field
let oldWorkPeriod = _.find(resourceBooking.body.hits.hits[0]._source.workPeriods, ['id', data.id])
// remove workPeriod from it's old parent
workPeriods = _.filter(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => workPeriod.id !== data.id)
// Update old ResourceBooking's workPeriods property
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.hits.hits[0]._source.id,
transactionId,
body: {
doc: { workPeriods }
},
refresh: constants.esRefreshOption
})
// find workPeriod's new parent ResourceBooking
resourceBooking = await esClient.getExtra({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
transactionId,
id: data.resourceBookingId
})
// Get ResourceBooking's existing workPeriods
workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : []
// Update workPeriod record
const newData = _.assign(oldWorkPeriod, data)
// Append updated workPeriod to workPeriods
workPeriods.push(newData)
// Update new ResourceBooking's workPeriods property
await esClient.updateExtra({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: data.resourceBookingId,
transactionId,
body: {
doc: { workPeriods }
},
refresh: constants.esRefreshOption
})
return
}
// Update workPeriod record
workPeriods = _.map(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => {
if (workPeriod.id === data.id) {
return _.assign(workPeriod, data)
}
return workPeriod
})
// Update ResourceBooking's workPeriods property
await esClient.updateExtra({
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: data.resourceBookingId,
id: resourceBooking.body.hits.hits[0]._id,
transactionId,
body: {
doc: { workPeriods }
script: {
lang: 'painless',
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)',
params: { data }
}
},
refresh: constants.esRefreshOption
})
Expand All @@ -184,10 +133,10 @@ async function processUpdate (message, transactionId) {
processUpdate.schema = processCreate.schema

/**
* Process delete entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
* Process delete entity message
* @param {Object} message the kafka message
* @param {String} transactionId
*/
async function processDelete (message, transactionId) {
const data = message.payload
// Find related ResourceBooking
Expand All @@ -208,15 +157,16 @@ async function processDelete (message, transactionId) {
if (!resourceBooking.body.hits.total.value) {
throw new Error(`id: ${data.id} "WorkPeriod" not found`)
}
// Remove workPeriod from workPeriods
const workPeriods = _.filter(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => workPeriod.id !== data.id)
// Update ResourceBooking's workPeriods property
await esClient.updateExtra({
await esClient.update({
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
id: resourceBooking.body.hits.hits[0]._source.id,
id: resourceBooking.body.hits.hits[0]._id,
transactionId,
body: {
doc: { workPeriods }
script: {
lang: 'painless',
source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)',
params: { data }
}
},
refresh: constants.esRefreshOption
})
Expand Down