Skip to content

Commit 225e844

Browse files
committed
Merge pull request #1888 from elastic/fix/1856
Fix #1856 respect DisableDirectStreaming setting on error responses
2 parents 236ef25 + a9a07fa commit 225e844

File tree

4 files changed

+111
-22
lines changed

4 files changed

+111
-22
lines changed

src/Elasticsearch.Net/Transport/Pipeline/ResponseBuilder.cs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,17 @@ private ElasticsearchResponse<TReturn> Initialize(int? statusCode, Exception exc
5454
}
5555

5656
private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
57-
{
57+
{
58+
byte[] bytes = null;
59+
if (NeedsToEagerReadStream())
60+
{
61+
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
62+
stream.CopyTo(inMemoryStream, BufferSize);
63+
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
64+
}
65+
5866
if (response.Success)
5967
{
60-
byte[] bytes = null;
61-
if (NeedsToEagerReadStream())
62-
{
63-
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
64-
stream.CopyTo(inMemoryStream, BufferSize);
65-
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
66-
}
67-
6868
if (!SetSpecialTypes(stream, response, bytes))
6969
{
7070
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
@@ -76,21 +76,23 @@ private void SetBody(ElasticsearchResponse<TReturn> response, Stream stream)
7676
ServerError serverError;
7777
if (ServerError.TryCreate(stream, out serverError))
7878
response.ServerError = serverError;
79+
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
80+
response.ResponseBodyInBytes = bytes;
7981
}
8082
}
8183

8284
private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream stream)
83-
{
85+
{
86+
byte[] bytes = null;
87+
if (NeedsToEagerReadStream())
88+
{
89+
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
90+
await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false);
91+
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
92+
}
93+
8494
if (response.Success)
8595
{
86-
byte[] bytes = null;
87-
if (NeedsToEagerReadStream())
88-
{
89-
var inMemoryStream = this._requestData.MemoryStreamFactory.Create();
90-
await stream.CopyToAsync(inMemoryStream, BufferSize, this._requestData.CancellationToken).ConfigureAwait(false);
91-
bytes = this.SwapStreams(ref stream, ref inMemoryStream);
92-
}
93-
9496
if (!SetSpecialTypes(stream, response, bytes))
9597
{
9698
if (this._requestData.CustomConverter != null) response.Body = this._requestData.CustomConverter(response, stream) as TReturn;
@@ -100,6 +102,8 @@ private async Task SetBodyAsync(ElasticsearchResponse<TReturn> response, Stream
100102
else if (response.HttpStatusCode != null)
101103
{
102104
response.ServerError = await ServerError.TryCreateAsync(stream, this._requestData.CancellationToken).ConfigureAwait(false);
105+
if (this._requestData.ConnectionSettings.DisableDirectStreaming)
106+
response.ResponseBodyInBytes = bytes;
103107
}
104108
}
105109

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using Elasticsearch.Net;
2+
using FluentAssertions;
3+
using Nest;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading.Tasks;
9+
using Tests.Framework;
10+
11+
namespace Tests.ClientConcepts.LowLevel
12+
{
13+
public class DirectStreaming
14+
{
15+
[U]
16+
public void DisableDirectStreamingOnError()
17+
{
18+
Action<IResponse> assert = r =>
19+
{
20+
r.ApiCall.Should().NotBeNull();
21+
r.ApiCall.RequestBodyInBytes.Should().NotBeNull();
22+
r.ApiCall.ResponseBodyInBytes.Should().NotBeNull();
23+
};
24+
25+
var client = TestClient.GetFixedReturnClient(new { }, 404, s => s.DisableDirectStreaming());
26+
var response = client.Search<object>(s => s);
27+
assert(response);
28+
response = client.SearchAsync<object>(s => s).Result;
29+
assert(response);
30+
}
31+
32+
[U]
33+
public void EnableDirectStreamingOnError()
34+
{
35+
Action<IResponse> assert = r =>
36+
{
37+
r.ApiCall.Should().NotBeNull();
38+
r.ApiCall.RequestBodyInBytes.Should().BeNull();
39+
r.ApiCall.ResponseBodyInBytes.Should().BeNull();
40+
};
41+
42+
var client = TestClient.GetFixedReturnClient(new { }, 404);
43+
var response = client.Search<object>(s => s);
44+
assert(response);
45+
response = client.SearchAsync<object>(s => s).Result;
46+
assert(response);
47+
}
48+
49+
[U]
50+
public void DisableDirectStreamingOnSuccess()
51+
{
52+
Action<IResponse> assert = r =>
53+
{
54+
r.ApiCall.Should().NotBeNull();
55+
r.ApiCall.RequestBodyInBytes.Should().NotBeNull();
56+
r.ApiCall.ResponseBodyInBytes.Should().NotBeNull();
57+
};
58+
59+
var client = TestClient.GetFixedReturnClient(new { }, 200, s => s.DisableDirectStreaming());
60+
var response = client.Search<object>(s => s);
61+
assert(response);
62+
response = client.SearchAsync<object>(s => s).Result;
63+
assert(response);
64+
}
65+
66+
[U]
67+
public void EnableDirectStreamingOnSuccess()
68+
{
69+
Action<IResponse> assert = r =>
70+
{
71+
r.ApiCall.Should().NotBeNull();
72+
r.ApiCall.RequestBodyInBytes.Should().BeNull();
73+
r.ApiCall.ResponseBodyInBytes.Should().BeNull();
74+
};
75+
76+
var client = TestClient.GetFixedReturnClient(new { });
77+
var response = client.Search<object>(s => s);
78+
assert(response);
79+
response = client.SearchAsync<object>(s => s).Result;
80+
assert(response);
81+
}
82+
}
83+
}

src/Tests/Framework/TestClient.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public static IConnection CreateConnection(ConnectionSettings settings = null, b
7676
? ((IConnection)new HttpConnection())
7777
: new InMemoryConnection();
7878

79-
public static IElasticClient GetFixedReturnClient(object responseJson)
79+
public static IElasticClient GetFixedReturnClient(
80+
object responseJson, int statusCode = 200, Func<ConnectionSettings, ConnectionSettings> modifySettings = null)
8081
{
8182
var serializer = new JsonNetSerializer(new ConnectionSettings());
8283
byte[] fixedResult;
@@ -85,9 +86,10 @@ public static IElasticClient GetFixedReturnClient(object responseJson)
8586
serializer.Serialize(responseJson, ms);
8687
fixedResult = ms.ToArray();
8788
}
88-
var connection = new InMemoryConnection(fixedResult);
89+
var connection = new InMemoryConnection(fixedResult, statusCode);
8990
var connectionPool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
90-
var settings = new ConnectionSettings(connectionPool, connection);
91+
var defaultSettings = new ConnectionSettings(connectionPool, connection);
92+
var settings = (modifySettings != null) ? modifySettings(defaultSettings) : defaultSettings;
9193
return new ElasticClient(settings);
9294
}
9395

src/Tests/tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# mode either u (unit test), i (integration test) or m (mixed mode)
2-
mode: i
2+
mode: u
33
# the elasticsearch version that should be started
44
elasticsearch_version: 2.0.1
55
# whether we want to forcefully reseed on the node, if you are starting the tests with a node already running

0 commit comments

Comments
 (0)