From 06d13414ee467688cc58ea5ae951b8ad1ef356e7 Mon Sep 17 00:00:00 2001 From: xxcxy Date: Wed, 14 Apr 2021 19:26:31 +0800 Subject: [PATCH 01/23] Recollect dumpDbToEs.js script --- scripts/db/dumpDbToEs.js | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index 4f6147c..2f48c5b 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -391,14 +391,12 @@ async function main () { for (let i = 0; i < keys.length; i++) { const key = keys[i] + const queryPage = { perPage: parseInt(config.get('ES.MAX_BATCH_SIZE')), page: 1 } try { - const allData = await dbHelper.find(models[key], {}) - let j = 0 - const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE')) - for (const data of dataset) { + while (true) { + const data = await dbHelper.find(models[key], { ...queryPage }) for (let i = 0; i < data.length; i++) { - j++ - logger.info(`Inserting data ${j} of ${allData.length}`) + logger.info(`Inserting data ${i + 1} of ${data.length}`) logger.info(JSON.stringify(data[i])) if (!_.isString(data[i].created)) { data[i].created = new Date() @@ -414,14 +412,18 @@ async function main () { } } await insertIntoES(key, data) + logger.info('import data for ' + key + ' done') + if (data.length < queryPage.perPage) { + break + } else { + queryPage.page = queryPage.page + 1 + } } - logger.info('import data for ' + key + ' done') } catch (e) { logger.error(e) logger.warn('import data for ' + key + ' failed') continue } - try { await createAndExecuteEnrichPolicy(key) logger.info('create and execute enrich policy for ' + key + ' done') From 1d01022a19ce11f256ccd266564a2959a92408ef Mon Sep 17 00:00:00 2001 From: Mithun Kamath Date: Wed, 14 Apr 2021 18:39:07 +0530 Subject: [PATCH 02/23] #90 - misc --- config/default.js | 2 +- scripts/db/dumpDbToEs.js | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/default.js b/config/default.js index 0766f4d..1df223b 100755 --- a/config/default.js +++ b/config/default.js @@ -126,7 +126,7 @@ module.exports = { orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders' } }, - MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000, + MAX_BATCH_SIZE: parseInt(process.env.MAX_BATCH_SIZE, 10) || 10000, MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000, MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100 } diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index 2f48c5b..6f3d85d 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -391,12 +391,12 @@ async function main () { for (let i = 0; i < keys.length; i++) { const key = keys[i] - const queryPage = { perPage: parseInt(config.get('ES.MAX_BATCH_SIZE')), page: 1 } + const queryPage = { perPage: parseInt(config.get('ES.MAX_BATCH_SIZE'), 10), page: 1 } try { while (true) { const data = await dbHelper.find(models[key], { ...queryPage }) for (let i = 0; i < data.length; i++) { - logger.info(`Inserting data ${i + 1} of ${data.length}`) + logger.info(`Inserting data ${(i + 1) + (queryPage.perPage * (queryPage.page - 1))}`) logger.info(JSON.stringify(data[i])) if (!_.isString(data[i].created)) { data[i].created = new Date() @@ -412,8 +412,8 @@ async function main () { } } await insertIntoES(key, data) - logger.info('import data for ' + key + ' done') if (data.length < queryPage.perPage) { + logger.info('import data for ' + key + ' done') break } else { queryPage.page = queryPage.page + 1 From 444c2574df3f846c5b440ce2cf82cfaeed839ded Mon Sep 17 00:00:00 2001 From: Mithun Kamath Date: Sat, 17 Apr 2021 17:49:30 +0530 Subject: [PATCH 03/23] log more details when error occurs during db to es migration --- scripts/db/dumpDbToEs.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index 6f3d85d..a7245c8 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -420,7 +420,9 @@ async function main () { } } } catch (e) { - logger.error(e) + logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4)) + logger.error(_.get(e, 'meta.meta.request.params.method', '')) + logger.error(_.get(e, 'meta.meta.request.params.path', '')) logger.warn('import data for ' + key + ' failed') continue } @@ -428,7 +430,7 @@ async function main () { await createAndExecuteEnrichPolicy(key) logger.info('create and execute enrich policy for ' + key + ' done') } catch (e) { - logger.error(e) + logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4)) logger.warn('create and execute enrich policy for ' + key + ' failed') } @@ -436,7 +438,7 @@ async function main () { await createEnrichProcessor(key) logger.info('create enrich processor (pipeline) for ' + key + ' done') } catch (e) { - logger.error(e) + logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4)) logger.warn('create enrich processor (pipeline) for ' + key + ' failed') } } From 878b94d0db970c66fe66d2bf2f6b0f9076b3114b Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Wed, 21 Apr 2021 14:20:25 +0530 Subject: [PATCH 04/23] for fixing nested looping error --- scripts/db/dumpDbToEs.js | 44 ++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index a7245c8..60bf96d 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -144,30 +144,40 @@ async function insertIntoES (modelName, dataset) { const chunked = _.chunk(dataset, config.get('ES.MAX_BULK_SIZE')) for (const ds of chunked) { const body = _.flatMap(ds, doc => [{ index: { _id: doc.id } }, doc]) - await client.bulk({ - index: topResources[esResourceName].index, - type: topResources[esResourceName].type, - body, - pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, - refresh: 'wait_for' - }) + try { + await client.bulk({ + index: topResources[esResourceName].index, + type: topResources[esResourceName].type, + body, + pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, + refresh: 'wait_for' + }) + } catch (e) { + logger.error('ES, create mapping error.') + logger.error(JSON.stringify(e)) + } } } else if (_.includes(_.keys(userResources), esResourceName)) { const userResource = userResources[esResourceName] if (userResource.nested === true && userResource.mappingCreated !== true) { - await client.indices.putMapping({ - index: topResources.user.index, - type: topResources.user.type, - include_type_name: true, - body: { - properties: { - [userResource.propertyName]: { - type: 'nested' + try { + await client.indices.putMapping({ + index: topResources.user.index, + type: topResources.user.type, + include_type_name: true, + body: { + properties: { + [userResource.propertyName]: { + type: 'nested' + } } } - } - }) + }) + } catch (e) { + logger.error('ES, nexted mapping error.') + logger.error(JSON.stringify(e)) + } userResource.mappingCreated = true } From 3f0daa52f2ad0838250316f6f14e7fa18a33bae3 Mon Sep 17 00:00:00 2001 From: yoution Date: Tue, 20 Jul 2021 12:05:04 +0800 Subject: [PATCH 05/23] CQRS Standards & Updates - Users endpoints --- src/common/db-helper.js | 16 +++++++++----- src/common/es-helper.js | 35 +++++++++++++++++++++++++++++ src/common/helper.js | 16 ++++++++++++++ src/common/service-helper.js | 43 ++++++++++++++++++++++++++++++------ src/modules/user/service.js | 32 +++++++++++++++++---------- 5 files changed, 117 insertions(+), 25 deletions(-) diff --git a/src/common/db-helper.js b/src/common/db-helper.js index 5ba3e7e..b69bf01 100644 --- a/src/common/db-helper.js +++ b/src/common/db-helper.js @@ -105,24 +105,27 @@ async function get (model, pk, params) { * @param model the sequelize model object * @param entity entity to create * @param auth the user auth object + * @param transaction the transaction object * @returns {Promise} */ -async function create (model, entity, auth) { +async function create (model, entity, auth, transaction) { if (auth) { entity.createdBy = helper.getAuthUser(auth) } - return model.create(entity) + return model.create(entity, { transaction }) } /** * delete object by pk * @param model the sequelize model object * @param pk the primary key + * @param transaction the transaction object * @returns {Promise} */ -async function remove (model, pk, params) { +async function remove (model, pk, params, transaction) { const instance = await get(model, pk, params) - return instance.destroy() + const result = await instance.destroy({ transaction }) + return result } /** @@ -132,13 +135,14 @@ async function remove (model, pk, params) { * @param entity entity to create * @param auth the auth object * @param auth the path params + * @param transaction the transaction object * @returns {Promise} */ -async function update (model, pk, entity, auth, params) { +async function update (model, pk, entity, auth, params, transaction) { // insure that object exists const instance = await get(model, pk, params) entity.updatedBy = helper.getAuthUser(auth) - return instance.update(entity) + return instance.update(entity, { transaction }) } /** diff --git a/src/common/es-helper.js b/src/common/es-helper.js index e667de4..14a5e69 100644 --- a/src/common/es-helper.js +++ b/src/common/es-helper.js @@ -2,6 +2,7 @@ const config = require('config') const _ = require('lodash') const querystring = require('querystring') const logger = require('../common/logger') +const helper = require('../common/helper') const appConst = require('../consts') const esClient = require('./es-client').getESClient() @@ -282,6 +283,37 @@ function escapeRegex (str) { /* eslint-enable no-useless-escape */ } +/** + * Process create entity + * @param {String} resource resource name + * @param {Object} entity entity object + */ +async function processCreate (resource, entity) { + helper.validProperties(entity, ['id']) + await esClient.index({ + index: DOCUMENTS[resource].index, + type: DOCUMENTS[resource].type, + id: entity.id, + body: entity, + refresh: 'wait_for' + }) +} + +/** + * Process delete entity + * @param {String} resource resource name + * @param {Object} entity entity object + */ +async function processDelete (resource, entity) { + helper.validProperties(entity, ['id']) + await esClient.delete({ + index: DOCUMENTS[resource].index, + type: DOCUMENTS[resource].type, + id: entity.id, + refresh: 'wait_for' + }) +} + async function getOrganizationId (handle) { const dBHelper = require('../common/db-helper') const sequelize = require('../models/index') @@ -1453,6 +1485,9 @@ async function searchAchievementValues ({ organizationId, keyword }) { } module.exports = { + processCreate, + processUpdate: processCreate, + processDelete, searchElasticSearch, getFromElasticSearch, searchUsers, diff --git a/src/common/helper.js b/src/common/helper.js index 95bde1e..0dcc811 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -1,4 +1,5 @@ const config = require('config') +const Joi = require('@hapi/joi') const querystring = require('querystring') const errors = require('./errors') const appConst = require('../consts') @@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper') const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) +/** + * Function to valid require keys + * @param {Object} payload validated object + * @param {Array} keys required keys + * @throws {Error} if required key absent + */ +function validProperties (payload, keys) { + const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true) + const error = schema.validate(payload).error + if (error) { + throw error + } +} + /** * get auth user handle or id * @param authUser the user @@ -146,6 +161,7 @@ async function postEvent (topic, payload) { } module.exports = { + validProperties, getAuthUser, permissionCheck, checkIfExists, diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 68b55a0..45e5ed0 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -38,8 +38,17 @@ const MODEL_TO_RESOURCE = { * Create record in es * @param resource the resource to create * @param result the resource fields + * @param toEs is to es directly */ -async function createRecordInEs (resource, entity) { +async function createRecordInEs (resource, entity, toEs) { + try { + if (toEs) { + await esHelper.processCreate(resource, entity) + } + } catch (err) { + logger.logFullError(err) + throw err + } try { await publishMessage('create', resource, entity) } catch (err) { @@ -51,8 +60,17 @@ async function createRecordInEs (resource, entity) { * Patch record in es * @param resource the resource to create * @param result the resource fields + * @param toEs is to es directly */ -async function patchRecordInEs (resource, entity) { +async function patchRecordInEs (resource, entity, toEs) { + try { + if (toEs) { + await esHelper.processUpdate(resource, entity) + } + } catch (err) { + logger.logFullError(err) + throw err + } try { await publishMessage('patch', resource, entity) } catch (err) { @@ -65,8 +83,9 @@ async function patchRecordInEs (resource, entity) { * @param id the id of record * @param params the params of record (like nested ids) * @param resource the resource to delete + * @param toEs is to es directly */ -async function deleteRecordFromEs (id, params, resource) { +async function deleteRecordFromEs (id, params, resource, toEs) { let payload if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) { payload = _.assign({}, params) @@ -75,6 +94,15 @@ async function deleteRecordFromEs (id, params, resource) { id } } + try { + if (toEs) { + await esHelper.processDelete(resource, payload) + } + } catch (err) { + logger.logFullError(err) + throw err + } + try { await publishMessage('remove', resource, payload) } catch (err) { @@ -174,13 +202,14 @@ function sleep (ms) { } /** - * delete child of record with delay between each item deleted + * delete child of record with delay between each item deleted and with transaction * @param model the child model to delete * @param id the user id to delete * @param params the params for child * @param resourceName the es recource name + * @param transaction the transaction object */ -async function deleteChild (model, id, params, resourceName) { +async function deleteChild (model, id, params, resourceName, transaction) { const query = {} query[params[0]] = id const result = await dbHelper.find(model, query) @@ -194,8 +223,8 @@ async function deleteChild (model, id, params, resourceName) { params.forEach(attr => { esParams[attr] = record[attr] }) // remove from db - dbHelper.remove(model, record.id) - deleteRecordFromEs(record.id, esParams, resourceName) + await dbHelper.remove(model, record.id, transaction) + await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction) // sleep for configured time await sleep(config.CASCADE_PAUSE_MS) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 0918552..ca3101a 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -30,8 +30,11 @@ const uniqueFields = [['handle']] async function create (entity, auth) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const result = await dbHelper.create(User, entity, auth) - await serviceHelper.createRecordInEs(resource, result.dataValues) + const result = await sequelize.transaction(async (t) => { + const userEntity = await dbHelper.create(User, entity, auth, t) + await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) + return userEntity + }) return result } @@ -56,10 +59,13 @@ create.schema = { async function patch (id, entity, auth, params) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const newEntity = await dbHelper.update(User, id, entity, auth) - await serviceHelper.patchRecordInEs(resource, newEntity.dataValues) + const result = await sequelize.transaction(async (t) => { + const newEntity = await dbHelper.update(User, id, entity, auth, null, t) + await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + return newEntity + }) - return newEntity + return result } patch.schema = { @@ -162,13 +168,15 @@ async function remove (id, auth, params) { * @param params the path params */ async function beginCascadeDelete (id, params) { - await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement') - await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile') - await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute') - await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole') - await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill') - await dbHelper.remove(User, id) - await serviceHelper.deleteRecordFromEs(id, params, resource) + await sequelize.transaction(async (t) => { + await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t) + await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t) + await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t) + await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) + await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) + await dbHelper.remove(User, id, null, t) + await serviceHelper.deleteRecordFromEs(id, params, resource) + }) } module.exports = { From a15dd36ffbd4621ab1ed9211ea22886237197784 Mon Sep 17 00:00:00 2001 From: yoution Date: Wed, 21 Jul 2021 22:01:37 +0800 Subject: [PATCH 06/23] fix: issue when delete user --- src/modules/user/service.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index ca3101a..33ea58e 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -159,7 +159,7 @@ search.schema = { * @return {Promise} no data returned */ async function remove (id, auth, params) { - beginCascadeDelete(id, params) + await beginCascadeDelete(id, params) } /** @@ -175,7 +175,7 @@ async function beginCascadeDelete (id, params) { await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) await dbHelper.remove(User, id, null, t) - await serviceHelper.deleteRecordFromEs(id, params, resource) + await serviceHelper.deleteRecordFromEs(id, params, resource, true) }) } From 7efafae0aede94337c350d7a252d347a6bca9165 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Thu, 22 Jul 2021 11:21:28 +0530 Subject: [PATCH 07/23] delopying on dev --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 6024bcd..78be9fb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -70,6 +70,7 @@ workflows: branches: only: - develop + - feature/shapeup4-cqrs-update # Production builds are exectuted only on tagged commits to the # master branch. From 1c7d754d3780fdf9301c34949da3c5e9a054f1ce Mon Sep 17 00:00:00 2001 From: yoution Date: Mon, 26 Jul 2021 15:51:55 +0800 Subject: [PATCH 08/23] feat: add user added topic --- config/default.js | 1 + src/modules/user/service.js | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/config/default.js b/config/default.js index 1df223b..09e9948 100755 --- a/config/default.js +++ b/config/default.js @@ -35,6 +35,7 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api', // topics + UBAHN_CREATE_USER_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'user.action.topic.create', UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create', UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update', UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete', diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 33ea58e..f700047 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -4,8 +4,10 @@ const joi = require('@hapi/joi') const _ = require('lodash') +const config = require('config') const errors = require('../../common/errors') +const logger = require('../../common/logger') const helper = require('../../common/helper') const dbHelper = require('../../common/db-helper') const serviceHelper = require('../../common/service-helper') @@ -33,7 +35,11 @@ async function create (entity, auth) { const result = await sequelize.transaction(async (t) => { const userEntity = await dbHelper.create(User, entity, auth, t) await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) - return userEntity + try { + await helper.postEvent(config.UBAHN_CREATE_USER_TOPIC, userEntity.dataValues) + } catch (err) { + logger.logFullError(err) + } }) return result From 14e73a9cb0ccf1c452481803cc0eab0315ebffdd Mon Sep 17 00:00:00 2001 From: yoution Date: Mon, 26 Jul 2021 16:33:52 +0800 Subject: [PATCH 09/23] fix: update elasticsearch to 7.13.4 --- docker-pgsql-es/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-pgsql-es/docker-compose.yml b/docker-pgsql-es/docker-compose.yml index 31f5a45..9ce9d2e 100644 --- a/docker-pgsql-es/docker-compose.yml +++ b/docker-pgsql-es/docker-compose.yml @@ -4,14 +4,14 @@ services: image: "postgres:12.4" volumes: - database-data:/var/lib/postgresql/data/ - ports: + ports: - "5432:5432" environment: POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_USER: ${DB_USERNAME} POSTGRES_DB: ${DB_NAME} esearch: - image: elasticsearch:7.7.1 + image: elasticsearch:7.13.4 container_name: ubahn-data-processor-es_es ports: - "9200:9200" From 34b6cbd814512eda5bcf4a4319a9afff28cd1fb0 Mon Sep 17 00:00:00 2001 From: yoution Date: Tue, 27 Jul 2021 08:04:56 +0800 Subject: [PATCH 10/23] fix: add update and delete topic for user --- config/default.js | 4 +++- src/modules/user/service.js | 11 +++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/config/default.js b/config/default.js index 09e9948..9093e2d 100755 --- a/config/default.js +++ b/config/default.js @@ -35,7 +35,9 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api', // topics - UBAHN_CREATE_USER_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'user.action.topic.create', + UBAHN_CREATE_USER_TOPIC: process.env.UBAHN_CREATE_USER_TOPIC || 'user.action.topic.create', + UBAHN_UPDATE_USER_TOPIC: process.env.UBAHN_UPDATE_USER_TOPIC || 'user.action.topic.update', + UBAHN_DELETE_USER_TOPIC: process.env.UBAHN_DELETE_USER_TOPIC || 'user.action.topic.delete', UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create', UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update', UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete', diff --git a/src/modules/user/service.js b/src/modules/user/service.js index f700047..6a63e86 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -68,6 +68,12 @@ async function patch (id, entity, auth, params) { const result = await sequelize.transaction(async (t) => { const newEntity = await dbHelper.update(User, id, entity, auth, null, t) await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + + try { + await helper.postEvent(config.UBAHN_UPDATE_USER_TOPIC, newEntity.dataValues) + } catch (err) { + logger.logFullError(err) + } return newEntity }) @@ -182,6 +188,11 @@ async function beginCascadeDelete (id, params) { await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) await dbHelper.remove(User, id, null, t) await serviceHelper.deleteRecordFromEs(id, params, resource, true) + try { + await helper.postEvent(config.UBAHN_DELETE_USER_TOPIC, {id}) + } catch (err) { + logger.logFullError(err) + } }) } From 5c1645c57435d0eb9bf0bd5f4afa63122f96ede5 Mon Sep 17 00:00:00 2001 From: yoution Date: Tue, 27 Jul 2021 19:33:43 +0800 Subject: [PATCH 11/23] fix: sendtopic after error happened --- src/modules/user/service.js | 81 +++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 6a63e86..80768e1 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -32,17 +32,20 @@ const uniqueFields = [['handle']] async function create (entity, auth) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const result = await sequelize.transaction(async (t) => { - const userEntity = await dbHelper.create(User, entity, auth, t) - await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) - try { - await helper.postEvent(config.UBAHN_CREATE_USER_TOPIC, userEntity.dataValues) - } catch (err) { - logger.logFullError(err) + let payload + try { + const result = await sequelize.transaction(async (t) => { + const userEntity = await dbHelper.create(User, entity, auth, t) + payload = userEntity.dataValues + await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) + }) + return result + } catch (e) { + if (payload) { + helper.postEvent(config.UBAHN_CREATE_USER_TOPIC, payload) } - }) - - return result + throw e + } } create.schema = { @@ -65,19 +68,24 @@ create.schema = { async function patch (id, entity, auth, params) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const result = await sequelize.transaction(async (t) => { - const newEntity = await dbHelper.update(User, id, entity, auth, null, t) - await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + let payload + try { + const result = await sequelize.transaction(async (t) => { + const newEntity = await dbHelper.update(User, id, entity, auth, null, t) + payload = newEntity.dataValues - try { - await helper.postEvent(config.UBAHN_UPDATE_USER_TOPIC, newEntity.dataValues) - } catch (err) { - logger.logFullError(err) - } - return newEntity - }) + await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + + return newEntity + }) - return result + return result + } catch (e) { + if (payload) { + helper.postEvent(config.UBAHN_UPDATE_USER_TOPIC, payload) + } + throw e + } } patch.schema = { @@ -180,20 +188,23 @@ async function remove (id, auth, params) { * @param params the path params */ async function beginCascadeDelete (id, params) { - await sequelize.transaction(async (t) => { - await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t) - await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t) - await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t) - await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) - await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) - await dbHelper.remove(User, id, null, t) - await serviceHelper.deleteRecordFromEs(id, params, resource, true) - try { - await helper.postEvent(config.UBAHN_DELETE_USER_TOPIC, {id}) - } catch (err) { - logger.logFullError(err) - } - }) + let payload = {id} + try { + await sequelize.transaction(async (t) => { + await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t) + aa.bb + await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t) + await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t) + await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) + await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) + await dbHelper.remove(User, id, null, t) + await serviceHelper.deleteRecordFromEs(id, params, resource, true) + }) + + } catch (e) { + helper.postEvent(config.UBAHN_DELETE_USER_TOPIC, payload) + throw e + } } module.exports = { From 144a8c1575cdf2595a93ced79200efd734db7b11 Mon Sep 17 00:00:00 2001 From: yoution Date: Tue, 27 Jul 2021 21:12:38 +0800 Subject: [PATCH 12/23] fix: return entity when create --- src/modules/user/service.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 80768e1..7b0cab1 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -32,12 +32,13 @@ const uniqueFields = [['handle']] async function create (entity, auth) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - let payload + let payload try { const result = await sequelize.transaction(async (t) => { const userEntity = await dbHelper.create(User, entity, auth, t) payload = userEntity.dataValues await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) + return userEntity }) return result } catch (e) { @@ -200,7 +201,7 @@ async function beginCascadeDelete (id, params) { await dbHelper.remove(User, id, null, t) await serviceHelper.deleteRecordFromEs(id, params, resource, true) }) - + } catch (e) { helper.postEvent(config.UBAHN_DELETE_USER_TOPIC, payload) throw e From ebc0a4df04e281b841ba0f0cc54a6150a3c798d7 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 27 Jul 2021 20:22:12 +0530 Subject: [PATCH 13/23] common error topic, no aggregation needed during error publishing --- config/default.js | 5 ++--- src/common/helper.js | 22 +++++++++++++++++++++- src/modules/user/service.js | 8 +++----- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/config/default.js b/config/default.js index 9093e2d..41dd1e9 100755 --- a/config/default.js +++ b/config/default.js @@ -35,9 +35,8 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api', // topics - UBAHN_CREATE_USER_TOPIC: process.env.UBAHN_CREATE_USER_TOPIC || 'user.action.topic.create', - UBAHN_UPDATE_USER_TOPIC: process.env.UBAHN_UPDATE_USER_TOPIC || 'user.action.topic.update', - UBAHN_DELETE_USER_TOPIC: process.env.UBAHN_DELETE_USER_TOPIC || 'user.action.topic.delete', + UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'u-bahn.action.error', + UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create', UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update', UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete', diff --git a/src/common/helper.js b/src/common/helper.js index 0dcc811..ec68e28 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -160,6 +160,25 @@ async function postEvent (topic, payload) { await busApiClient.postEvent(message) } +/** + * Send error event to Kafka + * @params {String} topic the topic name + * @params {Object} payload the payload + * @params {String} action for which operation error occurred + */ + async function publishError (topic, payload, action) { + logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(payload, null, 2)}`) + _.set(payload, 'apiAction', action) + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload + } + await busApiClient.postEvent(message) +} + module.exports = { validProperties, getAuthUser, @@ -168,5 +187,6 @@ module.exports = { injectSearchMeta, getControllerMethods, getSubControllerMethods, - postEvent + postEvent, + publishError } diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 7b0cab1..f4c7ec4 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -43,7 +43,7 @@ async function create (entity, auth) { return result } catch (e) { if (payload) { - helper.postEvent(config.UBAHN_CREATE_USER_TOPIC, payload) + helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.create') } throw e } @@ -74,16 +74,14 @@ async function patch (id, entity, auth, params) { const result = await sequelize.transaction(async (t) => { const newEntity = await dbHelper.update(User, id, entity, auth, null, t) payload = newEntity.dataValues - await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) - return newEntity }) return result } catch (e) { if (payload) { - helper.postEvent(config.UBAHN_UPDATE_USER_TOPIC, payload) + helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.update') } throw e } @@ -203,7 +201,7 @@ async function beginCascadeDelete (id, params) { }) } catch (e) { - helper.postEvent(config.UBAHN_DELETE_USER_TOPIC, payload) + helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.delete') throw e } } From 51b07192e4b9d4759305b43d2afd74e344aeb55b Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 27 Jul 2021 21:41:26 +0530 Subject: [PATCH 14/23] ES error testing.... --- src/common/service-helper.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 45e5ed0..119be5a 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -65,7 +65,8 @@ async function createRecordInEs (resource, entity, toEs) { async function patchRecordInEs (resource, entity, toEs) { try { if (toEs) { - await esHelper.processUpdate(resource, entity) + //await esHelper.processUpdate(resource, entity) + throw "throwing error" } } catch (err) { logger.logFullError(err) From a3dfbce54520d0efa1cc10622ef82adb91d8c5b9 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 27 Jul 2021 22:09:58 +0530 Subject: [PATCH 15/23] ES error testing, logging --- src/common/helper.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/helper.js b/src/common/helper.js index ec68e28..39e1407 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -176,6 +176,7 @@ async function postEvent (topic, payload) { 'mime-type': 'application/json', payload } + logger.debug(`Final message, ${JSON.stringify(message, null, 2)}`) await busApiClient.postEvent(message) } From 93fad26889ce91f94e0caacdfd5a105952e6ed8c Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 27 Jul 2021 23:03:16 +0530 Subject: [PATCH 16/23] topic name issue --- config/default.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/default.js b/config/default.js index 41dd1e9..eb69492 100755 --- a/config/default.js +++ b/config/default.js @@ -35,7 +35,7 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api', // topics - UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'u-bahn.action.error', + UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error', UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create', UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update', From 7edbda604018e3ebee9c702e55bce94ded5b261b Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Tue, 27 Jul 2021 23:12:35 +0530 Subject: [PATCH 17/23] testing done --- src/common/helper.js | 3 +-- src/common/service-helper.js | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/common/helper.js b/src/common/helper.js index 39e1407..b68f3ee 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -167,7 +167,6 @@ async function postEvent (topic, payload) { * @params {String} action for which operation error occurred */ async function publishError (topic, payload, action) { - logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(payload, null, 2)}`) _.set(payload, 'apiAction', action) const message = { topic, @@ -176,7 +175,7 @@ async function postEvent (topic, payload) { 'mime-type': 'application/json', payload } - logger.debug(`Final message, ${JSON.stringify(message, null, 2)}`) + logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`) await busApiClient.postEvent(message) } diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 119be5a..45e5ed0 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -65,8 +65,7 @@ async function createRecordInEs (resource, entity, toEs) { async function patchRecordInEs (resource, entity, toEs) { try { if (toEs) { - //await esHelper.processUpdate(resource, entity) - throw "throwing error" + await esHelper.processUpdate(resource, entity) } } catch (err) { logger.logFullError(err) From 5a21c71e9e97904ae2004335f811f0414248386f Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Wed, 28 Jul 2021 12:37:52 +0530 Subject: [PATCH 18/23] stopping event publishing for user object --- src/common/service-helper.js | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 45e5ed0..7eb518b 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -49,11 +49,15 @@ async function createRecordInEs (resource, entity, toEs) { logger.logFullError(err) throw err } - try { - await publishMessage('create', resource, entity) - } catch (err) { - logger.logFullError(err) + + if (!toEs) { + try { + await publishMessage("create", resource, entity); + } catch (err) { + logger.logFullError(err); + } } + } /** @@ -71,10 +75,12 @@ async function patchRecordInEs (resource, entity, toEs) { logger.logFullError(err) throw err } - try { - await publishMessage('patch', resource, entity) - } catch (err) { - logger.logFullError(err) + if (!toEs) { + try { + await publishMessage("patch", resource, entity); + } catch (err) { + logger.logFullError(err); + } } } @@ -102,11 +108,12 @@ async function deleteRecordFromEs (id, params, resource, toEs) { logger.logFullError(err) throw err } - - try { - await publishMessage('remove', resource, payload) - } catch (err) { - logger.logFullError(err) + if (!toEs) { + try { + await publishMessage("remove", resource, payload); + } catch (err) { + logger.logFullError(err); + } } } From 1d7130df06684fef5fad3d7f0a92ea97fd448c29 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Wed, 28 Jul 2021 13:40:24 +0530 Subject: [PATCH 19/23] adding more log --- src/common/es-helper.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/es-helper.js b/src/common/es-helper.js index 14a5e69..85bc543 100644 --- a/src/common/es-helper.js +++ b/src/common/es-helper.js @@ -297,6 +297,7 @@ async function processCreate (resource, entity) { body: entity, refresh: 'wait_for' }) + logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`) } /** From c718f54154e30f8e1ae49944ad0b5760027a940d Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Wed, 28 Jul 2021 18:14:19 +0530 Subject: [PATCH 20/23] clean-up issue --- src/modules/user/service.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index f4c7ec4..6b39d9a 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -191,7 +191,6 @@ async function beginCascadeDelete (id, params) { try { await sequelize.transaction(async (t) => { await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t) - aa.bb await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t) await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t) await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) From 707c209119e46784f133e9d39ef715837bdf734c Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 30 Jul 2021 10:57:28 +0530 Subject: [PATCH 21/23] reverting CQRS changes for 'user delete' --- src/modules/user/service.js | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 6b39d9a..ae16d03 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -187,6 +187,15 @@ async function remove (id, auth, params) { * @param params the path params */ async function beginCascadeDelete (id, params) { + await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement') + await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile') + await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute') + await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole') + await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill') + await dbHelper.remove(User, id) + await serviceHelper.deleteRecordFromEs(id, params, resource) + //TODO: below code is not working, so simply commented our changes + /* //Start here let payload = {id} try { await sequelize.transaction(async (t) => { @@ -203,6 +212,7 @@ async function beginCascadeDelete (id, params) { helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.delete') throw e } + */ // End here } module.exports = { From 61bb014d6ab0167b681a0c9c7e89c6561650eeb1 Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Fri, 30 Jul 2021 12:01:25 +0530 Subject: [PATCH 22/23] fixing gateway timeout --- src/modules/user/service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index ae16d03..953f92c 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -178,7 +178,7 @@ search.schema = { * @return {Promise} no data returned */ async function remove (id, auth, params) { - await beginCascadeDelete(id, params) + beginCascadeDelete(id, params) } /** From 67b98f148fe737afa664b70732c304034da8b062 Mon Sep 17 00:00:00 2001 From: phead198708 <85940583+phead198708@users.noreply.github.com> Date: Tue, 3 Aug 2021 13:27:05 +0800 Subject: [PATCH 23/23] Update service-helper.js --- src/common/service-helper.js | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 7eb518b..b47ae48 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -50,12 +50,11 @@ async function createRecordInEs (resource, entity, toEs) { throw err } - if (!toEs) { - try { - await publishMessage("create", resource, entity); - } catch (err) { - logger.logFullError(err); - } + // publish create event. + try { + await publishMessage("create", resource, entity); + } catch (err) { + logger.logFullError(err); } } @@ -75,12 +74,12 @@ async function patchRecordInEs (resource, entity, toEs) { logger.logFullError(err) throw err } - if (!toEs) { - try { - await publishMessage("patch", resource, entity); - } catch (err) { - logger.logFullError(err); - } + + // publish patch event. + try { + await publishMessage("patch", resource, entity); + } catch (err) { + logger.logFullError(err); } }