From b3cdf8ab1e940b30244846fbb7d71a99725bdb1f Mon Sep 17 00:00:00 2001 From: Mithun Kamath Date: Thu, 4 Mar 2021 19:53:09 +0530 Subject: [PATCH] #79 - Update db to es migration script to use ES bulk api --- README.md | 2 + config/default.js | 4 +- scripts/db/dumpDbToEs.js | 157 ++++++++++++++++++++++++--------------- 3 files changed, 101 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index ad064c9..86fd63e 100755 --- a/README.md +++ b/README.md @@ -52,6 +52,8 @@ Configuration for the application is at config/default.js and config/production. - ELASTICCLOUD_ID: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this - ELASTICCLOUD_USERNAME: The elastic cloud username for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud - ELASTICCLOUD_PASSWORD: The elastic cloud password for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud +- MAX_RESULT_SIZE: The Results Per Query Limits. Default is `1000` (Used by the db to es migration script) +- MAX_BULK_SIZE: The Bulk Indexing Maximum Limits. Default is `100` (Used by the db to es migration script) For `ES.DOCUMENTS` configuration, you will find multiple other configurations below it. Each has default values that you can override using the environment variables diff --git a/config/default.js b/config/default.js index 9eccc61..bbff95f 100755 --- a/config/default.js +++ b/config/default.js @@ -125,6 +125,8 @@ module.exports = { organizationskillprovider: { orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders' } - } + }, + MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000, + MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100 } } diff --git a/scripts/db/dumpDbToEs.js b/scripts/db/dumpDbToEs.js index bc1bf3b..2d9b96f 100644 --- a/scripts/db/dumpDbToEs.js +++ b/scripts/db/dumpDbToEs.js @@ -1,4 +1,5 @@ const _ = require('lodash') +const config = require('config') const sequelize = require('../../src/models/index') const dbHelper = require('../../src/common/db-helper') const logger = require('../../src/common/logger') @@ -130,7 +131,7 @@ async function cleanupES (keys) { console.log('Existing data in elasticsearch has been deleted!') } -async function insertIntoES (modelName, body) { +async function insertIntoES (modelName, dataset) { const esResourceName = modelToESIndexMapping[modelName] if (!esResourceName) { @@ -140,36 +141,46 @@ async function insertIntoES (modelName, body) { } if (_.includes(_.keys(topResources), esResourceName)) { - await client.index({ - index: topResources[esResourceName].index, - type: topResources[esResourceName].type, - id: body.id, - body, - pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, - refresh: 'wait_for' - }) + const chunked = _.chunk(dataset, config.get('ES.MAX_BULK_SIZE')) + for (const ds of chunked) { + const body = _.flatMap(ds, doc => [{ index: { _id: doc.id } }, doc]) + await client.bulk({ + index: topResources[esResourceName].index, + type: topResources[esResourceName].type, + body, + pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined, + refresh: 'wait_for' + }) + } } else if (_.includes(_.keys(userResources), esResourceName)) { const userResource = userResources[esResourceName] - let user - - try { - const res = await client.getSource({ + let users = [] + // query all users + const idsArr = _.chunk(_.uniq(_.map(dataset, 'userId')), config.get('ES.MAX_RESULT_SIZE')) + for (const ids of idsArr) { + const res = await client.search({ index: topResources.user.index, type: topResources.user.type, - id: body.userId + size: dataset.length, + body: { + query: { + ids: { + values: ids + } + } + } }) + users.push(..._.map(res.body.hits.hits, '_source')) + } - user = res.body - } catch (e) { - if (e.meta && e.meta.body.error.type === RESOURCE_NOT_FOUND) { - logger.info(`The ${modelName} references user with id ${body.userId}, which does not exist. Deleting the reference...`) + // remove unreference resource + for (const data of dataset) { + if (!_.some(users, ['id', data.userId])) { + logger.info(`The ${modelName} references user with id ${data.userId}, which does not exist. Deleting the reference...`) // The user does not exist. Delete the referece records - await dbHelper.remove(models[modelName], body.id) + await dbHelper.remove(models[modelName], data.id) logger.info('Reference deleted') - return - } else { - throw e } } @@ -189,21 +200,30 @@ async function insertIntoES (modelName, body) { userResource.mappingCreated = true } - const relateId = body[userResource.relateKey] - - if (!user[userResource.propertyName]) { - user[userResource.propertyName] = [] - } + users = _.filter(users, user => { + if (!user[userResource.propertyName]) { + user[userResource.propertyName] = [] + } + let updated = false + _.forEach(_.filter(dataset, ['userId', user.id]), body => { + const relateId = body[userResource.relateKey] + if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { + logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`) + } else { + updated = true + user[userResource.propertyName].push(body) + } + }) + return updated + }) - if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) { - logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`) - } else { - user[userResource.propertyName].push(body) - await client.index({ + const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE')) + for (const us of chunked) { + const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc]) + await client.bulk({ index: topResources.user.index, type: topResources.user.type, - id: body.userId, - body: user, + body, pipeline: topResources.user.pipeline.id, refresh: 'wait_for' }) @@ -211,43 +231,58 @@ async function insertIntoES (modelName, body) { } else if (_.includes(_.keys(organizationResources), esResourceName)) { const orgResource = organizationResources[esResourceName] - let organization - - try { - const res = await client.getSource({ + let organizations = [] + // query all organizations + const idsArr = _.chunk(_.uniq(_.map(dataset, 'organizationId')), config.get('ES.MAX_RESULT_SIZE')) + for (const ids of idsArr) { + const res = await client.search({ index: topResources.organization.index, type: topResources.organization.type, - id: body.organizationId + size: dataset.length, + body: { + query: { + ids: { + values: ids + } + } + } }) + organizations.push(..._.map(res.body.hits.hits, '_source')) + } - organization = res.body - } catch (e) { - if (e.meta && e.meta.body.error.type === RESOURCE_NOT_FOUND) { - logger.info(`The ${modelName} references org with id ${body.organizationId}, which does not exist. Deleting the reference...`) - // The user does not exist. Delete the referece records - await dbHelper.remove(models[modelName], body.id) + for (const data of dataset) { + if (!_.some(organizations, ['id', data.organizationId])) { + logger.info(`The ${modelName} references org with id ${data.organizationId}, which does not exist. Deleting the reference...`) + // The org does not exist. Delete the referece records + await dbHelper.remove(models[modelName], data.id) logger.info('Reference deleted') - return - } else { - throw e } } - const relateId = body[orgResource.relateKey] - - if (!organization[orgResource.propertyName]) { - organization[orgResource.propertyName] = [] - } + organizations = _.filter(organizations, organization => { + if (!organization[orgResource.propertyName]) { + organization[orgResource.propertyName] = [] + } + let updated = false + _.forEach(_.filter(dataset, ['organizationId', organization.id]), body => { + const relateId = body[orgResource.relateKey] + if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) { + logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`) + } else { + updated = true + organization[orgResource.propertyName].push(body) + } + }) + return updated + }) - if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) { - logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`) - } else { - organization[orgResource.propertyName].push(body) - await client.index({ + const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE')) + for (const os of chunked) { + const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc]) + await client.bulk({ index: topResources.organization.index, type: topResources.organization.type, - id: body.organizationId, - body: organization, + body, refresh: 'wait_for' }) } @@ -384,8 +419,8 @@ async function main () { if (!_.isString(data[i].updatedBy)) { data[i].updatedBy = 'tcAdmin' } - await insertIntoES(key, data[i]) } + await insertIntoES(key, data) logger.info('import data for ' + key + ' done') } catch (e) { logger.error(e)