diff --git a/docs/reference/client-helpers.md b/docs/reference/client-helpers.md index 731df31cc..ad170d5bf 100644 --- a/docs/reference/client-helpers.md +++ b/docs/reference/client-helpers.md @@ -619,7 +619,7 @@ Added in `v8.16.0` ES|QL can return results in multiple binary formats, including [Apache Arrow](https://arrow.apache.org/)'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets. -`toArrowReader` returns a [`RecordBatchStreamReader`](https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.md). +`toArrowReader` returns an [`AsyncRecordBatchStreamReader`](https://github.com/apache/arrow/blob/520ae44272d491bbb52eb3c9b84864ed7088f11a/js/src/ipc/reader.ts#L216). ```ts const reader = await client.helpers @@ -627,7 +627,7 @@ const reader = await client.helpers .toArrowReader() // print each record as JSON -for (const recordBatch of reader) { +for await (const recordBatch of reader) { for (const record of recordBatch) { console.log(record.toJSON()) } diff --git a/src/helpers.ts b/src/helpers.ts index 39e5a3e70..e8a64545a 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -11,7 +11,7 @@ import assert from 'node:assert' import * as timersPromises from 'node:timers/promises' import { Readable } from 'node:stream' import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport' -import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node' +import { Table, TypeMap, tableFromIPC, AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow.node' import Client from './client' import * as T from './api/types' import { Id } from './api/types' @@ -135,7 +135,7 @@ export interface EsqlColumn { export interface EsqlHelper { toRecords: () => Promise> toArrowTable: () => Promise> - toArrowReader: () => Promise + toArrowReader: () => Promise } export interface EsqlToRecords { @@ -1000,7 +1000,7 @@ export default class Helpers { return tableFromIPC(response) }, - async toArrowReader (): Promise { + async toArrowReader (): Promise { if (metaHeader !== null) { reqOptions.headers = reqOptions.headers ?? {} reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa` @@ -1009,9 +1009,9 @@ export default class Helpers { params.format = 'arrow' - // @ts-expect-error the return type will be ArrayBuffer when the format is set to 'arrow' - const response: ArrayBuffer = await client.esql.query(params, reqOptions) - return RecordBatchStreamReader.from(response) + // @ts-expect-error response is a Readable when asStream is true + const response: Readable = await client.esql.query(params, reqOptions) + return await AsyncRecordBatchStreamReader.from(Readable.from(response)) } } diff --git a/test/unit/helpers/esql.test.ts b/test/unit/helpers/esql.test.ts index af09c18ba..dace000c6 100644 --- a/test/unit/helpers/esql.test.ts +++ b/test/unit/helpers/esql.test.ts @@ -182,17 +182,28 @@ test('ES|QL helper', t => { t.end() }) - test('toArrowReader', t => { - t.test('Parses a binary response into an Arrow stream reader', async t => { - const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + test('toArrowReader', async t => { + const testRecords = [ + { amount: 4.900000095367432, }, + { amount: 8.199999809265137, }, + { amount: 15.5, }, + { amount: 9.899999618530273, }, + { amount: 13.899999618530273, }, + ] + + // build reusable Arrow table + const table = arrow.tableFromJSON(testRecords) + const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array() + t.test('Parses a binary response into an Arrow stream reader', async t => { const MockConnection = connection.buildMockConnection({ onRequest (_params) { return { - body: Buffer.from(binaryContent, 'base64'), + body: Buffer.from(rawData), statusCode: 200, headers: { - 'content-type': 'application/vnd.elasticsearch+arrow+stream' + 'content-type': 'application/vnd.elasticsearch+arrow+stream', + 'transfer-encoding': 'chunked' } } } @@ -206,30 +217,8 @@ test('ES|QL helper', t => { const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() t.ok(result.isStream()) - const testRecords = [ - { - amount: 4.900000095367432, - date: 1729532586965, - }, - { - amount: 8.199999809265137, - date: 1729446186965, - }, - { - amount: 15.5, - date: 1729359786965, - }, - { - amount: 9.899999618530273, - date: 1729273386965, - }, - { - amount: 13.899999618530273, - date: 1729186986965, - }, - ] let count = 0 - for (const recordBatch of result) { + for await (const recordBatch of result) { for (const record of recordBatch) { t.same(record.toJSON(), testRecords[count]) count++ @@ -240,17 +229,16 @@ test('ES|QL helper', t => { }) t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => { - const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' - const MockConnection = connection.buildMockConnection({ onRequest (params) { const header = params.headers?.['x-elastic-client-meta'] ?? '' t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`) return { - body: Buffer.from(binaryContent, 'base64'), + body: Buffer.from(rawData), statusCode: 200, headers: { - 'content-type': 'application/vnd.elasticsearch+arrow+stream' + 'content-type': 'application/vnd.elasticsearch+arrow+stream', + 'transfer-encoding': 'chunked' } } } @@ -289,10 +277,12 @@ test('ES|QL helper', t => { new arrow.RecordBatch(schema, batch3.data), ]) + const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array() + const MockConnection = connection.buildMockConnection({ onRequest (_params) { return { - body: Buffer.from(arrow.tableToIPC(table, "stream")), + body: Buffer.from(rawData), statusCode: 200, headers: { 'content-type': 'application/vnd.elasticsearch+arrow+stream' @@ -310,7 +300,7 @@ test('ES|QL helper', t => { t.ok(result.isStream()) let counter = 0 - for (const batch of result) { + for await (const batch of result) { for (const row of batch) { counter++ const { id, val } = row.toJSON()