Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Sync master with prod #88

Merged
merged 2 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Configuration for the application is at config/default.js and config/production.
- ELASTICCLOUD_ID: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this
- ELASTICCLOUD_USERNAME: The elastic cloud username for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud
- ELASTICCLOUD_PASSWORD: The elastic cloud password for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud
- MAX_BATCH_SIZE: Restrict number of records in memory during bulk insert (Used by the db to es migration script)
- MAX_RESULT_SIZE: The Results Per Query Limits. Default is `1000` (Used by the db to es migration script)
- MAX_BULK_SIZE: The Bulk Indexing Maximum Limits. Default is `100` (Used by the db to es migration script)

Expand Down
1 change: 0 additions & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ _.each(routes, (verbs, url) => {
return next(errors.newAuthError('Action is not allowed for invalid token'))
}
req.auth = req.authUser
req.auth.sub = req.auth.userId
if (req.authUser.roles) {
// all access are allowed
if (_.isEmpty(access)) {
Expand Down
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ module.exports = {
orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders'
}
},
MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000,
MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000,
MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100
}
Expand Down
200 changes: 97 additions & 103 deletions scripts/db/dumpDbToEs.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,23 @@ async function insertIntoES (modelName, dataset) {
} else if (_.includes(_.keys(userResources), esResourceName)) {
const userResource = userResources[esResourceName]

let users = []
// query all users
if (userResource.nested === true && userResource.mappingCreated !== true) {
await client.indices.putMapping({
index: topResources.user.index,
type: topResources.user.type,
include_type_name: true,
body: {
properties: {
[userResource.propertyName]: {
type: 'nested'
}
}
}
})
userResource.mappingCreated = true
}

// chunk the list to process
const idsArr = _.chunk(_.uniq(_.map(dataset, 'userId')), config.get('ES.MAX_RESULT_SIZE'))
for (const ids of idsArr) {
const res = await client.search({
Expand All @@ -171,68 +186,47 @@ async function insertIntoES (modelName, dataset) {
}
}
})
users.push(..._.map(res.body.hits.hits, '_source'))
}
const users = _.filter(_.map(res.body.hits.hits, '_source'), user => {
if (!user[userResource.propertyName]) {
user[userResource.propertyName] = []
}
let updated = false
_.forEach(_.filter(dataset, ['userId', user.id]), body => {
const relateId = body[userResource.relateKey]
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`)
} else {
updated = true
user[userResource.propertyName].push(body)
}
})
return updated
})

// remove unreference resource
for (const data of dataset) {
if (!_.some(users, ['id', data.userId])) {
const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE'))
for (const us of chunked) {
const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources.user.index,
type: topResources.user.type,
body,
pipeline: topResources.user.pipeline.id,
refresh: 'wait_for'
})
}
const deleteRecord = _.filter(dataset, d => _.includes(ids, d.userId) && !_.some(users, ['id', d.userId]))
// remove unreference resource
for (const data of deleteRecord) {
logger.info(`The ${modelName} references user with id ${data.userId}, which does not exist. Deleting the reference...`)
// The user does not exist. Delete the referece records
await dbHelper.remove(models[modelName], data.id)
logger.info('Reference deleted')
}
}

if (userResource.nested === true && userResource.mappingCreated !== true) {
await client.indices.putMapping({
index: topResources.user.index,
type: topResources.user.type,
include_type_name: true,
body: {
properties: {
[userResource.propertyName]: {
type: 'nested'
}
}
}
})
userResource.mappingCreated = true
}

users = _.filter(users, user => {
if (!user[userResource.propertyName]) {
user[userResource.propertyName] = []
}
let updated = false
_.forEach(_.filter(dataset, ['userId', user.id]), body => {
const relateId = body[userResource.relateKey]
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`)
} else {
updated = true
user[userResource.propertyName].push(body)
}
})
return updated
})

