Skip to content

Commit 4cfdabc

Browse files
committed
pipeline exceptions surface responses
1 parent 5b934bd commit 4cfdabc

File tree

8 files changed

+229
-77
lines changed

8 files changed

+229
-77
lines changed

src/Elasticsearch.Net/Connection/InMemoryConnection.cs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ public class InMemoryConnection : IConnection
1212
private readonly byte[] _responseBody;
1313
private readonly int _statusCode;
1414

15-
public InMemoryConnection()
15+
public InMemoryConnection()
1616
{
1717
_statusCode = 200;
1818
}
1919

20-
public InMemoryConnection(byte[] responseBody, int statusCode = 200)
20+
public InMemoryConnection(byte[] responseBody, int statusCode = 200)
2121
{
2222
_responseBody = responseBody;
2323
_statusCode = statusCode;
@@ -26,27 +26,27 @@ public InMemoryConnection(byte[] responseBody, int statusCode = 200)
2626
public virtual Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData) where TReturn : class =>
2727
Task.FromResult(this.ReturnConnectionStatus<TReturn>(requestData));
2828

29-
public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData requestData) where TReturn : class =>
29+
public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData requestData) where TReturn : class =>
3030
this.ReturnConnectionStatus<TReturn>(requestData);
3131

3232
protected ElasticsearchResponse<TReturn> ReturnConnectionStatus<TReturn>(RequestData requestData, byte[] responseBody = null, int? statusCode = null)
3333
where TReturn : class
3434
{
3535
var body = responseBody ?? _responseBody;
36-
var data = requestData.PostData;
37-
if (data != null)
38-
{
39-
using (var stream = new MemoryStream())
40-
{
41-
if (requestData.HttpCompression)
42-
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
43-
data.Write(zipStream, requestData.ConnectionSettings);
44-
else
45-
data.Write(stream, requestData.ConnectionSettings);
46-
}
47-
}
48-
49-
var builder = new ResponseBuilder<TReturn>(requestData)
36+
var data = requestData.PostData;
37+
if (data != null)
38+
{
39+
using (var stream = new MemoryStream())
40+
{
41+
if (requestData.HttpCompression)
42+
using (var zipStream = new GZipStream(stream, CompressionMode.Compress))
43+
data.Write(zipStream, requestData.ConnectionSettings);
44+
else
45+
data.Write(stream, requestData.ConnectionSettings);
46+
}
47+
}
48+
49+
var builder = new ResponseBuilder<TReturn>(requestData)
5050
{
5151
StatusCode = statusCode ?? this._statusCode,
5252
Stream = (body != null) ? new MemoryStream(body) : null
@@ -57,6 +57,6 @@ protected ElasticsearchResponse<TReturn> ReturnConnectionStatus<TReturn>(Request
5757

5858
void IDisposable.Dispose() => DisposeManagedResources();
5959

60-
protected virtual void DisposeManagedResources() {}
60+
protected virtual void DisposeManagedResources() { }
6161
}
6262
}

src/Elasticsearch.Net/Transport/Pipeline/PipelineException.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ public class PipelineException : Exception
1010
{
1111
public PipelineFailure FailureReason { get; }
1212

13+
public IApiCallDetails Response { get; internal set; }
14+
1315
public bool Recoverable =>
1416
FailureReason == PipelineFailure.BadResponse
1517
|| FailureReason == PipelineFailure.PingFailure;
@@ -27,7 +29,7 @@ public PipelineException(string message)
2729
this.FailureReason = PipelineFailure.BadResponse;
2830
}
2931

30-
public PipelineException(PipelineFailure failure, Exception innerException)
32+
public PipelineException(PipelineFailure failure, Exception innerException)
3133
: base(GetMessage(failure), innerException)
3234
{
3335
this.FailureReason = failure;

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public IEnumerable<Node> NextNode()
191191
yield break;
192192
}
193193

194-
//This for loop allows to break out of the view state machine if we need to
194+
//This for loop allows to break out of the view state machine if we need to
195195
//force a refresh (after reseeding connectionpool). We have a hardcoded limit of only
196196
//allowing 100 of these refreshes per call
197197
var refreshed = false;
@@ -243,13 +243,14 @@ public void Ping(Node node)
243243
var response = this._connection.Request<VoidResponse>(pingData);
244244
ThrowBadAuthPipelineExceptionWhenNeeded(response);
245245
//ping should not silently accept bad but valid http responses
246-
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse);
246+
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse) { Response = response };
247247
}
248248
catch (Exception e)
249249
{
250+
var response = (e as PipelineException)?.Response;
250251
audit.Event = PingFailure;
251252
audit.Exception = e;
252-
throw new PipelineException(PipelineFailure.PingFailure, e);
253+
throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response };
253254
}
254255
}
255256
}
@@ -266,21 +267,22 @@ public async Task PingAsync(Node node)
266267
var response = await this._connection.RequestAsync<VoidResponse>(pingData).ConfigureAwait(false);
267268
ThrowBadAuthPipelineExceptionWhenNeeded(response);
268269
//ping should not silently accept bad but valid http responses
269-
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse);
270+
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse) { Response = response };
270271
}
271272
catch (Exception e)
272273
{
274+
var response = (e as PipelineException)?.Response;
273275
audit.Event = PingFailure;
274276
audit.Exception = e;
275-
throw new PipelineException(PipelineFailure.PingFailure, e);
276-
}
277+
throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response };
278+
}
277279
}
278280
}
279281

