diff --git a/docs/reference/client-helpers.md b/docs/reference/client-helpers.md index c80562db4..2aad979e6 100644 --- a/docs/reference/client-helpers.md +++ b/docs/reference/client-helpers.md @@ -475,7 +475,7 @@ Added in `v8.16.0` ES|QL can return results in multiple binary formats, including [Apache Arrow](https://arrow.apache.org/)'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets. -`toArrowReader` returns a [`RecordBatchStreamReader`](https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.md). +`toArrowReader` returns an [`AsyncRecordBatchStreamReader`](https://github.com/apache/arrow/blob/520ae44272d491bbb52eb3c9b84864ed7088f11a/js/src/ipc/reader.ts#L216). ```ts const reader = await client.helpers @@ -483,7 +483,7 @@ const reader = await client.helpers .toArrowReader() // print each record as JSON -for (const recordBatch of reader) { +for await (const recordBatch of reader) { for (const record of recordBatch) { console.log(record.toJSON()) } diff --git a/src/helpers.ts b/src/helpers.ts index 39e5a3e70..e8a64545a 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -11,7 +11,7 @@ import assert from 'node:assert' import * as timersPromises from 'node:timers/promises' import { Readable } from 'node:stream' import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport' -import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node' +import { Table, TypeMap, tableFromIPC, AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow.node' import Client from './client' import * as T from './api/types' import { Id } from './api/types' @@ -135,7 +135,7 @@ export interface EsqlColumn { export interface EsqlHelper { toRecords: () => Promise> toArrowTable: () => Promise> - toArrowReader: () => Promise + toArrowReader: () => Promise } export interface EsqlToRecords { @@ -1000,7 +1000,7 @@ export default class Helpers { return tableFromIPC(response) }, - async toArrowReader (): Promise { + async toArrowReader (): Promise { if (metaHeader !== null) { reqOptions.headers = reqOptions.headers ?? {} reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa` @@ -1009,9 +1009,9 @@ export default class Helpers { params.format = 'arrow' - // @ts-expect-error the return type will be ArrayBuffer when the format is set to 'arrow' - const response: ArrayBuffer = await client.esql.query(params, reqOptions) - return RecordBatchStreamReader.from(response) + // @ts-expect-error response is a Readable when asStream is true + const response: Readable = await client.esql.query(params, reqOptions) + return await AsyncRecordBatchStreamReader.from(Readable.from(response)) } } diff --git a/test/unit/helpers/esql.test.ts b/test/unit/helpers/esql.test.ts index 3a66ee7d4..32f363674 100644 --- a/test/unit/helpers/esql.test.ts +++ b/test/unit/helpers/esql.test.ts @@ -158,17 +158,28 @@ test('ES|QL helper', t => { t.end() }) - test('toArrowReader', t => { - t.test('Parses a binary response into an Arrow stream reader', async t => { - const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + test('toArrowReader', async t => { + const testRecords = [ + { amount: 4.900000095367432, }, + { amount: 8.199999809265137, }, + { amount: 15.5, }, + { amount: 9.899999618530273, }, + { amount: 13.899999618530273, }, + ] + + // build reusable Arrow table + const table = arrow.tableFromJSON(testRecords) + const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array() + t.test('Parses a binary response into an Arrow stream reader', async t => { const MockConnection = connection.buildMockConnection({ onRequest (_params) { return { - body: Buffer.from(binaryContent, 'base64'), + body: Buffer.from(rawData), statusCode: 200, headers: { - 'content-type': 'application/vnd.elasticsearch+arrow+stream' + 'content-type': 'application/vnd.elasticsearch+arrow+stream', + 'transfer-encoding': 'chunked' } } } @@ -182,26 +193,28 @@ test('ES|QL helper', t => { const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader() t.ok(result.isStream()) - const recordBatch = result.next().value - t.same(recordBatch.get(0)?.toJSON(), { - amount: 4.900000095367432, - date: 1729532586965, - }) + let count = 0 + for await (const recordBatch of result) { + for (const record of recordBatch) { + t.same(record.toJSON(), testRecords[count]) + count++ + } + } + t.end() }) t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => { - const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' - const MockConnection = connection.buildMockConnection({ onRequest (params) { const header = params.headers?.['x-elastic-client-meta'] ?? '' t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`) return { - body: Buffer.from(binaryContent, 'base64'), + body: Buffer.from(rawData), statusCode: 200, headers: { - 'content-type': 'application/vnd.elasticsearch+arrow+stream' + 'content-type': 'application/vnd.elasticsearch+arrow+stream', + 'transfer-encoding': 'chunked' } } } @@ -240,10 +253,12 @@ test('ES|QL helper', t => { new arrow.RecordBatch(schema, batch3.data), ]) + const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array() + const MockConnection = connection.buildMockConnection({ onRequest (_params) { return { - body: Buffer.from(arrow.tableToIPC(table, "stream")), + body: Buffer.from(rawData), statusCode: 200, headers: { 'content-type': 'application/vnd.elasticsearch+arrow+stream' @@ -261,7 +276,7 @@ test('ES|QL helper', t => { t.ok(result.isStream()) let counter = 0 - for (const batch of result) { + for await (const batch of result) { for (const row of batch) { counter++ const { id, val } = row.toJSON()