Skip to content

Fix #1856 respect DisableDirectStreaming setting on error responses #1888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ private ElasticsearchResponse<TReturn> Initialize(int? statusCode, Exception exc
}

private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
{
{
byte[] bytes = null;
if (NeedsToEagerReadStream())
{
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
stream.CopyTo(inMemoryStream, BufferSize);
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
}

if (response.Success)
{
byte[] bytes = null;
if (NeedsToEagerReadStream())
{
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
stream.CopyTo(inMemoryStream, BufferSize);
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
}

if (!SetSpecialTypes(stream, response, bytes))
{
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
Expand All @@ -76,21 +76,23 @@ private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
ServerError serverError;
if (ServerError.TryCreate(stream, out serverError))
response.ServerError = serverError;
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
response.ResponseBodyInBytes = bytes;
}
}

private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream stream)
{
{
byte[] bytes = null;
if (NeedsToEagerReadStream())
{
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false);
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
}

if (response.Success)
{
byte[] bytes = null;
if (NeedsToEagerReadStream())
{
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false);
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
}

if (!SetSpecialTypes(stream, response, bytes))
{
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
Expand All @@ -100,6 +102,8 @@ private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream
else if (response.HttpStatusCode != null)
{
response.ServerError = await ServerError.TryCreateAsync(stream, this._requestData.CancellationToken).ConfigureAwait(false);
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
response.ResponseBodyInBytes = bytes;
}
}

Expand Down
83 changes: 83 additions & 0 deletions src/Tests/ClientConcepts/LowLevel/DirectStreaming.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Tests.Framework;

namespace Tests.ClientConcepts.LowLevel
{
public class DirectStreaming
{
[U]
public void DisableDirectStreamingOnError()
{
Action<IResponse> assert = r =>
{
r.ApiCall.Should().NotBeNull();
r.ApiCall.RequestBodyInBytes.Should().NotBeNull();
r.ApiCall.ResponseBodyInBytes.Should().NotBeNull();
};

var client = TestClient.GetFixedReturnClient(new { }, 404, s => s.DisableDirectStreaming());
var response = client.Search<object>(s => s);
assert(response);
response = client.SearchAsync<object>(s => s).Result;
assert(response);
}

[U]
public void EnableDirectStreamingOnError()
{
Action<IResponse> assert = r =>
{
r.ApiCall.Should().NotBeNull();
r.ApiCall.RequestBodyInBytes.Should().BeNull();
r.ApiCall.ResponseBodyInBytes.Should().BeNull();
};

var client = TestClient.GetFixedReturnClient(new { }, 404);
var response = client.Search<object>(s => s);
assert(response);
response = client.SearchAsync<object>(s => s).Result;
assert(response);
}

[U]
public void DisableDirectStreamingOnSuccess()
{
Action<IResponse> assert = r =>
{
r.ApiCall.Should().NotBeNull();
r.ApiCall.RequestBodyInBytes.Should().NotBeNull();
r.ApiCall.ResponseBodyInBytes.Should().NotBeNull();
};

var client = TestClient.GetFixedReturnClient(new { }, 200, s => s.DisableDirectStreaming());
var response = client.Search<object>(s => s);
assert(response);
response = client.SearchAsync<object>(s => s).Result;
assert(response);
}

[U]
public void EnableDirectStreamingOnSuccess()
{
Action<IResponse> assert = r =>
{
r.ApiCall.Should().NotBeNull();
r.ApiCall.RequestBodyInBytes.Should().BeNull();
r.ApiCall.ResponseBodyInBytes.Should().BeNull();
};

var client = TestClient.GetFixedReturnClient(new { });
var response = client.Search<object>(s => s);
assert(response);
response = client.SearchAsync<object>(s => s).Result;
assert(response);
}
}
}
8 changes: 5 additions & 3 deletions src/Tests/Framework/TestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static IConnection CreateConnection(ConnectionSettings settings = null, b
? ((IConnection)new HttpConnection())
: new InMemoryConnection();

public static IElasticClient GetFixedReturnClient(object responseJson)
public static IElasticClient GetFixedReturnClient(
object responseJson, int statusCode = 200, Func<ConnectionSettings, ConnectionSettings> modifySettings = null)
{
var serializer = new JsonNetSerializer(new ConnectionSettings());
byte[] fixedResult;
Expand All @@ -85,9 +86,10 @@ public static IElasticClient GetFixedReturnClient(object responseJson)
serializer.Serialize(responseJson, ms);
fixedResult = ms.ToArray();
}
var connection = new InMemoryConnection(fixedResult);
var connection = new InMemoryConnection(fixedResult, statusCode);
var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var settings = new ConnectionSettings(connectionPool, connection);
var defaultSettings = new ConnectionSettings(connectionPool, connection);
var settings = (modifySettings != null) ? modifySettings(defaultSettings) : defaultSettings;
return new ElasticClient(settings);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Tests/tests.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# mode either u (unit test), i (integration test) or m (mixed mode)
mode: i
mode: u
# the elasticsearch version that should be started
elasticsearch_version: 2.0.1
# whether we want to forcefully reseed on the node, if you are starting the tests with a node already running
Expand Down