const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE'))
for (const us of chunked) {
const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources.user.index,
type: topResources.user.type,
body,
pipeline: topResources.user.pipeline.id,
refresh: 'wait_for'
})
}
} else if (_.includes(_.keys(organizationResources), esResourceName)) {
const orgResource = organizationResources[esResourceName]

let organizations = []
// query all organizations
// chunk the list to process
const idsArr = _.chunk(_.uniq(_.map(dataset, 'organizationId')), config.get('ES.MAX_RESULT_SIZE'))
for (const ids of idsArr) {
const res = await client.search({
Expand All @@ -247,45 +241,41 @@ async function insertIntoES (modelName, dataset) {
}
}
})
organizations.push(..._.map(res.body.hits.hits, '_source'))
}
const organizations = _.filter(_.map(res.body.hits.hits, '_source'), organization => {
if (!organization[orgResource.propertyName]) {
organization[orgResource.propertyName] = []
}
let updated = false
_.forEach(_.filter(dataset, ['organizationId', organization.id]), body => {
const relateId = body[orgResource.relateKey]
if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`)
} else {
updated = true
organization[orgResource.propertyName].push(body)
}
})
return updated
})

for (const data of dataset) {
if (!_.some(organizations, ['id', data.organizationId])) {
const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE'))
for (const os of chunked) {
const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources.organization.index,
type: topResources.organization.type,
body,
refresh: 'wait_for'
})
}
const deleteRecord = _.filter(dataset, d => _.includes(ids, d.organizationId) && !_.some(organizations, ['id', d.organizationId]))
for (const data of deleteRecord) {
logger.info(`The ${modelName} references org with id ${data.organizationId}, which does not exist. Deleting the reference...`)
// The org does not exist. Delete the referece records
await dbHelper.remove(models[modelName], data.id)
logger.info('Reference deleted')
}
}

organizations = _.filter(organizations, organization => {
if (!organization[orgResource.propertyName]) {
organization[orgResource.propertyName] = []
}
let updated = false
_.forEach(_.filter(dataset, ['organizationId', organization.id]), body => {
const relateId = body[orgResource.relateKey]
if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) {
logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`)
} else {
updated = true
organization[orgResource.propertyName].push(body)
}
})
return updated
})

const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE'))
for (const os of chunked) {
const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc])
await client.bulk({
index: topResources.organization.index,
type: topResources.organization.type,
body,
refresh: 'wait_for'
})
}
}
}

Expand Down Expand Up @@ -402,25 +392,29 @@ async function main () {
for (let i = 0; i < keys.length; i++) {
const key = keys[i]
try {
const data = await dbHelper.find(models[key], {})

for (let i = 0; i < data.length; i++) {
logger.info(`Inserting data ${i + 1} of ${data.length}`)
logger.info(JSON.stringify(data[i]))
if (!_.isString(data[i].created)) {
data[i].created = new Date()
}
if (!_.isString(data[i].updated)) {
data[i].updated = new Date()
}
if (!_.isString(data[i].createdBy)) {
data[i].createdBy = 'tcAdmin'
}
if (!_.isString(data[i].updatedBy)) {
data[i].updatedBy = 'tcAdmin'
const allData = await dbHelper.find(models[key], {})
let j = 0
const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE'))
for (const data of dataset) {
for (let i = 0; i < data.length; i++) {
j++
logger.info(`Inserting data ${j} of ${allData.length}`)
logger.info(JSON.stringify(data[i]))
if (!_.isString(data[i].created)) {
data[i].created = new Date()
}
if (!_.isString(data[i].updated)) {
data[i].updated = new Date()
}
if (!_.isString(data[i].createdBy)) {
data[i].createdBy = 'tcAdmin'
}
if (!_.isString(data[i].updatedBy)) {
data[i].updatedBy = 'tcAdmin'
}
}
await insertIntoES(key, data)
}
await insertIntoES(key, data)
logger.info('import data for ' + key + ' done')
} catch (e) {
logger.error(e)
Expand Down