From 17fff0274762695fb8a31c14dccb9fe98d4a6cfa Mon Sep 17 00:00:00 2001 From: yoution Date: Tue, 31 Aug 2021 13:39:14 +0800 Subject: [PATCH 1/2] fix: issue #523 --- .../WorkPeriodPaymentEventHandler.js | 25 ++++++- src/services/PaymentSchedulerService.js | 69 +++++++++++++++---- 2 files changed, 80 insertions(+), 14 deletions(-) diff --git a/src/eventHandlers/WorkPeriodPaymentEventHandler.js b/src/eventHandlers/WorkPeriodPaymentEventHandler.js index 53ab7823..3587186b 100644 --- a/src/eventHandlers/WorkPeriodPaymentEventHandler.js +++ b/src/eventHandlers/WorkPeriodPaymentEventHandler.js @@ -9,7 +9,11 @@ const logger = require('../common/logger') const helper = require('../common/helper') const { ActiveWorkPeriodPaymentStatuses } = require('../../app-constants') const WorkPeriod = models.WorkPeriod +const { + processUpdate: processUpdateEs +} = require('../esProcessors/WorkPeriodProcessor') +const sequelize = models.sequelize /** * When a WorkPeriodPayment is updated or created, the workPeriod related to * that WorkPeriodPayment should be updated also. @@ -39,8 +43,25 @@ async function updateWorkPeriod (payload) { }) return } - const updated = await workPeriodModel.update(data) - await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, _.omit(updated.toJSON(), 'payments'), { oldValue: workPeriod, key: `resourceBooking.id:${workPeriod.resourceBookingId}` }) + + const key = `resourceBooking.id:${workPeriod.resourceBookingId}` + let entity + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodModel.update(data, { transaction: t }) + entity = updated.toJSON() + + entity = _.omit(entity, ['payments']) + await processUpdateEs({ ...entity, key }) + }) + } catch (e) { + if (entity) { + helper.postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiod.update') + } + throw e + } + await helper.postEvent(config.TAAS_WORK_PERIOD_UPDATE_TOPIC, entity, { oldValue: workPeriod, key }) + logger.debug({ component: 'WorkPeriodPaymentEventHandler', context: 'updateWorkPeriod', diff --git a/src/services/PaymentSchedulerService.js b/src/services/PaymentSchedulerService.js index f38eab6f..e718c295 100644 --- a/src/services/PaymentSchedulerService.js +++ b/src/services/PaymentSchedulerService.js @@ -2,11 +2,16 @@ const _ = require('lodash') const config = require('config') const moment = require('moment') const models = require('../models') -const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent } = require('../common/helper') +const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent, postErrorEvent } = require('../common/helper') const logger = require('../common/logger') const { createChallenge, addResourceToChallenge, activateChallenge, closeChallenge } = require('./PaymentService') const { ChallengeStatus, PaymentSchedulerStatus, PaymentProcessingSwitch } = require('../../app-constants') +const { + processUpdate +} = require('../esProcessors/WorkPeriodPaymentProcessor') + +const sequelize = models.sequelize const WorkPeriodPayment = models.WorkPeriodPayment const WorkPeriod = models.WorkPeriod const PaymentScheduler = models.PaymentScheduler @@ -88,9 +93,22 @@ async function processPayment (workPeriodPayment) { } } else { const oldValue = workPeriodPayment.toJSON() - const updated = await workPeriodPayment.update({ status: 'in-progress' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let entity + let key + try { + await sequelize.transaction(async (t) => { + const updated = await workPeriodPayment.update({ status: 'in-progress' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) } // Check whether the number of processed records per minute exceeds the specified number, if it exceeds, wait for the next minute before processing await checkWait(PaymentSchedulerStatus.START_PROCESS) @@ -112,11 +130,24 @@ async function processPayment (workPeriodPayment) { } const oldValue = workPeriodPayment.toJSON() - // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". - const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + let key + let entity + try { + await sequelize.transaction(async (t) => { + // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". + const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }, { transaction: t }) + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'completed' }) localLogger.info(`Processed workPeriodPayment ${workPeriodPayment.id} successfully`, 'processPayment') @@ -125,10 +156,24 @@ async function processPayment (workPeriodPayment) { logger.logFullError(err, { component: 'PaymentSchedulerService', context: 'processPayment' }) const statusDetails = { errorMessage: extractErrorMessage(err), errorCode: _.get(err, 'status', -1), retry: _.get(err, 'retry', -1), step: _.get(err, 'step'), challengeId: paymentScheduler ? paymentScheduler.challengeId : null } const oldValue = workPeriodPayment.toJSON() - // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. - const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }) - // Update the modified status to es - await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue, key: `workPeriodPayment.billingAccountId:${updated.billingAccountId}` }) + + let entity + let key + try { + await sequelize.transaction(async (t) => { + // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. + const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }, { transaction: t }) + key = `workPeriodPayment.billingAccountId:${updated.billingAccountId}` + entity = updated.toJSON() + await processUpdate({ ...entity, key }) + }) + } catch (e) { + if (entity) { + postErrorEvent(config.TAAS_ERROR_TOPIC, entity, 'workperiodpayment.update') + } + throw e + } + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, entity, { oldValue: oldValue, key }) if (paymentScheduler) { await paymentScheduler.update({ step: _.get(err, 'step'), userId: paymentScheduler.userId, status: 'failed' }) From 6b4c2c1e7ca7ff9baf6f6d3e9fbe0cacb0fc40fd Mon Sep 17 00:00:00 2001 From: sachin-maheshwari Date: Tue, 31 Aug 2021 17:22:23 +0530 Subject: [PATCH 2/2] Update default.js typo --- config/default.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default.js b/config/default.js index 42f53a2c..319720ab 100644 --- a/config/default.js +++ b/config/default.js @@ -97,7 +97,7 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api', // topics for error - TAAS_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'taas.action.error', + TAAS_ERROR_TOPIC: process.env.TAAS_ERROR_TOPIC || 'taas.action.error', // topics for job service // the create job entity Kafka message topic TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create',