Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 132948a

Browse files
Resolve issue where db to es migration script in prod would fail due to memory issues
1 parent 40eaa8a commit 132948a

File tree

3 files changed

+99
-103
lines changed

3 files changed

+99
-103
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Configuration for the application is at config/default.js and config/production.
5252
- ELASTICCLOUD_ID: The elastic cloud id, if your elasticsearch instance is hosted on elastic cloud. DO NOT provide a value for ES_HOST if you are using this
5353
- ELASTICCLOUD_USERNAME: The elastic cloud username for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud
5454
- ELASTICCLOUD_PASSWORD: The elastic cloud password for basic authentication. Provide this only if your elasticsearch instance is hosted on elastic cloud
55+
- MAX_BATCH_SIZE: Restrict number of records in memory during bulk insert (Used by the db to es migration script)
5556
- MAX_RESULT_SIZE: The Results Per Query Limits. Default is `1000` (Used by the db to es migration script)
5657
- MAX_BULK_SIZE: The Bulk Indexing Maximum Limits. Default is `100` (Used by the db to es migration script)
5758

config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ module.exports = {
126126
orgField: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders'
127127
}
128128
},
129+
MAX_BATCH_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 10000,
129130
MAX_RESULT_SIZE: parseInt(process.env.MAX_RESULT_SIZE, 10) || 1000,
130131
MAX_BULK_SIZE: parseInt(process.env.MAX_BULK_SIZE, 10) || 100
131132
}

scripts/db/dumpDbToEs.js

Lines changed: 97 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,23 @@ async function insertIntoES (modelName, dataset) {
155155
} else if (_.includes(_.keys(userResources), esResourceName)) {
156156
const userResource = userResources[esResourceName]
157157

158-
let users = []
159-
// query all users
158+
if (userResource.nested === true && userResource.mappingCreated !== true) {
159+
await client.indices.putMapping({
160+
index: topResources.user.index,
161+
type: topResources.user.type,
162+
include_type_name: true,
163+
body: {
164+
properties: {
165+
[userResource.propertyName]: {
166+
type: 'nested'
167+
}
168+
}
169+
}
170+
})
171+
userResource.mappingCreated = true
172+
}
173+
174+
// chunk the list to process
160175
const idsArr = _.chunk(_.uniq(_.map(dataset, 'userId')), config.get('ES.MAX_RESULT_SIZE'))
161176
for (const ids of idsArr) {
162177
const res = await client.search({
@@ -171,68 +186,47 @@ async function insertIntoES (modelName, dataset) {
171186
}
172187
}
173188
})
174-
users.push(..._.map(res.body.hits.hits, '_source'))
175-
}
189+
const users = _.filter(_.map(res.body.hits.hits, '_source'), user => {
190+
if (!user[userResource.propertyName]) {
191+
user[userResource.propertyName] = []
192+
}
193+
let updated = false
194+
_.forEach(_.filter(dataset, ['userId', user.id]), body => {
195+
const relateId = body[userResource.relateKey]
196+
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
197+
logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`)
198+
} else {
199+
updated = true
200+
user[userResource.propertyName].push(body)
201+
}
202+
})
203+
return updated
204+
})
176205

177-
// remove unreference resource
178-
for (const data of dataset) {
179-
if (!_.some(users, ['id', data.userId])) {
206+
const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE'))
207+
for (const us of chunked) {
208+
const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc])
209+
await client.bulk({
210+
index: topResources.user.index,
211+
type: topResources.user.type,
212+
body,
213+
pipeline: topResources.user.pipeline.id,
214+
refresh: 'wait_for'
215+
})
216+
}
217+
const deleteRecord = _.filter(dataset, d => _.includes(ids, d.userId) && !_.some(users, ['id', d.userId]))
218+
// remove unreference resource
219+
for (const data of deleteRecord) {
180220
logger.info(`The ${modelName} references user with id ${data.userId}, which does not exist. Deleting the reference...`)
181221
// The user does not exist. Delete the referece records
182222
await dbHelper.remove(models[modelName], data.id)
183223
logger.info('Reference deleted')
184224
}
185225
}
186-
187-
if (userResource.nested === true && userResource.mappingCreated !== true) {
188-
await client.indices.putMapping({
189-
index: topResources.user.index,
190-
type: topResources.user.type,
191-
include_type_name: true,
192-
body: {
193-
properties: {
194-
[userResource.propertyName]: {
195-
type: 'nested'
196-
}
197-
}
198-
}
199-
})
200-
userResource.mappingCreated = true
201-
}
202-
203-
users = _.filter(users, user => {
204-
if (!user[userResource.propertyName]) {
205-
user[userResource.propertyName] = []
206-
}
207-
let updated = false
208-
_.forEach(_.filter(dataset, ['userId', user.id]), body => {
209-
const relateId = body[userResource.relateKey]
210-
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
211-
logger.error(`Can't create existing ${esResourceName} with the ${userResource.relateKey}: ${relateId}, userId: ${body.userId}`)
212-
} else {
213-
updated = true
214-
user[userResource.propertyName].push(body)
215-
}
216-
})
217-
return updated
218-
})
219-
220-
const chunked = _.chunk(users, config.get('ES.MAX_BULK_SIZE'))
221-
for (const us of chunked) {
222-
const body = _.flatMap(us, doc => [{ index: { _id: doc.id } }, doc])
223-
await client.bulk({
224-
index: topResources.user.index,
225-
type: topResources.user.type,
226-
body,
227-
pipeline: topResources.user.pipeline.id,
228-
refresh: 'wait_for'
229-
})
230-
}
231226
} else if (_.includes(_.keys(organizationResources), esResourceName)) {
232227
const orgResource = organizationResources[esResourceName]
233228

234-
let organizations = []
235-
// query all organizations
229+
// chunk the list to process
236230
const idsArr = _.chunk(_.uniq(_.map(dataset, 'organizationId')), config.get('ES.MAX_RESULT_SIZE'))
237231
for (const ids of idsArr) {
238232
const res = await client.search({
@@ -247,45 +241,41 @@ async function insertIntoES (modelName, dataset) {
247241
}
248242
}
249243
})
250-
organizations.push(..._.map(res.body.hits.hits, '_source'))
251-
}
244+
const organizations = _.filter(_.map(res.body.hits.hits, '_source'), organization => {
245+
if (!organization[orgResource.propertyName]) {
246+
organization[orgResource.propertyName] = []
247+
}
248+
let updated = false
249+
_.forEach(_.filter(dataset, ['organizationId', organization.id]), body => {
250+
const relateId = body[orgResource.relateKey]
251+
if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) {
252+
logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`)
253+
} else {
254+
updated = true
255+
organization[orgResource.propertyName].push(body)
256+
}
257+
})
258+
return updated
259+
})
252260

253-
for (const data of dataset) {
254-
if (!_.some(organizations, ['id', data.organizationId])) {
261+
const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE'))
262+
for (const os of chunked) {
263+
const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc])
264+
await client.bulk({
265+
index: topResources.organization.index,
266+
type: topResources.organization.type,
267+
body,
268+
refresh: 'wait_for'
269+
})
270+
}
271+
const deleteRecord = _.filter(dataset, d => _.includes(ids, d.organizationId) && !_.some(organizations, ['id', d.organizationId]))
272+
for (const data of deleteRecord) {
255273
logger.info(`The ${modelName} references org with id ${data.organizationId}, which does not exist. Deleting the reference...`)
256274
// The org does not exist. Delete the referece records
257275
await dbHelper.remove(models[modelName], data.id)
258276
logger.info('Reference deleted')
259277
}
260278
}
261-
262-
organizations = _.filter(organizations, organization => {
263-
if (!organization[orgResource.propertyName]) {
264-
organization[orgResource.propertyName] = []
265-
}
266-
let updated = false
267-
_.forEach(_.filter(dataset, ['organizationId', organization.id]), body => {
268-
const relateId = body[orgResource.relateKey]
269-
if (_.some(organization[orgResource.propertyName], [orgResource.relateKey, relateId])) {
270-
logger.error(`Can't create existing ${esResourceName} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${body.organizationId}`)
271-
} else {
272-
updated = true
273-
organization[orgResource.propertyName].push(body)
274-
}
275-
})
276-
return updated
277-
})
278-
279-
const chunked = _.chunk(organizations, config.get('ES.MAX_BULK_SIZE'))
280-
for (const os of chunked) {
281-
const body = _.flatMap(os, doc => [{ index: { _id: doc.id } }, doc])
282-
await client.bulk({
283-
index: topResources.organization.index,
284-
type: topResources.organization.type,
285-
body,
286-
refresh: 'wait_for'
287-
})
288-
}
289279
}
290280
}
291281

