Skip to content

Commit 815e1d1

Browse files
eisbilirmaxceem
authored andcommitted
fix parallelism problem
1 parent fe9dd0b commit 815e1d1

File tree

2 files changed

+64
-191
lines changed

2 files changed

+64
-191
lines changed

src/services/WorkPeriodPaymentProcessorService.js

Lines changed: 29 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,46 @@
44

55
const Joi = require('@hapi/joi')
66
const config = require('config')
7-
const _ = require('lodash')
87
const logger = require('../common/logger')
98
const helper = require('../common/helper')
109
const constants = require('../common/constants')
1110

1211
const esClient = helper.getESClient()
1312

1413
/**
15-
* Process create entity message
16-
* @param {Object} message the kafka message
17-
* @param {String} transactionId
18-
*/
14+
* Process create entity message
15+
* @param {Object} message the kafka message
16+
* @param {String} transactionId
17+
*/
1918
async function processCreate (message, transactionId) {
20-
const data = message.payload
19+
const workPeriodPayment = message.payload
2120
// find related resourceBooking
22-
const result = await esClient.search({
21+
const resourceBooking = await esClient.search({
2322
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
2423
body: {
2524
query: {
2625
nested: {
2726
path: 'workPeriods',
2827
query: {
29-
match: { 'workPeriods.id': data.workPeriodId }
28+
match: { 'workPeriods.id': workPeriodPayment.workPeriodId }
3029
}
3130
}
3231
}
3332
}
3433
})
35-
if (!result.body.hits.total.value) {
36-
throw new Error(`id: ${data.workPeriodId} "WorkPeriod" not found`)
34+
if (!resourceBooking.body.hits.total.value) {
35+
throw new Error(`id: ${workPeriodPayment.workPeriodId} "WorkPeriod" not found`)
3736
}
38-
const resourceBooking = result.body.hits.hits[0]._source
39-
// find related workPeriod record
40-
const workPeriod = _.find(resourceBooking.workPeriods, ['id', data.workPeriodId])
41-
// Get workPeriod's existing payments
42-
const payments = _.isArray(workPeriod.payments) ? workPeriod.payments : []
43-
// Append new payment
44-
payments.push(data)
45-
// Assign new payments array to workPeriod
46-
workPeriod.payments = payments
47-
// Update ResourceBooking's workPeriods property
48-
await esClient.updateExtra({
37+
await esClient.update({
4938
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
50-
id: resourceBooking.id,
39+
id: resourceBooking.body.hits.hits[0]._id,
5140
transactionId,
5241
body: {
53-
doc: { workPeriods: resourceBooking.workPeriods }
42+
script: {
43+
lang: 'painless',
44+
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)',
45+
params: { workPeriodPayment }
46+
}
5447
},
5548
refresh: constants.esRefreshOption
5649
})
@@ -87,14 +80,14 @@ processCreate.schema = {
8780
}
8881

8982
/**
90-
* Process update entity message
91-
* @param {Object} message the kafka message
92-
* @param {String} transactionId
93-
*/
83+
* Process update entity message
84+
* @param {Object} message the kafka message
85+
* @param {String} transactionId
86+
*/
9487
async function processUpdate (message, transactionId) {
9588
const data = message.payload
9689
// find workPeriodPayment in it's parent ResourceBooking
97-
let result = await esClient.search({
90+
const resourceBooking = await esClient.search({
9891
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
9992
body: {
10093
query: {
@@ -107,89 +100,19 @@ async function processUpdate (message, transactionId) {
107100
}
108101
}
109102
})
110-
if (!result.body.hits.total.value) {
103+
if (!resourceBooking.body.hits.total.value) {
111104
throw new Error(`id: ${data.id} "WorkPeriodPayment" not found`)
112105
}
113-
const resourceBooking = _.cloneDeep(result.body.hits.hits[0]._source)
114-
let workPeriod = null
115-
let payment = null
116-
let paymentIndex = null
117-
// find workPeriod and workPeriodPayment records
118-
_.forEach(resourceBooking.workPeriods, wp => {
119-
_.forEach(wp.payments, (p, pi) => {
120-
if (p.id === data.id) {
121-
payment = p
122-
paymentIndex = pi
123-
return false
124-
}
125-
})
126-
if (payment) {
127-
workPeriod = wp
128-
return false
129-
}
130-
})
131-
let payments
132-
// if WorkPeriodPayment's workPeriodId changed then it must be deleted from the old WorkPeriod
133-
// and added to the new WorkPeriod
134-
if (payment.workPeriodId !== data.workPeriodId) {
135-
// remove payment from payments
136-
payments = _.filter(workPeriod.payments, p => p.id !== data.id)
137-
// assign payments to workPeriod record
138-
workPeriod.payments = payments
139-
// Update old ResourceBooking's workPeriods property
140-
await esClient.updateExtra({
141-
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
142-
id: resourceBooking.id,
143-
transactionId,
144-
body: {
145-
doc: { workPeriods: resourceBooking.workPeriods }
146-
},
147-
refresh: constants.esRefreshOption
148-
})
149-
// find workPeriodPayment's new parent WorkPeriod
150-
result = await esClient.search({
151-
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
152-
body: {
153-
query: {
154-
nested: {
155-
path: 'workPeriods',
156-
query: {
157-
match: { 'workPeriods.id': data.workPeriodId }
158-
}
159-
}
160-
}
161-
}
162-
})
163-
const newResourceBooking = result.body.hits.hits[0]._source
164-
// find WorkPeriod record in ResourceBooking
165-
const newWorkPeriod = _.find(newResourceBooking.workPeriods, ['id', data.workPeriodId])
166-
// Get WorkPeriod's existing payments
167-
const newPayments = _.isArray(newWorkPeriod.payments) ? newWorkPeriod.payments : []
168-
// Append new payment
169-
newPayments.push(data)
170-
// Assign new payments array to workPeriod
171-
newWorkPeriod.payments = newPayments
172-
// Update new ResourceBooking's workPeriods property
173-
await esClient.updateExtra({
174-
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
175-
id: newResourceBooking.id,
176-
transactionId,
177-
body: {
178-
doc: { workPeriods: newResourceBooking.workPeriods }
179-
},
180-
refresh: constants.esRefreshOption
181-
})
182-
return
183-
}
184-
// update payment record
185-
workPeriod.payments[paymentIndex] = data
186-
// Update ResourceBooking's workPeriods property
187-
await esClient.updateExtra({
106+
await esClient.update({
188107
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
189-
id: resourceBooking.id,
108+
id: resourceBooking.body.hits.hits[0]._id,
190109
transactionId,
191110
body: {
192-
doc: { workPeriods: resourceBooking.workPeriods }
111+
script: {
112+
lang: 'painless',
113+
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)',
114+
params: { data }
115+
}
193116
},
194117
refresh: constants.esRefreshOption
195118
})

src/services/WorkPeriodProcessorService.js

Lines changed: 35 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,15 @@ const logger = require('../common/logger')
77
const helper = require('../common/helper')
88
const constants = require('../common/constants')
99
const config = require('config')
10-
const _ = require('lodash')
1110
const esClient = helper.getESClient()
1211
const ActionProcessorService = require('../services/ActionProcessorService')
1312

1413
/**
15-
* Process create entity message
16-
* @param {Object} message the kafka message
17-
* @param {String} transactionId
18-
* @param {Object} options
19-
*/
14+
* Process create entity message
15+
* @param {Object} message the kafka message
16+
* @param {String} transactionId
17+
* @param {Object} options
18+
*/
2019
async function processCreate (message, transactionId, options) {
2120
const workPeriod = message.payload
2221
// Find related resourceBooking
@@ -44,20 +43,16 @@ async function processCreate (message, transactionId, options) {
4443
throw err
4544
}
4645
}
47-
48-
console.log(`[RB value-999] before update: ${JSON.stringify(resourceBooking)}`)
49-
// Get ResourceBooking's existing workPeriods
50-
const workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : []
51-
// Append new workPeriod
52-
workPeriods.push(workPeriod)
53-
// Update ResourceBooking's workPeriods property
54-
console.log(`[WP value-999]: ${JSON.stringify(workPeriod)}`)
55-
await esClient.updateExtra({
46+
await esClient.update({
5647
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
57-
id: workPeriod.resourceBookingId,
48+
id: resourceBooking.body.id,
5849
transactionId,
5950
body: {
60-
doc: { workPeriods }
51+
script: {
52+
lang: 'painless',
53+
source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)',
54+
params: { workPeriod }
55+
}
6156
},
6257
refresh: constants.esRefreshOption
6358
})
@@ -96,14 +91,14 @@ processCreate.schema = {
9691
}
9792

9893
/**
99-
* Process update entity message
100-
* @param {Object} message the kafka message
101-
* @param {String} transactionId
102-
*/
94+
* Process update entity message
95+
* @param {Object} message the kafka message
96+
* @param {String} transactionId
97+
*/
10398
async function processUpdate (message, transactionId) {
10499
const data = message.payload
105100
// find workPeriod in it's parent ResourceBooking
106-
let resourceBooking = await esClient.search({
101+
const resourceBooking = await esClient.search({
107102
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
108103
transactionId,
109104
body: {
@@ -120,62 +115,16 @@ async function processUpdate (message, transactionId) {
120115
if (!resourceBooking.body.hits.total.value) {
121116
throw new Error(`id: ${data.id} "WorkPeriod" not found`)
122117
}
123-
let workPeriods
124-
// if WorkPeriod's resourceBookingId changed then it must be deleted from the old ResourceBooking
125-
// and added to the new ResourceBooking
126-
if (resourceBooking.body.hits.hits[0]._source.id !== data.resourceBookingId) {
127-
// find old workPeriod record, so we can keep it's existing nested payments field
128-
let oldWorkPeriod = _.find(resourceBooking.body.hits.hits[0]._source.workPeriods, ['id', data.id])
129-
// remove workPeriod from it's old parent
130-
workPeriods = _.filter(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => workPeriod.id !== data.id)
131-
// Update old ResourceBooking's workPeriods property
132-
await esClient.updateExtra({
133-
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
134-
id: resourceBooking.body.hits.hits[0]._source.id,
135-
transactionId,
136-
body: {
137-
doc: { workPeriods }
138-
},
139-
refresh: constants.esRefreshOption
140-
})
141-
// find workPeriod's new parent ResourceBooking
142-
resourceBooking = await esClient.getExtra({
143-
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
144-
transactionId,
145-
id: data.resourceBookingId
146-
})
147-
// Get ResourceBooking's existing workPeriods
148-
workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : []
149-
// Update workPeriod record
150-
const newData = _.assign(oldWorkPeriod, data)
151-
// Append updated workPeriod to workPeriods
152-
workPeriods.push(newData)
153-
// Update new ResourceBooking's workPeriods property
154-
await esClient.updateExtra({
155-
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
156-
id: data.resourceBookingId,
157-
transactionId,
158-
body: {
159-
doc: { workPeriods }
160-
},
161-
refresh: constants.esRefreshOption
162-
})
163-
return
164-
}
165-
// Update workPeriod record
166-
workPeriods = _.map(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => {
167-
if (workPeriod.id === data.id) {
168-
return _.assign(workPeriod, data)
169-
}
170-
return workPeriod
171-
})
172-
// Update ResourceBooking's workPeriods property
173-
await esClient.updateExtra({
118+
await esClient.update({
174119
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
175-
id: data.resourceBookingId,
120+
id: resourceBooking.body.hits.hits[0]._id,
176121
transactionId,
177122
body: {
178-
doc: { workPeriods }
123+
script: {
124+
lang: 'painless',
125+
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)',
126+
params: { data }
127+
}
179128
},
180129
refresh: constants.esRefreshOption
181130
})
@@ -184,10 +133,10 @@ async function processUpdate (message, transactionId) {
184133
processUpdate.schema = processCreate.schema
185134

186135
/**
187-
* Process delete entity message
188-
* @param {Object} message the kafka message
189-
* @param {String} transactionId
190-
*/
136+
* Process delete entity message
137+
* @param {Object} message the kafka message
138+
* @param {String} transactionId
139+
*/
191140
async function processDelete (message, transactionId) {
192141
const data = message.payload
193142
// Find related ResourceBooking
@@ -208,15 +157,16 @@ async function processDelete (message, transactionId) {
208157
if (!resourceBooking.body.hits.total.value) {
209158
throw new Error(`id: ${data.id} "WorkPeriod" not found`)
210159
}
211-
// Remove workPeriod from workPeriods
212-
const workPeriods = _.filter(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => workPeriod.id !== data.id)
213-
// Update ResourceBooking's workPeriods property
214-
await esClient.updateExtra({
160+
await esClient.update({
215161
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
216-
id: resourceBooking.body.hits.hits[0]._source.id,
162+
id: resourceBooking.body.hits.hits[0]._id,
217163
transactionId,
218164
body: {
219-
doc: { workPeriods }
165+
script: {
166+
lang: 'painless',
167+
source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)',
168+
params: { data }
169+
}
220170
},
221171
refresh: constants.esRefreshOption
222172
})

0 commit comments

Comments
 (0)