1
1
const _ = require ( 'lodash' )
2
+ const config = require ( 'config' )
2
3
const sequelize = require ( '../../src/models/index' )
3
4
const dbHelper = require ( '../../src/common/db-helper' )
4
5
const logger = require ( '../../src/common/logger' )
@@ -130,7 +131,7 @@ async function cleanupES (keys) {
130
131
console . log ( 'Existing data in elasticsearch has been deleted!' )
131
132
}
132
133
133
- async function insertIntoES ( modelName , body ) {
134
+ async function insertIntoES ( modelName , dataset ) {
134
135
const esResourceName = modelToESIndexMapping [ modelName ]
135
136
136
137
if ( ! esResourceName ) {
@@ -140,36 +141,46 @@ async function insertIntoES (modelName, body) {
140
141
}
141
142
142
143
if ( _ . includes ( _ . keys ( topResources ) , esResourceName ) ) {
143
- await client . index ( {
144
- index : topResources [ esResourceName ] . index ,
145
- type : topResources [ esResourceName ] . type ,
146
- id : body . id ,
147
- body,
148
- pipeline : topResources [ esResourceName ] . ingest ? topResources [ esResourceName ] . ingest . pipeline . id : undefined ,
149
- refresh : 'wait_for'
150
- } )
144
+ const chunked = _ . chunk ( dataset , config . get ( 'ES.MAX_BULK_SIZE' ) )
145
+ for ( const ds of chunked ) {
146
+ const body = _ . flatMap ( ds , doc => [ { index : { _id : doc . id } } , doc ] )
147
+ await client . bulk ( {
148
+ index : topResources [ esResourceName ] . index ,
149
+ type : topResources [ esResourceName ] . type ,
150
+ body,
151
+ pipeline : topResources [ esResourceName ] . ingest ? topResources [ esResourceName ] . ingest . pipeline . id : undefined ,
152
+ refresh : 'wait_for'
153
+ } )
154
+ }
151
155
} else if ( _ . includes ( _ . keys ( userResources ) , esResourceName ) ) {
152
156
const userResource = userResources [ esResourceName ]
153
157
154
- let user
155
-
156
- try {
157
- const res = await client . getSource ( {
158
+ let users = [ ]
159
+ // query all users
160
+ const idsArr = _ . chunk ( _ . uniq ( _ . map ( dataset , 'userId' ) ) , config . get ( 'ES.MAX_RESULT_SIZE' ) )
161
+ for ( const ids of idsArr ) {
162
+ const res = await client . search ( {
158
163
index : topResources . user . index ,
159
164
type : topResources . user . type ,
160
- id : body . userId
165
+ size : dataset . length ,
166
+ body : {
167
+ query : {
168
+ ids : {
169
+ values : ids
170
+ }
171
+ }
172
+ }
161
173
} )
174
+ users . push ( ..._ . map ( res . body . hits . hits , '_source' ) )
175
+ }
162
176
163
- user = res . body
164
- } catch ( e ) {
165
- if ( e . meta && e . meta . body . error . type === RESOURCE_NOT_FOUND ) {
166
- logger . info ( `The ${ modelName } references user with id ${ body . userId } , which does not exist. Deleting the reference...` )
177
+ // remove unreference resource
178
+ for ( const data of dataset ) {
179
+ if ( ! _ . some ( users , [ 'id' , data . userId ] ) ) {
180
+ logger . info ( `The ${ modelName } references user with id ${ data . userId } , which does not exist. Deleting the reference...` )
167
181
// The user does not exist. Delete the referece records
168
- await dbHelper . remove ( models [ modelName ] , body . id )
182
+ await dbHelper . remove ( models [ modelName ] , data . id )
169
183
logger . info ( 'Reference deleted' )
170
- return
171
- } else {
172
- throw e
173
184
}
174
185
}
175
186
@@ -189,65 +200,89 @@ async function insertIntoES (modelName, body) {
189
200
userResource . mappingCreated = true
190
201
}
191
202
192
- const relateId = body [ userResource . relateKey ]
193
-
194
- if ( ! user [ userResource . propertyName ] ) {
195
- user [ userResource . propertyName ] = [ ]
196
- }
203
+ users = _ . filter ( users , user => {
204
+ if ( ! user [ userResource . propertyName ] ) {
205
+ user [ userResource . propertyName ] = [ ]
206
+ }
207
+ let updated = false
208
+ _ . forEach ( _ . filter ( dataset , [ 'userId' , user . id ] ) , body => {
209
+ const relateId = body [ userResource . relateKey ]
210
+ if ( _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
211
+ logger . error ( `Can't create existing ${ esResourceName } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ body . userId } ` )
212
+ } else {
213
+ updated = true
214
+ user [ userResource . propertyName ] . push ( body )
215
+ }
216
+ } )
217
+ return updated
218
+ } )
197
219
198
- if ( _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
199
- logger . error ( `Can't create existing ${ esResourceName } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ body . userId } ` )
200
- } else {
201
- user [ userResource . propertyName ] . push ( body )
202
- await client . index ( {
220
+ const chunked = _ . chunk ( users , config . get ( 'ES.MAX_BULK_SIZE' ) )
221
+ for ( const us of chunked ) {
222
+ const body = _ . flatMap ( us , doc => [ { index : { _id : doc . id } } , doc ] )
223
+ await client . bulk ( {
203
224
index : topResources . user . index ,
204
225
type : topResources . user . type ,
205
- id : body . userId ,
206
- body : user ,
226
+ body,
207
227
pipeline : topResources . user . pipeline . id ,
208
228
refresh : 'wait_for'
209
229
} )
210
230
}
211
231
} else if ( _ . includes ( _ . keys ( organizationResources ) , esResourceName ) ) {
212
232
const orgResource = organizationResources [ esResourceName ]
213
233
214
- let organization
215
-
216
- try {
217
- const res = await client . getSource ( {
234
+ let organizations = [ ]
235
+ // query all organizations
236
+ const idsArr = _ . chunk ( _ . uniq ( _ . map ( dataset , 'organizationId' ) ) , config . get ( 'ES.MAX_RESULT_SIZE' ) )
237
+ for ( const ids of idsArr ) {
238
+ const res = await client . search ( {
218
239
index : topResources . organization . index ,
219
240
type : topResources . organization . type ,
220
- id : body . organizationId
241
+ size : dataset . length ,
242
+ body : {
243
+ query : {
244
+ ids : {
245
+ values : ids
246
+ }
247
+ }
248
+ }
221
249
} )
250
+ organizations . push ( ..._ . map ( res . body . hits . hits , '_source' ) )
251
+ }
222
252
223
- organization = res . body
224
- } catch ( e ) {
225
- if ( e . meta && e . meta . body . error . type === RESOURCE_NOT_FOUND ) {
226
- logger . info ( `The ${ modelName } references org with id ${ body . organizationId } , which does not exist. Deleting the reference...` )
227
- // The user does not exist. Delete the referece records
228
- await dbHelper . remove ( models [ modelName ] , body . id )
253
+ for ( const data of dataset ) {
254
+ if ( ! _ . some ( organizations , [ 'id' , data . organizationId ] ) ) {
255
+ logger . info ( `The ${ modelName } references org with id ${ data . organizationId } , which does not exist. Deleting the reference...` )
256
+ // The org does not exist. Delete the referece records
257
+ await dbHelper . remove ( models [ modelName ] , data . id )
229
258
logger . info ( 'Reference deleted' )
230
- return
231
- } else {
232
- throw e
233
259
}
234
260
}
235
261
236
- const relateId = body [ orgResource . relateKey ]
237
-
238
- if ( ! organization [ orgResource . propertyName ] ) {
239
- organization [ orgResource . propertyName ] = [ ]
240
- }
262
+ organizations = _ . filter ( organizations , organization => {
263
+ if ( ! organization [ orgResource . propertyName ] ) {
264
+ organization [ orgResource . propertyName ] = [ ]
265
+ }
266
+ let updated = false
267
+ _ . forEach ( _ . filter ( dataset , [ 'organizationId' , organization . id ] ) , body => {
268
+ const relateId = body [ orgResource . relateKey ]
269
+ if ( _ . some ( organization [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] ) ) {
270
+ logger . error ( `Can't create existing ${ esResourceName } with the ${ orgResource . relateKey } : ${ relateId } , organizationId: ${ body . organizationId } ` )
271
+ } else {
272
+ updated = true
273
+ organization [ orgResource . propertyName ] . push ( body )
274
+ }
275
+ } )
276
+ return updated
277
+ } )
241
278
242
- if ( _ . some ( organization [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] ) ) {
243
- logger . error ( `Can't create existing ${ esResourceName } with the ${ orgResource . relateKey } : ${ relateId } , organizationId: ${ body . organizationId } ` )
244
- } else {
245
- organization [ orgResource . propertyName ] . push ( body )
246
- await client . index ( {
279
+ const chunked = _ . chunk ( organizations , config . get ( 'ES.MAX_BULK_SIZE' ) )
280
+ for ( const os of chunked ) {
281
+ const body = _ . flatMap ( os , doc => [ { index : { _id : doc . id } } , doc ] )
282
+ await client . bulk ( {
247
283
index : topResources . organization . index ,
248
284
type : topResources . organization . type ,
249
- id : body . organizationId ,
250
- body : organization ,
285
+ body,
251
286
refresh : 'wait_for'
252
287
} )
253
288
}
@@ -384,8 +419,8 @@ async function main () {
384
419
if ( ! _ . isString ( data [ i ] . updatedBy ) ) {
385
420
data [ i ] . updatedBy = 'tcAdmin'
386
421
}
387
- await insertIntoES ( key , data [ i ] )
388
422
}
423
+ await insertIntoES ( key , data )
389
424
logger . info ( 'import data for ' + key + ' done' )
390
425
} catch ( e ) {
391
426
logger . error ( e )
0 commit comments