diff --git a/src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs b/src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs index 1dccde4232a..22acf195560 100644 --- a/src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs +++ b/src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs @@ -54,17 +54,17 @@ private ElasticsearchResponse Initialize(int? statusCode, Exception exc } private void SetBody(ElasticsearchResponse response, Stream stream) - { + { + byte[] bytes = null; + if (NeedsToEagerReadStream()) + { + var inMemoryStream = this._requestData.MemoryStreamFactory.Create(); + stream.CopyTo(inMemoryStream, BufferSize); + bytes = this.SwapStreams(ref stream, ref inMemoryStream); + } + if (response.Success) { - byte[] bytes = null; - if (NeedsToEagerReadStream()) - { - var inMemoryStream = this._requestData.MemoryStreamFactory.Create(); - stream.CopyTo(inMemoryStream, BufferSize); - bytes = this.SwapStreams(ref stream, ref inMemoryStream); - } - if (!SetSpecialTypes(stream, response, bytes)) { if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn; @@ -76,21 +76,23 @@ private void SetBody(ElasticsearchResponse response, Stream stream) ServerError serverError; if (ServerError.TryCreate(stream, out serverError)) response.ServerError = serverError; + if (this._requestData.ConnectionSettings.DisableDirectStreaming) + response.ResponseBodyInBytes = bytes; } } private async Task SetBodyAsync(ElasticsearchResponse response, Stream stream) - { + { + byte[] bytes = null; + if (NeedsToEagerReadStream()) + { + var inMemoryStream = this._requestData.MemoryStreamFactory.Create(); + await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false); + bytes = this.SwapStreams(ref stream, ref inMemoryStream); + } + if (response.Success) { - byte[] bytes = null; - if (NeedsToEagerReadStream()) - { - var inMemoryStream = this._requestData.MemoryStreamFactory.Create(); - await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false); - bytes = this.SwapStreams(ref stream, ref inMemoryStream); - } - if (!SetSpecialTypes(stream, response, bytes)) { if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn; @@ -100,6 +102,8 @@ private async Task SetBodyAsync(ElasticsearchResponse response, Stream else if (response.HttpStatusCode != null) { response.ServerError = await ServerError.TryCreateAsync(stream, this._requestData.CancellationToken).ConfigureAwait(false); + if (this._requestData.ConnectionSettings.DisableDirectStreaming) + response.ResponseBodyInBytes = bytes; } } diff --git a/src/Tests/ClientConcepts/LowLevel/DirectStreaming.cs b/src/Tests/ClientConcepts/LowLevel/DirectStreaming.cs new file mode 100644 index 00000000000..db96343261a --- /dev/null +++ b/src/Tests/ClientConcepts/LowLevel/DirectStreaming.cs @@ -0,0 +1,83 @@ +using Elasticsearch.Net; +using FluentAssertions; +using Nest; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Tests.Framework; + +namespace Tests.ClientConcepts.LowLevel +{ + public class DirectStreaming + { + [U] + public void DisableDirectStreamingOnError() + { + Action assert = r => + { + r.ApiCall.Should().NotBeNull(); + r.ApiCall.RequestBodyInBytes.Should().NotBeNull(); + r.ApiCall.ResponseBodyInBytes.Should().NotBeNull(); + }; + + var client = TestClient.GetFixedReturnClient(new { }, 404, s => s.DisableDirectStreaming()); + var response = client.Search(s => s); + assert(response); + response = client.SearchAsync(s => s).Result; + assert(response); + } + + [U] + public void EnableDirectStreamingOnError() + { + Action assert = r => + { + r.ApiCall.Should().NotBeNull(); + r.ApiCall.RequestBodyInBytes.Should().BeNull(); + r.ApiCall.ResponseBodyInBytes.Should().BeNull(); + }; + + var client = TestClient.GetFixedReturnClient(new { }, 404); + var response = client.Search(s => s); + assert(response); + response = client.SearchAsync(s => s).Result; + assert(response); + } + + [U] + public void DisableDirectStreamingOnSuccess() + { + Action assert = r => + { + r.ApiCall.Should().NotBeNull(); + r.ApiCall.RequestBodyInBytes.Should().NotBeNull(); + r.ApiCall.ResponseBodyInBytes.Should().NotBeNull(); + }; + + var client = TestClient.GetFixedReturnClient(new { }, 200, s => s.DisableDirectStreaming()); + var response = client.Search(s => s); + assert(response); + response = client.SearchAsync(s => s).Result; + assert(response); + } + + [U] + public void EnableDirectStreamingOnSuccess() + { + Action assert = r => + { + r.ApiCall.Should().NotBeNull(); + r.ApiCall.RequestBodyInBytes.Should().BeNull(); + r.ApiCall.ResponseBodyInBytes.Should().BeNull(); + }; + + var client = TestClient.GetFixedReturnClient(new { }); + var response = client.Search(s => s); + assert(response); + response = client.SearchAsync(s => s).Result; + assert(response); + } + } +} diff --git a/src/Tests/Framework/TestClient.cs b/src/Tests/Framework/TestClient.cs index 70d06251bcf..97da680ea0e 100644 --- a/src/Tests/Framework/TestClient.cs +++ b/src/Tests/Framework/TestClient.cs @@ -76,7 +76,8 @@ public static IConnection CreateConnection(ConnectionSettings settings = null, b ? ((IConnection)new HttpConnection()) : new InMemoryConnection(); - public static IElasticClient GetFixedReturnClient(object responseJson) + public static IElasticClient GetFixedReturnClient( + object responseJson, int statusCode = 200, Func modifySettings = null) { var serializer = new JsonNetSerializer(new ConnectionSettings()); byte[] fixedResult; @@ -85,9 +86,10 @@ public static IElasticClient GetFixedReturnClient(object responseJson) serializer.Serialize(responseJson, ms); fixedResult = ms.ToArray(); } - var connection = new InMemoryConnection(fixedResult); + var connection = new InMemoryConnection(fixedResult, statusCode); var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); - var settings = new ConnectionSettings(connectionPool, connection); + var defaultSettings = new ConnectionSettings(connectionPool, connection); + var settings = (modifySettings != null) ? modifySettings(defaultSettings) : defaultSettings; return new ElasticClient(settings); } diff --git a/src/Tests/tests.yaml b/src/Tests/tests.yaml index cb0f500286c..0bf5e1a4827 100644 --- a/src/Tests/tests.yaml +++ b/src/Tests/tests.yaml @@ -1,5 +1,5 @@ # mode either u (unit test), i (integration test) or m (mixed mode) -mode: i +mode: u # the elasticsearch version that should be started elasticsearch_version: 2.0.1 # whether we want to forcefully reseed on the node, if you are starting the tests with a node already running