diff --git a/docs/client-concepts/connection/configuration-options.asciidoc b/docs/client-concepts/connection/configuration-options.asciidoc index 53095b50b60..6d1d8b960ea 100644 --- a/docs/client-concepts/connection/configuration-options.asciidoc +++ b/docs/client-concepts/connection/configuration-options.asciidoc @@ -64,6 +64,10 @@ Ensures the response bytes are always available on the `ElasticsearchResponse + IMPORTANT: Depending on the registered serializer, this may cause the response to be buffered in memory first, potentially affecting performance. +`DisableMetaHeader`:: + +Disables the meta header which is included on all requests by default. This header contains lightweight information about the client and runtime. + `DisablePing`:: When a node is used for the very first time or when it's used for the first time after it has been marked dead a ping with a very low timeout is send to the node to make sure that when it's still dead it reports it as fast as possible. You can disable these pings globally here if you rather have it fail on the possible slower original request diff --git a/src/Elasticsearch.Net/Api/RequestParameters/RequestParametersExtensions.cs b/src/Elasticsearch.Net/Api/RequestParameters/RequestParametersExtensions.cs new file mode 100644 index 00000000000..de4bb8307b6 --- /dev/null +++ b/src/Elasticsearch.Net/Api/RequestParameters/RequestParametersExtensions.cs @@ -0,0 +1,24 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; + +namespace Elasticsearch.Net +{ + internal static class RequestParametersExtensions + { + internal static void SetRequestMetaData(this IRequestParameters parameters, RequestMetaData requestMetaData) + { + if (parameters is null) + throw new ArgumentNullException(nameof(parameters)); + + if (requestMetaData is null) + throw new ArgumentNullException(nameof(requestMetaData)); + + parameters.RequestConfiguration ??= new RequestConfiguration(); + + parameters.RequestConfiguration.RequestMetaData = requestMetaData; + } + } +} diff --git a/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs b/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs index bf53f2ced7c..7232f148207 100644 --- a/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs +++ b/src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs @@ -33,36 +33,7 @@ public class ConnectionConfiguration : ConnectionConfigurationhttps://github.com/dotnet/runtime/issues/22366 /// - private static bool UsingCurlHandler - { - get - { -#if !DOTNETCORE - return false; -#else - var curlHandlerExists = typeof(HttpClientHandler).Assembly.GetType("System.Net.Http.CurlHandler") != null; - if (!curlHandlerExists) return false; - - var socketsHandlerExists = typeof(HttpClientHandler).Assembly.GetType("System.Net.Http.SocketsHttpHandler") != null; - // running on a .NET core version with CurlHandler, before the existence of SocketsHttpHandler. - // Must be using CurlHandler. - if (!socketsHandlerExists) return true; - - if (AppContext.TryGetSwitch("System.Net.Http.UseSocketsHttpHandler", out var isEnabled)) - return !isEnabled; - - var environmentVariable = - Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_HTTP_USESOCKETSHTTPHANDLER"); - - // SocketsHandler exists and no environment variable exists to disable it. - // Must be using SocketsHandler and not CurlHandler - if (environmentVariable == null) return false; - - return environmentVariable.Equals("false", StringComparison.OrdinalIgnoreCase) || - environmentVariable.Equals("0"); -#endif - } - } + private static bool UsingCurlHandler => ConnectionInfo.UsingCurlHandler; /// /// The default ping timeout. Defaults to 2 seconds @@ -153,7 +124,6 @@ public ConnectionConfiguration(IConnectionPool connectionPool, IElasticsearchSer /// A serializer implementation used to serialize requests and deserialize responses public ConnectionConfiguration(IConnectionPool connectionPool, IConnection connection, IElasticsearchSerializer serializer) : base(connectionPool, connection, serializer) { } - } [Browsable(false)] @@ -176,6 +146,7 @@ public abstract class ConnectionConfiguration : IConnectionConfigurationValue private TimeSpan? _deadTimeout; private bool _disableAutomaticProxyDetection; private bool _disableDirectStreaming; + private bool _disableMetaHeader; private bool _disablePings; private bool _enableHttpCompression; private bool _enableHttpPipelining = true; @@ -207,7 +178,7 @@ public abstract class ConnectionConfiguration : IConnectionConfigurationValue private bool _enableThreadPoolStats; private string _userAgent = ConnectionConfiguration.DefaultUserAgent; - private Func _statusCodeToResponseSuccess; + private readonly Func _statusCodeToResponseSuccess; protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection connection, IElasticsearchSerializer requestResponseSerializer) { @@ -234,7 +205,6 @@ protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection co _apiKeyAuthCredentials = cloudPool.ApiKeyCredentials; _enableHttpCompression = true; } - } protected IElasticsearchSerializer UseThisRequestResponseSerializer { get; set; } @@ -248,6 +218,7 @@ protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection co TimeSpan? IConnectionConfigurationValues.DeadTimeout => _deadTimeout; bool IConnectionConfigurationValues.DisableAutomaticProxyDetection => _disableAutomaticProxyDetection; bool IConnectionConfigurationValues.DisableDirectStreaming => _disableDirectStreaming; + bool IConnectionConfigurationValues.DisableMetaHeader => _disableMetaHeader; bool IConnectionConfigurationValues.DisablePings => _disablePings; bool IConnectionConfigurationValues.EnableHttpCompression => _enableHttpCompression; NameValueCollection IConnectionConfigurationValues.Headers => _headers; @@ -286,6 +257,8 @@ protected ConnectionConfiguration(IConnectionPool connectionPool, IConnection co bool IConnectionConfigurationValues.TransferEncodingChunked => _transferEncodingChunked; bool IConnectionConfigurationValues.EnableTcpStats => _enableTcpStats; bool IConnectionConfigurationValues.EnableThreadPoolStats => _enableThreadPoolStats; + + MetaHeaderProvider IConnectionConfigurationValues.MetaHeaderProvider { get; } = new MetaHeaderProvider(); void IDisposable.Dispose() => DisposeManagedResources(); @@ -368,6 +341,12 @@ public T SniffOnConnectionFault(bool sniffsOnConnectionFault = true) => /// public T DisableAutomaticProxyDetection(bool disable = true) => Assign(disable, (a, v) => a._disableAutomaticProxyDetection = v); + /// + /// Disables the meta header which is included on all requests by default. This header contains lightweight information + /// about the client and runtime. + /// + public T DisableMetaHeader(bool disable = true) => Assign(disable, (a, v) => a._disableMetaHeader = v); + /// /// Instead of following a c/go like error checking on response.IsValid do throw an exception (except when is false) /// on the client when a call resulted in an exception on either the client or the Elasticsearch server. @@ -432,11 +411,11 @@ public T SniffOnConnectionFault(bool sniffsOnConnectionFault = true) => /// /// DnsRefreshTimeout for the connections. Defaults to 5 minutes. - #if DOTNETCORE +#if DOTNETCORE /// Will create new instances of after this timeout to force DNS updates - #else +#else /// Will set both and - #endif +#endif /// public T DnsRefreshTimeout(TimeSpan timeout) => Assign(timeout, (a, v) => a._dnsRefreshTimeout = v); diff --git a/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs b/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs index 6529e127042..043164d6e4e 100644 --- a/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs +++ b/src/Elasticsearch.Net/Configuration/IConnectionConfigurationValues.cs @@ -77,6 +77,8 @@ public interface IConnectionConfigurationValues : IDisposable /// bool DisableDirectStreaming { get; } + bool DisableMetaHeader { get; } + /// /// This signals that we do not want to send initial pings to unknown/previously dead nodes /// and just send the call straightaway @@ -273,5 +275,10 @@ public interface IConnectionConfigurationValues : IDisposable /// Enable statistics about thread pools to be collected when making a request /// bool EnableThreadPoolStats { get; } + + /// + /// Produces the client meta header for a request. + /// + MetaHeaderProvider MetaHeaderProvider { get; } } } diff --git a/src/Elasticsearch.Net/Configuration/RequestConfiguration.cs b/src/Elasticsearch.Net/Configuration/RequestConfiguration.cs index 3a85bdf0257..5b2987e0fb2 100644 --- a/src/Elasticsearch.Net/Configuration/RequestConfiguration.cs +++ b/src/Elasticsearch.Net/Configuration/RequestConfiguration.cs @@ -126,6 +126,11 @@ public interface IRequestConfiguration /// bool? EnableThreadPoolStats { get; set; } + + /// + /// Holds additional meta data about the request. + /// + RequestMetaData RequestMetaData { get; set; } } public class RequestConfiguration : IRequestConfiguration @@ -172,6 +177,8 @@ public class RequestConfiguration : IRequestConfiguration public bool? EnableTcpStats { get; set; } /// public bool? EnableThreadPoolStats { get; set; } + /// + public RequestMetaData RequestMetaData { get; set; } } public class RequestConfigurationDescriptor : IRequestConfiguration @@ -223,6 +230,7 @@ public RequestConfigurationDescriptor(IRequestConfiguration config) NameValueCollection IRequestConfiguration.Headers { get; set; } bool? IRequestConfiguration.EnableTcpStats { get; set; } bool? IRequestConfiguration.EnableThreadPoolStats { get; set; } + RequestMetaData IRequestConfiguration.RequestMetaData { get; set; } /// /// Submit the request on behalf in the context of a different shield user @@ -406,5 +414,12 @@ public RequestConfigurationDescriptor EnableThreadPoolStats(bool? enableThreadPo Self.EnableThreadPoolStats = enableThreadPoolStats; return this; } + + /// + internal RequestConfigurationDescriptor RequestMetaData(RequestMetaData metaData) + { + Self.RequestMetaData = metaData; + return this; + } } } diff --git a/src/Elasticsearch.Net/Configuration/RequestConfigurationExtensions.cs b/src/Elasticsearch.Net/Configuration/RequestConfigurationExtensions.cs new file mode 100644 index 00000000000..51683edc970 --- /dev/null +++ b/src/Elasticsearch.Net/Configuration/RequestConfigurationExtensions.cs @@ -0,0 +1,22 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; + +namespace Elasticsearch.Net +{ + internal static class RequestConfigurationExtensions + { + internal static void SetRequestMetaData(this IRequestConfiguration requestConfiguration, RequestMetaData requestMetaData) + { + if (requestConfiguration is null) + throw new ArgumentNullException(nameof(requestConfiguration)); + + if (requestMetaData is null) + throw new ArgumentNullException(nameof(requestMetaData)); + + requestConfiguration.RequestMetaData = requestMetaData; + } + } +} diff --git a/src/Elasticsearch.Net/Configuration/RequestMetaData.cs b/src/Elasticsearch.Net/Configuration/RequestMetaData.cs new file mode 100644 index 00000000000..ec701973c24 --- /dev/null +++ b/src/Elasticsearch.Net/Configuration/RequestMetaData.cs @@ -0,0 +1,34 @@ +using System.Collections.Generic; + +namespace Elasticsearch.Net +{ + /// + /// Holds meta data about a client request. + /// + public sealed class RequestMetaData + { + /// + /// Reserved key for a meta data entry which identifies the helper which produced the request. + /// + internal const string HelperKey = "helper"; + + private Dictionary _metaDataItems; + + internal bool TryAddMetaData (string key, string value) + { + _metaDataItems ??= new Dictionary(); + +#if NETSTANDARD2_1 + return _metaDataItems.TryAdd(key, value); +#else + if (_metaDataItems.ContainsKey(key)) + return false; + + _metaDataItems.Add(key, value); + return true; +#endif + } + + public IReadOnlyDictionary Items => _metaDataItems ?? EmptyReadOnly.Dictionary; + } +} diff --git a/src/Elasticsearch.Net/Connection/ConnectionInfo.cs b/src/Elasticsearch.Net/Connection/ConnectionInfo.cs new file mode 100644 index 00000000000..b34c9f490ac --- /dev/null +++ b/src/Elasticsearch.Net/Connection/ConnectionInfo.cs @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +#if DOTNETCORE +using System.Net.Http; +#endif + +namespace Elasticsearch.Net +{ + public static class ConnectionInfo + { + public static bool UsingCurlHandler + { + get + { +#if !DOTNETCORE + return false; +#else + var curlHandlerExists = typeof(HttpClientHandler).Assembly.GetType("System.Net.Http.CurlHandler") != null; + if (!curlHandlerExists) + return false; + + var socketsHandlerExists = typeof(HttpClientHandler).Assembly.GetType("System.Net.Http.SocketsHttpHandler") != null; + // running on a .NET core version with CurlHandler, before the existence of SocketsHttpHandler. + // Must be using CurlHandler. + if (!socketsHandlerExists) + return true; + + if (AppContext.TryGetSwitch("System.Net.Http.UseSocketsHttpHandler", out var isEnabled)) + return !isEnabled; + + var environmentVariable = + Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_HTTP_USESOCKETSHTTPHANDLER"); + + // SocketsHandler exists and no environment variable exists to disable it. + // Must be using SocketsHandler and not CurlHandler + if (environmentVariable == null) + return false; + + return environmentVariable.Equals("false", StringComparison.OrdinalIgnoreCase) || + environmentVariable.Equals("0"); +#endif + } + } + } +} diff --git a/src/Elasticsearch.Net/Connection/HttpConnection.cs b/src/Elasticsearch.Net/Connection/HttpConnection.cs index ff2cbb42ff2..cfc034b7252 100644 --- a/src/Elasticsearch.Net/Connection/HttpConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpConnection.cs @@ -37,7 +37,6 @@ internal class WebProxy : IWebProxy public bool IsBypassed(Uri host) => host.IsLoopback; } - /// The default IConnection implementation. Uses . public class HttpConnection : IConnection { @@ -139,6 +138,7 @@ public virtual async Task RequestAsync(RequestData request IDisposable receive = DiagnosticSources.SingletonDisposable; ReadOnlyDictionary tcpStats = null; ReadOnlyDictionary threadPoolStats = null; + requestData.IsAsync = true; try { @@ -333,6 +333,14 @@ protected virtual HttpRequestMessage CreateRequestMessage(RequestData requestDat if (!requestData.RunAs.IsNullOrEmpty()) requestMessage.Headers.Add(RequestData.RunAsSecurityHeader, requestData.RunAs); + if (requestData.MetaHeaderProvider is object) + { + var value = requestData.MetaHeaderProvider.ProduceHeaderValue(requestData); + + if (!string.IsNullOrEmpty(value)) + requestMessage.Headers.TryAddWithoutValidation(requestData.MetaHeaderProvider.HeaderName, value); + } + return requestMessage; } diff --git a/src/Elasticsearch.Net/Connection/HttpWebRequestConnection.cs b/src/Elasticsearch.Net/Connection/HttpWebRequestConnection.cs index 56c783fcc12..071f1a03486 100644 --- a/src/Elasticsearch.Net/Connection/HttpWebRequestConnection.cs +++ b/src/Elasticsearch.Net/Connection/HttpWebRequestConnection.cs @@ -111,6 +111,7 @@ CancellationToken cancellationToken try { + requestData.IsAsync = true; var data = requestData.PostData; var request = CreateHttpWebRequest(requestData); using (cancellationToken.Register(() => request.Abort())) @@ -238,6 +239,14 @@ protected virtual HttpWebRequest CreateWebRequest(RequestData requestData) if (requestData.Headers != null && requestData.Headers.HasKeys()) request.Headers.Add(requestData.Headers); + if (requestData.MetaHeaderProvider is object) + { + var value = requestData.MetaHeaderProvider.ProduceHeaderValue(requestData); + + if (!string.IsNullOrEmpty(value)) + request.Headers.Add(requestData.MetaHeaderProvider.HeaderName, requestData.MetaHeaderProvider.ProduceHeaderValue(requestData)); + } + var timeout = (int)requestData.RequestTimeout.TotalMilliseconds; request.Timeout = timeout; request.ReadWriteTimeout = timeout; diff --git a/src/Elasticsearch.Net/Connection/MetaData/ClientVersionInfo.cs b/src/Elasticsearch.Net/Connection/MetaData/ClientVersionInfo.cs new file mode 100644 index 00000000000..a222650210e --- /dev/null +++ b/src/Elasticsearch.Net/Connection/MetaData/ClientVersionInfo.cs @@ -0,0 +1,50 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Diagnostics; +using System.Reflection; +using System.Text.RegularExpressions; + +namespace Elasticsearch.Net +{ + internal sealed class ClientVersionInfo : VersionInfo + { + private static readonly Regex VersionRegex = new Regex(@"(\d+\.)(\d+\.)(\d)"); + + public static readonly ClientVersionInfo Empty = new ClientVersionInfo { Version = new Version(0, 0, 0), IsPrerelease = false }; + + private ClientVersionInfo() { } + + public static ClientVersionInfo Create() + { + var fullVersion = DetermineClientVersion(typeof(T)); + + var clientVersion = new ClientVersionInfo(); + clientVersion.StoreVersion(fullVersion); + return clientVersion; + } + + private static string DetermineClientVersion(Type type) + { + try + { + var productVersion = FileVersionInfo.GetVersionInfo(type.GetTypeInfo().Assembly.Location)?.ProductVersion ?? EmptyVersion; + + if (productVersion == EmptyVersion) + productVersion = Assembly.GetAssembly(type).GetName().Version.ToString(); + + var match = VersionRegex.Match(productVersion); + + return match.Success ? match.Value : EmptyVersion; + } + catch + { + // ignore failures and fall through + } + + return EmptyVersion; + } + } +} diff --git a/src/Elasticsearch.Net/Connection/MetaData/MetaDataHeader.cs b/src/Elasticsearch.Net/Connection/MetaData/MetaDataHeader.cs new file mode 100644 index 00000000000..187b805ce2e --- /dev/null +++ b/src/Elasticsearch.Net/Connection/MetaData/MetaDataHeader.cs @@ -0,0 +1,44 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Text; + +namespace Elasticsearch.Net +{ + internal sealed class MetaDataHeader + { + private const char _separator = ','; + + private readonly string _headerValue; + + public MetaDataHeader(VersionInfo version, string serviceIdentifier, bool isAsync) + { + ClientVersion = version.ToString(); + RuntimeVersion = new RuntimeVersionInfo().ToString(); + ServiceIdentifier = serviceIdentifier; + + // This code is expected to be called infrequently so we're not concerns with over optimising this + + _headerValue = new StringBuilder(64) + .Append(serviceIdentifier).Append("=").Append(ClientVersion).Append(_separator) + .Append("a=").Append(isAsync ? "1" : "0").Append(_separator) + .Append("net=").Append(RuntimeVersion).Append(_separator) + .Append(_httpClientIdentifier).Append("=").Append(RuntimeVersion) + .ToString(); + } + + private static readonly string _httpClientIdentifier = +#if DOTNETCORE + ConnectionInfo.UsingCurlHandler ? "cu" : "so"; +#else + "wr"; +#endif + + public string ServiceIdentifier { get; private set; } + public string ClientVersion { get; private set; } + public string RuntimeVersion { get; private set; } + + public override string ToString() => _headerValue; + } +} diff --git a/src/Elasticsearch.Net/Connection/MetaData/MetaHeaderProvider.cs b/src/Elasticsearch.Net/Connection/MetaData/MetaHeaderProvider.cs new file mode 100644 index 00000000000..97325ca9cde --- /dev/null +++ b/src/Elasticsearch.Net/Connection/MetaData/MetaHeaderProvider.cs @@ -0,0 +1,50 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elasticsearch.Net +{ + /// + /// Produces the meta header when this functionality is enabled in the . + /// + public class MetaHeaderProvider + { + private const string MetaHeaderName = "x-elastic-client-meta"; + + private readonly MetaDataHeader _asyncMetaDataHeader; + private readonly MetaDataHeader _syncMetaDataHeader; + + public MetaHeaderProvider() + { + var clientVersionInfo = ClientVersionInfo.Create(); + _asyncMetaDataHeader = new MetaDataHeader(clientVersionInfo, "es", true); + _syncMetaDataHeader = new MetaDataHeader(clientVersionInfo, "es", false); + } + + public string HeaderName => MetaHeaderName; + + public string ProduceHeaderValue(RequestData requestData) + { + try + { + if (requestData.ConnectionSettings.DisableMetaHeader) + return null; + + var headerValue = requestData.IsAsync + ? _asyncMetaDataHeader.ToString() + : _syncMetaDataHeader.ToString(); + + if (requestData.RequestMetaData.TryGetValue(RequestMetaData.HelperKey, out var helperSuffix)) + headerValue = $"{headerValue},h={helperSuffix}"; + + return headerValue; + } + catch + { + // don't fail the application just because we cannot create this optional header + } + + return string.Empty; + } + } +} diff --git a/src/Elasticsearch.Net/Connection/MetaData/RuntimeVersionInfo.cs b/src/Elasticsearch.Net/Connection/MetaData/RuntimeVersionInfo.cs new file mode 100644 index 00000000000..f32096bf447 --- /dev/null +++ b/src/Elasticsearch.Net/Connection/MetaData/RuntimeVersionInfo.cs @@ -0,0 +1,243 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +// Adapted from BenchmarkDotNet source https://github.com/dotnet/BenchmarkDotNet/blob/master/src/BenchmarkDotNet/Environments/Runtimes/CoreRuntime.cs +#region BenchmarkDotNet License https://github.com/dotnet/BenchmarkDotNet/blob/master/LICENSE.md +// The MIT License +// Copyright (c) 2013–2020.NET Foundation and contributors + +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software +// and associated documentation files (the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all copies or substantial +// portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT +// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +#endregion + +using System; +#if DOTNETCORE +using System.Diagnostics; +using System.Reflection; +using System.Runtime.InteropServices; +using System.Runtime.Versioning; +#else +using Microsoft.Win32; +using System.Linq; +#endif + +namespace Elasticsearch.Net +{ + /// + /// Represents the current .NET Runtime version. + /// + internal sealed class RuntimeVersionInfo : VersionInfo + { + public static readonly RuntimeVersionInfo Default = new RuntimeVersionInfo { Version = new Version(0, 0, 0), IsPrerelease = false }; + + public RuntimeVersionInfo() => StoreVersion(GetRuntimeVersion()); + + private static string GetRuntimeVersion() => +#if !DOTNETCORE + GetFullFrameworkRuntime(); +#else + GetNetCoreVersion(); +#endif + +#if DOTNETCORE + private static string GetNetCoreVersion() + { + // for .NET 5+ we can use Environment.Version + if (Environment.Version.Major >= 5) + { + const string dotNet = ".NET "; + var index = RuntimeInformation.FrameworkDescription.IndexOf(dotNet, StringComparison.OrdinalIgnoreCase); + if (index >= 0) + { + return RuntimeInformation.FrameworkDescription.Substring(dotNet.Length); + } + } + + // next, try using file version info + var systemPrivateCoreLib = FileVersionInfo.GetVersionInfo(typeof(object).Assembly.Location); + if (TryGetVersionFromProductInfo(systemPrivateCoreLib.ProductVersion, systemPrivateCoreLib.ProductName, out var runtimeVersion)) + { + return runtimeVersion; + } + + var assembly = typeof(System.Runtime.GCSettings).GetTypeInfo().Assembly; + if (TryGetVersionFromAssemblyPath(assembly, out runtimeVersion)) + { + return runtimeVersion; + } + + //At this point, we can't identify whether this is a prerelease, but a version is better than nothing! + + var frameworkName = Assembly.GetEntryAssembly()?.GetCustomAttribute()?.FrameworkName; + if (TryGetVersionFromFrameworkName(frameworkName, out runtimeVersion)) + { + return runtimeVersion; + } + + if (IsRunningInContainer) + { + var dotNetVersion = Environment.GetEnvironmentVariable("DOTNET_VERSION"); + var aspNetCoreVersion = Environment.GetEnvironmentVariable("ASPNETCORE_VERSION"); + + return dotNetVersion ?? aspNetCoreVersion; + } + + return null; + } + + private static bool TryGetVersionFromAssemblyPath(Assembly assembly, out string runtimeVersion) + { + var assemblyPath = assembly.CodeBase.Split(new[] { '/', '\\' }, StringSplitOptions.RemoveEmptyEntries); + var netCoreAppIndex = Array.IndexOf(assemblyPath, "Microsoft.NETCore.App"); + if (netCoreAppIndex > 0 && netCoreAppIndex < assemblyPath.Length - 2) + { + runtimeVersion = assemblyPath[netCoreAppIndex + 1]; + return true; + } + + runtimeVersion = null; + return false; + } + + // NOTE: 5.0.1 FrameworkDescription returns .NET 5.0.1-servicing.20575.16, so we special case servicing as NOT prerelease + protected override bool ContainsPrerelease(string version) => base.ContainsPrerelease(version) && !version.Contains("-servicing"); + + // sample input: + // 2.0: 4.6.26614.01 @BuiltBy: dlab14-DDVSOWINAGE018 @Commit: a536e7eec55c538c94639cefe295aa672996bf9b, Microsoft .NET Framework + // 2.1: 4.6.27817.01 @BuiltBy: dlab14-DDVSOWINAGE101 @Branch: release/2.1 @SrcCode: https://github.com/dotnet/coreclr/tree/6f78fbb3f964b4f407a2efb713a186384a167e5c, Microsoft .NET Framework + // 2.2: 4.6.27817.03 @BuiltBy: dlab14-DDVSOWINAGE101 @Branch: release/2.2 @SrcCode: https://github.com/dotnet/coreclr/tree/ce1d090d33b400a25620c0145046471495067cc7, Microsoft .NET Framework + // 3.0: 3.0.0-preview8.19379.2+ac25be694a5385a6a1496db40de932df0689b742, Microsoft .NET Core + // 5.0: 5.0.0-alpha1.19413.7+0ecefa44c9d66adb8a997d5778dc6c246ad393a7, Microsoft .NET Core + private static bool TryGetVersionFromProductInfo(string productVersion, string productName, out string version) + { + if (string.IsNullOrEmpty(productVersion) || string.IsNullOrEmpty(productName)) + { + version = null; + return false; + } + + // yes, .NET Core 2.X has a product name == .NET Framework... + if (productName.IndexOf(".NET Framework", StringComparison.OrdinalIgnoreCase) >= 0) + { + const string releaseVersionPrefix = "release/"; + var releaseVersionIndex = productVersion.IndexOf(releaseVersionPrefix); + if (releaseVersionIndex > 0) + { + version = productVersion.Substring(releaseVersionIndex + releaseVersionPrefix.Length); + return true; + } + } + + // matches .NET Core and also .NET 5+ + if (productName.IndexOf(".NET", StringComparison.OrdinalIgnoreCase) >= 0) + { + version = productVersion; + return true; + } + + version = null; + return false; + } + + // sample input: + // .NETCoreApp,Version=v2.0 + // .NETCoreApp,Version=v2.1 + private static bool TryGetVersionFromFrameworkName(string frameworkName, out string runtimeVersion) + { + const string versionPrefix = ".NETCoreApp,Version=v"; + if (!string.IsNullOrEmpty(frameworkName) && frameworkName.StartsWith(versionPrefix)) + { + runtimeVersion = frameworkName.Substring(versionPrefix.Length); + return true; + } + + runtimeVersion = null; + return false; + } + + private static bool IsRunningInContainer => string.Equals(Environment.GetEnvironmentVariable("DOTNET_RUNNING_IN_CONTAINER"), "true"); +#endif + +#if !DOTNETCORE + private static string GetFullFrameworkRuntime() + { + const string subkey = @"SOFTWARE\Microsoft\NET Framework Setup\NDP\v4\Full\"; + + using (var ndpKey = RegistryKey.OpenBaseKey(RegistryHive.LocalMachine, RegistryView.Registry32).OpenSubKey(subkey)) + { + if (ndpKey != null && ndpKey.GetValue("Release") != null) + { + var version = CheckFor45PlusVersion((int)ndpKey.GetValue("Release")); + + if (!string.IsNullOrEmpty(version) ) + return version; + } + } + + var fullName = RuntimeInformation.FrameworkDescription; + var servicingVersion = new string(fullName.SkipWhile(c => !char.IsDigit(c)).ToArray()); + var servicingVersionRelease = MapToReleaseVersion(servicingVersion); + + return servicingVersionRelease; + + static string MapToReleaseVersion(string servicingVersion) + { + // the following code assumes that .NET 4.6.1 is the oldest supported version + if (string.Compare(servicingVersion, "4.6.2") < 0) + return "4.6.1"; + if (string.Compare(servicingVersion, "4.7") < 0) + return "4.6.2"; + if (string.Compare(servicingVersion, "4.7.1") < 0) + return "4.7"; + if (string.Compare(servicingVersion, "4.7.2") < 0) + return "4.7.1"; + if (string.Compare(servicingVersion, "4.8") < 0) + return "4.7.2"; + + return "4.8.0"; // most probably the last major release of Full .NET Framework + } + + // Checking the version using >= enables forward compatibility. + static string CheckFor45PlusVersion(int releaseKey) + { + if (releaseKey >= 528040) + return "4.8.0"; + if (releaseKey >= 461808) + return "4.7.2"; + if (releaseKey >= 461308) + return "4.7.1"; + if (releaseKey >= 460798) + return "4.7"; + if (releaseKey >= 394802) + return "4.6.2"; + if (releaseKey >= 394254) + return "4.6.1"; + if (releaseKey >= 393295) + return "4.6"; + if (releaseKey >= 379893) + return "4.5.2"; + if (releaseKey >= 378675) + return "4.5.1"; + if (releaseKey >= 378389) + return "4.5.0"; + // This code should never execute. A non-null release key should mean + // that 4.5 or later is installed. + return null; + } + } +#endif + } +} diff --git a/src/Elasticsearch.Net/Connection/MetaData/VersionInfo.cs b/src/Elasticsearch.Net/Connection/MetaData/VersionInfo.cs new file mode 100644 index 00000000000..ea2e49d34dd --- /dev/null +++ b/src/Elasticsearch.Net/Connection/MetaData/VersionInfo.cs @@ -0,0 +1,47 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Linq; + +namespace Elasticsearch.Net +{ + internal abstract class VersionInfo + { + protected const string EmptyVersion = "0.0.0"; + + public Version Version { get; protected set; } + public bool IsPrerelease { get; protected set; } + + protected void StoreVersion(string fullVersion) + { + if (string.IsNullOrEmpty(fullVersion)) + fullVersion = EmptyVersion; + + var clientVersion = GetParsableVersionPart(fullVersion); + + if (!Version.TryParse(clientVersion, out var parsedVersion)) + throw new ArgumentException("Invalid version string", nameof(fullVersion)); + + var finalVersion = parsedVersion; + + if (parsedVersion.Minor == -1 || parsedVersion.Build == -1) + finalVersion = new Version(parsedVersion.Major, parsedVersion.Minor > -1 + ? parsedVersion.Minor + : 0, parsedVersion.Build > -1 + ? parsedVersion.Build + : 0); + + Version = finalVersion; + IsPrerelease = ContainsPrerelease(fullVersion); + } + + protected virtual bool ContainsPrerelease(string version) => version.Contains("-"); + + private static string GetParsableVersionPart(string fullVersionName) => + new string(fullVersionName.TakeWhile(c => char.IsDigit(c) || c == '.').ToArray()); + + public override string ToString() => IsPrerelease ? Version.ToString() + "p" : Version.ToString(); + } +} diff --git a/src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs b/src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs index 229bd03525c..17e5623701b 100644 --- a/src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs +++ b/src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs @@ -91,6 +91,8 @@ IMemoryStreamFactory memoryStreamFactory TransferEncodingChunked = local?.TransferEncodingChunked ?? global.TransferEncodingChunked; TcpStats = local?.EnableTcpStats ?? global.EnableTcpStats; ThreadPoolStats = local?.EnableThreadPoolStats ?? global.EnableThreadPoolStats; + MetaHeaderProvider = global.MetaHeaderProvider; + RequestMetaData = local?.RequestMetaData?.Items ?? EmptyReadOnly.Dictionary; } private readonly string _path; @@ -141,6 +143,12 @@ IMemoryStreamFactory memoryStreamFactory public Uri Uri => Node != null ? new Uri(Node.Uri, PathAndQuery) : null; public TimeSpan DnsRefreshTimeout { get; } + public MetaHeaderProvider MetaHeaderProvider { get; } + + public IReadOnlyDictionary RequestMetaData { get; } + + public bool IsAsync { get; internal set; } + public override string ToString() => $"{Method.GetStringValue()} {_path}"; // TODO This feels like its in the wrong place diff --git a/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs b/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs index f4ffcf3038a..af22fdfff78 100644 --- a/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs +++ b/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs @@ -26,7 +26,7 @@ public class BulkAllObservable : IDisposable, IObservable wh private readonly Func _retryPredicate; private Action _incrementFailed = () => { }; private Action _incrementRetries = () => { }; - private Action _bulkResponseCallback; + private readonly Action _bulkResponseCallback; public BulkAllObservable( IElasticClient client, @@ -35,7 +35,7 @@ public BulkAllObservable( ) { _client = client; - _partitionedBulkRequest = partitionedBulkRequest; + _partitionedBulkRequest = partitionedBulkRequest; _backOffRetries = _partitionedBulkRequest.BackOffRetries.GetValueOrDefault(CoordinatedRequestDefaults.BulkAllBackOffRetriesDefault); _backOffTime = _partitionedBulkRequest?.BackOffTime?.ToTimeSpan() ?? CoordinatedRequestDefaults.BulkAllBackOffTimeDefault; _bulkSize = _partitionedBulkRequest.Size ?? CoordinatedRequestDefaults.BulkAllSizeDefault; @@ -108,7 +108,20 @@ private void RefreshOnCompleted() var indices = _partitionedBulkRequest.RefreshIndices ?? _partitionedBulkRequest.Index; if (indices == null) return; - var refresh = _client.Indices.Refresh(indices); + var refresh = _client.Indices.Refresh(indices, r => r.RequestConfiguration(rc => + { + switch (_partitionedBulkRequest) + { + case IHelperCallable helperCallable when helperCallable.ParentMetaData is object: + rc.RequestMetaData(helperCallable.ParentMetaData); + break; + default: + rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData()); + break; + } + + return rc; + })); if (!refresh.IsValid) throw Throw($"Refreshing after all documents have indexed failed", refresh.ApiCall); } @@ -127,6 +140,16 @@ private async Task BulkAsync(IList buffer, long page, int ba if (request.Routing != null) s.Routing(request.Routing); if (request.WaitForActiveShards.HasValue) s.WaitForActiveShards(request.WaitForActiveShards.ToString()); + switch (_partitionedBulkRequest) + { + case IHelperCallable helperCallable when helperCallable.ParentMetaData is object: + s.RequestConfiguration(rc => rc.RequestMetaData(helperCallable.ParentMetaData)); + break; + default: + s.RequestConfiguration(rc => rc.RequestMetaData(RequestMetaDataFactory.BulkHelperRequestMetaData())); + break; + } + return s; }, _compositeCancelToken) .ConfigureAwait(false); diff --git a/src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs b/src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs index 75a3bc1e4b9..e64ed1cca9a 100644 --- a/src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs +++ b/src/Nest/Document/Multiple/BulkAll/BulkAllRequest.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Linq; +using Elasticsearch.Net; namespace Nest { @@ -101,7 +102,7 @@ public interface IBulkAllRequest where T : class Action BulkResponseCallback { get; set; } } - public class BulkAllRequest : IBulkAllRequest + public class BulkAllRequest : IBulkAllRequest, IHelperCallable where T : class { public BulkAllRequest(IEnumerable documents) @@ -163,9 +164,13 @@ public BulkAllRequest(IEnumerable documents) /// public Action BulkResponseCallback { get; set; } + + internal RequestMetaData ParentMetaData { get; set; } + + RequestMetaData IHelperCallable.ParentMetaData { get => ParentMetaData; set => ParentMetaData = value; } } - public class BulkAllDescriptor : DescriptorBase, IBulkAllRequest>, IBulkAllRequest + public class BulkAllDescriptor : DescriptorBase, IBulkAllRequest>, IBulkAllRequest, IHelperCallable where T : class { private readonly IEnumerable _documents; @@ -195,6 +200,7 @@ public BulkAllDescriptor(IEnumerable documents) Time IBulkAllRequest.Timeout { get; set; } int? IBulkAllRequest.WaitForActiveShards { get; set; } Action IBulkAllRequest.BulkResponseCallback { get; set; } + RequestMetaData IHelperCallable.ParentMetaData { get; set; } /// public BulkAllDescriptor MaxDegreeOfParallelism(int? parallelism) => diff --git a/src/Nest/Document/Multiple/Reindex/ReindexObservable.cs b/src/Nest/Document/Multiple/Reindex/ReindexObservable.cs index b808540f4fb..e6ee7ca4762 100644 --- a/src/Nest/Document/Multiple/Reindex/ReindexObservable.cs +++ b/src/Nest/Document/Multiple/Reindex/ReindexObservable.cs @@ -42,6 +42,7 @@ CancellationToken cancellationToken { _connectionSettings = connectionSettings; _reindexRequest = reindexRequest; + _client = client; _compositeCancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _compositeCancelToken = _compositeCancelTokenSource.Token; @@ -95,7 +96,8 @@ private void Reindex(IObserver observer) //by casting the observable can potentially store more meta data on the user provided observer if (observer is BulkAllObserver moreInfoObserver) observableBulk.Subscribe(moreInfoObserver); - else observableBulk.Subscribe(observer); + else + observableBulk.Subscribe(observer); } private BulkAllObservable> BulkAll(IEnumerable> scrollDocuments, @@ -120,6 +122,13 @@ private BulkAllObservable> BulkAll(IEnumerable> ScrollAll(int slices, ProducerC { var scrollAll = _reindexRequest.ScrollAll; var scroll = _reindexRequest.ScrollAll?.ScrollTime ?? TimeSpan.FromMinutes(2); - + var scrollAllRequest = new ScrollAllRequest(scroll, slices) { RoutingField = scrollAll.RoutingField, MaxDegreeOfParallelism = scrollAll.MaxDegreeOfParallelism ?? slices, Search = scrollAll.Search, - BackPressure = backPressure - }; + BackPressure = backPressure, + ParentMetaData = RequestMetaDataFactory.ReindexHelperRequestMetaData() + }; var scrollObservable = _client.ScrollAll(scrollAllRequest, _compositeCancelToken); return new GetEnumerator>() @@ -214,11 +224,15 @@ private int CreateIndex(string toIndex, IScrollAllRequest scrollAll) /// Either the number of shards from to source or the target as a slice hint to ScrollAll private int? CreateIndexIfNeeded(Indices fromIndices, string resolvedTo) { - if (_reindexRequest.OmitIndexCreation) return null; + var requestMetaData = RequestMetaDataFactory.ReindexHelperRequestMetaData(); + + if (_reindexRequest.OmitIndexCreation) + return null; var pointsToSingleSourceIndex = fromIndices.Match((a) => false, (m) => m.Indices.Count == 1); - var targetExistsAlready = _client.Indices.Exists(resolvedTo); - if (targetExistsAlready.Exists) return null; + var targetExistsAlready = _client.Indices.Exists(resolvedTo, e => e.RequestConfiguration(rc => rc.RequestMetaData(requestMetaData))); + if (targetExistsAlready.Exists) + return null; _compositeCancelToken.ThrowIfCancellationRequested(); IndexState originalIndexState = null; @@ -226,7 +240,7 @@ private int CreateIndex(string toIndex, IScrollAllRequest scrollAll) if (pointsToSingleSourceIndex) { - var getIndexResponse = _client.Indices.Get(resolvedFrom); + var getIndexResponse = _client.Indices.Get(resolvedFrom, i => i.RequestConfiguration(rc => rc.RequestMetaData(requestMetaData))); _compositeCancelToken.ThrowIfCancellationRequested(); originalIndexState = getIndexResponse.Indices[resolvedFrom]; if (_reindexRequest.OmitIndexCreation) @@ -237,6 +251,9 @@ private int CreateIndex(string toIndex, IScrollAllRequest scrollAll) (originalIndexState != null ? new CreateIndexRequest(resolvedTo, originalIndexState) : new CreateIndexRequest(resolvedTo)); + + createIndexRequest.RequestParameters.SetRequestMetaData(requestMetaData); + var createIndexResponse = _client.Indices.Create(createIndexRequest); _compositeCancelToken.ThrowIfCancellationRequested(); if (!createIndexResponse.IsValid) diff --git a/src/Nest/Document/Multiple/Reindex/ReindexRequest.cs b/src/Nest/Document/Multiple/Reindex/ReindexRequest.cs index 19d424b8e52..d8c4cd04ddc 100644 --- a/src/Nest/Document/Multiple/Reindex/ReindexRequest.cs +++ b/src/Nest/Document/Multiple/Reindex/ReindexRequest.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using Elasticsearch.Net; namespace Nest { diff --git a/src/Nest/Document/Multiple/ScrollAll/ScrollAllObservable.cs b/src/Nest/Document/Multiple/ScrollAll/ScrollAllObservable.cs index 3a75782c664..ddf9e939280 100644 --- a/src/Nest/Document/Multiple/ScrollAll/ScrollAllObservable.cs +++ b/src/Nest/Document/Multiple/ScrollAll/ScrollAllObservable.cs @@ -25,14 +25,21 @@ public class ScrollAllObservable : IDisposable, IObservable(); + + switch (_scrollAllRequest) + { + case IHelperCallable helperCallable when helperCallable.ParentMetaData is object: + _searchRequest.RequestParameters.SetRequestMetaData(helperCallable.ParentMetaData); + break; + default: + _searchRequest.RequestParameters.SetRequestMetaData(RequestMetaDataFactory.ScrollHelperRequestMetaData()); + break; + } + if (_searchRequest.Sort == null) _searchRequest.Sort = FieldSort.ByDocumentOrder; _searchRequest.RequestParameters.Scroll = _scrollAllRequest.ScrollTime.ToTimeSpan(); @@ -43,7 +50,7 @@ public ScrollAllObservable( } public bool IsDisposed { get; private set; } - + public void Dispose() { IsDisposed = true; @@ -111,6 +118,20 @@ private async Task ScrollToCompletionAsync(int slice, IObserver(request, _compositeCancelToken).ConfigureAwait(false); ThrowOnBadSearchResult(searchResult, slice, page); } diff --git a/src/Nest/Document/Multiple/ScrollAll/ScrollAllRequest.cs b/src/Nest/Document/Multiple/ScrollAll/ScrollAllRequest.cs index fe08674c756..d87bc049a3f 100644 --- a/src/Nest/Document/Multiple/ScrollAll/ScrollAllRequest.cs +++ b/src/Nest/Document/Multiple/ScrollAll/ScrollAllRequest.cs @@ -4,10 +4,11 @@ using System; using System.Linq.Expressions; +using Elasticsearch.Net; namespace Nest { - public interface IScrollAllRequest + public interface IScrollAllRequest { /// /// Simple back pressure implementation that makes sure the minimum max concurrency between producer and consumer @@ -46,7 +47,7 @@ public interface IScrollAllRequest int Slices { get; set; } } - public class ScrollAllRequest : IScrollAllRequest + public class ScrollAllRequest : IScrollAllRequest, IHelperCallable { public ScrollAllRequest(Time scrollTime, int numberOfSlices) { @@ -67,9 +68,11 @@ public ScrollAllRequest(Time scrollTime, int numberOfSlices, Field routingField) /// public Field RoutingField { get; set; } + internal RequestMetaData ParentMetaData { get; set; } + /// public ISearchRequest Search { get; set; } - + RequestMetaData IHelperCallable.ParentMetaData { get => ParentMetaData; set => ParentMetaData = value; } Time IScrollAllRequest.ScrollTime { get; set; } int IScrollAllRequest.Slices { get; set; } } diff --git a/src/Nest/Helpers/HelperIdentifiers.cs b/src/Nest/Helpers/HelperIdentifiers.cs new file mode 100644 index 00000000000..1fca4f6095c --- /dev/null +++ b/src/Nest/Helpers/HelperIdentifiers.cs @@ -0,0 +1,15 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Nest +{ + internal static class HelperIdentifiers + { + public const string SnapshotHelper = "sn"; + public const string ScrollHelper = "s"; + public const string ReindexHelper = "r"; + public const string BulkHelper = "b"; + public const string RestoreHelper = "sr"; + } +} diff --git a/src/Nest/Helpers/IHelperCallable.cs b/src/Nest/Helpers/IHelperCallable.cs new file mode 100644 index 00000000000..3dffd97cebc --- /dev/null +++ b/src/Nest/Helpers/IHelperCallable.cs @@ -0,0 +1,25 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elasticsearch.Net; + +namespace Nest +{ + /// + /// May be applied to helper requests where they may be called by an upstream helper. + /// + /// + /// For example, the reindex helper calls down into the bulk helper and scroll helpers. + /// and therefore both + /// implement this interface. + /// + internal interface IHelperCallable + { + /// + /// The of the parent helper when this requestis created by a parent + /// helper. + /// + RequestMetaData ParentMetaData { get; internal set; } + } +} diff --git a/src/Nest/Helpers/RequestMetaDataExtensions.cs b/src/Nest/Helpers/RequestMetaDataExtensions.cs new file mode 100644 index 00000000000..549efb5e3a8 --- /dev/null +++ b/src/Nest/Helpers/RequestMetaDataExtensions.cs @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using Elasticsearch.Net; + +namespace Nest +{ + internal static class RequestMetaDataExtensions + { + internal static void AddHelper(this RequestMetaData metaData, string helperValue) + { + if (!metaData.TryAddMetaData(RequestMetaData.HelperKey, helperValue)) + throw new InvalidOperationException("A helper value has already been added."); + } + + internal static void AddSnapshotHelper(this RequestMetaData metaData) => metaData.AddHelper(HelperIdentifiers.SnapshotHelper); + + internal static void AddScrollHelper(this RequestMetaData metaData) => metaData.AddHelper(HelperIdentifiers.ScrollHelper); + + internal static void AddReindexHelper(this RequestMetaData metaData) => metaData.AddHelper(HelperIdentifiers.ReindexHelper); + + internal static void AddBulkHelper(this RequestMetaData metaData) => metaData.AddHelper(HelperIdentifiers.BulkHelper); + + internal static void AddRestoreHelper(this RequestMetaData metaData) => metaData.AddHelper(HelperIdentifiers.RestoreHelper); + } + + internal static class RequestMetaDataFactory + { + internal static RequestMetaData ReindexHelperRequestMetaData() + { + var metaData = new RequestMetaData(); + metaData.AddReindexHelper(); + return metaData; + } + + internal static RequestMetaData ScrollHelperRequestMetaData() + { + var metaData = new RequestMetaData(); + metaData.AddScrollHelper(); + return metaData; + } + + internal static RequestMetaData BulkHelperRequestMetaData() + { + var metaData = new RequestMetaData(); + metaData.AddBulkHelper(); + return metaData; + } + + internal static RequestMetaData SnapshotHelperRequestMetaData() + { + var metaData = new RequestMetaData(); + metaData.AddSnapshotHelper(); + return metaData; + } + + internal static RequestMetaData RestoreHelperRequestMetaData() + { + var metaData = new RequestMetaData(); + metaData.AddRestoreHelper(); + return metaData; + } + } +} diff --git a/src/Nest/Modules/SnapshotAndRestore/Restore/RestoreObservable/RestoreObservable.cs b/src/Nest/Modules/SnapshotAndRestore/Restore/RestoreObservable/RestoreObservable.cs index 7e98bba4dcd..fe6daf75c7f 100644 --- a/src/Nest/Modules/SnapshotAndRestore/Restore/RestoreObservable/RestoreObservable.cs +++ b/src/Nest/Modules/SnapshotAndRestore/Restore/RestoreObservable/RestoreObservable.cs @@ -22,7 +22,7 @@ public class RestoreObservable : IDisposable, IObservable _errorEventHandlers; private EventHandler _nextEventHandlers; private Timer _timer; - + public RestoreObservable(IElasticClient elasticClient, IRestoreRequest restoreRequest) { elasticClient.ThrowIfNull(nameof(elasticClient)); @@ -30,7 +30,7 @@ public RestoreObservable(IElasticClient elasticClient, IRestoreRequest restoreRe _elasticClient = elasticClient; _restoreRequest = restoreRequest; - + _restoreRequest.RequestParameters.SetRequestMetaData(RequestMetaDataFactory.RestoreHelperRequestMetaData()); _restoreStatusHumbleObject = new RestoreStatusHumbleObject(elasticClient, restoreRequest); _restoreStatusHumbleObject.Completed += StopTimer; _restoreStatusHumbleObject.Error += StopTimer; @@ -162,7 +162,6 @@ public RestoreStatusHumbleObject(IElasticClient elasticClient, IRestoreRequest r _elasticClient = elasticClient; _restoreRequest = restoreRequest; - _renamePattern = string.IsNullOrEmpty(_restoreRequest.RenamePattern) ? string.Empty : _restoreRequest.RenamePattern; _renameReplacement = string.IsNullOrEmpty(_restoreRequest.RenameReplacement) ? string.Empty : _restoreRequest.RenameReplacement; } @@ -183,10 +182,13 @@ public void CheckStatus() )) .ToArray(); - var recoveryStatus = _elasticClient.Indices.RecoveryStatus(new RecoveryStatusRequest(indices) + var recoveryStatusRequest = new RecoveryStatusRequest(indices) { Detailed = true, - }); + RequestConfiguration = new RequestConfiguration() + }; + recoveryStatusRequest.RequestConfiguration.SetRequestMetaData(RequestMetaDataFactory.RestoreHelperRequestMetaData()); + var recoveryStatus = _elasticClient.Indices.RecoveryStatus(recoveryStatusRequest); if (!recoveryStatus.IsValid) throw new ElasticsearchClientException(PipelineFailure.BadResponse, "Failed getting recovery status.", recoveryStatus.ApiCall); diff --git a/src/Nest/Modules/SnapshotAndRestore/Snapshot/SnapshotObservable/SnapshotObservable.cs b/src/Nest/Modules/SnapshotAndRestore/Snapshot/SnapshotObservable/SnapshotObservable.cs index 7c94bb2d43b..f556b3a2023 100644 --- a/src/Nest/Modules/SnapshotAndRestore/Snapshot/SnapshotObservable/SnapshotObservable.cs +++ b/src/Nest/Modules/SnapshotAndRestore/Snapshot/SnapshotObservable/SnapshotObservable.cs @@ -16,7 +16,7 @@ public class SnapshotObservable : IDisposable, IObservable _completedEentHandler; + private EventHandler _completedEventHandler; private bool _disposed; private EventHandler _errorEventHandler; private EventHandler _nextEventHandler; @@ -29,6 +29,7 @@ public SnapshotObservable(IElasticClient elasticClient, ISnapshotRequest snapsho _elasticClient = elasticClient; _snapshotRequest = snapshotRequest; + _snapshotRequest.RequestParameters.SetRequestMetaData(RequestMetaDataFactory.SnapshotHelperRequestMetaData()); _snapshotStatusHumbleObject = new SnapshotStatusHumbleObject(elasticClient, snapshotRequest); _snapshotStatusHumbleObject.Completed += StopTimer; _snapshotStatusHumbleObject.Error += StopTimer; @@ -62,7 +63,7 @@ public IDisposable Subscribe(IObserver observer) EventHandler onError = (sender, args) => observer.OnError(args.Exception); _nextEventHandler = onNext; - _completedEentHandler = onCompleted; + _completedEventHandler = onCompleted; _errorEventHandler = onError; _snapshotStatusHumbleObject.Next += onNext; @@ -113,7 +114,7 @@ protected virtual void Dispose(bool disposing) if (_snapshotStatusHumbleObject != null) { _snapshotStatusHumbleObject.Next -= _nextEventHandler; - _snapshotStatusHumbleObject.Completed -= _completedEentHandler; + _snapshotStatusHumbleObject.Completed -= _completedEventHandler; _snapshotStatusHumbleObject.Error -= _errorEventHandler; _snapshotStatusHumbleObject.Completed -= StopTimer; @@ -151,7 +152,7 @@ public class SnapshotStatusHumbleObject { private readonly IElasticClient _elasticClient; private readonly ISnapshotRequest _snapshotRequest; - + public SnapshotStatusHumbleObject(IElasticClient elasticClient, ISnapshotRequest snapshotRequest) { elasticClient.ThrowIfNull(nameof(elasticClient)); @@ -169,9 +170,15 @@ public void CheckStatus() { try { + var snapshotRequest = new SnapshotStatusRequest(_snapshotRequest.RepositoryName, _snapshotRequest.Snapshot) + { + RequestConfiguration = new RequestConfiguration() + }; + + snapshotRequest.RequestConfiguration.SetRequestMetaData(RequestMetaDataFactory.SnapshotHelperRequestMetaData()); + var snapshotStatusResponse = - _elasticClient.Snapshot.Status(new SnapshotStatusRequest(_snapshotRequest.RepositoryName, - _snapshotRequest.Snapshot)); + _elasticClient.Snapshot.Status(snapshotRequest); if (!snapshotStatusResponse.IsValid) throw new ElasticsearchClientException(PipelineFailure.BadResponse, "Failed to get snapshot status.", @@ -194,19 +201,19 @@ public void CheckStatus() protected virtual void OnNext(SnapshotNextEventArgs nextEventArgs) { var handler = Next; - if (handler != null) handler(this, nextEventArgs); + handler?.Invoke(this, nextEventArgs); } protected virtual void OnCompleted(SnapshotCompletedEventArgs completedEventArgs) { var handler = Completed; - if (handler != null) handler(this, completedEventArgs); + handler?.Invoke(this, completedEventArgs); } protected virtual void OnError(SnapshotErrorEventArgs errorEventArgs) { var handler = Error; - if (handler != null) handler(this, errorEventArgs); + handler?.Invoke(this, errorEventArgs); } } } diff --git a/tests/Tests/ClientConcepts/Connection/HttpConnectionTests.cs b/tests/Tests/ClientConcepts/Connection/HttpConnectionTests.cs index b73d10e10d9..ee850f36abe 100644 --- a/tests/Tests/ClientConcepts/Connection/HttpConnectionTests.cs +++ b/tests/Tests/ClientConcepts/Connection/HttpConnectionTests.cs @@ -15,6 +15,7 @@ using Tests.Core.ManagedElasticsearch.Clusters; using HttpMethod = Elasticsearch.Net.HttpMethod; using FluentAssertions; +using System.Text.RegularExpressions; namespace Tests.ClientConcepts.Connection { @@ -57,8 +58,7 @@ [I] public async Task RespectsDnsRefreshTimeout() connection.InUseHandlers.Should().Be(1); connection.RemovedHandlers.Should().Be(1); } - - + [I] public async Task MultipleInstancesOfHttpClientWhenRequestTimeoutChanges() => await MultipleInstancesOfHttpClientWhen(() => CreateRequestData(TimeSpan.FromSeconds(30))); @@ -96,7 +96,9 @@ private RequestData CreateRequestData( Uri proxyAddress = null, bool disableAutomaticProxyDetection = false, bool httpCompression = false, - bool transferEncodingChunked = false + bool transferEncodingChunked = false, + bool disableMetaHeader = false, + Action requestMetaData = null ) { if (requestTimeout == default) requestTimeout = TimeSpan.FromSeconds(10); @@ -107,13 +109,23 @@ private RequestData CreateRequestData( .DnsRefreshTimeout(dnsRefreshTimeout ?? ConnectionConfiguration.DefaultDnsRefreshTimeout) .DisableAutomaticProxyDetection(disableAutomaticProxyDetection) .TransferEncodingChunked(transferEncodingChunked) - .EnableHttpCompression(httpCompression); + .EnableHttpCompression(httpCompression) + .DisableMetaHeader(disableMetaHeader); if (proxyAddress != null) connectionSettings.Proxy(proxyAddress, null, (string)null); + var requestParameters = new SearchRequestParameters(); + + if (requestMetaData is object) + { + requestParameters.RequestConfiguration ??= new RequestConfiguration(); + requestParameters.RequestConfiguration.RequestMetaData ??= new RequestMetaData(); + requestMetaData(requestParameters.RequestConfiguration.RequestMetaData); + } + var requestData = new RequestData(HttpMethod.POST, "/_search", "{ \"query\": { \"match_all\" : { } } }", connectionSettings, - new SearchRequestParameters(), + requestParameters, new RecyclableMemoryStreamFactory()) { Node = node }; return requestData; @@ -182,6 +194,53 @@ [I] public async Task HttpClientSetsContentLengthWhenTransferEncodingChunkedHttp await connection.RequestAsync(requestData, CancellationToken.None).ConfigureAwait(false); } + [I] public async Task HttpClientSetsMetaHeaderWhenNotDisabled() + { + var regex = new Regex(@"^[a-z]{1,}=[a-z0-9\.\-]{1,}(?:,[a-z]{1,}=[a-z0-9\.\-]+)*$"); + + var requestData = CreateRequestData(); + var connection = new TestableHttpConnection(responseMessage => + { + responseMessage.RequestMessage.Headers.TryGetValues("x-elastic-client-meta", out var headerValue).Should().BeTrue(); + headerValue.Should().HaveCount(1); + headerValue.Single().Should().NotBeNullOrEmpty(); + regex.Match(headerValue.Single()).Success.Should().BeTrue(); + }); + + connection.Request(requestData); + await connection.RequestAsync(requestData, CancellationToken.None).ConfigureAwait(false); + } + + [I] public async Task HttpClientSetsMetaHeaderWithHelperWhenNotDisabled() + { + var regex = new Regex(@"^[a-z]{1,}=[a-z0-9\.\-]{1,}(?:,[a-z]{1,}=[a-z0-9\.\-]+)*$"); + + var requestData = CreateRequestData(requestMetaData: m => m.TryAddMetaData("helper", "r")); + var connection = new TestableHttpConnection(responseMessage => + { + responseMessage.RequestMessage.Headers.TryGetValues("x-elastic-client-meta", out var headerValue).Should().BeTrue(); + headerValue.Should().HaveCount(1); + headerValue.Single().Should().NotBeNullOrEmpty(); + headerValue.Single().Should().EndWith(",h=r"); + regex.Match(headerValue.Single()).Success.Should().BeTrue(); + }); + + connection.Request(requestData); + await connection.RequestAsync(requestData, CancellationToken.None).ConfigureAwait(false); + } + + [I] public async Task HttpClientShouldNotSetMetaHeaderWhenDisabled() + { + var requestData = CreateRequestData(disableMetaHeader: true); + var connection = new TestableHttpConnection(responseMessage => + { + responseMessage.RequestMessage.Headers.TryGetValues("x-elastic-client-meta", out var headerValue).Should().BeFalse(); + }); + + connection.Request(requestData); + await connection.RequestAsync(requestData, CancellationToken.None).ConfigureAwait(false); + } + public class TestableHttpConnection : HttpConnection { private readonly Action _response; diff --git a/tests/Tests/Connection/MetaData/MetaHeaderProviderTests.cs b/tests/Tests/Connection/MetaData/MetaHeaderProviderTests.cs new file mode 100644 index 00000000000..9415bdabe09 --- /dev/null +++ b/tests/Tests/Connection/MetaData/MetaHeaderProviderTests.cs @@ -0,0 +1,105 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Text.RegularExpressions; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using Elasticsearch.Net; +using FluentAssertions; +using Nest; + +namespace Tests.Core.Connection.MetaData +{ + public class MetaHeaderProviderTests + { + private readonly Regex _validHeaderRegex = new Regex(@"^[a-z]{1,}=[a-z0-9\.\-]{1,}(?:,[a-z]{1,}=[a-z0-9\.\-]+)*$"); + private readonly Regex _validVersionRegex = new Regex(@"^[0-9]{1,2}\.[0-9]{1,2}(?:\.[0-9]{1,3})?p?$"); + private readonly Regex _validHttpClientPart = new Regex(@"^[a-z]{2,3}=[0-9]{1,2}\.[0-9]{1,2}(?:\.[0-9]{1,3})?p?$"); + + [U] public void HeaderName_ReturnsExpectedValue() + { + var sut = new MetaHeaderProvider(); + sut.HeaderName.Should().Be("x-elastic-client-meta"); + } + + [U] public void HeaderName_ReturnsNullWhenDisabled() + { + var sut = new MetaHeaderProvider(); + + var connectionSettings = new ConnectionSettings() + .DisableMetaHeader(true); + + var requestData = new RequestData(HttpMethod.POST, "/_search", "{}", connectionSettings, + new SearchRequestParameters(), + new RecyclableMemoryStreamFactory()); + + sut.ProduceHeaderValue(requestData).Should().BeNull(); + } + + [U] public void HeaderName_ReturnsExpectedValue_ForSyncRequest_WhenNotDisabled() + { + var sut = new MetaHeaderProvider(); + + var connectionSettings = new ConnectionSettings(); + + var requestData = new RequestData(HttpMethod.POST, "/_search", "{}", connectionSettings, + new SearchRequestParameters(), + new RecyclableMemoryStreamFactory()) + { + IsAsync = false + }; + + var result = sut.ProduceHeaderValue(requestData); + + _validHeaderRegex.Match(result).Success.Should().BeTrue(); + + var parts = result.Split(','); + parts.Length.Should().Be(4); + + parts[0].Should().StartWith("es="); + var clientVersion = parts[0].Substring(3); + _validVersionRegex.Match(clientVersion).Success.Should().BeTrue(); + + parts[1].Should().Be("a=0"); + + parts[2].Should().StartWith("net="); + var runtimeVersion = parts[2].Substring(4); + _validVersionRegex.Match(runtimeVersion).Success.Should().BeTrue(); + + _validHttpClientPart.Match(parts[3]).Success.Should().BeTrue(); + } + + [U] public void HeaderName_ReturnsExpectedValue_ForAsyncRequest_WhenNotDisabled() + { + var sut = new MetaHeaderProvider(); + + var connectionSettings = new ConnectionSettings(); + + var requestData = new RequestData(HttpMethod.POST, "/_search", "{}", connectionSettings, + new SearchRequestParameters(), + new RecyclableMemoryStreamFactory()) + { + IsAsync = true + }; + + var result = sut.ProduceHeaderValue(requestData); + + _validHeaderRegex.Match(result).Success.Should().BeTrue(); + + var parts = result.Split(','); + parts.Length.Should().Be(4); + + parts[0].Should().StartWith("es="); + var clientVersion = parts[0].Substring(3); + _validVersionRegex.Match(clientVersion).Success.Should().BeTrue(); + + parts[1].Should().Be("a=1"); + + parts[2].Should().StartWith("net="); + var runtimeVersion = parts[2].Substring(4); + _validVersionRegex.Match(runtimeVersion).Success.Should().BeTrue(); + + _validHttpClientPart.Match(parts[3]).Success.Should().BeTrue(); + } + } +} diff --git a/tests/Tests/Connection/MetaData/VersionInfoTests.cs b/tests/Tests/Connection/MetaData/VersionInfoTests.cs new file mode 100644 index 00000000000..292414276a9 --- /dev/null +++ b/tests/Tests/Connection/MetaData/VersionInfoTests.cs @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using Elasticsearch.Net; +using FluentAssertions; + +namespace Tests.Core.Connection.MetaData +{ + public class VersionInfoTests + { + [U] public void ToString_ReturnsExpectedValue_ForNonPrerelease() + { + var sut = new TestVersionInfo("1.2.3", false); + sut.ToString().Should().Be("1.2.3"); + } + + [U] public void ToString_ReturnsExpectedValue_ForPrerelease() + { + var sut = new TestVersionInfo("1.2.3", true); + sut.ToString().Should().Be("1.2.3p"); + } + + private class TestVersionInfo : VersionInfo + { + public TestVersionInfo(string version, bool isPrerelease) + { + Version = new Version(version); + IsPrerelease = isPrerelease; + } + } + } +} diff --git a/tests/Tests/MetaHeader/MetaHeaderHelperTests.cs b/tests/Tests/MetaHeader/MetaHeaderHelperTests.cs new file mode 100644 index 00000000000..20524caaa97 --- /dev/null +++ b/tests/Tests/MetaHeader/MetaHeaderHelperTests.cs @@ -0,0 +1,248 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using Elasticsearch.Net; +using FluentAssertions; +using Nest; + +namespace Tests.MetaHeader +{ + public class MetaHeaderHelperTests + { + [U] public void BulkAllHelperRequestsIncludeExpectedHelperMetaData() + { + var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); + + // We can avoid specifying response bodies and this still exercises all requests. + var responses = new List<(int, string)> + { + (200, "{}"), + (200, "{}"), + (200, "{}") + }; + + var connection = new TestableInMemoryConnection(a => + a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("b"), responses); + var settings = new ConnectionSettings(pool, connection); + var client = new ElasticClient(settings); + + var documents = CreateLazyStreamOfDocuments(20); + + var observableBulk = client.BulkAll(documents, f => f + .MaxDegreeOfParallelism(8) + .BackOffTime(TimeSpan.FromSeconds(10)) + .BackOffRetries(2) + .Size(10) + .RefreshOnCompleted() + .Index("an-index") + ); + + var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => + { + foreach (var item in b.Items) + { + item.IsValid.Should().BeTrue(); + item.Id.Should().NotBeNullOrEmpty(); + } + }); + + connection.AssertExpectedCallCount(); + } + + [U] public void ScrollAllHelperRequestsIncludeExpectedHelperMetaData() + { + var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); + + var responses = new List<(int, string)> + { + (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":0,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}"), + (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[{\"_index\":\"index-a\",\"_type\":\"_doc\",\"_id\":\"ISXw0HYBAJbnbq7-Utq6\",\"_score\":null,\"_source\":{\"name\": \"name-a\"},\"sort\":[0]}]}}"), + (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":1,\"timed_out\":false,\"terminated_early\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}") + }; + + var connection = new TestableInMemoryConnection(a => + a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("s"), responses); + var settings = new ConnectionSettings(pool, connection); + var client = new ElasticClient(settings); + + var documents = CreateLazyStreamOfDocuments(20); + + var observableScroll = client.ScrollAll("5s", 2, s => s.Search(ss => ss.Size(2).Index("index-a"))); + var bulkObserver = observableScroll.Wait(TimeSpan.FromMinutes(5), _ => { }); + + connection.AssertExpectedCallCount(); + } + + [U] public void ReindexHelperRequestsIncludeExpectedHelperMetaData() + { + var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); + + var responses = new List<(int, string)> + { + (404, "{}"), + (200, "{\"index-a\":{\"aliases\":{},\"mappings\":{\"properties\":{\"name\":{\"type\":\"keyword\"}}},\"settings\":{\"index\":{\"routing\":{\"allocation\":{\"include\":{\"_tier_preference\":\"data_content\"}}},\"number_of_shards\":\"1\",\"provided_name\":\"index-a\",\"creation_date\":\"1609823178261\",\"number_of_replicas\":\"1\",\"uuid\":\"2R4H1VfTR5imfmIPkNIIxw\",\"version\":{\"created\":\"7100099\"}}}}}"), + (200, "{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"index-b\"}"), + (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":0,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}"), + (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":0,\"timed_out\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[{\"_index\":\"index-a\",\"_type\":\"_doc\",\"_id\":\"ISXw0HYBAJbnbq7-Utq6\",\"_score\":null,\"_source\":{\"name\": \"name-a\"},\"sort\":[0]}]}}"), + (200, "{\"_scroll_id\":\"SCROLLID\",\"took\":1,\"timed_out\":false,\"terminated_early\":false,\"_shards\":{\"total\":1,\"successful\":1,\"skipped\":0,\"failed\":0},\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"},\"max_score\":null,\"hits\":[]}}"), + (200, "{\"took\":4,\"errors\":false,\"items\":[{\"index\":{\"_index\":\"index-b\",\"_type\":\"_doc\",\"_id\":\"ISXw0HYBAJbnbq7-Utq6\",\"_version\":1,\"result\":\"created\",\"_shards\":{\"total\":2,\"successful\":1,\"failed\":0},\"_seq_no\":0,\"_primary_term\":1,\"status\":201}}]}") + }; + + var connection = new TestableInMemoryConnection(a => + a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("r"), responses); + var settings = new ConnectionSettings(pool, connection); + var client = new ElasticClient(settings); + + var reindexObserver = client.Reindex(r => r + .ScrollAll("5s", 2, s => s.Search(ss => ss.Size(2).Index("index-a"))) + .BulkAll(b => b.Size(1).Index("index-b"))) + .Wait(TimeSpan.FromMinutes(1), _ => { }); + + connection.AssertExpectedCallCount(); + } + + [U] public void SnapshotHelperRequestsIncludeExpectedHelperMetaData() + { + var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); + + // We can avoid specifying response bodies and this still exercises all requests. + var responses = new List<(int, string)> + { + (200, "{}"), + (200, "{}") + }; + + var connection = new TestableInMemoryConnection(a => + a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("sn"), responses); + var settings = new ConnectionSettings(pool, connection); + var client = new ElasticClient(settings); + + var observableSnapshot = new SnapshotObservable(client, new SnapshotRequest("repository-a", "snapshot-a")); + var observer = new SnapshotObserver(connection); + using var subscription = observableSnapshot.Subscribe(observer); + } + + private class SnapshotObserver : IObserver + { + private readonly TestableInMemoryConnection _connection; + + public SnapshotObserver(TestableInMemoryConnection connection) => _connection = connection; + + public void OnCompleted() => _connection.AssertExpectedCallCount(); + public void OnError(Exception error) => throw new NotImplementedException(); + public void OnNext(SnapshotStatusResponse value) { } + } + + [U] public void RestoreHelperRequestsIncludeExpectedHelperMetaData() + { + var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); + + // We can avoid specifying response bodies and this still exercises all requests. + var responses = new List<(int, string)> + { + (200, "{}"), + (200, "{}") + }; + + var connection = new TestableInMemoryConnection(a => + a.RequestMetaData.Single(x => x.Key == "helper").Value.Should().Be("sr"), responses); + var settings = new ConnectionSettings(pool, connection); + var client = new ElasticClient(settings); + + var observableRestore = new RestoreObservable(client, new RestoreRequest("repository-a", "snapshot-a")); + var observer = new RestoreObserver(connection); + using var subscription = observableRestore.Subscribe(observer); + } + + private class RestoreObserver : IObserver + { + private readonly TestableInMemoryConnection _connection; + + public RestoreObserver(TestableInMemoryConnection connection) => _connection = connection; + + public void OnCompleted() => _connection.AssertExpectedCallCount(); + public void OnError(Exception error) => throw new NotImplementedException(); + public void OnNext(RecoveryStatusResponse value) { } + } + + protected static IEnumerable CreateLazyStreamOfDocuments(int count) + { + for (var i = 0; i < count; i++) + yield return new SmallObject { Name = i.ToString() }; + } + + protected class SmallObject + { + public string Name { get; set; } + } + + protected class TestableInMemoryConnection : IConnection + { + internal static readonly byte[] EmptyBody = Encoding.UTF8.GetBytes(""); + + private readonly Action _perRequestAssertion; + private readonly List<(int, string)> _responses; + private int _requestCounter = -1; + + public TestableInMemoryConnection(Action assertion, List<(int, string)> responses) + { + _perRequestAssertion = assertion; + _responses = responses; + } + + public void AssertExpectedCallCount() => _requestCounter.Should().Be(_responses.Count - 1); + + async Task IConnection.RequestAsync(RequestData requestData, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _requestCounter); + + _perRequestAssertion(requestData); + + await Task.Yield(); // avoids test deadlocks + + int statusCode; + string response; + + if (_responses.Count > _requestCounter) + (statusCode, response) = _responses[_requestCounter]; + else + (statusCode, response) = (500, (string)null); + + var stream = !string.IsNullOrEmpty(response) ? requestData.MemoryStreamFactory.Create(Encoding.UTF8.GetBytes(response)) : requestData.MemoryStreamFactory.Create(EmptyBody); + + return await ResponseBuilder + .ToResponseAsync(requestData, null, statusCode, null, stream, RequestData.MimeType, cancellationToken) + .ConfigureAwait(false); + } + + TResponse IConnection.Request(RequestData requestData) + { + Interlocked.Increment(ref _requestCounter); + + _perRequestAssertion(requestData); + + int statusCode; + string response; + + if (_responses.Count > _requestCounter) + (statusCode, response) = _responses[_requestCounter]; + else + (statusCode, response) = (200, (string)null); + + var stream = !string.IsNullOrEmpty(response) ? requestData.MemoryStreamFactory.Create(Encoding.UTF8.GetBytes(response)) : requestData.MemoryStreamFactory.Create(EmptyBody); + + return ResponseBuilder.ToResponse(requestData, null, statusCode, null, stream, RequestData.MimeType); + } + + public void Dispose() { } + } + } +}