Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Shapeup4 : CQRS standards update, part 1 #107

Merged
merged 30 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
06d1341
Recollect dumpDbToEs.js script
xxcxy Apr 14, 2021
1d01022
#90 - misc
callmekatootie Apr 14, 2021
9078533
Merge pull request #89 from topcoder-platform/fix-recollect-dbtoes
callmekatootie Apr 14, 2021
444c257
log more details when error occurs during db to es migration
callmekatootie Apr 17, 2021
878b94d
for fixing nested looping error
Apr 21, 2021
3f0daa5
CQRS Standards & Updates - Users endpoints
yoution Jul 20, 2021
a15dd36
fix: issue when delete user
yoution Jul 21, 2021
c6565f2
Merge pull request #99 from yoution/feature/CQRS-Standards
Jul 22, 2021
7efafae
delopying on dev
Jul 22, 2021
1c7d754
feat: add user added topic
yoution Jul 26, 2021
14e73a9
fix: update elasticsearch to 7.13.4
yoution Jul 26, 2021
34b6cbd
fix: add update and delete topic for user
yoution Jul 27, 2021
e85d2d3
Merge pull request #100 from yoution/feature/CQRS-Standards
Jul 27, 2021
5c1645c
fix: sendtopic after error happened
yoution Jul 27, 2021
144a8c1
fix: return entity when create
yoution Jul 27, 2021
3a01c82
Merge pull request #101 from yoution/feature/CQRS-Standards
Jul 27, 2021
ebc0a4d
common error topic, no aggregation needed during error publishing
Jul 27, 2021
51b0719
ES error testing....
Jul 27, 2021
a3dfbce
ES error testing, logging
Jul 27, 2021
93fad26
topic name issue
Jul 27, 2021
7edbda6
testing done
Jul 27, 2021
5a21c71
stopping event publishing for user object
Jul 28, 2021
1d7130d
adding more log
Jul 28, 2021
c718f54
clean-up issue
Jul 28, 2021
707c209
reverting CQRS changes for 'user delete'
Jul 30, 2021
61bb014
fixing gateway timeout
Jul 30, 2021
dbfddd5
Merge pull request #103 from topcoder-platform/feature/shapeup4-cqrs-…
Jul 30, 2021
67b98f1
Update service-helper.js
phead198708 Aug 3, 2021
74bbede
Merge pull request #105 from phead198708/develop
Aug 3, 2021
a0b1bbe
Merge pull request #106 from topcoder-platform/feature/shapeup4-cqrs-…
Aug 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ workflows:
branches:
only:
- develop
- feature/shapeup4-cqrs-update