280-
private void ThrowBadAuthPipelineExceptionWhenNeeded<TReturn>(ElasticsearchResponse<TReturn> response)
282+
private void ThrowBadAuthPipelineExceptionWhenNeeded(IApiCallDetails response)
281283
{
282284
if (response.HttpStatusCode == 401)
283-
throw new PipelineException(PipelineFailure.BadAuthentication, response.OriginalException);
285+
throw new PipelineException(PipelineFailure.BadAuthentication, response.OriginalException) { Response = response };
284286
}
285287

286288
public string SniffPath => "_nodes/_all/settings?flat_settings&timeout=" + this.PingTimeout.ToTimeUnit();
@@ -319,7 +321,7 @@ public void Sniff()
319321
var response = this._connection.Request<SniffResponse>(requestData);
320322
ThrowBadAuthPipelineExceptionWhenNeeded(response);
321323
//sniff should not silently accept bad but valid http responses
322-
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse);
324+
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse) { Response = response };
323325
var nodes = response.Body.ToNodes(this._connectionPool.UsingSsl);
324326
this._connectionPool.Reseed(nodes);
325327
this.Refresh = true;
@@ -352,7 +354,7 @@ public async Task SniffAsync()
352354
var response = await this._connection.RequestAsync<SniffResponse>(requestData).ConfigureAwait(false);
353355
ThrowBadAuthPipelineExceptionWhenNeeded(response);
354356
//sniff should not silently accept bad but valid http responses
355-
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse);
357+
if (!response.Success) throw new PipelineException(PipelineFailure.BadResponse) { Response = response };
356358
this._connectionPool.Reseed(response.Body.ToNodes(this._connectionPool.UsingSsl));
357359
this.Refresh = true;
358360
return;
@@ -426,40 +428,43 @@ public async Task<ElasticsearchResponse<TReturn>> CallElasticsearchAsync<TReturn
426428
public void BadResponse<TReturn>(ref ElasticsearchResponse<TReturn> response, RequestData data, List<PipelineException> pipelineExceptions)
427429
where TReturn : class
428430
{
431+
var callDetails = response ?? pipelineExceptions.LastOrDefault()?.Response;
429432
var pipelineFailure = PipelineFailure.BadResponse;
430433
if (pipelineExceptions.HasAny())
431434
pipelineFailure = pipelineExceptions.Last().FailureReason;
432435

433436
var innerException = pipelineExceptions.HasAny()
434437
? new AggregateException(pipelineExceptions)
435-
: response?.OriginalException;
438+
: callDetails?.OriginalException;
436439

437440
var exceptionMessage = innerException?.Message ?? "Could not complete the request to Elasticsearch.";
438441

439442
if (this.IsTakingTooLong)
440443
{
441-
pipelineFailure = PipelineFailure.MaxTimeoutReached;
444+
pipelineFailure = PipelineFailure.MaxTimeoutReached;
442445
this.Audit(MaxTimeoutReached);
443446
exceptionMessage = "Maximum timout reached while retrying request";
444447
}
445448
else if (this.Retried >= this.MaxRetries && this.MaxRetries > 0)
446449
{
447-
pipelineFailure = PipelineFailure.MaxRetriesReached;
450+
pipelineFailure = PipelineFailure.MaxRetriesReached;
448451
this.Audit(MaxRetriesReached);
449452
exceptionMessage = "Maximum number of retries reached.";
450453
}
451454

452455
var clientException = new ElasticsearchClientException(pipelineFailure, exceptionMessage, innerException)
453456
{
454457
Request = data,
455-
Response = response,
458+
Response = callDetails,
456459
AuditTrail = this.AuditTrail
457460
};
458461

459462
if (_settings.ThrowExceptions) throw clientException;
460463

461464
if (response == null)
465+
{
462466
response = new ResponseBuilder<TReturn>(data) { Exception = clientException }.ToResponse();
467+
}
463468

464469
response.AuditTrail = this.AuditTrail;
465470
}
@@ -468,4 +473,4 @@ public void BadResponse<TReturn>(ref ElasticsearchResponse<TReturn> response, Re
468473

469474
protected virtual void Dispose() { }
470475
}
471-
}
476+
}

