From b734b478b45d508b5e4442841430957d0c0a0b88 Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Mon, 3 Jun 2024 15:52:45 +0200 Subject: [PATCH] Add `Esql.QueryAsObjectsAsync` high level API --- .../Client/ElasticsearchClient.Esql.cs | 96 +++++++++++++++++++ .../Client/NamespacedClientProxy.cs | 12 +-- 2 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.Esql.cs diff --git a/src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.Esql.cs b/src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.Esql.cs new file mode 100644 index 00000000000..d378ddde9ad --- /dev/null +++ b/src/Elastic.Clients.Elasticsearch.Shared/Client/ElasticsearchClient.Esql.cs @@ -0,0 +1,96 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +#if !ELASTICSEARCH_SERVERLESS + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Threading.Tasks; +using System.Threading; + +#if ELASTICSEARCH_SERVERLESS +namespace Elastic.Clients.Elasticsearch.Esql.Serverless; +#else + +namespace Elastic.Clients.Elasticsearch.Esql; +#endif + +public partial class EsqlNamespacedClient +{ + public virtual async Task> QueryAsObjectsAsync( + Action> configureRequest, + CancellationToken cancellationToken = default) + { + if (configureRequest is null) + throw new ArgumentNullException(nameof(configureRequest)); + + var response = await QueryAsync(Configure, cancellationToken).ConfigureAwait(false); + + return EsqlToObject(Client, response); + + void Configure(EsqlQueryRequestDescriptor descriptor) + { + configureRequest(descriptor); + descriptor.Format("JSON"); + descriptor.Columnar(false); + } + } + + private static IEnumerable EsqlToObject(ElasticsearchClient client, EsqlQueryResponse response) + { + // TODO: Improve performance + + using var doc = JsonSerializer.Deserialize(response.Data) ?? throw new JsonException(); + + if (!doc.RootElement.TryGetProperty("columns"u8, out var columns) || (columns.ValueKind is not JsonValueKind.Array)) + throw new JsonException(""); + + if (!doc.RootElement.TryGetProperty("values"u8, out var values) || (values.ValueKind is not JsonValueKind.Array)) + yield break; + + var names = columns.EnumerateArray() + .Select(x => + { + if (!x.TryGetProperty("name"u8, out var prop)) + { + throw new JsonException(); + } + + var result = prop.GetString() ?? throw new JsonException(); + + return result; + }) + .ToArray(); + + var obj = new JsonObject(); + using var ms = new MemoryStream(); + using var writer = new Utf8JsonWriter(ms); + + foreach (var document in values.EnumerateArray()) + { + obj.Clear(); + ms.SetLength(0); + writer.Reset(); + + var properties = names.Zip(document.EnumerateArray(), + (key, value) => new KeyValuePair(key, JsonValue.Create(value))); + foreach (var property in properties) + obj.Add(property); + + obj.WriteTo(writer); + writer.Flush(); + ms.Position = 0; + + var result = client.SourceSerializer.Deserialize(ms) ?? throw new JsonException(""); + + yield return result; + } + } +} + +#endif diff --git a/src/Elastic.Clients.Elasticsearch.Shared/Client/NamespacedClientProxy.cs b/src/Elastic.Clients.Elasticsearch.Shared/Client/NamespacedClientProxy.cs index 1e34e7b0d43..ee5a4ed93d5 100644 --- a/src/Elastic.Clients.Elasticsearch.Shared/Client/NamespacedClientProxy.cs +++ b/src/Elastic.Clients.Elasticsearch.Shared/Client/NamespacedClientProxy.cs @@ -24,14 +24,14 @@ public abstract class NamespacedClientProxy private const string InvalidOperation = "The client has not been initialised for proper usage as may have been partially mocked. Ensure you are using a " + "new instance of ElasticsearchClient to perform requests over a network to Elasticsearch."; - private readonly ElasticsearchClient _client; + protected ElasticsearchClient Client { get; } /// /// Initializes a new instance for mocking. /// protected NamespacedClientProxy() { } - internal NamespacedClientProxy(ElasticsearchClient client) => _client = client; + internal NamespacedClientProxy(ElasticsearchClient client) => Client = client; internal TResponse DoRequest(TRequest request) where TRequest : Request @@ -46,10 +46,10 @@ internal TResponse DoRequest( where TResponse : ElasticsearchResponse, new() where TRequestParameters : RequestParameters, new() { - if (_client is null) + if (Client is null) ThrowHelper.ThrowInvalidOperationException(InvalidOperation); - return _client.DoRequest(request, forceConfiguration); + return Client.DoRequest(request, forceConfiguration); } internal Task DoRequestAsync( @@ -68,9 +68,9 @@ internal Task DoRequestAsync where TResponse : ElasticsearchResponse, new() where TRequestParameters : RequestParameters, new() { - if (_client is null) + if (Client is null) ThrowHelper.ThrowInvalidOperationException(InvalidOperation); - return _client.DoRequestAsync(request, forceConfiguration, cancellationToken); + return Client.DoRequestAsync(request, forceConfiguration, cancellationToken); } }