diff --git a/.circleci/config.yml b/.circleci/config.yml index 6024bcd..78be9fb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -70,6 +70,7 @@ workflows: branches: only: - develop + - feature/shapeup4-cqrs-update # Production builds are exectuted only on tagged commits to the # master branch. diff --git a/config/default.js b/config/default.js index 0766f4d..eb69492 100755 --- a/config/default.js +++ b/config/default.js @@ -35,6 +35,8 @@ module.exports = { KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api', // topics + UBAHN_ERROR_TOPIC: process.env.UBAHN_ERROR_TOPIC || 'ubahn.action.error', + UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create', UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update', UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete', @@ -126,7 +128,7 @@ module.exports = { orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders' } }, - MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000, + MAX_BATCH_SIZE: parseInt(process.env.MAX_BATCH_SIZE, 10) || 10000, MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000, MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100 } diff --git a/docker-pgsql-es/docker-compose.yml b/docker-pgsql-es/docker-compose.yml index 31f5a45..9ce9d2e 100644 --- a/docker-pgsql-es/docker-compose.yml +++ b/docker-pgsql-es/docker-compose.yml @@ -4,14 +4,14 @@ services: image: "postgres:12.4" volumes: - database-data:/var/lib/postgresql/data/ - ports: + ports: - "5432:5432" environment: POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_USER: ${DB_USERNAME} POSTGRES_DB: ${DB_NAME} esearch: - image: elasticsearch:7.7.1 + image: elasticsearch:7.13.4 container_name: ubahn-data-processor-es_es ports: - "9200:9200" diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index 4f6147c..60bf96d 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -144,30 +144,40 @@ async function insertIntoES (modelName, dataset) { const chunked = _.chunk(dataset, config.get('ES.MAX_BULK_SIZE')) for (const ds of chunked) { const body = _.flatMap(ds, doc => [{ index: { _id: doc.id } }, doc]) - await client.bulk({ - index: topResources[esResourceName].index, - type: topResources[esResourceName].type, - body, - pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, - refresh: 'wait_for' - }) + try { + await client.bulk({ + index: topResources[esResourceName].index, + type: topResources[esResourceName].type, + body, + pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, + refresh: 'wait_for' + }) + } catch (e) { + logger.error('ES, create mapping error.') + logger.error(JSON.stringify(e)) + } } } else if (_.includes(_.keys(userResources), esResourceName)) { const userResource = userResources[esResourceName] if (userResource.nested === true && userResource.mappingCreated !== true) { - await client.indices.putMapping({ - index: topResources.user.index, - type: topResources.user.type, - include_type_name: true, - body: { - properties: { - [userResource.propertyName]: { - type: 'nested' + try { + await client.indices.putMapping({ + index: topResources.user.index, + type: topResources.user.type, + include_type_name: true, + body: { + properties: { + [userResource.propertyName]: { + type: 'nested' + } } } - } - }) + }) + } catch (e) { + logger.error('ES, nexted mapping error.') + logger.error(JSON.stringify(e)) + } userResource.mappingCreated = true } @@ -391,14 +401,12 @@ async function main () { for (let i = 0; i < keys.length; i++) { const key = keys[i] + const queryPage = { perPage: parseInt(config.get('ES.MAX_BATCH_SIZE'), 10), page: 1 } try { - const allData = await dbHelper.find(models[key], {}) - let j = 0 - const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE')) - for (const data of dataset) { + while (true) { + const data = await dbHelper.find(models[key], { ...queryPage }) for (let i = 0; i < data.length; i++) { - j++ - logger.info(`Inserting data ${j} of ${allData.length}`) + logger.info(`Inserting data ${(i + 1) + (queryPage.perPage * (queryPage.page - 1))}`) logger.info(JSON.stringify(data[i])) if (!_.isString(data[i].created)) { data[i].created = new Date() @@ -414,19 +422,25 @@ async function main () { } } await insertIntoES(key, data) + if (data.length < queryPage.perPage) { + logger.info('import data for ' + key + ' done') + break + } else { + queryPage.page = queryPage.page + 1 + } } - logger.info('import data for ' + key + ' done') } catch (e) { - logger.error(e) + logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4)) + logger.error(_.get(e, 'meta.meta.request.params.method', '')) + logger.error(_.get(e, 'meta.meta.request.params.path', '')) logger.warn('import data for ' + key + ' failed') continue } - try { await createAndExecuteEnrichPolicy(key) logger.info('create and execute enrich policy for ' + key + ' done') } catch (e) { - logger.error(e) + logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4)) logger.warn('create and execute enrich policy for ' + key + ' failed') } @@ -434,7 +448,7 @@ async function main () { await createEnrichProcessor(key) logger.info('create enrich processor (pipeline) for ' + key + ' done') } catch (e) { - logger.error(e) + logger.error(JSON.stringify(_.get(e, 'meta.body', ''), null, 4)) logger.warn('create enrich processor (pipeline) for ' + key + ' failed') } } diff --git a/src/common/db-helper.js b/src/common/db-helper.js index 5ba3e7e..b69bf01 100644 --- a/src/common/db-helper.js +++ b/src/common/db-helper.js @@ -105,24 +105,27 @@ async function get (model, pk, params) { * @param model the sequelize model object * @param entity entity to create * @param auth the user auth object + * @param transaction the transaction object * @returns {Promise} */ -async function create (model, entity, auth) { +async function create (model, entity, auth, transaction) { if (auth) { entity.createdBy = helper.getAuthUser(auth) } - return model.create(entity) + return model.create(entity, { transaction }) } /** * delete object by pk * @param model the sequelize model object * @param pk the primary key + * @param transaction the transaction object * @returns {Promise} */ -async function remove (model, pk, params) { +async function remove (model, pk, params, transaction) { const instance = await get(model, pk, params) - return instance.destroy() + const result = await instance.destroy({ transaction }) + return result } /** @@ -132,13 +135,14 @@ async function remove (model, pk, params) { * @param entity entity to create * @param auth the auth object * @param auth the path params + * @param transaction the transaction object * @returns {Promise} */ -async function update (model, pk, entity, auth, params) { +async function update (model, pk, entity, auth, params, transaction) { // insure that object exists const instance = await get(model, pk, params) entity.updatedBy = helper.getAuthUser(auth) - return instance.update(entity) + return instance.update(entity, { transaction }) } /** diff --git a/src/common/es-helper.js b/src/common/es-helper.js index e667de4..85bc543 100644 --- a/src/common/es-helper.js +++ b/src/common/es-helper.js @@ -2,6 +2,7 @@ const config = require('config') const _ = require('lodash') const querystring = require('querystring') const logger = require('../common/logger') +const helper = require('../common/helper') const appConst = require('../consts') const esClient = require('./es-client').getESClient() @@ -282,6 +283,38 @@ function escapeRegex (str) { /* eslint-enable no-useless-escape */ } +/** + * Process create entity + * @param {String} resource resource name + * @param {Object} entity entity object + */ +async function processCreate (resource, entity) { + helper.validProperties(entity, ['id']) + await esClient.index({ + index: DOCUMENTS[resource].index, + type: DOCUMENTS[resource].type, + id: entity.id, + body: entity, + refresh: 'wait_for' + }) + logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`) +} + +/** + * Process delete entity + * @param {String} resource resource name + * @param {Object} entity entity object + */ +async function processDelete (resource, entity) { + helper.validProperties(entity, ['id']) + await esClient.delete({ + index: DOCUMENTS[resource].index, + type: DOCUMENTS[resource].type, + id: entity.id, + refresh: 'wait_for' + }) +} + async function getOrganizationId (handle) { const dBHelper = require('../common/db-helper') const sequelize = require('../models/index') @@ -1453,6 +1486,9 @@ async function searchAchievementValues ({ organizationId, keyword }) { } module.exports = { + processCreate, + processUpdate: processCreate, + processDelete, searchElasticSearch, getFromElasticSearch, searchUsers, diff --git a/src/common/helper.js b/src/common/helper.js index 95bde1e..b68f3ee 100644 --- a/src/common/helper.js +++ b/src/common/helper.js @@ -1,4 +1,5 @@ const config = require('config') +const Joi = require('@hapi/joi') const querystring = require('querystring') const errors = require('./errors') const appConst = require('../consts') @@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper') const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL'])) +/** + * Function to valid require keys + * @param {Object} payload validated object + * @param {Array} keys required keys + * @throws {Error} if required key absent + */ +function validProperties (payload, keys) { + const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true) + const error = schema.validate(payload).error + if (error) { + throw error + } +} + /** * get auth user handle or id * @param authUser the user @@ -145,12 +160,33 @@ async function postEvent (topic, payload) { await busApiClient.postEvent(message) } +/** + * Send error event to Kafka + * @params {String} topic the topic name + * @params {Object} payload the payload + * @params {String} action for which operation error occurred + */ + async function publishError (topic, payload, action) { + _.set(payload, 'apiAction', action) + const message = { + topic, + originator: config.KAFKA_MESSAGE_ORIGINATOR, + timestamp: new Date().toISOString(), + 'mime-type': 'application/json', + payload + } + logger.debug(`Publish error to Kafka topic ${topic}, ${JSON.stringify(message, null, 2)}`) + await busApiClient.postEvent(message) +} + module.exports = { + validProperties, getAuthUser, permissionCheck, checkIfExists, injectSearchMeta, getControllerMethods, getSubControllerMethods, - postEvent + postEvent, + publishError } diff --git a/src/common/service-helper.js b/src/common/service-helper.js index 68b55a0..b47ae48 100644 --- a/src/common/service-helper.js +++ b/src/common/service-helper.js @@ -38,25 +38,48 @@ const MODEL_TO_RESOURCE = { * Create record in es * @param resource the resource to create * @param result the resource fields + * @param toEs is to es directly */ -async function createRecordInEs (resource, entity) { +async function createRecordInEs (resource, entity, toEs) { try { - await publishMessage('create', resource, entity) + if (toEs) { + await esHelper.processCreate(resource, entity) + } } catch (err) { logger.logFullError(err) + throw err } + + // publish create event. + try { + await publishMessage("create", resource, entity); + } catch (err) { + logger.logFullError(err); + } + } /** * Patch record in es * @param resource the resource to create * @param result the resource fields + * @param toEs is to es directly */ -async function patchRecordInEs (resource, entity) { +async function patchRecordInEs (resource, entity, toEs) { try { - await publishMessage('patch', resource, entity) + if (toEs) { + await esHelper.processUpdate(resource, entity) + } } catch (err) { logger.logFullError(err) + throw err + } + + // publish patch event. + try { + await publishMessage("patch", resource, entity); + } catch (err) { + logger.logFullError(err); } } @@ -65,8 +88,9 @@ async function patchRecordInEs (resource, entity) { * @param id the id of record * @param params the params of record (like nested ids) * @param resource the resource to delete + * @param toEs is to es directly */ -async function deleteRecordFromEs (id, params, resource) { +async function deleteRecordFromEs (id, params, resource, toEs) { let payload if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) { payload = _.assign({}, params) @@ -76,9 +100,19 @@ async function deleteRecordFromEs (id, params, resource) { } } try { - await publishMessage('remove', resource, payload) + if (toEs) { + await esHelper.processDelete(resource, payload) + } } catch (err) { logger.logFullError(err) + throw err + } + if (!toEs) { + try { + await publishMessage("remove", resource, payload); + } catch (err) { + logger.logFullError(err); + } } } @@ -174,13 +208,14 @@ function sleep (ms) { } /** - * delete child of record with delay between each item deleted + * delete child of record with delay between each item deleted and with transaction * @param model the child model to delete * @param id the user id to delete * @param params the params for child * @param resourceName the es recource name + * @param transaction the transaction object */ -async function deleteChild (model, id, params, resourceName) { +async function deleteChild (model, id, params, resourceName, transaction) { const query = {} query[params[0]] = id const result = await dbHelper.find(model, query) @@ -194,8 +229,8 @@ async function deleteChild (model, id, params, resourceName) { params.forEach(attr => { esParams[attr] = record[attr] }) // remove from db - dbHelper.remove(model, record.id) - deleteRecordFromEs(record.id, esParams, resourceName) + await dbHelper.remove(model, record.id, transaction) + await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction) // sleep for configured time await sleep(config.CASCADE_PAUSE_MS) diff --git a/src/modules/user/service.js b/src/modules/user/service.js index 0918552..953f92c 100644 --- a/src/modules/user/service.js +++ b/src/modules/user/service.js @@ -4,8 +4,10 @@ const joi = require('@hapi/joi') const _ = require('lodash') +const config = require('config') const errors = require('../../common/errors') +const logger = require('../../common/logger') const helper = require('../../common/helper') const dbHelper = require('../../common/db-helper') const serviceHelper = require('../../common/service-helper') @@ -30,10 +32,21 @@ const uniqueFields = [['handle']] async function create (entity, auth) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const result = await dbHelper.create(User, entity, auth) - await serviceHelper.createRecordInEs(resource, result.dataValues) - - return result + let payload + try { + const result = await sequelize.transaction(async (t) => { + const userEntity = await dbHelper.create(User, entity, auth, t) + payload = userEntity.dataValues + await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true) + return userEntity + }) + return result + } catch (e) { + if (payload) { + helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.create') + } + throw e + } } create.schema = { @@ -56,10 +69,22 @@ create.schema = { async function patch (id, entity, auth, params) { await dbHelper.makeSureUnique(User, entity, uniqueFields) - const newEntity = await dbHelper.update(User, id, entity, auth) - await serviceHelper.patchRecordInEs(resource, newEntity.dataValues) - - return newEntity + let payload + try { + const result = await sequelize.transaction(async (t) => { + const newEntity = await dbHelper.update(User, id, entity, auth, null, t) + payload = newEntity.dataValues + await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true) + return newEntity + }) + + return result + } catch (e) { + if (payload) { + helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.update') + } + throw e + } } patch.schema = { @@ -169,6 +194,25 @@ async function beginCascadeDelete (id, params) { await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill') await dbHelper.remove(User, id) await serviceHelper.deleteRecordFromEs(id, params, resource) + //TODO: below code is not working, so simply commented our changes + /* //Start here + let payload = {id} + try { + await sequelize.transaction(async (t) => { + await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t) + await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t) + await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t) + await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t) + await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t) + await dbHelper.remove(User, id, null, t) + await serviceHelper.deleteRecordFromEs(id, params, resource, true) + }) + + } catch (e) { + helper.publishError(config.UBAHN_ERROR_TOPIC, payload, 'user.delete') + throw e + } + */ // End here } module.exports = {