src/Tests/ClientConcepts/ConnectionPooling/Exceptions/UnrecoverableExceptions.doc.cs

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
using System;
22
using System.Linq;
33
using System.Net;
4+
using System.Text;
45
using System.Threading.Tasks;
56
using Elasticsearch.Net;
67
using FluentAssertions;
8+
using Nest;
79
using Tests.Framework;
10+
using Tests.Framework.MockData;
811

912
namespace Tests.ClientConcepts.ConnectionPooling.Exceptions
1013
{
1114
public class UnrecoverableExceptions
1215
{
13-
/** == Unrecoverable exceptions
14-
* Unrecoverable exceptions are excepted exceptions that are grounds to exit the client pipeline immediately.
15-
* By default the client won't throw on any ElasticsearchClientException but return an invalid response.
16+
/** == Unrecoverable exceptions
17+
* Unrecoverable exceptions are excepted exceptions that are grounds to exit the client pipeline immediately.
18+
* By default the client won't throw on any ElasticsearchClientException but return an invalid response.
1619
* You can configure the client to throw using ThrowExceptions() on ConnectionSettings. The following test
17-
* both a client that throws and one that returns an invalid response with an `.OriginalException` exposed
20+
* both a client that throws and one that returns an invalid response with an `.OriginalException` exposed
1821
*/
1922

2023
[U] public void SomePipelineFailuresAreRecoverable()
@@ -59,5 +62,92 @@ [U] public async Task BadAuthenticationIsUnrecoverable()
5962
}
6063
);
6164
}
65+
66+
private static byte[] ResponseHtml = Encoding.UTF8.GetBytes(@"<html>
67+
<head><title>401 Authorization Required</title></head>
68+
<body bgcolor=""white"">
69+
<center><h1>401 Authorization Required</h1></center>
70+
<hr><center>nginx/1.4.6 (Ubuntu)</center>
71+
</body>
72+
</html>
73+
");
74+
[U] public async Task BadAuthenticationHtmlResponseIsIgnored()
75+
{
76+
var audit = new Auditor(() => Framework.Cluster
77+
.Nodes(10)
78+
.Ping(r => r.SucceedAlways())
79+
.ClientCalls(r => r.FailAlways(401).ReturnResponse(ResponseHtml))
80+
.StaticConnectionPool()
81+
.AllDefaults()
82+
);
83+
84+
audit = await audit.TraceElasticsearchException(
85+
new ClientCall {
86+
{ AuditEvent.PingSuccess, 9200 },
87+
{ AuditEvent.BadResponse, 9200 },
88+
},
89+
(e) =>
90+
{
91+
e.FailureReason.Should().Be(PipelineFailure.BadAuthentication);
92+
e.Response.ResponseBodyInBytes.Should().BeNull();
93+
}
94+
);
95+
}
96+
97+
[U] public async Task BadAuthenticationHtmlResponseStillExposedWhenUsingDisableDirectStreaming()
98+
{
99+
var audit = new Auditor(() => Framework.Cluster
100+
.Nodes(10)
101+
.Ping(r => r.SucceedAlways())
102+
.ClientCalls(r => r.FailAlways(401).ReturnResponse(ResponseHtml))
103+
.StaticConnectionPool()
104+
.Settings(s=>s.DisableDirectStreaming())
105+
);
106+
107+
audit = await audit.TraceElasticsearchException(
108+
new ClientCall {
109+
{ AuditEvent.PingSuccess, 9200 },
110+
{ AuditEvent.BadResponse, 9200 },
111+
},
112+
(e) =>
113+
{
114+
e.FailureReason.Should().Be(PipelineFailure.BadAuthentication);
115+
e.Response.ResponseBodyInBytes.Should().NotBeNull();
116+
var responseString = Encoding.UTF8.GetString(e.Response.ResponseBodyInBytes);
117+
responseString.Should().Contain("nginx/");
118+
e.DebugInformation.Should().Contain("nginx/");
119+
}
120+
);
121+
}
122+
123+
[U] public async Task BadAuthOnGetClientCallDoesNotThrowSerializationException()
124+
{
125+
var audit = new Auditor(() => Framework.Cluster
126+
.Nodes(10)
127+
.Ping(r => r.SucceedAlways())
128+
.ClientCalls(r => r.FailAlways(401).ReturnResponse(ResponseHtml))
129+
.StaticConnectionPool()
130+
.Settings(s=>s.DisableDirectStreaming().DefaultIndex("default-index"))
131+
.ClientProxiesTo(
132+
(c, r) => c.Get<Project>("1", s=>s.RequestConfiguration(r)),
133+
async (c, r) => await c.GetAsync<Project>("1", s=>s.RequestConfiguration(r)) as IResponse
134+
)
135+
);
136+
137+
audit = await audit.TraceElasticsearchException(
138+
new ClientCall {
139+
{ AuditEvent.PingSuccess, 9200 },
140+
{ AuditEvent.BadResponse, 9200 },
141+
},
142+
(e) =>
143+
{
144+
e.FailureReason.Should().Be(PipelineFailure.BadAuthentication);
145+
e.Response.ResponseBodyInBytes.Should().NotBeNull();
146+
var responseString = Encoding.UTF8.GetString(e.Response.ResponseBodyInBytes);
147+
responseString.Should().Contain("nginx/");
148+
e.DebugInformation.Should().Contain("nginx/");
149+
}
150+
);
151+
}
62152
}
63153
}

