diff --git a/README.md b/README.md index d8409ab..045684f 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ The following parameters can be set in config files or in env variables: - `topics.TAAS_WORK_PERIOD_CREATE_TOPIC`: the create work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_UPDATE_TOPIC`: the update work period entity Kafka message topic - `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic +- `topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC`: the create work period payment entity Kafka message topic +- `topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC`: the update work period payment entity Kafka message topic - `esConfig.HOST`: Elasticsearch host - `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service - `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this diff --git a/VERIFICATION.md b/VERIFICATION.md index c6930b9..5410bcf 100644 --- a/VERIFICATION.md +++ b/VERIFICATION.md @@ -2,7 +2,7 @@ ## Create documents in ES -- Run the following commands to create `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod` documents in ES. +- Run the following commands to create `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES. ``` bash # for Job @@ -13,12 +13,14 @@ docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.create < test/messages/taas.resourcebooking.create.event.json # for WorkPeriod docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiod.create < test/messages/taas.workperiod.create.event.json + # for WorkPeriodPayment + docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiodpayment.create < test/messages/taas.workperiodpayment.create.event.json ``` - Run `npm run view-data ` to see if documents were created. ## Update documents in ES -- Run the following commands to update `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod` documents in ES. +- Run the following commands to update `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES. ``` bash # for Job @@ -29,6 +31,8 @@ docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.update < test/messages/taas.resourcebooking.update.event.json # for WorkPeriod docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiod.update < test/messages/taas.workperiod.update.event.json + # for WorkPeriodPayment + docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiodpayment.update < test/messages/taas.workperiodpayment.update.event.json ``` - Run `npm run view-data ` to see if documents were updated. diff --git a/config/default.js b/config/default.js index 446ab4e..7a8dbf8 100644 --- a/config/default.js +++ b/config/default.js @@ -31,7 +31,10 @@ module.exports = { // topics for work period service TAAS_WORK_PERIOD_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_CREATE_TOPIC || 'taas.workperiod.create', TAAS_WORK_PERIOD_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_UPDATE_TOPIC || 'taas.workperiod.update', - TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete' + TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete', + // topics for work period payment service + TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC || 'taas.workperiodpayment.create', + TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update' }, esConfig: { diff --git a/local/docker-compose.yml b/local/docker-compose.yml index 5d2d803..35e9486 100644 --- a/local/docker-compose.yml +++ b/local/docker-compose.yml @@ -12,7 +12,7 @@ services: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost - KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1" + KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 esearch: image: elasticsearch:7.7.1 diff --git a/src/app.js b/src/app.js index 1d3ae47..c38c03d 100644 --- a/src/app.js +++ b/src/app.js @@ -13,6 +13,7 @@ const JobProcessorService = require('./services/JobProcessorService') const JobCandidateProcessorService = require('./services/JobCandidateProcessorService') const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService') const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService') +const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService') const Mutex = require('async-mutex').Mutex const events = require('events') @@ -43,7 +44,10 @@ const topicServiceMapping = { // work period [config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate, [config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate, - [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete + [config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete, + // work period payment + [config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate, + [config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate } // Start kafka consumer diff --git a/src/bootstrap.js b/src/bootstrap.js index 0f58277..6c9c001 100644 --- a/src/bootstrap.js +++ b/src/bootstrap.js @@ -10,6 +10,7 @@ Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist Joi.workload = () => Joi.string().valid('full-time', 'fractional') Joi.title = () => Joi.string().max(128) Joi.paymentStatus = () => Joi.string().valid('pending', 'partially-completed', 'completed', 'cancelled') +Joi.workPeriodPaymentStatus = () => Joi.string().valid('completed', 'cancelled') // Empty string is not allowed by Joi by default and must be enabled with allow(''). // See https://joi.dev/api/?v=17.3.0#string fro details why it's like this. // In many cases we would like to allow empty string to make it easier to create UI for editing data. diff --git a/src/scripts/createIndex.js b/src/scripts/createIndex.js index 095633e..fad96d3 100644 --- a/src/scripts/createIndex.js +++ b/src/scripts/createIndex.js @@ -91,6 +91,19 @@ async function createIndex () { memberRate: { type: 'float' }, customerRate: { type: 'float' }, paymentStatus: { type: 'keyword' }, + payments: { + type: 'nested', + properties: { + workPeriodId: { type: 'keyword' }, + challengeId: { type: 'keyword' }, + amount: { type: 'float' }, + status: { type: 'keyword' }, + createdAt: { type: 'date' }, + createdBy: { type: 'keyword' }, + updatedAt: { type: 'date' }, + updatedBy: { type: 'keyword' } + } + }, createdAt: { type: 'date' }, createdBy: { type: 'keyword' }, updatedAt: { type: 'date' }, diff --git a/src/services/WorkPeriodPaymentProcessorService.js b/src/services/WorkPeriodPaymentProcessorService.js new file mode 100644 index 0000000..45fd6c5 --- /dev/null +++ b/src/services/WorkPeriodPaymentProcessorService.js @@ -0,0 +1,134 @@ +/** + * WorkPeriodPayment Processor Service + */ + +const Joi = require('@hapi/joi') +const config = require('config') +const _ = require('lodash') +const logger = require('../common/logger') +const helper = require('../common/helper') +const constants = require('../common/constants') + +const esClient = helper.getESClient() + +/** + * Process create entity message + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processCreate (message, transactionId) { + const data = message.payload + const workPeriod = await esClient.get({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + id: data.workPeriodId + }) + const payments = _.isArray(workPeriod.body._source.payments) ? workPeriod.body._source.payments : [] + payments.push(data) + + return esClient.update({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + id: data.workPeriodId, + transactionId, + body: { + doc: _.assign(workPeriod.body._source, { payments }) + }, + refresh: constants.esRefreshOption + }) +} + +processCreate.schema = { + message: Joi.object().keys({ + topic: Joi.string().required(), + originator: Joi.string().required(), + timestamp: Joi.date().required(), + 'mime-type': Joi.string().required(), + payload: Joi.object().keys({ + id: Joi.string().uuid().required(), + workPeriodId: Joi.string().uuid().required(), + challengeId: Joi.string().uuid().required(), + amount: Joi.number().greater(0).allow(null), + status: Joi.workPeriodPaymentStatus().required(), + createdAt: Joi.date().required(), + createdBy: Joi.string().uuid().required(), + updatedAt: Joi.date().allow(null), + updatedBy: Joi.string().uuid().allow(null) + }).required() + }).required(), + transactionId: Joi.string().required() +} + +/** + * Process update entity message + * @param {Object} message the kafka message + * @param {String} transactionId + */ +async function processUpdate (message, transactionId) { + const data = message.payload + let workPeriod = await esClient.search({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + body: { + query: { + nested: { + path: 'payments', + query: { + match: { 'payments.id': data.id } + } + } + } + } + }) + let payments + // if WorkPeriodPayment's workPeriodId changed then it must be deleted from the old WorkPeriod + // and added to the new WorkPeriod + if (workPeriod.body.hits.hits[0]._source.id !== data.workPeriodId) { + payments = _.filter(workPeriod.body.hits.hits[0]._source.payments, (payment) => payment.id !== data.id) + await esClient.update({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + id: workPeriod.body.hits.hits[0]._source.id, + transactionId, + body: { + doc: _.assign(workPeriod.body.hits.hits[0]._source, { payments }) + } + }) + workPeriod = await esClient.get({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + id: data.workPeriodId + }) + payments = _.isArray(workPeriod.body._source.payments) ? workPeriod.body._source.payments : [] + payments.push(data) + return esClient.update({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + id: data.workPeriodId, + transactionId, + body: { + doc: _.assign(workPeriod.body._source, { payments }) + } + }) + } + + payments = _.map(workPeriod.body.hits.hits[0]._source.payments, (payment) => { + if (payment.id === data.id) { + return _.assign(payment, data) + } + return payment + }) + + return esClient.update({ + index: config.get('esConfig.ES_INDEX_WORK_PERIOD'), + id: data.workPeriodId, + transactionId, + body: { + doc: _.assign(workPeriod.body.hits.hits[0]._source, { payments }) + }, + refresh: constants.esRefreshOption + }) +} + +processUpdate.schema = processCreate.schema + +module.exports = { + processCreate, + processUpdate +} + +logger.buildService(module.exports, 'WorkPeriodPaymentProcessorService') diff --git a/test/messages/taas.workperiodpayment.create.event.json b/test/messages/taas.workperiodpayment.create.event.json new file mode 100644 index 0000000..25ab9cc --- /dev/null +++ b/test/messages/taas.workperiodpayment.create.event.json @@ -0,0 +1 @@ +{"topic":"taas.workperiodpayment.create","originator":"taas-api","timestamp":"2021-04-09T20:10:33.770Z","mime-type":"application/json","payload":{"challengeId":"00000000-0000-0000-0000-000000000000","workPeriodId":"140b7407-540d-40c3-ad23-905d932aa9c8","amount":600,"status":"completed","id":"09c80ee6-21be-45a4-9c3c-7ec4c75ece79","createdBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c","updatedAt":"2021-04-09T20:10:33.755Z","createdAt":"2021-04-09T20:10:33.755Z","updatedBy":null}} \ No newline at end of file diff --git a/test/messages/taas.workperiodpayment.update.event.json b/test/messages/taas.workperiodpayment.update.event.json new file mode 100644 index 0000000..66e5bce --- /dev/null +++ b/test/messages/taas.workperiodpayment.update.event.json @@ -0,0 +1 @@ +{"topic":"taas.workperiodpayment.update","originator":"taas-api","timestamp":"2021-04-09T20:12:26.994Z","mime-type":"application/json","payload":{"id":"09c80ee6-21be-45a4-9c3c-7ec4c75ece79","workPeriodId":"140b7407-540d-40c3-ad23-905d932aa9c8","challengeId":"00000000-0000-0000-0000-000000000000","amount":1600,"status":"completed","createdBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c","updatedBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c","createdAt":"2021-04-09T20:10:33.755Z","updatedAt":"2021-04-09T20:12:26.966Z"}} \ No newline at end of file