@@ -402,25 +392,29 @@ async function main () {
402392
for (let i = 0; i < keys.length; i++) {
403393
const key = keys[i]
404394
try {
405-
const data = await dbHelper.find(models[key], {})
406-
407-
for (let i = 0; i < data.length; i++) {
408-
logger.info(`Inserting data ${i + 1} of ${data.length}`)
409-
logger.info(JSON.stringify(data[i]))
410-
if (!_.isString(data[i].created)) {
411-
data[i].created = new Date()
412-
}
413-
if (!_.isString(data[i].updated)) {
414-
data[i].updated = new Date()
415-
}
416-
if (!_.isString(data[i].createdBy)) {
417-
data[i].createdBy = 'tcAdmin'
418-
}
419-
if (!_.isString(data[i].updatedBy)) {
420-
data[i].updatedBy = 'tcAdmin'
395+
const allData = await dbHelper.find(models[key], {})
396+
let j = 0
397+
const dataset = _.chunk(allData, config.get('ES.MAX_BATCH_SIZE'))
398+
for (const data of dataset) {
399+
for (let i = 0; i < data.length; i++) {
400+
j++
401+
logger.info(`Inserting data ${j} of ${allData.length}`)
402+
logger.info(JSON.stringify(data[i]))
403+
if (!_.isString(data[i].created)) {
404+
data[i].created = new Date()
405+
}
406+
if (!_.isString(data[i].updated)) {
407+
data[i].updated = new Date()
408+
}
409+
if (!_.isString(data[i].createdBy)) {
410+
data[i].createdBy = 'tcAdmin'
411+
}
412+
if (!_.isString(data[i].updatedBy)) {
413+
data[i].updatedBy = 'tcAdmin'
414+
}
421415
}
416+
await insertIntoES(key, data)
422417
}
423-
await insertIntoES(key, data)
424418
logger.info('import data for ' + key + ' done')
425419
} catch (e) {
426420
logger.error(e)

0 commit comments

Comments
 (0)