src/Tests/Framework/Audit/Auditor.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ private Auditor(VirtualizedCluster cluster, VirtualizedCluster clusterAsync)
2727
_clusterAsync = clusterAsync;
2828
}
2929

30-
public ISearchResponse<Project> Response { get; internal set; }
31-
public ISearchResponse<Project> ResponseAsync { get; internal set; }
30+
public IResponse Response { get; internal set; }
31+
public IResponse ResponseAsync { get; internal set; }
3232

3333
public List<Audit> AsyncAuditTrail { get; set; }
3434
public List<Audit> AuditTrail { get; set; }
@@ -170,7 +170,7 @@ public async Task<Auditor> TraceCalls(params ClientCall[] audits)
170170
var auditor = this;
171171
foreach (var a in audits.Select((a, i)=> new { a, i }))
172172
{
173-
auditor = await auditor.TraceCall(a.a, a.i);
173+
auditor = await auditor.TraceCall(a.a, a.i);
174174
}
175175
return auditor;
176176
}
@@ -197,4 +197,4 @@ private static void AssertTrailOnResponse(ClientCall callTrace, List<Audit> audi
197197
callTrace.Count.Should().Be(auditTrail.Count);
198198
}
199199
}
200-
}
200+
}

0 commit comments

Comments
 (0)