Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Sync master with dev #83

Merged
merged 2 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Configuration for the application is at config/default.js and config/production.
- ELASTICCLOUD_ID: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this
- ELASTICCLOUD_USERNAME: The elastic cloud username for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud
- ELASTICCLOUD_PASSWORD: The elastic cloud password for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud
- MAX_RESULT_SIZE: The Results Per Query Limits. Default is `1000` (Used by the db to es migration script)
- MAX_BULK_SIZE: The Bulk Indexing Maximum Limits. Default is `100` (Used by the db to es migration script)

For `ES.DOCUMENTS` configuration, you will find multiple other configurations below it. Each has default values that you can override using the environment variables

Expand Down
4 changes: 3 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ module.exports = {
organizationskillprovider: {
orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders'
}
}
},
MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000,
MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100
}
}
157 changes: 96 additions & 61 deletions scripts/db/dumpDbToEs.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const _ = require('lodash')
const config = require('config')
const sequelize = require('../../src/models/index')
const dbHelper = require('../../src/common/db-helper')
const logger = require('../../src/common/logger')
Expand Down Expand Up @@ -130,7 +131,7 @@ async function cleanupES (keys) {
console.log('Existing data in elasticsearch has been deleted!')
}

async function insertIntoES (modelName, body) {
async function insertIntoES (modelName, dataset) {
const esResourceName = modelToESIndexMapping[modelName]

if (!esResourceName) {
Expand All @@ -140,36 +141,46 @@ async function insertIntoES (modelName, body) {
}

if (_.includes(_.keys(topResources), esResourceName)) {
await client.index({
index: topResources[esResourceName].index,
type: topResources[esResourceName].type,
id: body.id,
body,
pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined,
refresh: 'wait_for'
})
const chunked = _.chunk(dataset, config.get('ES.MAX_BULK_SIZE'))
for (const ds of chunked) {
const body = _.flatMap(ds, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources[esResourceName].index,
type: topResources[esResourceName].type,
body,
pipeline: topResources[esResourceName].ingest ? topResources[esResourceName].ingest.pipeline.id : undefined,
refresh: 'wait_for'
})
}
} else if (_.includes(_.keys(userResources), esResourceName)) {
const userResource = userResources[esResourceName]

let user

try {
const res = await client.getSource({
let users = []
// query all users
const idsArr = _.chunk(_.uniq(_.map(dataset, 'userId')), config.get('ES.MAX_RESULT_SIZE'))
for (const ids of idsArr) {
const res = await client.search({
index: topResources.user.index,
type: topResources.user.type,
id: body.userId
size: dataset.length,
body: {
query: {
ids: {
values: ids
}
}
}
})
users.push(..._.map(res.body.hits.hits, '_source'))
}

user = res.body
} catch (e) {
if (e.meta && e.meta.body.error.type === RESOURCE_NOT_FOUND) {
logger.info(`The ${modelName} references user with id ${body.userId}, which does not exist. Deleting the reference...`)
// remove unreference resource
for (const data of dataset) {
if (!_.some(users, ['id', data.userId])) {
logger.info(`The ${modelName} references user with id ${data.userId}, which does not exist. Deleting the reference...`)
// The user does not exist. Delete the referece records
await dbHelper.remove(models[modelName], body.id)
await dbHelper.remove(models[modelName], data.id)
logger.info('Reference deleted')
return
} else {
throw e
}
}

Expand All @@ -189,65 +200,89 @@ async function insertIntoES (modelName, body) {
userResource.mappingCreated = true
}

const relateId = body[userResource.relateKey]

if (!user[userResource.propertyName]) {
user[userResource.propertyName] = []
}
users = _.filter(users, user => {
if (!user[userResource.propertyName]) {
user[userResource.propertyName] = []
}
let updated = false
_.forEach(_.filter(dataset, ['userId', user.id]), body => {
const relateId = body[userResource.relateKey]
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`)
} else {
updated = true
user[userResource.propertyName].push(body)
}
})
return updated
})

if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`)
} else {
user[userResource.propertyName].push(body)
await client.index({
const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE'))
for (const us of chunked) {
const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources.user.index,
type: topResources.user.type,
id: body.userId,
body: user,
body,
pipeline: topResources.user.pipeline.id,
refresh: 'wait_for'
})
}
} else if (_.includes(_.keys(organizationResources), esResourceName)) {
const orgResource = organizationResources[esResourceName]

let organization

try {
const res = await client.getSource({
let organizations = []
// query all organizations
const idsArr = _.chunk(_.uniq(_.map(dataset, 'organizationId')), config.get('ES.MAX_RESULT_SIZE'))
for (const ids of idsArr) {
const res = await client.search({
index: topResources.organization.index,
type: topResources.organization.type,
id: body.organizationId
size: dataset.length,
body: {
query: {
ids: {
values: ids
}
}
}
})
organizations.push(..._.map(res.body.hits.hits, '_source'))
}

organization = res.body
} catch (e) {
if (e.meta && e.meta.body.error.type === RESOURCE_NOT_FOUND) {
logger.info(`The ${modelName} references org with id ${body.organizationId}, which does not exist. Deleting the reference...`)
// The user does not exist. Delete the referece records
await dbHelper.remove(models[modelName], body.id)
for (const data of dataset) {
if (!_.some(organizations, ['id', data.organizationId])) {
logger.info(`The ${modelName} references org with id ${data.organizationId}, which does not exist. Deleting the reference...`)
// The org does not exist. Delete the referece records
await dbHelper.remove(models[modelName], data.id)
logger.info('Reference deleted')
return
} else {
throw e
}
}

const relateId = body[orgResource.relateKey]

if (!organization[orgResource.propertyName]) {
organization[orgResource.propertyName] = []
}
organizations = _.filter(organizations, organization => {
if (!organization[orgResource.propertyName]) {
organization[orgResource.propertyName] = []
}
let updated = false
_.forEach(_.filter(dataset, ['organizationId', organization.id]), body => {
const relateId = body[orgResource.relateKey]
if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`)
} else {
updated = true
organization[orgResource.propertyName].push(body)
}
})
return updated
})

if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`)
} else {
organization[orgResource.propertyName].push(body)
await client.index({
const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE'))
for (const os of chunked) {
const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources.organization.index,
type: topResources.organization.type,
id: body.organizationId,
body: organization,
body,
refresh: 'wait_for'
})
}
Expand Down Expand Up @@ -384,8 +419,8 @@ async function main () {
if (!_.isString(data[i].updatedBy)) {
data[i].updatedBy = 'tcAdmin'
}
await insertIntoES(key, data[i])
}
await insertIntoES(key, data)
logger.info('import data for ' + key + ' done')
} catch (e) {
logger.error(e)
Expand Down