# Production builds are exectuted only on tagged commits to the
# master branch.
Expand Down
4 changes: 3 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module.exports = {
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',

// topics
UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error',

UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
Expand Down Expand Up @@ -126,7 +128,7 @@ module.exports = {
orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders'
}
},
MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000,
MAX_BATCH_SIZE: parseInt(process.env.MAX_BATCH_SIZE, 10) || 10000,
MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000,
MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100
}
Expand Down
4 changes: 2 additions & 2 deletions docker-pgsql-es/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ services:
image: "postgres:12.4"
volumes:
- database-data:/var/lib/postgresql/data/
ports:
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_USER: ${DB_USERNAME}
POSTGRES_DB: ${DB_NAME}
esearch:
image: elasticsearch:7.7.1
image: elasticsearch:7.13.4
container_name: ubahn-data-processor-es_es
ports:
- "9200:9200"
Expand Down
70 changes: 42 additions & 28 deletions scripts/db/dumpDbToEs.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,30 +144,40 @@ async function insertIntoES (modelName, dataset) {
const chunked = _.chunk(dataset, config.get('ES.MAX_BULK_SIZE'))
for (const ds of chunked) {
const body = _.flatMap(ds, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources[esResourceName].index,
type: topResources[esResourceName].type,
body,
pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined,
refresh: 'wait_for'
})
try {
await client.bulk({
index: topResources[esResourceName].index,
type: topResources[esResourceName].type,
body,
pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined,
refresh: 'wait_for'
})
} catch (e) {
logger.error('ES, create mapping error.')
logger.error(JSON.stringify(e))
}
}
} else if (_.includes(_.keys(userResources), esResourceName)) {
const userResource = userResources[esResourceName]

if (userResource.nested === true && userResource.mappingCreated !== true) {
await client.indices.putMapping({
index: topResources.user.index,
type: topResources.user.type,
include_type_name: true,
body: {
properties: {
[userResource.propertyName]: {
type: 'nested'
try {
await client.indices.putMapping({
index: topResources.user.index,
type: topResources.user.type,
include_type_name: true,
body: {
properties: {
[userResource.propertyName]: {
type: 'nested'
}
}
}
}
})
})
} catch (e) {
logger.error('ES, nexted mapping error.')
logger.error(JSON.stringify(e))
}
userResource.mappingCreated = true
}

Expand Down Expand Up @@ -391,14 +401,12 @@ async function main () {

for (let i = 0; i < keys.length; i++) {
const key = keys[i]
const queryPage = { perPage: parseInt(config.get('ES.MAX_BATCH_SIZE'), 10), page: 1 }
try {
const allData = await dbHelper.find(models[key], {})
let j = 0
const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE'))
for (const data of dataset) {
while (true) {
const data = await dbHelper.find(models[key], { ...queryPage })
for (let i = 0; i < data.length; i++) {
j++
logger.info(`Inserting data ${j} of ${allData.length}`)
logger.info(`Inserting data ${(i + 1) + (queryPage.perPage * (queryPage.page - 1))}`)
logger.info(JSON.stringify(data[i]))
if (!_.isString(data[i].created)) {
data[i].created = new Date()
Expand All @@ -414,27 +422,33 @@ async function main () {
}
}
await insertIntoES(key, data)
if (data.length < queryPage.perPage) {
logger.info('import data for ' + key + ' done')
break
} else {
queryPage.page = queryPage.page + 1
}
}
logger.info('import data for ' + key + ' done')
} catch (e) {
logger.error(e)
logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4))
logger.error(_.get(e, 'meta.meta.request.params.method', ''))
logger.error(_.get(e, 'meta.meta.request.params.path', ''))
logger.warn('import data for ' + key + ' failed')
continue
}

try {
await createAndExecuteEnrichPolicy(key)
logger.info('create and execute enrich policy for ' + key + ' done')
} catch (e) {
logger.error(e)
logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4))
logger.warn('create and execute enrich policy for ' + key + ' failed')
}

try {
await createEnrichProcessor(key)
logger.info('create enrich processor (pipeline) for ' + key + ' done')
} catch (e) {
logger.error(e)
logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4))
logger.warn('create enrich processor (pipeline) for ' + key + ' failed')
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/common/db-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,27 @@ async function get (model, pk, params) {
* @param model the sequelize model object
* @param entity entity to create
* @param auth the user auth object
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function create (model, entity, auth) {
async function create (model, entity, auth, transaction) {
if (auth) {
entity.createdBy = helper.getAuthUser(auth)
}
return model.create(entity)
return model.create(entity, { transaction })
}

/**
* delete object by pk
* @param model the sequelize model object
* @param pk the primary key
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function remove (model, pk, params) {
async function remove (model, pk, params, transaction) {
const instance = await get(model, pk, params)
return instance.destroy()
const result = await instance.destroy({ transaction })
return result
}

/**
Expand All @@ -132,13 +135,14 @@ async function remove (model, pk, params) {
* @param entity entity to create
* @param auth the auth object
* @param auth the path params
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function update (model, pk, entity, auth, params) {
async function update (model, pk, entity, auth, params, transaction) {
// insure that object exists
const instance = await get(model, pk, params)
entity.updatedBy = helper.getAuthUser(auth)
return instance.update(entity)
return instance.update(entity, { transaction })
}

/**
Expand Down
36 changes: 36 additions & 0 deletions src/common/es-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const config = require('config')
const _ = require('lodash')
const querystring = require('querystring')
const logger = require('../common/logger')
const helper = require('../common/helper')
const appConst = require('../consts')
const esClient = require('./es-client').getESClient()

Expand Down Expand Up @@ -282,6 +283,38 @@ function escapeRegex (str) {
/* eslint-enable no-useless-escape */
}

/**
* Process create entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processCreate (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.index({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
body: entity,
refresh: 'wait_for'
})
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
}

/**
* Process delete entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processDelete (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.delete({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
refresh: 'wait_for'
})
}

async function getOrganizationId (handle) {
const dBHelper = require('../common/db-helper')
const sequelize = require('../models/index')
Expand Down Expand Up @@ -1453,6 +1486,9 @@ async function searchAchievementValues ({ organizationId, keyword }) {
}

module.exports = {
processCreate,
processUpdate: processCreate,
processDelete,
searchElasticSearch,
getFromElasticSearch,
searchUsers,
Expand Down
38 changes: 37 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const config = require('config')
const Joi = require('@hapi/joi')
const querystring = require('querystring')
const errors = require('./errors')
const appConst = require('../consts')
Expand All @@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper')
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))

/**
* Function to valid require keys
* @param {Object} payload validated object
* @param {Array} keys required keys
* @throws {Error} if required key absent
*/
function validProperties (payload, keys) {
const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
const error = schema.validate(payload).error
if (error) {
throw error
}
}

/**
* get auth user handle or id
* @param authUser the user
Expand Down Expand Up @@ -145,12 +160,33 @@ async function postEvent (topic, payload) {
await busApiClient.postEvent(message)
}

/**
* Send error event to Kafka
* @params {String} topic the topic name
* @params {Object} payload the payload
* @params {String} action for which operation error occurred
*/
async function publishError (topic, payload, action) {
_.set(payload, 'apiAction', action)
const message = {
topic,
originator: config.KAFKA_MESSAGE_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': 'application/json',
payload
}
logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`)
await busApiClient.postEvent(message)
}

module.exports = {
validProperties,
getAuthUser,
permissionCheck,
checkIfExists,
injectSearchMeta,
getControllerMethods,
getSubControllerMethods,
postEvent
postEvent,
publishError
}
Loading