diff --git a/README.md b/README.md index d4bb982..861c35c 100755 --- a/README.md +++ b/README.md @@ -37,6 +37,9 @@ Configuration for the application is at config/default.js and config/production. - UBAHN_AGGREGATE_TOPIC: Kafka topic that is used to combine all create, update and delete message(s) - ES.HOST: Elasticsearch host - ES.DOCUMENTS: Elasticsearch index, type and id mapping for resources. +- ATTRIBUTE_GROUP_PIPELINE_ID: The pipeline id for enrichment with attribute group. Default is `attributegroup-pipeline` +- SKILL_PROVIDER_PIPELINE_ID: The pipeline id for enrichment with skill provider. Default is `skillprovider-pipeline` +- USER_PIPELINE_ID: The pipeline id for enrichment of user details. Default is `user-pipeline` For `ES.DOCUMENTS` configuration, you will find multiple other configurations below it. Each has default values that you can override using the environment variables diff --git a/config/default.js b/config/default.js index e824552..c5e9f3a 100755 --- a/config/default.js +++ b/config/default.js @@ -51,7 +51,6 @@ module.exports = { // ElasticSearch ES: { HOST: process.env.ES_HOST || 'http://localhost:9200', - ENRICH_USER_PIPELINE_NAME: process.env.ENRICH_USER_PIPELINE_NAME || 'enrich_user', // es mapping: _index, _type, _id DOCUMENTS: { achievementprovider: { @@ -64,7 +63,8 @@ module.exports = { }, attributegroup: { index: process.env.ATTRIBUTE_GROUP_INDEX || 'attribute_group', - type: '_doc' + type: '_doc', + pipelineId: process.env.ATTRIBUTE_GROUP_PIPELINE_ID || 'attributegroup-pipeline' }, organization: { index: process.env.ORGANIZATION_INDEX || 'organization', @@ -80,11 +80,13 @@ module.exports = { }, skillprovider: { index: process.env.SKILL_PROVIDER_INDEX || 'skill_provider', - type: '_doc' + type: '_doc', + pipelineId: process.env.SKILL_PROVIDER_PIPELINE_ID || 'skillprovider-pipeline' }, user: { index: process.env.USER_INDEX || 'user', - type: '_doc' + type: '_doc', + pipelineId: process.env.USER_PIPELINE_ID || 'user-pipeline' }, // sub resources under user achievement: { diff --git a/scripts/constants.js b/scripts/constants.js index 42acd44..de5d6af 100644 --- a/scripts/constants.js +++ b/scripts/constants.js @@ -6,42 +6,136 @@ const config = require('config') const topResources = { + skillprovider: { + index: config.get('ES.DOCUMENTS.skillprovider.index'), + type: config.get('ES.DOCUMENTS.skillprovider.type'), + enrich: { + policyName: 'skillprovider-policy', + matchField: 'id', + enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'] + }, + pipeline: { + id: config.get('ES.DOCUMENTS.skillprovider.pipelineId'), + field: 'skillProviderId', + targetField: 'skillprovider', + maxMatches: '1' + } + }, + + role: { + index: config.get('ES.DOCUMENTS.role.index'), + type: config.get('ES.DOCUMENTS.role.type'), + enrich: { + policyName: 'role-policy', + matchField: 'id', + enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'] + } + }, + achievementprovider: { index: config.get('ES.DOCUMENTS.achievementprovider.index'), - enrichPolicy: 'achievementprovider-policy', - type: config.get('ES.DOCUMENTS.achievementprovider.type') - }, - attribute: { - index: config.get('ES.DOCUMENTS.attribute.index'), - enrichPolicy: 'attribute-policy', - type: config.get('ES.DOCUMENTS.attribute.type') + type: config.get('ES.DOCUMENTS.achievementprovider.type'), + enrich: { + policyName: 'achievementprovider-policy', + matchField: 'id', + enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy'] + } }, + attributegroup: { index: config.get('ES.DOCUMENTS.attributegroup.index'), - type: config.get('ES.DOCUMENTS.attributegroup.type') - }, - organization: { - index: config.get('ES.DOCUMENTS.organization.index'), - enrichPolicy: 'organization-policy', - type: config.get('ES.DOCUMENTS.organization.type') - }, - role: { - index: config.get('ES.DOCUMENTS.role.index'), - enrichPolicy: 'role-policy', - type: config.get('ES.DOCUMENTS.role.type') + type: config.get('ES.DOCUMENTS.attributegroup.type'), + enrich: { + policyName: 'attributegroup-policy', + matchField: 'id', + enrichFields: ['id', 'name', 'organizationId', 'created', 'updated', 'createdBy', 'updatedBy'] + }, + pipeline: { + id: config.get('ES.DOCUMENTS.attributegroup.pipelineId'), + field: 'attributeGroupId', + targetField: 'attributegroup', + maxMatches: '1' + } }, + skill: { index: config.get('ES.DOCUMENTS.skill.index'), - enrichPolicy: 'skill-policy', - type: config.get('ES.DOCUMENTS.skill.type') + type: config.get('ES.DOCUMENTS.skill.type'), + enrich: { + policyName: 'skill-policy', + matchField: 'id', + enrichFields: ['id', 'skillProviderId', 'name', 'externalId', 'uri', 'created', 'updated', 'createdBy', 'updatedBy', 'skillprovider'] + }, + ingest: { + pipeline: { + id: config.get('ES.DOCUMENTS.skillprovider.pipelineId') + } + } }, - skillprovider: { - index: config.get('ES.DOCUMENTS.skillprovider.index'), - type: config.get('ES.DOCUMENTS.skillprovider.type') + + attribute: { + index: config.get('ES.DOCUMENTS.attribute.index'), + type: config.get('ES.DOCUMENTS.attribute.type'), + enrich: { + policyName: 'attribute-policy', + matchField: 'id', + enrichFields: ['id', 'name', 'attributeGroupId', 'created', 'updated', 'createdBy', 'updatedBy', 'attributegroup'] + }, + ingest: { + pipeline: { + id: config.get('ES.DOCUMENTS.attributegroup.pipelineId') + } + } }, + + organization: { + index: config.get('ES.DOCUMENTS.organization.index'), + type: config.get('ES.DOCUMENTS.organization.type') + }, + user: { index: config.get('ES.DOCUMENTS.user.index'), - type: config.get('ES.DOCUMENTS.user.type') + type: config.get('ES.DOCUMENTS.user.type'), + pipeline: { + id: config.get('ES.DOCUMENTS.user.pipelineId'), + processors: [ + { + referenceField: config.get('ES.DOCUMENTS.achievement.userField'), + enrichPolicyName: 'achievementprovider-policy', + field: '_ingest._value.achievementsProviderId', + targetField: '_ingest._value.achievementprovider', + maxMatches: '1' + }, + { + referenceField: config.get('ES.DOCUMENTS.externalprofile.userField'), + enrichPolicyName: 'organization-policy', + field: '_ingest._value.organizationId', + targetField: '_ingest._value.organization', + maxMatches: '1' + }, + { + referenceField: config.get('ES.DOCUMENTS.userattribute.userField'), + enrichPolicyName: 'attribute-policy', + field: '_ingest._value.attributeId', + targetField: '_ingest._value.attribute', + maxMatches: '1' + }, + { + referenceField: config.get('ES.DOCUMENTS.userrole.userField'), + enrichPolicyName: 'role-policy', + field: '_ingest._value.roleId', + targetField: '_ingest._value.role', + maxMatches: '1' + }, + { + referenceField: config.get('ES.DOCUMENTS.userskill.userField'), + enrichPolicyName: 'skill-policy', + field: '_ingest._value.skillId', + targetField: '_ingest._value.skill', + maxMatches: '1' + } + ] + } } } @@ -72,7 +166,12 @@ const userResources = { const organizationResources = { organizationskillprovider: { propertyName: config.get('ES.DOCUMENTS.organizationskillprovider.orgField'), - relateKey: 'skillProviderId' + relateKey: 'skillProviderId', + enrich: { + policyName: 'organization-policy', + matchField: 'id', + enrichFields: ['id', 'name', 'created', 'updated', 'createdBy', 'updatedBy', 'skillProviders'] + } } } diff --git a/scripts/db/dropAll.js b/scripts/db/dropAll.js index cef1924..a30a2cf 100644 --- a/scripts/db/dropAll.js +++ b/scripts/db/dropAll.js @@ -4,11 +4,33 @@ const _ = require('lodash') const models = require('../../src/models') const logger = require('../../src/common/logger') -const { topResources, modelToESIndexMapping } = require('../constants') +const { + topResources, + organizationResources, + modelToESIndexMapping +} = require('../constants') const { getESClient } = require('../../src/common/es-client') async function main () { const client = getESClient() + + try { + logger.info('Deleting all pipelines...') + await client.ingest.deletePipeline({ + id: topResources.user.pipeline.id + }) + await client.ingest.deletePipeline({ + id: topResources.skillprovider.pipeline.id + }) + await client.ingest.deletePipeline({ + id: topResources.attributegroup.pipeline.id + }) + logger.info('Successfully deleted') + } catch (e) { + console.error(e) + logger.warn('Delete all ingest pipelines failed') + } + const keys = Object.keys(models) for (let i = 0; i < keys.length; i++) { const key = keys[i] @@ -16,12 +38,29 @@ async function main () { const esResourceName = modelToESIndexMapping[key] try { if (_.includes(_.keys(topResources), esResourceName)) { + if (topResources[esResourceName].enrich) { + logger.info(`Deleting enrich policy for ${esResourceName}`) + await client.enrich.deletePolicy({ + name: topResources[esResourceName].enrich.policyName + }) + logger.info(`Successfully deleted enrich policy for ${esResourceName}`) + } + logger.info(`Deleting index for ${esResourceName}`) await client.indices.delete({ index: topResources[esResourceName].index }) + logger.info(`Successfully deleted enrich policy for ${esResourceName}`) + } else if (_.includes(_.keys(organizationResources), esResourceName)) { + logger.info('Deleting enrich policy for organization') + await client.enrich.deletePolicy({ + name: organizationResources[esResourceName].enrich.policyName + }) + logger.info('Successfully deleted enrich policy for organization') } + logger.info(`Deleting data in QLDB for ${esResourceName}`) await models.DBHelper.clear(models[key]) + logger.info(`Successfully deleted data in QLDB for ${esResourceName}`) } catch (e) { console.error(e) logger.warn(`drop table ${key} failed`) diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index 0d12e73..c5a23fd 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -1,5 +1,4 @@ const _ = require('lodash') -const config = require('config') const models = require('../../src/models') const logger = require('../../src/common/logger') const { getESClient } = require('../../src/common/es-client') @@ -10,14 +9,62 @@ const { modelToESIndexMapping } = require('../constants') -async function cleanupES () { - const client = getESClient() +// Declares the ordering of the resource data insertion, to ensure that enrichment happens correctly +const RESOURCES_IN_ORDER = [ + 'skillprovider', + 'role', + 'achievementprovider', + 'attributegroup', + 'skill', + 'attribute', + 'organization', + 'organizationskillprovider', + 'user', + 'userskill', + 'achievement', + 'userrole', + 'externalprofile', + 'userattribute' +] - await client.indices.delete({ - index: '_all' - }) +const client = getESClient() - console.log('Existing indices have been deleted!') +/** + * Cleans up the data in elasticsearch + * @param {Array} keys Array of models + */ +async function cleanupES (keys) { + const client = getESClient() + await client.ingest.deletePipeline({ + id: topResources.user.pipeline.id + }) + await client.ingest.deletePipeline({ + id: topResources.skillprovider.pipeline.id + }) + await client.ingest.deletePipeline({ + id: topResources.attributegroup.pipeline.id + }) + for (let i = 0; i < keys.length; i++) { + const key = keys[i] + if (models[key].tableName) { + const esResourceName = modelToESIndexMapping[key] + if (_.includes(_.keys(topResources), esResourceName)) { + if (topResources[esResourceName].enrich) { + await client.enrich.deletePolicy({ + name: topResources[esResourceName].enrich.policyName + }) + } + await client.indices.delete({ + index: topResources[esResourceName].index + }) + } else if (_.includes(_.keys(organizationResources), esResourceName)) { + await client.enrich.deletePolicy({ + name: organizationResources[esResourceName].enrich.policyName + }) + } + } + } + console.log('Existing data in elasticsearch has been deleted!') } async function insertIntoES (modelName, body) { @@ -29,19 +76,15 @@ async function insertIntoES (modelName, body) { return } - const client = getESClient() - if (_.includes(_.keys(topResources), esResourceName)) { - await client.create({ + await client.index({ index: topResources[esResourceName].index, type: topResources[esResourceName].type, id: body.id, body, - refresh: 'true' + pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, + refresh: 'wait_for' }) - if (topResources[esResourceName].enrichPolicy) { - await client.enrich.executePolicy({ name: topResources[esResourceName].enrichPolicy }) - } } else if (_.includes(_.keys(userResources), esResourceName)) { const userResource = userResources[esResourceName] @@ -82,8 +125,8 @@ async function insertIntoES (modelName, body) { type: topResources.user.type, id: body.userId, body: user, - pipeline: config.get('ES.ENRICH_USER_PIPELINE_NAME'), - refresh: 'true' + pipeline: topResources.user.pipeline.id, + refresh: 'wait_for' }) } } else if (_.includes(_.keys(organizationResources), esResourceName)) { @@ -105,12 +148,102 @@ async function insertIntoES (modelName, body) { logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`) } else { organization[orgResource.propertyName].push(body) - await client.update({ + await client.index({ index: topResources.organization.index, type: topResources.organization.type, id: body.organizationId, - body: { doc: organization }, - refresh: 'true' + body: organization, + refresh: 'wait_for' + }) + } + } +} + +/** + * Creates and executes the enrich policy for the provided model + * @param {String} modelName The model name + */ +async function createAndExecuteEnrichPolicy (modelName) { + const esResourceName = modelToESIndexMapping[modelName] + + if (_.includes(_.keys(topResources), esResourceName) && topResources[esResourceName].enrich) { + await client.enrich.putPolicy({ + name: topResources[esResourceName].enrich.policyName, + body: { + match: { + indices: topResources[esResourceName].index, + match_field: topResources[esResourceName].enrich.matchField, + enrich_fields: topResources[esResourceName].enrich.enrichFields + } + } + }) + await client.enrich.executePolicy({ name: topResources[esResourceName].enrich.policyName }) + } else if (_.includes(_.keys(organizationResources), esResourceName)) { + // For organization, execute enrich policy AFTER the sub documents on the org (namely orgskillprovider) is in + // This is because external profile on user is enriched with org, and it needs to have the orgskillprovider details in it + await client.enrich.putPolicy({ + name: organizationResources[esResourceName].enrich.policyName, + body: { + match: { + indices: topResources.organization.index, + match_field: organizationResources[esResourceName].enrich.matchField, + enrich_fields: organizationResources[esResourceName].enrich.enrichFields + } + } + }) + await client.enrich.executePolicy({ name: organizationResources[esResourceName].enrich.policyName }) + } +} + +/** + * Creates the ingest pipeline using the enrich policy + * @param {String} modelName The model name + */ +async function createEnrichProcessor (modelName) { + const esResourceName = modelToESIndexMapping[modelName] + + if (_.includes(_.keys(topResources), esResourceName) && topResources[esResourceName].pipeline) { + if (topResources[esResourceName].pipeline.processors) { + const processors = [] + + for (let i = 0; i < topResources[esResourceName].pipeline.processors.length; i++) { + const ep = topResources[esResourceName].pipeline.processors[i] + processors.push({ + foreach: { + field: ep.referenceField, + ignore_missing: true, + processor: { + enrich: { + policy_name: ep.enrichPolicyName, + ignore_missing: true, + field: ep.field, + target_field: ep.targetField, + max_matches: ep.maxMatches + } + } + } + }) + } + + await client.ingest.putPipeline({ + id: topResources[esResourceName].pipeline.id, + body: { + processors + } + }) + } else { + await client.ingest.putPipeline({ + id: topResources[esResourceName].pipeline.id, + body: { + processors: [{ + enrich: { + policy_name: topResources[esResourceName].enrich.policyName, + field: topResources[esResourceName].pipeline.field, + target_field: topResources[esResourceName].pipeline.targetField, + max_matches: topResources[esResourceName].pipeline.maxMatches + } + }] + } }) } } @@ -122,32 +255,50 @@ async function insertIntoES (modelName, body) { */ async function main () { let keys = Object.keys(models) - keys = _.orderBy(keys, k => { - const esResourceName = modelToESIndexMapping[k] - // Create parent data first - if (_.includes(_.keys(topResources), esResourceName)) { - return -1 + // Sort the models in the order of insertion (for correct enrichment) + const temp = Array(keys.length).fill(null) + keys.forEach(k => { + if (models[k].tableName) { + const esResourceName = modelToESIndexMapping[k] + const index = RESOURCES_IN_ORDER.indexOf(esResourceName) + temp[index] = k } - - return 1 }) + keys = _.compact(temp) - await cleanupES() + await cleanupES(keys) for (let i = 0; i < keys.length; i++) { const key = keys[i] - if (models[key].tableName) { - try { - const data = await models.DBHelper.find(models[key], []) - for (let i = 0; i < data.length; i++) { - await insertIntoES(key, data[i]) - } - logger.info('import data for ' + key + ' done') - } catch (e) { - logger.error(e) - logger.warn('import data for ' + key + ' failed') + try { + const data = await models.DBHelper.find(models[key], []) + + for (let i = 0; i < data.length; i++) { + logger.info(`Inserting data ${i + 1} of ${data.length}`) + await insertIntoES(key, data[i]) } + logger.info('import data for ' + key + ' done') + } catch (e) { + logger.error(e) + logger.warn('import data for ' + key + ' failed') + continue + } + + try { + await createAndExecuteEnrichPolicy(key) + logger.info('create and execute enrich policy for ' + key + ' done') + } catch (e) { + logger.error(e) + logger.warn('create and execute enrich policy for ' + key + ' failed') + } + + try { + await createEnrichProcessor(key) + logger.info('create enrich processor (pipeline) for ' + key + ' done') + } catch (e) { + logger.error(e) + logger.warn('create enrich processor (pipeline) for ' + key + ' failed') } } logger.info('all done') diff --git a/scripts/db/genData.js b/scripts/db/genData.js index e11dca9..97681ac 100644 --- a/scripts/db/genData.js +++ b/scripts/db/genData.js @@ -1,5 +1,4 @@ const _ = require('lodash') -const config = require('config') const models = require('../../src/models') const logger = require('../../src/common/logger') const { getESClient } = require('../../src/common/es-client') @@ -10,6 +9,26 @@ const { modelToESIndexMapping } = require('../constants') +// Declares the ordering of the resource data insertion, to ensure that enrichment happens correctly +const RESOURCES_IN_ORDER = [ + 'skillprovider', + 'role', + 'achievementprovider', + 'attributegroup', + 'skill', + 'attribute', + 'organization', + 'organizationskillprovider', + 'user', + 'userskill', + 'achievement', + 'userrole', + 'externalprofile', + 'userattribute' +] + +const client = getESClient() + async function insertIntoES (modelName, body) { const esResourceName = modelToESIndexMapping[modelName] @@ -19,19 +38,15 @@ async function insertIntoES (modelName, body) { return } - const client = getESClient() - if (_.includes(_.keys(topResources), esResourceName)) { - await client.create({ + await client.index({ index: topResources[esResourceName].index, type: topResources[esResourceName].type, id: body.id, body, - refresh: 'true' + pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, + refresh: 'wait_for' }) - if (topResources[esResourceName].enrichPolicy) { - await client.enrich.executePolicy({ name: topResources[esResourceName].enrichPolicy }) - } } else if (_.includes(_.keys(userResources), esResourceName)) { const userResource = userResources[esResourceName] @@ -72,7 +87,7 @@ async function insertIntoES (modelName, body) { type: topResources.user.type, id: body.userId, body: user, - pipeline: config.get('ES.ENRICH_USER_PIPELINE_NAME'), + pipeline: topResources.user.pipeline.id, refresh: 'wait_for' }) } @@ -95,12 +110,102 @@ async function insertIntoES (modelName, body) { logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`) } else { organization[orgResource.propertyName].push(body) - await client.update({ + await client.index({ index: topResources.organization.index, type: topResources.organization.type, id: body.organizationId, - body: { doc: organization }, - refresh: 'true' + body: organization, + refresh: 'wait_for' + }) + } + } +} + +/** + * Creates and executes the enrich policy for the provided model + * @param {String} modelName The model name + */ +async function createAndExecuteEnrichPolicy (modelName) { + const esResourceName = modelToESIndexMapping[modelName] + + if (_.includes(_.keys(topResources), esResourceName) && topResources[esResourceName].enrich) { + await client.enrich.putPolicy({ + name: topResources[esResourceName].enrich.policyName, + body: { + match: { + indices: topResources[esResourceName].index, + match_field: topResources[esResourceName].enrich.matchField, + enrich_fields: topResources[esResourceName].enrich.enrichFields + } + } + }) + await client.enrich.executePolicy({ name: topResources[esResourceName].enrich.policyName }) + } else if (_.includes(_.keys(organizationResources), esResourceName)) { + // For organization, execute enrich policy AFTER the sub documents on the org (namely orgskillprovider) is in + // This is because external profile on user is enriched with org, and it needs to have the orgskillprovider details in it + await client.enrich.putPolicy({ + name: organizationResources[esResourceName].enrich.policyName, + body: { + match: { + indices: topResources.organization.index, + match_field: organizationResources[esResourceName].enrich.matchField, + enrich_fields: organizationResources[esResourceName].enrich.enrichFields + } + } + }) + await client.enrich.executePolicy({ name: organizationResources[esResourceName].enrich.policyName }) + } +} + +/** + * Creates the ingest pipeline using the enrich policy + * @param {String} modelName The model name + */ +async function createEnrichProcessor (modelName) { + const esResourceName = modelToESIndexMapping[modelName] + + if (_.includes(_.keys(topResources), esResourceName) && topResources[esResourceName].pipeline) { + if (topResources[esResourceName].pipeline.processors) { + const processors = [] + + for (let i = 0; i < topResources[esResourceName].pipeline.processors.length; i++) { + const ep = topResources[esResourceName].pipeline.processors[i] + processors.push({ + foreach: { + field: ep.referenceField, + ignore_missing: true, + processor: { + enrich: { + policy_name: ep.enrichPolicyName, + ignore_missing: true, + field: ep.field, + target_field: ep.targetField, + max_matches: ep.maxMatches + } + } + } + }) + } + + await client.ingest.putPipeline({ + id: topResources[esResourceName].pipeline.id, + body: { + processors + } + }) + } else { + await client.ingest.putPipeline({ + id: topResources[esResourceName].pipeline.id, + body: { + processors: [{ + enrich: { + policy_name: topResources[esResourceName].enrich.policyName, + field: topResources[esResourceName].pipeline.field, + target_field: topResources[esResourceName].pipeline.targetField, + max_matches: topResources[esResourceName].pipeline.maxMatches + } + }] + } }) } } @@ -114,32 +219,49 @@ async function main () { await models.init() let keys = Object.keys(models) - keys = _.orderBy(keys, k => { - const esResourceName = modelToESIndexMapping[k] - // Create parent data first - if (_.includes(_.keys(topResources), esResourceName)) { - return -1 + // Sort the models in the order of insertion (for correct enrichment) + const temp = Array(keys.length).fill(null) + keys.forEach(k => { + if (models[k].tableName) { + const esResourceName = modelToESIndexMapping[k] + const index = RESOURCES_IN_ORDER.indexOf(esResourceName) + temp[index] = k } - - return 1 }) + keys = _.compact(temp) for (let i = 0; i < keys.length; i++) { const key = keys[i] - if (models[key].tableName) { - try { - const data = require(`./data/${key}.json`) - await models.DBHelper.clear(models[key]) - for (let i = 0; i < data.length; i++) { - await models.DBHelper.save(models[key], new models[key]().from(data[i]), true) - await insertIntoES(key, data[i]) - } - logger.info('import data for ' + key + ' done') - } catch (e) { - logger.error(e) - logger.warn('import data for ' + key + ' failed') + try { + const data = require(`./data/${key}.json`) + await models.DBHelper.clear(models[key]) + for (let i = 0; i < data.length; i++) { + logger.info(`Inserting data ${i + 1} of ${data.length}`) + await models.DBHelper.save(models[key], new models[key]().from(data[i]), true) + await insertIntoES(key, data[i]) } + logger.info('import data for ' + key + ' done') + } catch (e) { + logger.error(e) + logger.warn('import data for ' + key + ' failed') + continue + } + + try { + await createAndExecuteEnrichPolicy(key) + logger.info('create and execute enrich policy for ' + key + ' done') + } catch (e) { + logger.error(e) + logger.warn('create and execute enrich policy for ' + key + ' failed') + } + + try { + await createEnrichProcessor(key) + logger.info('create enrich processor (pipeline) for ' + key + ' done') + } catch (e) { + logger.error(e) + logger.warn('create enrich processor (pipeline) for ' + key + ' failed') } } logger.info('all done') diff --git a/src/common/es-helper.js b/src/common/es-helper.js index 581ce3f..3cdf833 100644 --- a/src/common/es-helper.js +++ b/src/common/es-helper.js @@ -215,13 +215,11 @@ const FILTER_CHAIN = { skill: { filterNext: 'userskill', queryField: 'skillId', - enrichNext: 'skillprovider', idField: 'skillProviderId' }, attribute: { filterNext: 'userattribute', queryField: 'attributeId', - enrichNext: 'attributegroup', idField: 'attributeGroupId' }, attributegroup: { @@ -247,29 +245,23 @@ const FILTER_CHAIN = { // sub resource userskill: { queryField: 'skillId', - enrichNext: 'skill', idField: 'skillId' }, userrole: { queryField: 'roleId', - enrichNext: 'role', idField: 'roleId' }, externalprofile: { - enrichNext: 'organization', idField: 'organizationId' }, achievement: { - enrichNext: 'achievementprovider', idField: 'achievementsProviderId' }, userattribute: { - enrichNext: 'attribute', idField: 'attributeId' }, organizationskillprovider: { queryField: 'skillProviderId', - enrichNext: 'skillprovider', idField: 'skillProviderId' } }