Skip to content

Bulk helper document manipulation #1209

Closed
@delvedor

Description

@delvedor

Currently, you can't manipulate the document that gets passed to the bulk helper, if you need to do that, you should pipe the datasource in a transform stream or inside an async generator before giving it to the bulk helper:

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

async function * manipulate (stream) {
  for await (const chunk of stream) {
    // chunk manipulation
    yield chunk
  }
}

const stream = createReadStream('./dataset.ndjson').pipe(split())

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: manipulate(stream),
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})

While the solution above works perfectly, it does not support the case where the document contains metadata that does not need to be indexed, such as the index name for example:

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

// assuming the document has the following shape:
// { index: string, id: string, body: object }

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
  onDocument (doc) {
    return {
      index: { _index: doc.index, _id: doc.id }
    }
  }
})

In the code above we are indexing the document metadata as well.
For solving this problem, we could introduce another callback, to allow the document manipulation after the creation of the bulk action:

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

// assuming the document has the following shape:
// { index: string, id: string, body: object }

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
  onDocument (doc) {
    return {
      index: { _index: doc.index, _id: doc.id }
    }
  },
  transformDocument (doc) {
    return doc.body
  }
})

Another solution, but it could cause a breaking change in some cases (which is not a big deal given that the helpers are still experimental):

const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')

// assuming the document has the following shape:
// { index: string, id: string, body: object }

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
  datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse)),
  onDocument (doc) {
    return {
      index: { _index: doc.index, _id: doc.id },
      document: doc.body
    }
  }
})

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions