Skip to content

Commit e88999f

Browse files
JoshMockrobdasilva
andauthored
[Backport 8.9] Allow document to be overwritten in onDocument iteratee of bulk helper (#1732)Co-authored-by: Josh Mock <joshua.mock@elastic.co> (#1953)
Co-authored-by: Robert Da Silva <mail@robdasilva.com>
1 parent 485e57a commit e88999f

File tree

2 files changed

+120
-14
lines changed

2 files changed

+120
-14
lines changed

src/helpers.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ export interface BulkStats {
7474
aborted: boolean
7575
}
7676

77-
interface IndexAction {
77+
interface IndexActionOperation {
7878
index: T.BulkIndexOperation
7979
}
8080

81-
interface CreateAction {
81+
interface CreateActionOperation {
8282
create: T.BulkCreateOperation
8383
}
8484

@@ -90,7 +90,9 @@ interface DeleteAction {
9090
delete: T.BulkDeleteOperation
9191
}
9292

93-
type UpdateAction = [UpdateActionOperation, Record<string, any>]
93+
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
94+
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
95+
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
9496
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
9597

9698
export interface OnDropDocument<TDocument = unknown> {
@@ -618,22 +620,21 @@ export default class Helpers {
618620
for await (const chunk of datasource) {
619621
if (shouldAbort) break
620622
timeoutRef.refresh()
621-
const action = onDocument(chunk)
622-
const operation = Array.isArray(action)
623-
? Object.keys(action[0])[0]
624-
: Object.keys(action)[0]
623+
const result = onDocument(chunk)
624+
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
625+
const operation = Object.keys(action)[0]
625626
if (operation === 'index' || operation === 'create') {
626627
actionBody = serializer.serialize(action)
627-
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
628+
payloadBody = typeof payload === 'string'
629+
? payload
630+
: serializer.serialize(payload)
628631
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
629632
bulkBody.push(actionBody, payloadBody)
630633
} else if (operation === 'update') {
631-
// @ts-expect-error in case of update action is an array
632-
actionBody = serializer.serialize(action[0])
634+
actionBody = serializer.serialize(action)
633635
payloadBody = typeof chunk === 'string'
634636
? `{"doc":${chunk}}`
635-
// @ts-expect-error in case of update action is an array
636-
: serializer.serialize({ doc: chunk, ...action[1] })
637+
: serializer.serialize({ doc: chunk, ...payload })
637638
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
638639
bulkBody.push(actionBody, payloadBody)
639640
} else if (operation === 'delete') {

test/unit/helpers/bulk.test.ts

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
* under the License.
1818
*/
1919

20-
import * as http from 'http'
20+
import FakeTimers from '@sinonjs/fake-timers'
2121
import { createReadStream } from 'fs'
22+
import * as http from 'http'
2223
import { join } from 'path'
2324
import split from 'split2'
24-
import FakeTimers from '@sinonjs/fake-timers'
2525
import { test } from 'tap'
2626
import { Client, errors } from '../../../'
2727
import { buildServer, connection } from '../../utils'
@@ -785,6 +785,59 @@ test('bulk index', t => {
785785
t.end()
786786
})
787787

788+
t.test('Should use payload returned by `onDocument`', async t => {
789+
let count = 0
790+
const updatedAt = '1970-01-01T12:00:00.000Z'
791+
const MockConnection = connection.buildMockConnection({
792+
onRequest (params) {
793+
t.equal(params.path, '/_bulk')
794+
t.match(params.headers, {
795+
'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8',
796+
'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${transportVersion},hc=${nodeVersion},h=bp`
797+
})
798+
// @ts-expect-error
799+
const [action, payload] = params.body.split('\n')
800+
t.same(JSON.parse(action), { index: { _index: 'test' } })
801+
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
802+
return { body: { errors: false, items: [{}] } }
803+
}
804+
})
805+
806+
const client = new Client({
807+
node: 'http://localhost:9200',
808+
Connection: MockConnection
809+
})
810+
const result = await client.helpers.bulk<Document>({
811+
datasource: dataset.slice(),
812+
flushBytes: 1,
813+
concurrency: 1,
814+
onDocument (doc) {
815+
t.type(doc.user, 'string') // testing that doc is type of Document
816+
return [
817+
{
818+
index: {
819+
_index: 'test'
820+
}
821+
},
822+
{ ...doc, updatedAt }
823+
]
824+
},
825+
onDrop (doc) {
826+
t.fail('This should never be called')
827+
}
828+
})
829+
830+
t.type(result.time, 'number')
831+
t.type(result.bytes, 'number')
832+
t.match(result, {
833+
total: 3,
834+
successful: 3,
835+
retry: 0,
836+
failed: 0,
837+
aborted: false
838+
})
839+
})
840+
788841
t.end()
789842
})
790843

@@ -835,6 +888,58 @@ test('bulk create', t => {
835888
aborted: false
836889
})
837890
})
891+
892+
t.test('Should use payload returned by `onDocument`', async t => {
893+
let count = 0
894+
const updatedAt = '1970-01-01T12:00:00.000Z'
895+
const MockConnection = connection.buildMockConnection({
896+
onRequest (params) {
897+
t.equal(params.path, '/_bulk')
898+
t.match(params.headers, { 'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8' })
899+
// @ts-expect-error
900+
const [action, payload] = params.body.split('\n')
901+
t.same(JSON.parse(action), { create: { _index: 'test', _id: count } })
902+
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
903+
return { body: { errors: false, items: [{}] } }
904+
}
905+
})
906+
907+
const client = new Client({
908+
node: 'http://localhost:9200',
909+
Connection: MockConnection
910+
})
911+
let id = 0
912+
const result = await client.helpers.bulk({
913+
datasource: dataset.slice(),
914+
flushBytes: 1,
915+
concurrency: 1,
916+
onDocument (doc) {
917+
return [
918+
{
919+
create: {
920+
_index: 'test',
921+
_id: String(id++)
922+
}
923+
},
924+
{ ...doc, updatedAt }
925+
]
926+
},
927+
onDrop (doc) {
928+
t.fail('This should never be called')
929+
}
930+
})
931+
932+
t.type(result.time, 'number')
933+
t.type(result.bytes, 'number')
934+
t.match(result, {
935+
total: 3,
936+
successful: 3,
937+
retry: 0,
938+
failed: 0,
939+
aborted: false
940+
})
941+
})
942+
838943
t.end()
839944
})
840945

0 commit comments

Comments
 (0)