Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit d8ee78f

Browse files
author
sachin-maheshwari
authored
Merge pull request #107 from topcoder-platform/develop
Shapeup4 : CQRS standards update, part 1
2 parents 9cab614 + a0b1bbe commit d8ee78f

File tree

9 files changed

+228
-56
lines changed

9 files changed

+228
-56
lines changed

.circleci/config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ workflows:
7070
branches:
7171
only:
7272
- develop
73+
- feature/shapeup4-cqrs-update
7374

7475
# Production builds are exectuted only on tagged commits to the
7576
# master branch.

config/default.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ module.exports = {
3535
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',
3636

3737
// topics
38+
UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error',
39+
3840
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
3941
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
4042
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
@@ -126,7 +128,7 @@ module.exports = {
126128
orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders'
127129
}
128130
},
129-
MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000,
131+
MAX_BATCH_SIZE: parseInt(process.env.MAX_BATCH_SIZE, 10) || 10000,
130132
MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000,
131133
MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100
132134
}

docker-pgsql-es/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ services:
44
image: "postgres:12.4"
55
volumes:
66
- database-data:/var/lib/postgresql/data/
7-
ports:
7+
ports:
88
- "5432:5432"
99
environment:
1010
POSTGRES_PASSWORD: ${DB_PASSWORD}
1111
POSTGRES_USER: ${DB_USERNAME}
1212
POSTGRES_DB: ${DB_NAME}
1313
esearch:
14-
image: elasticsearch:7.7.1
14+
image: elasticsearch:7.13.4
1515
container_name: ubahn-data-processor-es_es
1616
ports:
1717
- "9200:9200"

scripts/db/dumpDbToEs.js

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -144,30 +144,40 @@ async function insertIntoES (modelName, dataset) {
144144
const chunked = _.chunk(dataset, config.get('ES.MAX_BULK_SIZE'))
145145
for (const ds of chunked) {
146146
const body = _.flatMap(ds, doc => [{ index: { _id: doc.id } }, doc])
147-
await client.bulk({
148-
index: topResources[esResourceName].index,
149-
type: topResources[esResourceName].type,
150-
body,
151-
pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined,
152-
refresh: 'wait_for'
153-
})
147+
try {
148+
await client.bulk({
149+
index: topResources[esResourceName].index,
150+
type: topResources[esResourceName].type,
151+
body,
152+
pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined,
153+
refresh: 'wait_for'
154+
})
155+
} catch (e) {
156+
logger.error('ES, create mapping error.')
157+
logger.error(JSON.stringify(e))
158+
}
154159
}
155160
} else if (_.includes(_.keys(userResources), esResourceName)) {
156161
const userResource = userResources[esResourceName]
157162

158163
if (userResource.nested === true && userResource.mappingCreated !== true) {
159-
await client.indices.putMapping({
160-
index: topResources.user.index,
161-
type: topResources.user.type,
162-
include_type_name: true,
163-
body: {
164-
properties: {
165-
[userResource.propertyName]: {
166-
type: 'nested'
164+
try {
165+
await client.indices.putMapping({
166+
index: topResources.user.index,
167+
type: topResources.user.type,
168+
include_type_name: true,
169+
body: {
170+
properties: {
171+
[userResource.propertyName]: {
172+
type: 'nested'
173+
}
167174
}
168175
}
169-
}
170-
})
176+
})
177+
} catch (e) {
178+
logger.error('ES, nexted mapping error.')
179+
logger.error(JSON.stringify(e))
180+
}
171181
userResource.mappingCreated = true
172182
}
173183

@@ -391,14 +401,12 @@ async function main () {
391401

392402
for (let i = 0; i < keys.length; i++) {
393403
const key = keys[i]
404+
const queryPage = { perPage: parseInt(config.get('ES.MAX_BATCH_SIZE'), 10), page: 1 }
394405
try {
395-
const allData = await dbHelper.find(models[key], {})
396-
let j = 0
397-
const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE'))
398-
for (const data of dataset) {
406+
while (true) {
407+
const data = await dbHelper.find(models[key], { ...queryPage })
399408
for (let i = 0; i < data.length; i++) {
400-
j++
401-
logger.info(`Inserting data ${j} of ${allData.length}`)
409+
logger.info(`Inserting data ${(i + 1) + (queryPage.perPage * (queryPage.page - 1))}`)
402410
logger.info(JSON.stringify(data[i]))
403411
if (!_.isString(data[i].created)) {
404412
data[i].created = new Date()
@@ -414,27 +422,33 @@ async function main () {
414422
}
415423
}
416424
await insertIntoES(key, data)
425+
if (data.length < queryPage.perPage) {
426+
logger.info('import data for ' + key + ' done')
427+
break
428+
} else {
429+
queryPage.page = queryPage.page + 1
430+
}
417431
}
418-
logger.info('import data for ' + key + ' done')
419432
} catch (e) {
420-
logger.error(e)
433+
logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4))
434+
logger.error(_.get(e, 'meta.meta.request.params.method', ''))
435+
logger.error(_.get(e, 'meta.meta.request.params.path', ''))
421436
logger.warn('import data for ' + key + ' failed')
422437
continue
423438
}
424-
425439
try {
426440
await createAndExecuteEnrichPolicy(key)
427441
logger.info('create and execute enrich policy for ' + key + ' done')
428442
} catch (e) {
429-
logger.error(e)
443+
logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4))
430444
logger.warn('create and execute enrich policy for ' + key + ' failed')
431445
}
432446

433447
try {
434448
await createEnrichProcessor(key)
435449
logger.info('create enrich processor (pipeline) for ' + key + ' done')
436450
} catch (e) {
437-
logger.error(e)
451+
logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4))
438452
logger.warn('create enrich processor (pipeline) for ' + key + ' failed')
439453
}
440454
}

src/common/db-helper.js

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,24 +105,27 @@ async function get (model, pk, params) {
105105
* @param model the sequelize model object
106106
* @param entity entity to create
107107
* @param auth the user auth object
108+
* @param transaction the transaction object
108109
* @returns {Promise<void>}
109110
*/
110-
async function create (model, entity, auth) {
111+
async function create (model, entity, auth, transaction) {
111112
if (auth) {
112113
entity.createdBy = helper.getAuthUser(auth)
113114
}
114-
return model.create(entity)
115+
return model.create(entity, { transaction })
115116
}
116117

117118
/**
118119
* delete object by pk
119120
* @param model the sequelize model object
120121
* @param pk the primary key
122+
* @param transaction the transaction object
121123
* @returns {Promise<void>}
122124
*/
123-
async function remove (model, pk, params) {
125+
async function remove (model, pk, params, transaction) {
124126
const instance = await get(model, pk, params)
125-
return instance.destroy()
127+
const result = await instance.destroy({ transaction })
128+
return result
126129
}
127130

128131
/**
@@ -132,13 +135,14 @@ async function remove (model, pk, params) {
132135
* @param entity entity to create
133136
* @param auth the auth object
134137
* @param auth the path params
138+
* @param transaction the transaction object
135139
* @returns {Promise<void>}
136140
*/
137-
async function update (model, pk, entity, auth, params) {
141+
async function update (model, pk, entity, auth, params, transaction) {
138142
// insure that object exists
139143
const instance = await get(model, pk, params)
140144
entity.updatedBy = helper.getAuthUser(auth)
141-
return instance.update(entity)
145+
return instance.update(entity, { transaction })
142146
}
143147

144148
/**

src/common/es-helper.js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const config = require('config')
22
const _ = require('lodash')
33
const querystring = require('querystring')
44
const logger = require('../common/logger')
5+
const helper = require('../common/helper')
56
const appConst = require('../consts')
67
const esClient = require('./es-client').getESClient()
78

@@ -282,6 +283,38 @@ function escapeRegex (str) {
282283
/* eslint-enable no-useless-escape */
283284
}
284285

286+
/**
287+
* Process create entity
288+
* @param {String} resource resource name
289+
* @param {Object} entity entity object
290+
*/
291+
async function processCreate (resource, entity) {
292+
helper.validProperties(entity, ['id'])
293+
await esClient.index({
294+
index: DOCUMENTS[resource].index,
295+
type: DOCUMENTS[resource].type,
296+
id: entity.id,
297+
body: entity,
298+
refresh: 'wait_for'
299+
})
300+
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
301+
}
302+
303+
/**
304+
* Process delete entity
305+
* @param {String} resource resource name
306+
* @param {Object} entity entity object
307+
*/
308+
async function processDelete (resource, entity) {
309+
helper.validProperties(entity, ['id'])
310+
await esClient.delete({
311+
index: DOCUMENTS[resource].index,
312+
type: DOCUMENTS[resource].type,
313+
id: entity.id,
314+
refresh: 'wait_for'
315+
})
316+
}
317+
285318
async function getOrganizationId (handle) {
286319
const dBHelper = require('../common/db-helper')
287320
const sequelize = require('../models/index')
@@ -1453,6 +1486,9 @@ async function searchAchievementValues ({ organizationId, keyword }) {
14531486
}
14541487

14551488
module.exports = {
1489+
processCreate,
1490+
processUpdate: processCreate,
1491+
processDelete,
14561492
searchElasticSearch,
14571493
getFromElasticSearch,
14581494
searchUsers,

src/common/helper.js

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const config = require('config')
2+
const Joi = require('@hapi/joi')
23
const querystring = require('querystring')
34
const errors = require('./errors')
45
const appConst = require('../consts')
@@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper')
910
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
1011
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
1112

13+
/**
14+
* Function to valid require keys
15+
* @param {Object} payload validated object
16+
* @param {Array} keys required keys
17+
* @throws {Error} if required key absent
18+
*/
19+
function validProperties (payload, keys) {
20+
const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
21+
const error = schema.validate(payload).error
22+
if (error) {
23+
throw error
24+
}
25+
}
26+
1227
/**
1328
* get auth user handle or id
1429
* @param authUser the user
@@ -145,12 +160,33 @@ async function postEvent (topic, payload) {
145160
await busApiClient.postEvent(message)
146161
}
147162

163+
/**
164+
* Send error event to Kafka
165+
* @params {String} topic the topic name
166+
* @params {Object} payload the payload
167+
* @params {String} action for which operation error occurred
168+
*/
169+
async function publishError (topic, payload, action) {
170+
_.set(payload, 'apiAction', action)
171+
const message = {
172+
topic,
173+
originator: config.KAFKA_MESSAGE_ORIGINATOR,
174+
timestamp: new Date().toISOString(),
175+
'mime-type': 'application/json',
176+
payload
177+
}
178+
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
179+
await busApiClient.postEvent(message)
180+
}
181+
148182
module.exports = {
183+
validProperties,
149184
getAuthUser,
150185
permissionCheck,
151186
checkIfExists,
152187
injectSearchMeta,
153188
getControllerMethods,
154189
getSubControllerMethods,
155-
postEvent
190+
postEvent,
191+
publishError
156192
}

0 commit comments

Comments
 (0)