Skip to content

Commit 624b03a

Browse files
committed
feat: add transaction for WorkPeriodService, WorkPeriodPaymentService and ResourceBookingService
1 parent 23ac85f commit 624b03a

File tree

8 files changed

+441
-29
lines changed

8 files changed

+441
-29
lines changed

config/default.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ module.exports = {
9393
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
9494
// The originator value for the kafka messages
9595
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'taas-api',
96+
97+
// topics for error
98+
TAAS_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'taas.action.error',
9699
// topics for job service
97100
// the create job entity Kafka message topic
98101
TAAS_JOB_CREATE_TOPIC: process.env.TAAS_JOB_CREATE_TOPIC || 'taas.job.create',

src/common/helper.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,26 @@ async function postEvent (topic, payload, options = {}) {
987987
await eventDispatcher.handleEvent(topic, { value: payload, options })
988988
}
989989

990+
/**
991+
* Send error event to Kafka
992+
* @params {String} topic the topic name
993+
* @params {Object} payload the payload
994+
* @params {String} action for which operation error occurred
995+
*/
996+
async function postErrorEvent (topic, payload, action) {
997+
_.set(payload, 'apiAction', action)
998+
const client = getBusApiClient()
999+
const message = {
1000+
topic,
1001+
originator: config.KAFKA_MESSAGE_ORIGINATOR,
1002+
timestamp: new Date().toISOString(),
1003+
'mime-type': 'application/json',
1004+
payload
1005+
}
1006+
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
1007+
await client.postEvent(message)
1008+
}
1009+
9901010
/**
9911011
* Test if an error is document missing exception
9921012
*
@@ -2036,6 +2056,7 @@ module.exports = {
20362056
getM2MToken,
20372057
getM2MUbahnToken,
20382058
postEvent,
2059+
postErrorEvent,
20392060
getBusApiClient,
20402061
isDocumentMissingException,
20412062
getProjects,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* ResourceBooking Processor
3+
*/
4+
5+
const helper = require('../common/helper')
6+
const config = require('config')
7+
8+
const esClient = helper.getESClient()
9+
10+
/**
11+
* Process create entity message
12+
* @param {Object} entity entity object
13+
*/
14+
async function processCreate (entity) {
15+
await esClient.create({
16+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
17+
id: entity.id,
18+
body: entity,
19+
refresh: 'wait_for'
20+
})
21+
}
22+
23+
/**
24+
* Process update entity message
25+
* @param {Object} entity entity object
26+
*/
27+
async function processUpdate (entity) {
28+
await esClient.update({
29+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
30+
id: entity.id,
31+
body: {
32+
doc: entity
33+
},
34+
refresh: 'wait_for'
35+
})
36+
}
37+
38+
/**
39+
* Process delete entity message
40+
* @param {Object} entity entity object
41+
*/
42+
async function processDelete (entity) {
43+
await esClient.delete({
44+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
45+
id: entity.id,
46+
refresh: 'wait_for'
47+
})
48+
}
49+
50+
module.exports = {
51+
processCreate,
52+
processUpdate,
53+
processDelete
54+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* WorkPeriodPayment Processor
3+
*/
4+
5+
const config = require('config')
6+
const helper = require('../common/helper')
7+
8+
const esClient = helper.getESClient()
9+
10+
/**
11+
* Process create entity
12+
* @param {Object} entity entity object
13+
*/
14+
async function processCreate (entity) {
15+
// find related resourceBooking
16+
const resourceBooking = await esClient.search({
17+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
18+
body: {
19+
query: {
20+
nested: {
21+
path: 'workPeriods',
22+
query: {
23+
match: { 'workPeriods.id': entity.workPeriodId }
24+
}
25+
}
26+
}
27+
}
28+
})
29+
if (!resourceBooking.body.hits.total.value) {
30+
throw new Error(`id: ${entity.workPeriodId} "WorkPeriod" not found`)
31+
}
32+
await esClient.update({
33+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
34+
id: resourceBooking.body.hits.hits[0]._id,
35+
body: {
36+
script: {
37+
lang: 'painless',
38+
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.workPeriodPayment.workPeriodId); if(!wp.containsKey("payments") || wp.payments == null){wp["payments"]=[]}wp.payments.add(params.workPeriodPayment)',
39+
params: { workPeriodPayment: entity }
40+
}
41+
},
42+
refresh: 'wait_for'
43+
})
44+
}
45+
46+
/**
47+
* Process update entity
48+
* @param {Object} entity entity object
49+
*/
50+
async function processUpdate (entity) {
51+
// find workPeriodPayment in it's parent ResourceBooking
52+
const resourceBooking = await esClient.search({
53+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
54+
body: {
55+
query: {
56+
nested: {
57+
path: 'workPeriods.payments',
58+
query: {
59+
match: { 'workPeriods.payments.id': entity.id }
60+
}
61+
}
62+
}
63+
}
64+
})
65+
if (!resourceBooking.body.hits.total.value) {
66+
throw new Error(`id: ${entity.id} "WorkPeriodPayment" not found`)
67+
}
68+
await esClient.update({
69+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
70+
id: resourceBooking.body.hits.hits[0]._id,
71+
body: {
72+
script: {
73+
lang: 'painless',
74+
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.workPeriodId); wp.payments.removeIf(payment -> payment.id == params.data.id); wp.payments.add(params.data)',
75+
params: { data: entity }
76+
}
77+
},
78+
refresh: 'wait_for'
79+
})
80+
}
81+
82+
module.exports = {
83+
processCreate,
84+
processUpdate
85+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/**
2+
* WorkPeriod Processor
3+
*/
4+
5+
const helper = require('../common/helper')
6+
const config = require('config')
7+
const esClient = helper.getESClient()
8+
9+
/**
10+
* Process create entity
11+
* @param {Object} entity entity object
12+
*/
13+
async function processCreate (entity, options) {
14+
// Find related resourceBooking
15+
const resourceBooking = await esClient.getSource({
16+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
17+
id: entity.resourceBookingId
18+
})
19+
await esClient.update({
20+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
21+
id: resourceBooking.body.id,
22+
body: {
23+
script: {
24+
lang: 'painless',
25+
source: 'if(!ctx._source.containsKey("workPeriods") || ctx._source.workPeriods == null){ctx._source["workPeriods"]=[]}ctx._source.workPeriods.add(params.workPeriod)',
26+
params: { workPeriod: entity }
27+
}
28+
},
29+
refresh: 'wait_for'
30+
})
31+
}
32+
33+
/**
34+
* Process update entity
35+
* @param {Object} entity entity object
36+
*/
37+
async function processUpdate (entity) {
38+
// find workPeriod in it's parent ResourceBooking
39+
const resourceBooking = await esClient.search({
40+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
41+
body: {
42+
query: {
43+
nested: {
44+
path: 'workPeriods',
45+
query: {
46+
match: { 'workPeriods.id': entity.id }
47+
}
48+
}
49+
}
50+
}
51+
})
52+
if (!resourceBooking.body.hits.total.value) {
53+
throw new Error(`id: ${entity.id} "WorkPeriod" not found`)
54+
}
55+
await esClient.update({
56+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
57+
id: resourceBooking.body.hits.hits[0]._id,
58+
body: {
59+
script: {
60+
lang: 'painless',
61+
source: 'def wp = ctx._source.workPeriods.find(workPeriod -> workPeriod.id == params.data.id); ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id); params.data.payments = wp.payments; ctx._source.workPeriods.add(params.data)',
62+
params: { data: entity }
63+
}
64+
},
65+
refresh: 'wait_for'
66+
})
67+
}
68+
69+
/**
70+
* Process delete entity
71+
* @param {Object} entity entity object
72+
*/
73+
async function processDelete (entity) {
74+
// Find related ResourceBooking
75+
const resourceBooking = await esClient.search({
76+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
77+
body: {
78+
query: {
79+
nested: {
80+
path: 'workPeriods',
81+
query: {
82+
match: { 'workPeriods.id': entity.id }
83+
}
84+
}
85+
}
86+
}
87+
})
88+
if (!resourceBooking.body.hits.total.value) {
89+
const resourceBookingId = entity.key.replace('resourceBooking.id:', '')
90+
if (resourceBookingId) {
91+
try {
92+
await esClient.getSource({
93+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
94+
id: resourceBookingId
95+
})
96+
if (!resourceBooking) {
97+
return
98+
}
99+
100+
throw new Error(`id: ${entity.id} "WorkPeriod" not found`)
101+
} catch (e) {
102+
// if ResourceBooking is deleted, ignore
103+
if (e.message === 'resource_not_found_exception') {
104+
return
105+
}
106+
throw e
107+
}
108+
}
109+
// if ResourceBooking is deleted, ignore, else throw error
110+
if (resourceBooking) {
111+
throw new Error(`id: ${entity.id} "WorkPeriod" not found`)
112+
}
113+
}
114+
await esClient.update({
115+
index: config.get('esConfig.ES_INDEX_RESOURCE_BOOKING'),
116+
id: resourceBooking.body.hits.hits[0]._id,
117+
body: {
118+
script: {
119+
lang: 'painless',
120+
source: 'ctx._source.workPeriods.removeIf(workPeriod -> workPeriod.id == params.data.id)',
121+
params: { data: entity }
122+
}
123+
},
124+
refresh: 'wait_for'
125+
})
126+
}
127+
128+
module.exports = {
129+
processCreate,
130+
processUpdate,
131+
processDelete
132+
}

0 commit comments

Comments
 (0)