diff --git a/README.md b/README.md index 30ac169b..eddfead3 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,7 @@ To be able to change and test `taas-es-processor` locally you can follow the nex | `npm run cov` | Code Coverage Report. | | `npm run migrate` | Run any migration files which haven't run yet. | | `npm run migrate:undo` | Revert most recent migration. | +| `npm run demo-payment-scheduler` | Create 1000 Work Periods Payment records in with status "scheduled" and various "amount" | ## Import and Export data diff --git a/app-constants.js b/app-constants.js index d28f773c..2c232275 100644 --- a/app-constants.js +++ b/app-constants.js @@ -86,8 +86,24 @@ const ChallengeStatus = { const WorkPeriodPaymentStatus = { COMPLETED: 'completed', - CANCELLED: 'cancelled', - SCHEDULED: 'scheduled' + SCHEDULED: 'scheduled', + IN_PROGRESS: 'in-progress', + FAILED: 'failed', + CANCELLED: 'cancelled' +} + +const PaymentProcessingSwitch = { + ON: 'ON', + OFF: 'OFF' +} + +const PaymentSchedulerStatus = { + START_PROCESS: 'start-process', + CREATE_CHALLENGE: 'create-challenge', + ASSIGN_MEMBER: 'assign-member', + ACTIVATE_CHALLENGE: 'activate-challenge', + GET_USER_ID: 'get-userId', + CLOSE_CHALLENGE: 'close-challenge' } module.exports = { @@ -96,5 +112,7 @@ module.exports = { Scopes, Interviews, ChallengeStatus, - WorkPeriodPaymentStatus + WorkPeriodPaymentStatus, + PaymentSchedulerStatus, + PaymentProcessingSwitch } diff --git a/app.js b/app.js index 7f3d7d85..e6d79c69 100644 --- a/app.js +++ b/app.js @@ -13,6 +13,7 @@ const schedule = require('node-schedule') const logger = require('./src/common/logger') const eventHandlers = require('./src/eventHandlers') const interviewService = require('./src/services/InterviewService') +const { processScheduler } = require('./src/services/PaymentSchedulerService') // setup express app const app = express() @@ -97,6 +98,9 @@ const server = app.listen(app.get('port'), () => { eventHandlers.init() // schedule updateCompletedInterviews to run every hour schedule.scheduleJob('0 0 * * * *', interviewService.updateCompletedInterviews) + + // schedule payment processing + schedule.scheduleJob(config.PAYMENT_PROCESSING.CRON, processScheduler) }) if (process.env.NODE_ENV === 'test') { diff --git a/config/default.js b/config/default.js index 2f0f8b15..d90ae6c0 100644 --- a/config/default.js +++ b/config/default.js @@ -175,5 +175,40 @@ module.exports = { // the minimum matching rate when searching roles by skills ROLE_MATCHING_RATE: process.env.ROLE_MATCHING_RATE || 0.70, // member groups representing Wipro or TopCoder employee - INTERNAL_MEMBER_GROUPS: process.env.INTERNAL_MEMBER_GROUPS || ['20000000', '20000001', '20000003', '20000010', '20000015'] + INTERNAL_MEMBER_GROUPS: process.env.INTERNAL_MEMBER_GROUPS || ['20000000', '20000001', '20000003', '20000010', '20000015'], + // payment scheduler config + PAYMENT_PROCESSING: { + // switch off actual API calls in Payment Scheduler + SWITCH: process.env.PAYMENT_PROCESSING_SWITCH || 'OFF', + // the payment scheduler cron config + CRON: process.env.PAYMENT_PROCESSING_CRON || '0 */5 * * * *', + // the number of records processed by one time + BATCH_SIZE: parseInt(process.env.PAYMENT_PROCESSING_BATCH_SIZE || 50), + // in-progress expired to determine whether a record has been processed abnormally, moment duration format + IN_PROGRESS_EXPIRED: process.env.IN_PROGRESS_EXPIRED || 'PT1H', + // the number of max retry config + MAX_RETRY_COUNT: parseInt(process.env.PAYMENT_PROCESSING_MAX_RETRY_COUNT || 10), + // the time of retry base delay, unit: ms + RETRY_BASE_DELAY: parseInt(process.env.PAYMENT_PROCESSING_RETRY_BASE_DELAY || 100), + // the time of retry max delay, unit: ms + RETRY_MAX_DELAY: parseInt(process.env.PAYMENT_PROCESSING_RETRY_MAX_DELAY || 10000), + // the max time of one request, unit: ms + PER_REQUEST_MAX_TIME: parseInt(process.env.PAYMENT_PROCESSING_PER_REQUEST_MAX_TIME || 30000), + // the max time of one payment record, unit: ms + PER_PAYMENT_MAX_TIME: parseInt(process.env.PAYMENT_PROCESSING_PER_PAYMENT_MAX_TIME || 60000), + // the max records of payment of a minute + PER_MINUTE_PAYMENT_MAX_COUNT: parseInt(process.env.PAYMENT_PROCESSING_PER_MINUTE_PAYMENT_MAX_COUNT || 12), + // the max requests of challenge of a minute + PER_MINUTE_CHALLENGE_REQUEST_MAX_COUNT: parseInt(process.env.PAYMENT_PROCESSING_PER_MINUTE_CHALLENGE_REQUEST_MAX_COUNT || 60), + // the max requests of resource of a minute + PER_MINUTE_RESOURCE_REQUEST_MAX_COUNT: parseInt(process.env.PAYMENT_PROCESSING_PER_MINUTE_CHALLENGE_REQUEST_MAX_COUNT || 20), + // the default step fix delay, unit: ms + FIX_DELAY_STEP: parseInt(process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP || 500), + // the fix delay after step of create challenge, unit: ms + FIX_DELAY_STEP_CREATE_CHALLENGE: parseInt(process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP_CREATE_CHALLENGE || process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP || 500), + // the fix delay after step of assign member, unit: ms + FIX_DELAY_STEP_ASSIGN_MEMBER: parseInt(process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP_ASSIGN_MEMBER || process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP || 500), + // the fix delay after step of activate challenge, unit: ms + FIX_DELAY_STEP_ACTIVATE_CHALLENGE: parseInt(process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP_ACTIVATE_CHALLENGE || process.env.PAYMENT_PROCESSING_FIX_DELAY_STEP || 500) + } } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 6e2508c1..94c147b0 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -2403,7 +2403,7 @@ paths: required: false schema: type: string - enum: ["completed", "scheduled", "cancelled"] + enum: ["completed", "scheduled", "in-progress", "failed", "cancelled"] description: The payment status. responses: "200": @@ -2519,7 +2519,7 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" - + /work-period-payments/{id}: get: tags: @@ -4830,8 +4830,22 @@ components: description: "The amount to be paid." status: type: string - enum: ["completed", "scheduled", "cancelled"] + enum: ["completed", "scheduled", "in-progress", "failed", "cancelled"] description: "The payment status." + statusDetails: + type: object + properties: + errorMessage: + type: string + errorCode: + type: integer + retry: + type: integer + step: + type: string + challengeId: + type: string + format: uuid billingAccountId: type: integer example: 80000071 @@ -4888,7 +4902,7 @@ components: description: "The amount to be paid." status: type: string - enum: ["completed", "scheduled", "cancelled"] + enum: ["completed", "scheduled", "in-progress", "failed", "cancelled"] description: "The payment status." WorkPeriodPaymentCreateRequestBody: required: @@ -4979,7 +4993,7 @@ components: description: "The amount to be paid." status: type: string - enum: ["completed", "scheduled", "cancelled"] + enum: ["completed", "scheduled", "in-progress", "failed", "cancelled"] description: "The payment status." CheckRun: type: object diff --git a/migrations/2021-05-29-create-payment-scheduler-table-add-status-details-to-payment.js b/migrations/2021-05-29-create-payment-scheduler-table-add-status-details-to-payment.js new file mode 100644 index 00000000..5eb2232d --- /dev/null +++ b/migrations/2021-05-29-create-payment-scheduler-table-add-status-details-to-payment.js @@ -0,0 +1,112 @@ +'use strict'; + +const config = require('config') +const _ = require('lodash') +const { PaymentSchedulerStatus } = require('../app-constants') + +/** + * Create `payment_schedulers` table & relations. + */ +module.exports = { + up: async (queryInterface, Sequelize) => { + const transaction = await queryInterface.sequelize.transaction() + try { + await queryInterface.createTable('payment_schedulers', { + id: { + type: Sequelize.UUID, + primaryKey: true, + allowNull: false, + defaultValue: Sequelize.UUIDV4 + }, + challengeId: { + field: 'challenge_id', + type: Sequelize.UUID, + allowNull: false + }, + workPeriodPaymentId: { + field: 'work_period_payment_id', + type: Sequelize.UUID, + allowNull: false, + references: { + model: { + tableName: 'work_period_payments', + schema: config.DB_SCHEMA_NAME + }, + key: 'id' + } + }, + step: { + type: Sequelize.ENUM(_.values(PaymentSchedulerStatus)), + allowNull: false + }, + status: { + type: Sequelize.ENUM( + 'in-progress', + 'completed', + 'failed' + ), + allowNull: false + }, + userId: { + field: 'user_id', + type: Sequelize.BIGINT + }, + userHandle: { + field: 'user_handle', + type: Sequelize.STRING, + allowNull: false + }, + createdAt: { + field: 'created_at', + type: Sequelize.DATE + }, + updatedAt: { + field: 'updated_at', + type: Sequelize.DATE + }, + deletedAt: { + field: 'deleted_at', + type: Sequelize.DATE + } + }, { schema: config.DB_SCHEMA_NAME, transaction }) + await queryInterface.addColumn({ tableName: 'work_period_payments', schema: config.DB_SCHEMA_NAME }, 'status_details', + { type: Sequelize.JSONB }, + { transaction }) + await queryInterface.changeColumn({ tableName: 'work_period_payments', schema: config.DB_SCHEMA_NAME }, 'challenge_id', + { type: Sequelize.UUID }, + { transaction }) + await queryInterface.sequelize.query(`ALTER TYPE ${config.DB_SCHEMA_NAME}.enum_work_period_payments_status ADD VALUE 'scheduled'`) + await queryInterface.sequelize.query(`ALTER TYPE ${config.DB_SCHEMA_NAME}.enum_work_period_payments_status ADD VALUE 'in-progress'`) + await queryInterface.sequelize.query(`ALTER TYPE ${config.DB_SCHEMA_NAME}.enum_work_period_payments_status ADD VALUE 'failed'`) + await transaction.commit() + } catch (err) { + await transaction.rollback() + throw err + } + }, + + down: async (queryInterface, Sequelize) => { + const table = { schema: config.DB_SCHEMA_NAME, tableName: 'payment_schedulers' } + const statusTypeName = `${table.schema}.enum_${table.tableName}_status` + const stepTypeName = `${table.schema}.enum_${table.tableName}_step` + const transaction = await queryInterface.sequelize.transaction() + try { + await queryInterface.dropTable(table, { transaction }) + // drop enum type for status and step column + await queryInterface.sequelize.query(`DROP TYPE ${statusTypeName}`, { transaction }) + await queryInterface.sequelize.query(`DROP TYPE ${stepTypeName}`, { transaction }) + + await queryInterface.changeColumn({ tableName: 'work_period_payments', schema: config.DB_SCHEMA_NAME }, 'challenge_id', + { type: Sequelize.UUID, allowNull: false }, + { transaction }) + await queryInterface.removeColumn({ tableName: 'work_period_payments', schema: config.DB_SCHEMA_NAME }, 'status_details', + { transaction }) + await queryInterface.sequelize.query(`DELETE FROM pg_enum WHERE enumlabel in ('scheduled', 'in-progress', 'failed') AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'enum_work_period_payments_status')`, + { transaction }) + await transaction.commit() + } catch (err) { + await transaction.rollback() + throw err + } + } +}; diff --git a/package.json b/package.json index 510504f1..3e2e7b18 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,7 @@ "local:init": "npm run local:reset && npm run data:import -- --force", "local:reset": "npm run delete-index -- --force || true && npm run create-index -- --force && npm run init-db force", "cov": "nyc --reporter=html --reporter=text npm run test", + "demo-payment-scheduler": "node scripts/demo-payment-scheduler/index.js && npm run index:all -- --force", "demo-payment": "node scripts/demo-payment" }, "keywords": [], diff --git a/scripts/demo-payment-scheduler/data.json b/scripts/demo-payment-scheduler/data.json new file mode 100644 index 00000000..5023842c --- /dev/null +++ b/scripts/demo-payment-scheduler/data.json @@ -0,0 +1,103 @@ +{ + "Job":{ + "id":"43d695d4-e926-41d5-ad42-a899612b5246", + "projectId":17234, + "title":"Dummy title - at most 64 characters", + "numPositions":13, + "skills":[ + "23e00d92-207a-4b5b-b3c9-4c5662644941", + "7d076384-ccf6-4e43-a45d-1b24b1e624aa", + "cbac57a3-7180-4316-8769-73af64893158", + "a2b4bc11-c641-4a19-9eb7-33980378f82e" + ], + "status":"in-review", + "isApplicationPageActive":false, + "createdBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c", + "updatedBy":"00000000-0000-0000-0000-000000000000", + "createdAt":"2021-05-09T21:21:10.394Z", + "updatedAt":"2021-05-09T21:21:14.010Z" + }, + "ResourceBooking":{ + "id":"41671764-0ded-46fd-b7de-2af5d5e4f3fc", + "projectId":17234, + "userId":"05e988b7-7d54-4c10-ada1-1a04870a88a8", + "jobId":"43d695d4-e926-41d5-ad42-a899612b5246", + "status":"placed", + "startDate":"2020-09-27", + "endDate":"2020-10-27", + "memberRate":13.23, + "customerRate":13, + "rateType":"hourly", + "billingAccountId":80000069, + "createdBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c", + "updatedBy":null, + "createdAt":"2021-05-09T21:25:46.728Z", + "updatedAt":"2021-05-09T21:25:46.728Z" + }, + "WorkPeriods":[ + { + "id":"4baae2cf-fd70-4ab3-9959-e826257b7e0f", + "resourceBookingId":"41671764-0ded-46fd-b7de-2af5d5e4f3fc", + "userHandle":"pshah_manager", + "projectId":17234, + "startDate":"2020-09-27", + "endDate":"2020-10-03", + "daysWorked":4, + "memberRate":27.06, + "customerRate":13.13, + "paymentStatus":"partially-completed", + "createdBy":"00000000-0000-0000-0000-000000000000", + "updatedBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c", + "createdAt":"2021-05-09T21:25:47.813Z", + "updatedAt":"2021-05-09T21:45:32.659Z" + }, + { + "id":"9918e1b7-acbc-41ae-baa6-fdcb2386681d", + "resourceBookingId":"41671764-0ded-46fd-b7de-2af5d5e4f3fc", + "userHandle":"Shuchikr", + "projectId":17234, + "startDate":"2020-10-18", + "endDate":"2020-10-24", + "daysWorked":4, + "memberRate":4.08, + "customerRate":3.89, + "paymentStatus":"cancelled", + "createdBy":"00000000-0000-0000-0000-000000000000", + "updatedBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c", + "createdAt":"2021-05-09T21:25:47.834Z", + "updatedAt":"2021-05-09T21:45:37.647Z" + }, + { + "id":"42e990c9-b14c-4496-9977-c3024aa90024", + "resourceBookingId":"41671764-0ded-46fd-b7de-2af5d5e4f3fc", + "userHandle":"vkumars", + "projectId":17234, + "startDate":"2020-10-25", + "endDate":"2020-10-31", + "daysWorked":3, + "memberRate":15.61, + "customerRate":9.76, + "paymentStatus":"pending", + "createdBy":"00000000-0000-0000-0000-000000000000", + "updatedBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c", + "createdAt":"2021-05-09T21:25:47.824Z", + "updatedAt":"2021-05-09T21:45:48.727Z" + }, + { + "id":"8bf64481-ae7b-4e51-b48c-000cd90c87d1", + "resourceBookingId":"41671764-0ded-46fd-b7de-2af5d5e4f3fc", + "userHandle":"chandanant", + "projectId":17234, + "startDate":"2020-10-11", + "endDate":"2020-10-17", + "daysWorked":4, + "memberRate":10.82, + "customerRate":30.71, + "paymentStatus":"pending", + "createdBy":"00000000-0000-0000-0000-000000000000", + "updatedBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c", + "createdAt":"2021-05-09T21:25:47.815Z", + "updatedAt":"2021-05-09T21:45:41.810Z" + } + ] + } \ No newline at end of file diff --git a/scripts/demo-payment-scheduler/index.js b/scripts/demo-payment-scheduler/index.js new file mode 100644 index 00000000..8f76a814 --- /dev/null +++ b/scripts/demo-payment-scheduler/index.js @@ -0,0 +1,81 @@ +const { v4: uuid } = require('uuid') +const config = require('config') +const _ = require('lodash') +const data = require('./data.json') +const model = require('../../src/models') +const logger = require('../../src/common/logger') + +const payments = [] +for (let i = 0; i < 1000; i++) { + payments.push({ + id: uuid(), + workPeriodId: data.WorkPeriods[_.random(3)].id, + amount: _.round(_.random(1000, true), 2), + status: 'scheduled', + billingAccountId: data.ResourceBooking.billingAccountId, + createdBy: '57646ff9-1cd3-4d3c-88ba-eb09a395366c', + updatedBy: null, + createdAt: `2021-05-19T21:3${i % 10}:46.507Z`, + updatedAt: '2021-05-19T21:33:46.507Z' + }) +} + +/** + * Clear old demo data + */ +async function clearData () { + const workPeriodIds = _.join(_.map(data.WorkPeriods, w => `'${w.id}'`), ',') + await model.PaymentScheduler.destroy({ + where: { + workPeriodPaymentId: { + [model.Sequelize.Op.in]: [ + model.sequelize.literal(`select id from ${config.DB_SCHEMA_NAME}.work_period_payments where work_period_id in (${workPeriodIds})`) + ] + } + }, + force: true + }) + await model.WorkPeriodPayment.destroy({ + where: { + workPeriodId: _.map(data.WorkPeriods, 'id') + }, + force: true + }) + await model.WorkPeriod.destroy({ + where: { + id: _.map(data.WorkPeriods, 'id') + }, + force: true + }) + await model.ResourceBooking.destroy({ + where: { + id: data.ResourceBooking.id + }, + force: true + }) + await model.Job.destroy({ + where: { + id: data.Job.id + }, + force: true + }) +} + +/** + * Insert payment scheduler demo data + */ +async function insertPaymentSchedulerDemoData () { + logger.info({ component: 'payment-scheduler-demo-data', context: 'insertPaymentSchedulerDemoData', message: 'Starting to remove demo data if exists' }) + await clearData() + logger.info({ component: 'payment-scheduler-demo-data', context: 'insertPaymentSchedulerDemoData', message: 'Data cleared' }) + await model.Job.create(data.Job) + logger.info({ component: 'payment-scheduler-demo-data', context: 'insertPaymentSchedulerDemoData', message: `Job ${data.Job.id} created` }) + await model.ResourceBooking.create(data.ResourceBooking) + logger.info({ component: 'payment-scheduler-demo-data', context: 'insertPaymentSchedulerDemoData', message: `ResourceBooking: ${data.ResourceBooking.id} create` }) + await model.WorkPeriod.bulkCreate(data.WorkPeriods) + logger.info({ component: 'payment-scheduler-demo-data', context: 'insertPaymentSchedulerDemoData', message: `WorkPeriods: ${_.map(data.WorkPeriods, 'id')} created` }) + await model.WorkPeriodPayment.bulkCreate(payments) + logger.info({ component: 'payment-scheduler-demo-data', context: 'insertPaymentSchedulerDemoData', message: `${payments.length} of WorkPeriodPayments scheduled` }) +} + +insertPaymentSchedulerDemoData() diff --git a/src/bootstrap.js b/src/bootstrap.js index 259c4a2c..2af51ecd 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -1,8 +1,9 @@ const fs = require('fs') const Joi = require('joi') +const config = require('config') const path = require('path') const _ = require('lodash') -const { Interviews, WorkPeriodPaymentStatus } = require('../app-constants') +const { Interviews, WorkPeriodPaymentStatus, PaymentProcessingSwitch } = require('../app-constants') const logger = require('./common/logger') const allowedInterviewStatuses = _.values(Interviews.Status) @@ -44,3 +45,14 @@ function buildServices (dir) { } buildServices(path.join(__dirname, 'services')) + +// validate some configurable parameters for the app +const paymentProcessingSwitchSchema = Joi.string().label('PAYMENT_PROCESSING_SWITCH').valid( + ...Object.values(PaymentProcessingSwitch) +) +try { + Joi.attempt(config.PAYMENT_PROCESSING.SWITCH, paymentProcessingSwitchSchema) +} catch (err) { + console.error(err.message) + process.exit(1) +} diff --git a/src/common/helper.js b/src/common/helper.js index 60f398c2..7dbab23d 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -202,6 +202,16 @@ esIndexPropertyMapping[config.get('esConfig.ES_INDEX_RESOURCE_BOOKING')] = { challengeId: { type: 'keyword' }, amount: { type: 'float' }, status: { type: 'keyword' }, + statusDetails: { + type: 'nested', + properties: { + errorMessage: { type: 'text' }, + errorCode: { type: 'integer' }, + retry: { type: 'integer' }, + step: { type: 'keyword' }, + challengeId: { type: 'keyword' } + } + }, billingAccountId: { type: 'integer' }, createdAt: { type: 'date' }, createdBy: { type: 'keyword' }, @@ -286,6 +296,16 @@ async function promptUser (promptQuery, cb) { }) } +/** + * Sleep for a given number of milliseconds. + * + * @param {Number} milliseconds the sleep time + * @returns {undefined} + */ +async function sleep (milliseconds) { + return new Promise((resolve) => setTimeout(resolve, milliseconds)) +} + /** * Create index in elasticsearch * @param {Object} index the index name @@ -1618,6 +1638,26 @@ async function createChallenge (data, token) { return challenge } +/** + * Get a challenge + * + * @param {Object} data challenge data + * @returns {Object} the challenge + */ +async function getChallenge (challengeId) { + const token = await getM2MToken() + const url = `${config.TC_API}/challenges/${challengeId}` + localLogger.debug({ context: 'getChallenge', message: `EndPoint: GET ${url}` }) + const { body: challenge, status: httpStatus } = await request + .get(url) + .set('Authorization', `Bearer ${token}`) + .set('Content-Type', 'application/json') + .set('Accept', 'application/json') + localLogger.debug({ context: 'getChallenge', message: `Status Code: ${httpStatus}` }) + localLogger.debug({ context: 'getChallenge', message: `Response Body: ${JSON.stringify(challenge)}` }) + return challenge +} + /** * Update a challenge * @@ -1693,6 +1733,35 @@ async function createChallengeResource (data, token) { return resource } +/** + * + * @param {String} challengeId the challenge id + * @param {String} memberHandle the member handle + * @param {String} roleId the role id + * @returns {Object} the resource + */ +async function getChallengeResource (challengeId, memberHandle, roleId) { + const token = await getM2MToken() + const url = `${config.TC_API}/resources?challengeId=${challengeId}&memberHandle=${memberHandle}&roleId=${roleId}` + localLogger.debug({ context: 'createChallengeResource', message: `EndPoint: POST ${url}` }) + try { + const { body: resource, status: httpStatus } = await request + .get(url) + .set('Authorization', `Bearer ${token}`) + .set('Content-Type', 'application/json') + .set('Accept', 'application/json') + localLogger.debug({ context: 'getChallengeResource', message: `Status Code: ${httpStatus}` }) + localLogger.debug({ context: 'getChallengeResource', message: `Response Body: ${JSON.stringify(resource)}` }) + return resource[0] + } catch (err) { + if (err.status === 404) { + localLogger.debug({ context: 'getChallengeResource', message: `Status Code: ${err.status}` }) + } else { + throw err + } + } +} + /** * Populates workPeriods from start and end date of resource booking * @param {Date} start start date of the resource booking @@ -1836,6 +1905,7 @@ async function getMemberGroups (userId) { module.exports = { getParamFromCliArgs, promptUser, + sleep, createIndex, deleteIndex, indexBulkDataToES, @@ -1882,8 +1952,10 @@ module.exports = { deleteProjectMember, getUserAttributeValue, createChallenge, + getChallenge, updateChallenge, createChallengeResource, + getChallengeResource, extractWorkPeriods, getUserByHandle, substituteStringByObject, diff --git a/src/models/PaymentScheduler.js b/src/models/PaymentScheduler.js new file mode 100644 index 00000000..f8f64dfa --- /dev/null +++ b/src/models/PaymentScheduler.js @@ -0,0 +1,109 @@ +const { Sequelize, Model } = require('sequelize') +const config = require('config') +const _ = require('lodash') +const errors = require('../common/errors') +const { PaymentSchedulerStatus } = require('../../app-constants') + +module.exports = (sequelize) => { + class PaymentScheduler extends Model { + /** + * Create association between models + * @param {Object} models the database models + */ + static associate (models) { + PaymentScheduler.belongsTo(models.WorkPeriodPayment, { foreignKey: 'workPeriodPaymentId' }) + } + + /** + * Get payment scheduler by id + * @param {String} id the payment scheduler id + * @returns {PaymentScheduler} the payment scheduler instance + */ + static async findById (id) { + const paymentScheduler = await PaymentScheduler.findOne({ + where: { + id + } + }) + if (!paymentScheduler) { + throw new errors.NotFoundError(`id: ${id} "paymentScheduler" doesn't exists`) + } + return paymentScheduler + } + } + PaymentScheduler.init( + { + id: { + type: Sequelize.UUID, + primaryKey: true, + allowNull: false, + defaultValue: Sequelize.UUIDV4 + }, + challengeId: { + field: 'challenge_id', + type: Sequelize.UUID, + allowNull: false + }, + workPeriodPaymentId: { + field: 'work_period_payment_id', + type: Sequelize.UUID, + allowNull: false + }, + step: { + type: Sequelize.ENUM(_.values(PaymentSchedulerStatus)), + allowNull: false + }, + status: { + type: Sequelize.ENUM( + 'in-progress', + 'completed', + 'failed' + ), + allowNull: false + }, + userId: { + field: 'user_id', + type: Sequelize.BIGINT + }, + userHandle: { + field: 'user_handle', + type: Sequelize.STRING, + allowNull: false + }, + createdAt: { + field: 'created_at', + type: Sequelize.DATE + }, + updatedAt: { + field: 'updated_at', + type: Sequelize.DATE + }, + deletedAt: { + field: 'deleted_at', + type: Sequelize.DATE + } + }, + { + schema: config.DB_SCHEMA_NAME, + sequelize, + tableName: 'payment_schedulers', + paranoid: true, + deletedAt: 'deletedAt', + createdAt: 'createdAt', + updatedAt: 'updatedAt', + timestamps: true, + defaultScope: { + attributes: { + exclude: ['deletedAt'] + } + }, + hooks: { + afterCreate: (paymentScheduler) => { + delete paymentScheduler.dataValues.deletedAt + } + } + } + ) + + return PaymentScheduler +} diff --git a/src/models/WorkPeriodPayment.js b/src/models/WorkPeriodPayment.js index 7db484ea..8d23487a 100644 --- a/src/models/WorkPeriodPayment.js +++ b/src/models/WorkPeriodPayment.js @@ -55,6 +55,10 @@ module.exports = (sequelize) => { type: Sequelize.ENUM(_.values(WorkPeriodPaymentStatus)), allowNull: false }, + statusDetails: { + field: 'status_details', + type: Sequelize.JSONB + }, billingAccountId: { field: 'billing_account_id', type: Sequelize.BIGINT diff --git a/src/services/PaymentSchedulerService.js b/src/services/PaymentSchedulerService.js new file mode 100644 index 00000000..20ebf249 --- /dev/null +++ b/src/services/PaymentSchedulerService.js @@ -0,0 +1,337 @@ +const _ = require('lodash') +const config = require('config') +const moment = require('moment') +const models = require('../models') +const { getMemberDetailsByHandle, getChallenge, getChallengeResource, sleep, postEvent } = require('../common/helper') +const logger = require('../common/logger') +const { createChallenge, addResourceToChallenge, activateChallenge, closeChallenge } = require('./PaymentService') +const { ChallengeStatus, PaymentSchedulerStatus, PaymentProcessingSwitch } = require('../../app-constants') + +const WorkPeriodPayment = models.WorkPeriodPayment +const WorkPeriod = models.WorkPeriod +const PaymentScheduler = models.PaymentScheduler +const { + SWITCH, BATCH_SIZE, IN_PROGRESS_EXPIRED, MAX_RETRY_COUNT, RETRY_BASE_DELAY, RETRY_MAX_DELAY, PER_REQUEST_MAX_TIME, PER_PAYMENT_MAX_TIME, + PER_MINUTE_PAYMENT_MAX_COUNT, PER_MINUTE_CHALLENGE_REQUEST_MAX_COUNT, PER_MINUTE_RESOURCE_REQUEST_MAX_COUNT, + FIX_DELAY_STEP_CREATE_CHALLENGE, FIX_DELAY_STEP_ASSIGN_MEMBER, FIX_DELAY_STEP_ACTIVATE_CHALLENGE +} = config.PAYMENT_PROCESSING +const processStatus = { + perMin: { + minute: '0:0', + paymentsProcessed: 0, + challengeRequested: 0, + resourceRequested: 0 + }, + perMinThreshold: { + paymentsProcessed: PER_MINUTE_PAYMENT_MAX_COUNT, + challengeRequested: PER_MINUTE_CHALLENGE_REQUEST_MAX_COUNT, + resourceRequested: PER_MINUTE_RESOURCE_REQUEST_MAX_COUNT + }, + paymentStartTime: 0, + requestStartTime: 0 +} +const processResult = { + SUCCESS: 'success', + FAIL: 'fail', + SKIP: 'skip' +} + +const localLogger = { + debug: (message, context) => logger.debug({ component: 'PaymentSchedulerService', context, message }), + error: (message, context) => logger.error({ component: 'PaymentSchedulerService', context, message }), + info: (message, context) => logger.info({ component: 'PaymentSchedulerService', context, message }) +} + +/** + * Scheduler process entrance + */ +async function processScheduler () { + // Get the oldest Work Periods Payment records in status "scheduled" and "in-progress", + // the in progress state may be caused by an abnormal shutdown, + // or it may be a normal record that is still being processed + const workPeriodPaymentList = await WorkPeriodPayment.findAll({ where: { status: ['in-progress', 'scheduled'] }, order: [['status', 'desc'], ['createdAt']], limit: BATCH_SIZE }) + localLogger.info(`start processing ${workPeriodPaymentList.length} of payments`, 'processScheduler') + const failIds = [] + const skipIds = [] + for (const workPeriodPayment of workPeriodPaymentList) { + const result = await processPayment(workPeriodPayment) + if (result === processResult.FAIL) { + failIds.push(workPeriodPayment.id) + } else if (result === processResult.SKIP) { + skipIds.push(workPeriodPayment.id) + } + } + localLogger.info(`process end. ${workPeriodPaymentList.length - failIds.length - skipIds.length} of payments processed successfully`, 'processScheduler') + if (!_.isEmpty(skipIds)) { + localLogger.info(`payments: ${_.join(skipIds, ',')} are processing by other processor`, 'processScheduler') + } + if (!_.isEmpty(failIds)) { + localLogger.error(`payments: ${_.join(failIds, ',')} are processed failed`, 'processScheduler') + } +} + +/** + * Process a record of payment + * @param {Object} workPeriodPayment the work period payment + * @returns {String} process result + */ +async function processPayment (workPeriodPayment) { + processStatus.paymentStartTime = Date.now() + let paymentScheduler + if (workPeriodPayment.status === 'in-progress') { + paymentScheduler = await PaymentScheduler.findOne({ where: { workPeriodPaymentId: workPeriodPayment.id, status: 'in-progress' } }) + + // If the in-progress record has not expired, it is considered to be being processed by other processes + if (paymentScheduler && moment(paymentScheduler.updatedAt).add(moment.duration(IN_PROGRESS_EXPIRED)).isAfter(moment())) { + localLogger.info(`workPeriodPayment: ${workPeriodPayment.id} is being processed by other processor`, 'processPayment') + return processResult.SKIP + } + } else { + const oldValue = workPeriodPayment.toJSON() + const updated = await workPeriodPayment.update({ status: 'in-progress' }) + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue }) + } + // Check whether the number of processed records per minute exceeds the specified number, if it exceeds, wait for the next minute before processing + await checkWait(PaymentSchedulerStatus.START_PROCESS) + localLogger.info(`Processing workPeriodPayment ${workPeriodPayment.id}`, 'processPayment') + + const workPeriod = await WorkPeriod.findById(workPeriodPayment.workPeriodId) + try { + if (!paymentScheduler) { + // 1. create challenge + const challengeId = await withRetry(createChallenge, [getCreateChallengeParam(workPeriod, workPeriodPayment)], validateError, PaymentSchedulerStatus.CREATE_CHALLENGE) + paymentScheduler = await PaymentScheduler.create({ challengeId, step: PaymentSchedulerStatus.CREATE_CHALLENGE, workPeriodPaymentId: workPeriodPayment.id, userHandle: workPeriod.userHandle, status: 'in-progress' }) + } else { + // If the paymentScheduler already exists, it means that this is a record caused by an abnormal shutdown + await setPaymentSchedulerStep(paymentScheduler) + } + // Start from unprocessed step, perform the process step by step + while (paymentScheduler.step !== PaymentSchedulerStatus.CLOSE_CHALLENGE) { + await processStep(paymentScheduler) + } + + const oldValue = workPeriodPayment.toJSON() + // 5. update wp and save it should only update already existent Work Period Payment record with created "challengeId" and "status=completed". + const updated = await workPeriodPayment.update({ challengeId: paymentScheduler.challengeId, status: 'completed' }) + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue }) + + await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'completed' }) + + localLogger.info(`Processed workPeriodPayment ${workPeriodPayment.id} successfully`, 'processPayment') + return processResult.SUCCESS + } catch (err) { + logger.logFullError(err, { component: 'PaymentSchedulerService', context: 'processPayment' }) + const statusDetails = { errorMessage: err.message, errorCode: _.get(err, 'status', -1), retry: _.get(err, 'retry', -1), step: _.get(err, 'step'), challengeId: paymentScheduler ? paymentScheduler.challengeId : null } + const oldValue = workPeriodPayment.toJSON() + // If payment processing failed Work Periods Payment "status" should be changed to "failed" and populate "statusDetails" field with error details in JSON format. + const updated = await workPeriodPayment.update({ statusDetails, status: 'failed' }) + // Update the modified status to es + await postEvent(config.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC, updated.toJSON(), { oldValue }) + + if (paymentScheduler) { + await paymentScheduler.update({ step: PaymentSchedulerStatus.CLOSE_CHALLENGE, userId: paymentScheduler.userId, status: 'failed' }) + } + localLogger.error(`Processed workPeriodPayment ${workPeriodPayment.id} failed`, 'processPayment') + return processResult.FAIL + } +} + +/** + * Perform a specific step in the process + * @param {Object} paymentScheduler the payment scheduler + */ +async function processStep (paymentScheduler) { + if (paymentScheduler.step === PaymentSchedulerStatus.CREATE_CHALLENGE) { + // 2. assign member to the challenge + await withRetry(addResourceToChallenge, [paymentScheduler.challengeId, paymentScheduler.userHandle], validateError, PaymentSchedulerStatus.ASSIGN_MEMBER) + paymentScheduler.step = PaymentSchedulerStatus.ASSIGN_MEMBER + } else if (paymentScheduler.step === PaymentSchedulerStatus.ASSIGN_MEMBER) { + // 3. active the challenge + await withRetry(activateChallenge, [paymentScheduler.challengeId], validateError, PaymentSchedulerStatus.ACTIVATE_CHALLENGE) + paymentScheduler.step = PaymentSchedulerStatus.ACTIVATE_CHALLENGE + } else if (paymentScheduler.step === PaymentSchedulerStatus.ACTIVATE_CHALLENGE) { + // 4.1. get user id + const { userId } = await withRetry(getMemberDetailsByHandle, [paymentScheduler.userHandle], validateError, PaymentSchedulerStatus.GET_USER_ID) + paymentScheduler.userId = userId + paymentScheduler.step = PaymentSchedulerStatus.GET_USER_ID + } else if (paymentScheduler.step === PaymentSchedulerStatus.GET_USER_ID) { + // 4.2. close the challenge + await withRetry(closeChallenge, [paymentScheduler.challengeId, paymentScheduler.userId, paymentScheduler.userHandle], validateError, PaymentSchedulerStatus.CLOSE_CHALLENGE) + paymentScheduler.step = PaymentSchedulerStatus.CLOSE_CHALLENGE + } +} + +/** + * Set the scheduler actual step + * @param {Object} paymentScheduler the scheduler object + */ +async function setPaymentSchedulerStep (paymentScheduler) { + const challenge = await getChallenge(paymentScheduler.challengeId) + if (SWITCH === PaymentProcessingSwitch.OFF) { + paymentScheduler.step = PaymentSchedulerStatus.CLOSE_CHALLENGE + } else if (challenge.status === ChallengeStatus.COMPLETED) { + paymentScheduler.step = PaymentSchedulerStatus.CLOSE_CHALLENGE + } else if (challenge.status === ChallengeStatus.ACTIVE) { + paymentScheduler.step = PaymentSchedulerStatus.ACTIVATE_CHALLENGE + } else { + const resource = await getChallengeResource(paymentScheduler.challengeId, paymentScheduler.userHandle, config.ROLE_ID_SUBMITTER) + if (resource) { + paymentScheduler.step = PaymentSchedulerStatus.ASSIGN_MEMBER + } else { + paymentScheduler.step = PaymentSchedulerStatus.CREATE_CHALLENGE + } + } + // The main purpose is updating the updatedAt of payment scheduler to avoid simultaneous processing + await paymentScheduler.update({ step: paymentScheduler.step }) +} + +/** + * Generate the create challenge parameter + * @param {Object} workPeriod the work period + * @param {Object} workPeriodPayment the work period payment + * @returns {Object} the create challenge parameter + */ +function getCreateChallengeParam (workPeriod, workPeriodPayment) { + return { + projectId: workPeriod.projectId, + userHandle: workPeriod.userHandle, + amount: workPeriodPayment.amount, + name: `TaaS Payment - ${workPeriod.userHandle} - Week Ending ${moment(workPeriod.endDate).format('D/M/YYYY')}`, + description: `TaaS Payment - ${workPeriod.userHandle} - Week Ending ${moment(workPeriod.endDate).format('D/M/YYYY')}`, + billingAccountId: workPeriodPayment.billingAccountId + } +} + +/** + * Before each step is processed, wait for the corresponding time + * @param {String} step the step name + * @param {Number} tryCount the try count + */ +async function checkWait (step, tryCount) { + // When calculating the retry time later, we need to subtract the time that has been waited before + let lapse = 0 + if (step === PaymentSchedulerStatus.START_PROCESS) { + lapse += await checkPerMinThreshold('paymentsProcessed') + } else if (step === PaymentSchedulerStatus.CREATE_CHALLENGE) { + await checkPerMinThreshold('challengeRequested') + } else if (step === PaymentSchedulerStatus.ASSIGN_MEMBER) { + // Only when tryCount = 0, it comes from the previous step, and it is necessary to wait for a fixed time + if (FIX_DELAY_STEP_CREATE_CHALLENGE > 0 && tryCount === 0) { + await sleep(FIX_DELAY_STEP_CREATE_CHALLENGE) + } + lapse += await checkPerMinThreshold('resourceRequested') + } else if (step === PaymentSchedulerStatus.ACTIVATE_CHALLENGE) { + // Only when tryCount = 0, it comes from the previous step, and it is necessary to wait for a fixed time + if (FIX_DELAY_STEP_ASSIGN_MEMBER > 0 && tryCount === 0) { + await sleep(FIX_DELAY_STEP_ASSIGN_MEMBER) + } + lapse += await checkPerMinThreshold('challengeRequested') + } else if (step === PaymentSchedulerStatus.CLOSE_CHALLENGE) { + // Only when tryCount = 0, it comes from the previous step, and it is necessary to wait for a fixed time + if (FIX_DELAY_STEP_ACTIVATE_CHALLENGE > 0 && tryCount === 0) { + await sleep(FIX_DELAY_STEP_ACTIVATE_CHALLENGE) + } + lapse += await checkPerMinThreshold('challengeRequested') + } + + if (tryCount > 0) { + // exponential backoff and do not exceed the maximum retry delay + const retryDelay = Math.min(RETRY_BASE_DELAY * Math.pow(2, tryCount), RETRY_MAX_DELAY) + await sleep(retryDelay - lapse) + } +} + +/** + * Determine whether the number of records processed every minute exceeds the specified number, if it exceeds, wait for the next minute + * @param {String} key the min threshold key + * @returns {Number} wait time + */ +async function checkPerMinThreshold (key) { + const mt = moment() + const min = mt.format('h:m') + let waitMs = 0 + if (processStatus.perMin.minute === min) { + if (processStatus.perMin[key] >= processStatus.perMinThreshold[key]) { + waitMs = (60 - mt.seconds()) * 1000 + localLogger.info(`The number of records of ${key} processed per minute reaches ${processStatus.perMinThreshold[key]}, and it need to wait for ${60 - mt.seconds()} seconds until the next minute`) + await sleep(waitMs) + processStatus.perMin = { + minute: moment().format('h:m'), + paymentsProcessed: 0, + challengeRequested: 0, + resourceRequested: 0 + } + } + } else { + processStatus.perMin = { + minute: min, + paymentsProcessed: 0, + challengeRequested: 0, + resourceRequested: 0 + } + } + processStatus.perMin[key]++ + return waitMs +} + +/** + * Determine whether it can try again + * @param {Object} err the process error + * @returns {Boolean} + */ +function validateError (err) { + return !err.status || err.status >= 500 +} + +/** + * Execute the function, if an exception occurs, retry according to the conditions + * @param {Function} func the main function + * @param {Array} argArr the args of main function + * @param {Function} predictFunc the determine error function + * @param {String} step the step name + * @returns the result of main function + */ +async function withRetry (func, argArr, predictFunc, step) { + let tryCount = 0 + processStatus.requestStartTime = Date.now() + while (true) { + await checkWait(step, tryCount) + tryCount++ + try { + // mock code + if (SWITCH === PaymentProcessingSwitch.OFF) { + // without actual API calls by adding delay (for example 1 second for each step), to simulate the act + sleep(1000) + if (step === PaymentSchedulerStatus.CREATE_CHALLENGE) { + return '00000000-0000-0000-0000-000000000000' + } else if (step === PaymentSchedulerStatus.GET_USER_ID) { + return { userId: 100001 } + } + return + } else { + // Execute the main function + const result = await func(...argArr) + return result + } + } catch (err) { + const now = Date.now() + // The following is the case of not retrying: + // 1. The number of retries exceeds the configured number + // 2. The thrown error does not match the retry conditions + // 3. The request execution time exceeds the configured time + // 4. The processing time of the payment record exceeds the configured time + if (tryCount > MAX_RETRY_COUNT || !predictFunc(err) || now - processStatus.requestStartTime > PER_REQUEST_MAX_TIME || now - processStatus.paymentStartTime > PER_PAYMENT_MAX_TIME) { + err.retry = tryCount + err.step = step + throw err + } + localLogger.info(`execute ${step} with error: ${err.message}, retry...`, 'withRetry') + } + } +} + +module.exports = { + processScheduler +} diff --git a/src/services/PaymentService.js b/src/services/PaymentService.js index e61a6c34..93d04e41 100644 --- a/src/services/PaymentService.js +++ b/src/services/PaymentService.js @@ -39,7 +39,8 @@ async function createPayment (options) { const challengeId = await createChallenge(options, token) await addResourceToChallenge(challengeId, options.userHandle, token) await activateChallenge(challengeId, token) - const completedChallenge = await closeChallenge(challengeId, options.userHandle, token) + const { userId } = await helper.getV3MemberDetailsByHandle(options.userHandle) + const completedChallenge = await closeChallenge(challengeId, userId, options.userHandle, token) return completedChallenge } @@ -117,6 +118,13 @@ async function addResourceToChallenge (id, handle, token) { await helper.createChallengeResource(body, token) localLogger.info({ context: 'addResourceToChallenge', message: `${handle} added to challenge ${id}` }) } catch (err) { + if (err.status === 409) { + const resource = await helper.getChallengeResource(id, handle, config.ROLE_ID_SUBMITTER) + if (resource) { + localLogger.info({ context: 'addResourceToChallenge', message: `${handle} exists in challenge ${id}` }) + return + } + } localLogger.error({ context: 'addResourceToChallenge', message: `Status Code: ${err.status}` }) localLogger.error({ context: 'addResourceToChallenge', message: err.response.text }) throw err @@ -137,6 +145,13 @@ async function activateChallenge (id, token) { await helper.updateChallenge(id, body, token) localLogger.info({ context: 'activateChallenge', message: `Challenge ${id} is activated successfully.` }) } catch (err) { + if (err.status >= 500) { + const challenge = await helper.getChallenge(id) + if (_.includes([constants.ChallengeStatus.ACTIVE, constants.ChallengeStatus.COMPLETED], challenge.status)) { + localLogger.info({ context: 'activateChallenge', message: `the status of Challenge ${id} had been ${challenge.status}.` }) + return + } + } localLogger.error({ context: 'activateChallenge', message: `Status Code: ${err.status}` }) localLogger.error({ context: 'activateChallenge', message: err.response.text }) throw err @@ -146,14 +161,14 @@ async function activateChallenge (id, token) { /** * closes the topcoder challenge * @param {String} id the challenge id + * @param {String} userId the user id * @param {String} userHandle the user handle * @param {String} token m2m token * @returns {Object} the closed challenge */ -async function closeChallenge (id, userHandle, token) { +async function closeChallenge (id, userId, userHandle, token) { localLogger.info({ context: 'closeChallenge', message: `Closing challenge ${id}` }) try { - const { userId } = await helper.getMemberDetailsByHandle(userHandle) const body = { status: constants.ChallengeStatus.COMPLETED, winners: [{ @@ -166,6 +181,13 @@ async function closeChallenge (id, userHandle, token) { localLogger.info({ context: 'closeChallenge', message: `Challenge ${id} is closed successfully.` }) return response } catch (err) { + if (err.status >= 500) { + const challenge = await helper.getChallenge(id) + if (constants.ChallengeStatus.COMPLETED === challenge.status) { + localLogger.info({ context: 'activateChallenge', message: `the status of Challenge ${id} had been ${challenge.status}.` }) + return challenge + } + } localLogger.error({ context: 'closeChallenge', message: `Status Code: ${err.status}` }) localLogger.error({ context: 'closeChallenge', message: err.response.text }) throw err @@ -173,5 +195,9 @@ async function closeChallenge (id, userHandle, token) { } module.exports = { - createPayment + createPayment, + createChallenge, + addResourceToChallenge, + activateChallenge, + closeChallenge }