Closed
Description
Currently, you can't manipulate the document that gets passed to the bulk helper, if you need to do that, you should pipe the datasource in a transform stream or inside an async generator before giving it to the bulk helper:
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
async function * manipulate (stream) {
for await (const chunk of stream) {
// chunk manipulation
yield chunk
}
}
const stream = createReadStream('./dataset.ndjson').pipe(split())
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: manipulate(stream),
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})
While the solution above works perfectly, it does not support the case where the document contains metadata that does not need to be indexed, such as the index name for example:
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
// assuming the document has the following shape:
// { index: string, id: string, body: object }
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
onDocument (doc) {
return {
index: { _index: doc.index, _id: doc.id }
}
}
})
In the code above we are indexing the document metadata as well.
For solving this problem, we could introduce another callback, to allow the document manipulation after the creation of the bulk action:
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
// assuming the document has the following shape:
// { index: string, id: string, body: object }
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
onDocument (doc) {
return {
index: { _index: doc.index, _id: doc.id }
}
},
transformDocument (doc) {
return doc.body
}
})
Another solution, but it could cause a breaking change in some cases (which is not a big deal given that the helpers are still experimental):
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
// assuming the document has the following shape:
// { index: string, id: string, body: object }
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
onDocument (doc) {
return {
index: { _index: doc.index, _id: doc.id },
document: doc.body
}
}
})
Metadata
Metadata
Assignees
Labels
No labels