From 815e1d1ca4464aa950b14334fb8651a8c9998c9c Mon Sep 17 00:00:00 2001 From: eisbilir Date: Mon, 14 Jun 2021 03:03:37 +0300 Subject: [PATCH] fix parallelism problem --- .../WorkPeriodPaymentProcessorService.js | 135 ++++-------------- src/services/WorkPeriodProcessorService.js | 120 +++++----------- 2 files changed, 64 insertions(+), 191 deletions(-) diff --git a/src/services/WorkPeriodPaymentProcessorService.js b/src/services/WorkPeriodPaymentProcessorService.js index bede8fd..50daad2 100644 --- a/src/services/WorkPeriodPaymentProcessorService.js +++ b/src/services/WorkPeriodPaymentProcessorService.js @@ -4,7 +4,6 @@ const Joi = require('@hapi/joi') const config = require('config') -const _ = require('lodash') const logger = require('../common/logger') const helper = require('../common/helper') const constants = require('../common/constants') @@ -12,45 +11,39 @@ const constants = require('../common/constants') const esClient = helper.getESClient() /** - * Process create entity message - * @param {Object} message the kafka message - * @param {String} transactionId - */ + * Process create entity message + * @param {Object} message the kafka message + * @param {String} transactionId + */ async function processCreate (message, transactionId) { - const data = message.payload + const workPeriodPayment = message.payload // find related resourceBooking - const result = await esClient.search({ + const resourceBooking = await esClient.search({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), body: { query: { nested: { path: 'workPeriods', query: { - match: { 'workPeriods.id': data.workPeriodId } + match: { 'workPeriods.id': workPeriodPayment.workPeriodId } } } } } }) - if (!result.body.hits.total.value) { - throw new Error(`id: ${data.workPeriodId} "WorkPeriod" not found`) + if (!resourceBooking.body.hits.total.value) { + throw new Error(`id: ${workPeriodPayment.workPeriodId} "WorkPeriod" not found`) } - const resourceBooking = result.body.hits.hits[0]._source - // find related workPeriod record - const workPeriod = _.find(resourceBooking.workPeriods, ['id', data.workPeriodId]) - // Get workPeriod's existing payments - const payments = _.isArray(workPeriod.payments) ? workPeriod.payments : [] - // Append new payment - payments.push(data) - // Assign new payments array to workPeriod - workPeriod.payments = payments - // Update ResourceBooking's workPeriods property - await esClient.updateExtra({ + await esClient.update({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: resourceBooking.id, + id: resourceBooking.body.hits.hits[0]._id, transactionId, body: { - doc: { workPeriods: resourceBooking.workPeriods } + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)', + params: { workPeriodPayment } + } }, refresh: constants.esRefreshOption }) @@ -87,14 +80,14 @@ processCreate.schema = { } /** - * Process update entity message - * @param {Object} message the kafka message - * @param {String} transactionId - */ + * Process update entity message + * @param {Object} message the kafka message + * @param {String} transactionId + */ async function processUpdate (message, transactionId) { const data = message.payload // find workPeriodPayment in it's parent ResourceBooking - let result = await esClient.search({ + const resourceBooking = await esClient.search({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), body: { query: { @@ -107,89 +100,19 @@ async function processUpdate (message, transactionId) { } } }) - if (!result.body.hits.total.value) { + if (!resourceBooking.body.hits.total.value) { throw new Error(`id: ${data.id} "WorkPeriodPayment" not found`) } - const resourceBooking = _.cloneDeep(result.body.hits.hits[0]._source) - let workPeriod = null - let payment = null - let paymentIndex = null - // find workPeriod and workPeriodPayment records - _.forEach(resourceBooking.workPeriods, wp => { - _.forEach(wp.payments, (p, pi) => { - if (p.id === data.id) { - payment = p - paymentIndex = pi - return false - } - }) - if (payment) { - workPeriod = wp - return false - } - }) - let payments - // if WorkPeriodPayment's workPeriodId changed then it must be deleted from the old WorkPeriod - // and added to the new WorkPeriod - if (payment.workPeriodId !== data.workPeriodId) { - // remove payment from payments - payments = _.filter(workPeriod.payments, p => p.id !== data.id) - // assign payments to workPeriod record - workPeriod.payments = payments - // Update old ResourceBooking's workPeriods property - await esClient.updateExtra({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: resourceBooking.id, - transactionId, - body: { - doc: { workPeriods: resourceBooking.workPeriods } - }, - refresh: constants.esRefreshOption - }) - // find workPeriodPayment's new parent WorkPeriod - result = await esClient.search({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - body: { - query: { - nested: { - path: 'workPeriods', - query: { - match: { 'workPeriods.id': data.workPeriodId } - } - } - } - } - }) - const newResourceBooking = result.body.hits.hits[0]._source - // find WorkPeriod record in ResourceBooking - const newWorkPeriod = _.find(newResourceBooking.workPeriods, ['id', data.workPeriodId]) - // Get WorkPeriod's existing payments - const newPayments = _.isArray(newWorkPeriod.payments) ? newWorkPeriod.payments : [] - // Append new payment - newPayments.push(data) - // Assign new payments array to workPeriod - newWorkPeriod.payments = newPayments - // Update new ResourceBooking's workPeriods property - await esClient.updateExtra({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: newResourceBooking.id, - transactionId, - body: { - doc: { workPeriods: newResourceBooking.workPeriods } - }, - refresh: constants.esRefreshOption - }) - return - } - // update payment record - workPeriod.payments[paymentIndex] = data - // Update ResourceBooking's workPeriods property - await esClient.updateExtra({ + await esClient.update({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: resourceBooking.id, + id: resourceBooking.body.hits.hits[0]._id, transactionId, body: { - doc: { workPeriods: resourceBooking.workPeriods } + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)', + params: { data } + } }, refresh: constants.esRefreshOption }) diff --git a/src/services/WorkPeriodProcessorService.js b/src/services/WorkPeriodProcessorService.js index 4ea661f..9ad4d7f 100644 --- a/src/services/WorkPeriodProcessorService.js +++ b/src/services/WorkPeriodProcessorService.js @@ -7,16 +7,15 @@ const logger = require('../common/logger') const helper = require('../common/helper') const constants = require('../common/constants') const config = require('config') -const _ = require('lodash') const esClient = helper.getESClient() const ActionProcessorService = require('../services/ActionProcessorService') /** - * Process create entity message - * @param {Object} message the kafka message - * @param {String} transactionId - * @param {Object} options - */ + * Process create entity message + * @param {Object} message the kafka message + * @param {String} transactionId + * @param {Object} options + */ async function processCreate (message, transactionId, options) { const workPeriod = message.payload // Find related resourceBooking @@ -44,20 +43,16 @@ async function processCreate (message, transactionId, options) { throw err } } - - console.log(`[RB value-999] before update: ${JSON.stringify(resourceBooking)}`) - // Get ResourceBooking's existing workPeriods - const workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : [] - // Append new workPeriod - workPeriods.push(workPeriod) - // Update ResourceBooking's workPeriods property - console.log(`[WP value-999]: ${JSON.stringify(workPeriod)}`) - await esClient.updateExtra({ + await esClient.update({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: workPeriod.resourceBookingId, + id: resourceBooking.body.id, transactionId, body: { - doc: { workPeriods } + script: { + lang: 'painless', + source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)', + params: { workPeriod } + } }, refresh: constants.esRefreshOption }) @@ -96,14 +91,14 @@ processCreate.schema = { } /** - * Process update entity message - * @param {Object} message the kafka message - * @param {String} transactionId - */ + * Process update entity message + * @param {Object} message the kafka message + * @param {String} transactionId + */ async function processUpdate (message, transactionId) { const data = message.payload // find workPeriod in it's parent ResourceBooking - let resourceBooking = await esClient.search({ + const resourceBooking = await esClient.search({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), transactionId, body: { @@ -120,62 +115,16 @@ async function processUpdate (message, transactionId) { if (!resourceBooking.body.hits.total.value) { throw new Error(`id: ${data.id} "WorkPeriod" not found`) } - let workPeriods - // if WorkPeriod's resourceBookingId changed then it must be deleted from the old ResourceBooking - // and added to the new ResourceBooking - if (resourceBooking.body.hits.hits[0]._source.id !== data.resourceBookingId) { - // find old workPeriod record, so we can keep it's existing nested payments field - let oldWorkPeriod = _.find(resourceBooking.body.hits.hits[0]._source.workPeriods, ['id', data.id]) - // remove workPeriod from it's old parent - workPeriods = _.filter(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => workPeriod.id !== data.id) - // Update old ResourceBooking's workPeriods property - await esClient.updateExtra({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: resourceBooking.body.hits.hits[0]._source.id, - transactionId, - body: { - doc: { workPeriods } - }, - refresh: constants.esRefreshOption - }) - // find workPeriod's new parent ResourceBooking - resourceBooking = await esClient.getExtra({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - transactionId, - id: data.resourceBookingId - }) - // Get ResourceBooking's existing workPeriods - workPeriods = _.isArray(resourceBooking.body.workPeriods) ? resourceBooking.body.workPeriods : [] - // Update workPeriod record - const newData = _.assign(oldWorkPeriod, data) - // Append updated workPeriod to workPeriods - workPeriods.push(newData) - // Update new ResourceBooking's workPeriods property - await esClient.updateExtra({ - index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: data.resourceBookingId, - transactionId, - body: { - doc: { workPeriods } - }, - refresh: constants.esRefreshOption - }) - return - } - // Update workPeriod record - workPeriods = _.map(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => { - if (workPeriod.id === data.id) { - return _.assign(workPeriod, data) - } - return workPeriod - }) - // Update ResourceBooking's workPeriods property - await esClient.updateExtra({ + await esClient.update({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: data.resourceBookingId, + id: resourceBooking.body.hits.hits[0]._id, transactionId, body: { - doc: { workPeriods } + script: { + lang: 'painless', + source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)', + params: { data } + } }, refresh: constants.esRefreshOption }) @@ -184,10 +133,10 @@ async function processUpdate (message, transactionId) { processUpdate.schema = processCreate.schema /** - * Process delete entity message - * @param {Object} message the kafka message - * @param {String} transactionId - */ + * Process delete entity message + * @param {Object} message the kafka message + * @param {String} transactionId + */ async function processDelete (message, transactionId) { const data = message.payload // Find related ResourceBooking @@ -208,15 +157,16 @@ async function processDelete (message, transactionId) { if (!resourceBooking.body.hits.total.value) { throw new Error(`id: ${data.id} "WorkPeriod" not found`) } - // Remove workPeriod from workPeriods - const workPeriods = _.filter(resourceBooking.body.hits.hits[0]._source.workPeriods, (workPeriod) => workPeriod.id !== data.id) - // Update ResourceBooking's workPeriods property - await esClient.updateExtra({ + await esClient.update({ index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'), - id: resourceBooking.body.hits.hits[0]._source.id, + id: resourceBooking.body.hits.hits[0]._id, transactionId, body: { - doc: { workPeriods } + script: { + lang: 'painless', + source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)', + params: { data } + } }, refresh: constants.esRefreshOption })