Skip to content

Commit 4c5d45c

Browse files
authored
Merge pull request #39 from narekcat/feature/work-periods
Add work period payments.
2 parents 830de0e + 9a42597 commit 4c5d45c

File tree

10 files changed

+168
-5
lines changed

10 files changed

+168
-5
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ The following parameters can be set in config files or in env variables:
3333
- `topics.TAAS_WORK_PERIOD_CREATE_TOPIC`: the create work period entity Kafka message topic
3434
- `topics.TAAS_WORK_PERIOD_UPDATE_TOPIC`: the update work period entity Kafka message topic
3535
- `topics.TAAS_WORK_PERIOD_DELETE_TOPIC`: the delete work period entity Kafka message topic
36+
- `topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC`: the create work period payment entity Kafka message topic
37+
- `topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC`: the update work period payment entity Kafka message topic
3638
- `esConfig.HOST`: Elasticsearch host
3739
- `esConfig.AWS_REGION`: The Amazon region to use when using AWS Elasticsearch service
3840
- `esConfig.ELASTICCLOUD.id`: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this

VERIFICATION.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Create documents in ES
44

5-
- Run the following commands to create `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod` documents in ES.
5+
- Run the following commands to create `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES.
66

77
``` bash
88
# for Job
@@ -13,12 +13,14 @@
1313
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.create < test/messages/taas.resourcebooking.create.event.json
1414
# for WorkPeriod
1515
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiod.create < test/messages/taas.workperiod.create.event.json
16+
# for WorkPeriodPayment
17+
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiodpayment.create < test/messages/taas.workperiodpayment.create.event.json
1618
```
1719

1820
- Run `npm run view-data <model-name-here>` to see if documents were created.
1921

2022
## Update documents in ES
21-
- Run the following commands to update `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod` documents in ES.
23+
- Run the following commands to update `Job`, `JobCandidate`, `ResourceBooking`, `WorkPeriod`, `WorkPeriodPayment` documents in ES.
2224

2325
``` bash
2426
# for Job
@@ -29,6 +31,8 @@
2931
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.resourcebooking.update < test/messages/taas.resourcebooking.update.event.json
3032
# for WorkPeriod
3133
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiod.update < test/messages/taas.workperiod.update.event.json
34+
# for WorkPeriodPayment
35+
docker exec -i taas-es-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic taas.workperiodpayment.update < test/messages/taas.workperiodpayment.update.event.json
3236
```
3337

3438
- Run `npm run view-data <model-name-here>` to see if documents were updated.

config/default.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ module.exports = {
3131
// topics for work period service
3232
TAAS_WORK_PERIOD_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_CREATE_TOPIC || 'taas.workperiod.create',
3333
TAAS_WORK_PERIOD_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_UPDATE_TOPIC || 'taas.workperiod.update',
34-
TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete'
34+
TAAS_WORK_PERIOD_DELETE_TOPIC: process.env.TAAS_WORK_PERIOD_DELETE_TOPIC || 'taas.workperiod.delete',
35+
// topics for work period payment service
36+
TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC || 'taas.workperiodpayment.create',
37+
TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC: process.env.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC || 'taas.workperiodpayment.update'
3538
},
3639

3740
esConfig: {

local/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1"
15+
KAFKA_CREATE_TOPICS: "taas.job.create:1:1,taas.jobcandidate.create:1:1,taas.resourcebooking.create:1:1,taas.workperiod.create:1:1,taas.workperiodpayment.create:1:1,taas.job.update:1:1,taas.jobcandidate.update:1:1,taas.resourcebooking.update:1:1,taas.workperiod.update:1:1,taas.workperiodpayment.update:1:1,taas.job.delete:1:1,taas.jobcandidate.delete:1:1,taas.resourcebooking.delete:1:1,taas.workperiod.delete:1:1"
1616
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
1717
esearch:
1818
image: elasticsearch:7.7.1

src/app.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const JobProcessorService = require('./services/JobProcessorService')
1313
const JobCandidateProcessorService = require('./services/JobCandidateProcessorService')
1414
const ResourceBookingProcessorService = require('./services/ResourceBookingProcessorService')
1515
const WorkPeriodProcessorService = require('./services/WorkPeriodProcessorService')
16+
const WorkPeriodPaymentProcessorService = require('./services/WorkPeriodPaymentProcessorService')
1617
const Mutex = require('async-mutex').Mutex
1718
const events = require('events')
1819

@@ -43,7 +44,10 @@ const topicServiceMapping = {
4344
// work period
4445
[config.topics.TAAS_WORK_PERIOD_CREATE_TOPIC]: WorkPeriodProcessorService.processCreate,
4546
[config.topics.TAAS_WORK_PERIOD_UPDATE_TOPIC]: WorkPeriodProcessorService.processUpdate,
46-
[config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete
47+
[config.topics.TAAS_WORK_PERIOD_DELETE_TOPIC]: WorkPeriodProcessorService.processDelete,
48+
// work period payment
49+
[config.topics.TAAS_WORK_PERIOD_PAYMENT_CREATE_TOPIC]: WorkPeriodPaymentProcessorService.processCreate,
50+
[config.topics.TAAS_WORK_PERIOD_PAYMENT_UPDATE_TOPIC]: WorkPeriodPaymentProcessorService.processUpdate
4751
}
4852

4953
// Start kafka consumer

src/bootstrap.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Joi.jobCandidateStatus = () => Joi.string().valid('open', 'selected', 'shortlist
1010
Joi.workload = () => Joi.string().valid('full-time', 'fractional')
1111
Joi.title = () => Joi.string().max(128)
1212
Joi.paymentStatus = () => Joi.string().valid('pending', 'partially-completed', 'completed', 'cancelled')
13+
Joi.workPeriodPaymentStatus = () => Joi.string().valid('completed', 'cancelled')
1314
// Empty string is not allowed by Joi by default and must be enabled with allow('').
1415
// See https://joi.dev/api/?v=17.3.0#string fro details why it's like this.
1516
// In many cases we would like to allow empty string to make it easier to create UI for editing data.

src/scripts/createIndex.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,19 @@ async function createIndex () {
9191
memberRate: { type: 'float' },
9292
customerRate: { type: 'float' },
9393
paymentStatus: { type: 'keyword' },
94+
payments: {
95+
type: 'nested',
96+
properties: {
97+
workPeriodId: { type: 'keyword' },
98+
challengeId: { type: 'keyword' },
99+
amount: { type: 'float' },
100+
status: { type: 'keyword' },
101+
createdAt: { type: 'date' },
102+
createdBy: { type: 'keyword' },
103+
updatedAt: { type: 'date' },
104+
updatedBy: { type: 'keyword' }
105+
}
106+
},
94107
createdAt: { type: 'date' },
95108
createdBy: { type: 'keyword' },
96109
updatedAt: { type: 'date' },
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* WorkPeriodPayment Processor Service
3+
*/
4+
5+
const Joi = require('@hapi/joi')
6+
const config = require('config')
7+
const _ = require('lodash')
8+
const logger = require('../common/logger')
9+
const helper = require('../common/helper')
10+
const constants = require('../common/constants')
11+
12+
const esClient = helper.getESClient()
13+
14+
/**
15+
* Process create entity message
16+
* @param {Object} message the kafka message
17+
* @param {String} transactionId
18+
*/
19+
async function processCreate (message, transactionId) {
20+
const data = message.payload
21+
const workPeriod = await esClient.get({
22+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
23+
id: data.workPeriodId
24+
})
25+
const payments = _.isArray(workPeriod.body._source.payments) ? workPeriod.body._source.payments : []
26+
payments.push(data)
27+
28+
return esClient.update({
29+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
30+
id: data.workPeriodId,
31+
transactionId,
32+
body: {
33+
doc: _.assign(workPeriod.body._source, { payments })
34+
},
35+
refresh: constants.esRefreshOption
36+
})
37+
}
38+
39+
processCreate.schema = {
40+
message: Joi.object().keys({
41+
topic: Joi.string().required(),
42+
originator: Joi.string().required(),
43+
timestamp: Joi.date().required(),
44+
'mime-type': Joi.string().required(),
45+
payload: Joi.object().keys({
46+
id: Joi.string().uuid().required(),
47+
workPeriodId: Joi.string().uuid().required(),
48+
challengeId: Joi.string().uuid().required(),
49+
amount: Joi.number().greater(0).allow(null),
50+
status: Joi.workPeriodPaymentStatus().required(),
51+
createdAt: Joi.date().required(),
52+
createdBy: Joi.string().uuid().required(),
53+
updatedAt: Joi.date().allow(null),
54+
updatedBy: Joi.string().uuid().allow(null)
55+
}).required()
56+
}).required(),
57+
transactionId: Joi.string().required()
58+
}
59+
60+
/**
61+
* Process update entity message
62+
* @param {Object} message the kafka message
63+
* @param {String} transactionId
64+
*/
65+
async function processUpdate (message, transactionId) {
66+
const data = message.payload
67+
let workPeriod = await esClient.search({
68+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
69+
body: {
70+
query: {
71+
nested: {
72+
path: 'payments',
73+
query: {
74+
match: { 'payments.id': data.id }
75+
}
76+
}
77+
}
78+
}
79+
})
80+
let payments
81+
// if WorkPeriodPayment's workPeriodId changed then it must be deleted from the old WorkPeriod
82+
// and added to the new WorkPeriod
83+
if (workPeriod.body.hits.hits[0]._source.id !== data.workPeriodId) {
84+
payments = _.filter(workPeriod.body.hits.hits[0]._source.payments, (payment) => payment.id !== data.id)
85+
await esClient.update({
86+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
87+
id: workPeriod.body.hits.hits[0]._source.id,
88+
transactionId,
89+
body: {
90+
doc: _.assign(workPeriod.body.hits.hits[0]._source, { payments })
91+
}
92+
})
93+
workPeriod = await esClient.get({
94+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
95+
id: data.workPeriodId
96+
})
97+
payments = _.isArray(workPeriod.body._source.payments) ? workPeriod.body._source.payments : []
98+
payments.push(data)
99+
return esClient.update({
100+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
101+
id: data.workPeriodId,
102+
transactionId,
103+
body: {
104+
doc: _.assign(workPeriod.body._source, { payments })
105+
}
106+
})
107+
}
108+
109+
payments = _.map(workPeriod.body.hits.hits[0]._source.payments, (payment) => {
110+
if (payment.id === data.id) {
111+
return _.assign(payment, data)
112+
}
113+
return payment
114+
})
115+
116+
return esClient.update({
117+
index: config.get('esConfig.ES_INDEX_WORK_PERIOD'),
118+
id: data.workPeriodId,
119+
transactionId,
120+
body: {
121+
doc: _.assign(workPeriod.body.hits.hits[0]._source, { payments })
122+
},
123+
refresh: constants.esRefreshOption
124+
})
125+
}
126+
127+
processUpdate.schema = processCreate.schema
128+
129+
module.exports = {
130+
processCreate,
131+
processUpdate
132+
}
133+
134+
logger.buildService(module.exports, 'WorkPeriodPaymentProcessorService')
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"topic":"taas.workperiodpayment.create","originator":"taas-api","timestamp":"2021-04-09T20:10:33.770Z","mime-type":"application/json","payload":{"challengeId":"00000000-0000-0000-0000-000000000000","workPeriodId":"140b7407-540d-40c3-ad23-905d932aa9c8","amount":600,"status":"completed","id":"09c80ee6-21be-45a4-9c3c-7ec4c75ece79","createdBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c","updatedAt":"2021-04-09T20:10:33.755Z","createdAt":"2021-04-09T20:10:33.755Z","updatedBy":null}}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"topic":"taas.workperiodpayment.update","originator":"taas-api","timestamp":"2021-04-09T20:12:26.994Z","mime-type":"application/json","payload":{"id":"09c80ee6-21be-45a4-9c3c-7ec4c75ece79","workPeriodId":"140b7407-540d-40c3-ad23-905d932aa9c8","challengeId":"00000000-0000-0000-0000-000000000000","amount":1600,"status":"completed","createdBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c","updatedBy":"57646ff9-1cd3-4d3c-88ba-eb09a395366c","createdAt":"2021-04-09T20:10:33.755Z","updatedAt":"2021-04-09T20:12:26.966Z"}}

0 commit comments

Comments
 (0)