Skip to content

Commit f510809

Browse files
committed
Support product checking on virtual cluster
1 parent 82c1b65 commit f510809

File tree

5 files changed

+147
-59
lines changed

5 files changed

+147
-59
lines changed

src/Elasticsearch.Net.VirtualizedCluster/Audit/Auditor.cs

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,17 @@ public void ChangeTime(Func<DateTime, DateTime> selector)
5353
public async Task<Auditor> TraceStartup(ClientCall callTrace = null)
5454
{
5555
//synchronous code path
56-
_cluster = _cluster ?? Cluster();
56+
_cluster ??= Cluster();
5757
if (!StartedUp) AssertPoolBeforeStartup?.Invoke(_cluster.ConnectionPool);
5858
AssertPoolBeforeCall?.Invoke(_cluster.ConnectionPool);
59+
// ReSharper disable once MethodHasAsyncOverload
5960
Response = _cluster.ClientCall(callTrace?.RequestOverrides);
6061
AuditTrail = Response.ApiCall.AuditTrail;
6162
if (!StartedUp) AssertPoolAfterStartup?.Invoke(_cluster.ConnectionPool);
6263
AssertPoolAfterCall?.Invoke(_cluster.ConnectionPool);
6364

6465
//async code path
65-
_clusterAsync = _clusterAsync ?? Cluster();
66+
_clusterAsync ??= Cluster();
6667
if (!StartedUp) AssertPoolBeforeStartup?.Invoke(_clusterAsync.ConnectionPool);
6768
AssertPoolBeforeCall?.Invoke(_clusterAsync.ConnectionPool);
6869
ResponseAsync = await _clusterAsync.ClientCallAsync(callTrace?.RequestOverrides).ConfigureAwait(false);
@@ -71,19 +72,25 @@ public async Task<Auditor> TraceStartup(ClientCall callTrace = null)
7172
AssertPoolAfterCall?.Invoke(_clusterAsync.ConnectionPool);
7273
return new Auditor(_cluster, _clusterAsync);
7374
}
74-
75+
7576
public async Task<Auditor> TraceCall(ClientCall callTrace, int nthCall = 0)
7677
{
7778
await TraceStartup(callTrace).ConfigureAwait(false);
78-
return AssertAuditTrails(callTrace, nthCall);
79+
return AssertAuditTrails(callTrace, nthCall, true);
80+
}
81+
82+
public async Task<Auditor> TraceCall(bool skipProductCheck, ClientCall callTrace, int nthCall = 0)
83+
{
84+
await TraceStartup(callTrace).ConfigureAwait(false);
85+
return AssertAuditTrails(callTrace, nthCall, skipProductCheck);
7986
}
8087

8188
#pragma warning disable 1998 // Async method lacks 'await' operators and will run synchronously
8289
private async Task TraceException<TException>(ClientCall callTrace, Action<TException> assert)
8390
#pragma warning restore 1998 // Async method lacks 'await' operators and will run synchronously
8491
where TException : ElasticsearchClientException
8592
{
86-
_cluster = _cluster ?? Cluster();
93+
_cluster ??= Cluster();
8794
_cluster.ClientThrows(true);
8895
AssertPoolBeforeCall?.Invoke(_cluster.ConnectionPool);
8996

@@ -94,7 +101,7 @@ private async Task TraceException<TException>(ClientCall callTrace, Action<TExce
94101
AuditTrail = exception.AuditTrail;
95102
AssertPoolAfterCall?.Invoke(_cluster.ConnectionPool);
96103

97-
_clusterAsync = _clusterAsync ?? Cluster();
104+
_clusterAsync ??= Cluster();
98105
_clusterAsync.ClientThrows(true);
99106
Func<Task> callAsync = async () => ResponseAsync = await _clusterAsync.ClientCallAsync(callTrace?.RequestOverrides).ConfigureAwait(false);
100107
exception = await TryCallAsync(callAsync, assert).ConfigureAwait(false);
@@ -121,7 +128,7 @@ public async Task<Auditor> TraceUnexpectedElasticsearchException(ClientCall call
121128
public async Task<Auditor> TraceElasticsearchExceptionOnResponse(ClientCall callTrace, Action<ElasticsearchClientException> assert)
122129
#pragma warning restore 1998
123130
{
124-
_cluster = _cluster ?? Cluster();
131+
_cluster ??= Cluster();
125132
_cluster.ClientThrows(false);
126133
AssertPoolBeforeCall?.Invoke(_cluster.ConnectionPool);
127134

@@ -130,14 +137,15 @@ public async Task<Auditor> TraceElasticsearchExceptionOnResponse(ClientCall call
130137

131138
if (Response.ApiCall.Success) throw new Exception("Expected call to not be valid");
132139

133-
var exception = Response.ApiCall.OriginalException as ElasticsearchClientException;
134-
if (exception == null) throw new Exception("OriginalException on response is not expected ElasticsearchClientException");
140+
if (Response.ApiCall.OriginalException is not ElasticsearchClientException exception)
141+
throw new Exception("OriginalException on response is not expected ElasticsearchClientException");
142+
135143
assert(exception);
136144

137145
AuditTrail = exception.AuditTrail;
138146
AssertPoolAfterCall?.Invoke(_cluster.ConnectionPool);
139147

140-
_clusterAsync = _clusterAsync ?? Cluster();
148+
_clusterAsync ??= Cluster();
141149
_clusterAsync.ClientThrows(false);
142150
Func<Task> callAsync = async () => { ResponseAsync = await _clusterAsync.ClientCallAsync(callTrace?.RequestOverrides).ConfigureAwait(false); };
143151
await callAsync().ConfigureAwait(false);
@@ -157,30 +165,38 @@ public async Task<Auditor> TraceElasticsearchExceptionOnResponse(ClientCall call
157165
public async Task<Auditor> TraceUnexpectedException(ClientCall callTrace, Action<UnexpectedElasticsearchClientException> assert)
158166
#pragma warning restore 1998
159167
{
160-
_cluster = _cluster ?? Cluster();
168+
_cluster ??= Cluster();
161169
AssertPoolBeforeCall?.Invoke(_cluster.ConnectionPool);
162170

163-
Action call = () => Response = _cluster.ClientCall(callTrace?.RequestOverrides);
164-
var exception = TryCall(call, assert);
171+
172+
var exception = TryCall(Call, assert);
165173
assert(exception);
166174

167175
AuditTrail = exception.AuditTrail;
168176
AssertPoolAfterCall?.Invoke(_cluster.ConnectionPool);
169177

170-
_clusterAsync = _clusterAsync ?? Cluster();
171-
Func<Task> callAsync = async () => ResponseAsync = await _clusterAsync.ClientCallAsync(callTrace?.RequestOverrides).ConfigureAwait(false);
172-
exception = await TryCallAsync(callAsync, assert).ConfigureAwait(false);
178+
_clusterAsync ??= Cluster();
179+
180+
exception = await TryCallAsync(CallAsync, assert).ConfigureAwait(false);
173181
assert(exception);
174182

175183
AsyncAuditTrail = exception.AuditTrail;
176184
AssertPoolAfterCall?.Invoke(_clusterAsync.ConnectionPool);
185+
177186
return new Auditor(_cluster, _clusterAsync);
187+
188+
void Call() => Response = _cluster.ClientCall(callTrace?.RequestOverrides);
189+
async Task CallAsync() => ResponseAsync = await _clusterAsync.ClientCallAsync(callTrace?.RequestOverrides).ConfigureAwait(false);
178190
}
179191

180-
private Auditor AssertAuditTrails(ClientCall callTrace, int nthCall)
192+
private Auditor AssertAuditTrails(ClientCall callTrace, int nthCall, bool skipProductCheck)
181193
{
182194
var nl = Environment.NewLine;
183-
if (AuditTrail.Count != AsyncAuditTrail.Count)
195+
196+
if (skipProductCheck)
197+
AuditTrail.RemoveAll(a => a.Event is AuditEvent.ProductCheckOnStartup or AuditEvent.ProductCheckSuccess or AuditEvent.ProductCheckFailure);
198+
199+
if (AuditTrail.Count(Predicate) != AsyncAuditTrail.Count(Predicate))
184200
throw new Exception($"{nthCall} has a mismatch between sync and async. {nl}async:{AuditTrail}{nl}sync:{AsyncAuditTrail}");
185201

186202
AssertTrailOnResponse(callTrace, AuditTrail, true, nthCall);
@@ -192,8 +208,13 @@ private Auditor AssertAuditTrails(ClientCall callTrace, int nthCall)
192208
callTrace?.AssertPoolAfterCall?.Invoke(_cluster.ConnectionPool);
193209
callTrace?.AssertPoolAfterCall?.Invoke(_clusterAsync.ConnectionPool);
194210
return new Auditor(_cluster, _clusterAsync);
195-
}
196211

212+
// These happen one time only so should not be counted when comparing equality of audit trails
213+
static bool Predicate(Net.Audit auditEvent) =>
214+
auditEvent.Event != AuditEvent.ProductCheckOnStartup &&
215+
auditEvent.Event != AuditEvent.ProductCheckFailure &&
216+
auditEvent.Event != AuditEvent.ProductCheckSuccess;
217+
}
197218

198219
public void VisualizeCalls(int numberOfCalls)
199220
{
@@ -218,14 +239,24 @@ private static string AuditTrailToString(List<Elasticsearch.Net.Audit> auditTrai
218239
return actualAuditTrail;
219240
}
220241

242+
221243
public async Task<Auditor> TraceCalls(params ClientCall[] audits)
222244
{
223245
var auditor = this;
224-
foreach (var a in audits.Select((a, i) => new { a, i })) auditor = await auditor.TraceCall(a.a, a.i).ConfigureAwait(false);
246+
foreach (var a in audits.Select((a, i) => new { a, i }))
247+
auditor = await auditor.TraceCall(a.a, a.i).ConfigureAwait(false);
248+
return auditor;
249+
}
250+
251+
public async Task<Auditor> TraceCalls(bool skipProductCheck, params ClientCall[] audits)
252+
{
253+
var auditor = this;
254+
foreach (var a in audits.Select((a, i) => new { a, i }))
255+
auditor = await auditor.TraceCall(skipProductCheck, a.a, a.i).ConfigureAwait(false);
225256
return auditor;
226257
}
227258

228-
private static void AssertTrailOnResponse(ClientCall callTrace, List<Elasticsearch.Net.Audit> auditTrail, bool sync, int nthCall)
259+
private static void AssertTrailOnResponse(ClientCall callTrace, IReadOnlyCollection<Net.Audit> auditTrail, bool sync, int nthCall)
229260
{
230261
var typeOfTrail = (sync ? "synchronous" : "asynchronous") + " audit trail";
231262
var nthClientCall = (nthCall + 1).ToOrdinal();
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System;
6+
7+
namespace Elasticsearch.Net.VirtualizedCluster.Rules
8+
{
9+
public class ProductCheckRule : RuleBase<ProductCheckRule>
10+
{
11+
private IRule Self => this;
12+
13+
public ProductCheckRule Fails(RuleOption<TimesHelper.AllTimes, int> times, RuleOption<Exception, int> errorState = null)
14+
{
15+
Self.Times = times;
16+
Self.Succeeds = false;
17+
Self.Return = errorState;
18+
return this;
19+
}
20+
21+
public ProductCheckRule Succeeds(RuleOption<TimesHelper.AllTimes, int> times, int? validResponseCode = 200)
22+
{
23+
Self.Times = times;
24+
Self.Succeeds = true;
25+
Self.Return = validResponseCode;
26+
return this;
27+
}
28+
29+
public ProductCheckRule SucceedAlways(int? validResponseCode = 200) => Succeeds(TimesHelper.Always, validResponseCode);
30+
31+
public ProductCheckRule FailAlways(RuleOption<Exception, int> errorState = null) => Fails(TimesHelper.Always, errorState);
32+
}
33+
}

src/Elasticsearch.Net.VirtualizedCluster/VirtualCluster.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,23 @@ public class VirtualCluster
1515
{
1616
private readonly List<Node> _nodes;
1717

18-
public VirtualCluster(IEnumerable<Node> nodes) => _nodes = nodes.ToList();
18+
public VirtualCluster(IEnumerable<Node> nodes, bool productCheckSucceeds = true)
19+
{
20+
_nodes = nodes.ToList();
21+
22+
if (productCheckSucceeds)
23+
ProductCheckRules.Add(new ProductCheckRule().SucceedAlways());
24+
}
1925

20-
public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
21-
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
26+
public List<IClientCallRule> ClientCallRules { get; } = new();
27+
public TestableDateTimeProvider DateTimeProvider { get; } = new();
2228

2329
public IReadOnlyList<Node> Nodes => _nodes;
24-
public List<IRule> PingingRules { get; } = new List<IRule>();
2530

26-
public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
31+
public List<IRule> PingingRules { get; } = new();
32+
public List<ISniffRule> SniffingRules { get; } = new();
33+
public List<IRule> ProductCheckRules { get; } = new();
34+
2735
internal string PublishAddressOverride { get; private set; }
2836

2937
internal bool SniffShouldReturnFqnd { get; private set; }
@@ -85,6 +93,12 @@ public VirtualCluster Sniff(Func<SniffRule, ISniffRule> selector)
8593
SniffingRules.Add(selector(new SniffRule()));
8694
return this;
8795
}
96+
97+
public VirtualCluster ProductCheck(Func<ProductCheckRule, IRule> selector)
98+
{
99+
ProductCheckRules.Add(selector(new ProductCheckRule()));
100+
return this;
101+
}
88102

89103
public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector)
90104
{

src/Elasticsearch.Net.VirtualizedCluster/VirtualClusterConnection.cs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
using System.Collections.Generic;
77
using System.IO;
88
using System.Linq;
9-
#if DOTNETCORE
10-
using TheException = System.Net.Http.HttpRequestException;
11-
#else
12-
using TheException = System.Net.WebException;
13-
#endif
149
using System.Threading;
1510
using System.Threading.Tasks;
1611
using Elasticsearch.Net.VirtualizedCluster.MockResponses;
1712
using Elasticsearch.Net.VirtualizedCluster.Providers;
1813
using Elasticsearch.Net.VirtualizedCluster.Rules;
14+
#if DOTNETCORE
15+
using TheException = System.Net.Http.HttpRequestException;
16+
#else
17+
using TheException = System.Net.WebException;
18+
#endif
1919

2020
namespace Elasticsearch.Net.VirtualizedCluster
2121
{
@@ -31,7 +31,7 @@ namespace Elasticsearch.Net.VirtualizedCluster
3131
/// </summary>
3232
public class VirtualClusterConnection : InMemoryConnection
3333
{
34-
private static readonly object Lock = new object();
34+
private static readonly object Lock = new();
3535

3636
private static byte[] _defaultResponseBytes;
3737

@@ -101,6 +101,9 @@ public bool IsPingRequest(RequestData requestData) =>
101101
requestData.Method == HttpMethod.HEAD &&
102102
(requestData.PathAndQuery == string.Empty || requestData.PathAndQuery.StartsWith("?"));
103103

104+
public bool IsProductCheckRequest(RequestData requestData) =>
105+
requestData.Uri.AbsolutePath.Equals("/", StringComparison.Ordinal) && requestData.Method == HttpMethod.GET;
106+
104107
public override TResponse Request<TResponse>(RequestData requestData)
105108
{
106109
if (!_calls.ContainsKey(requestData.Uri.Port))
@@ -117,8 +120,8 @@ public override TResponse Request<TResponse>(RequestData requestData)
117120
nameof(VirtualCluster.Sniff),
118121
_cluster.SniffingRules,
119122
requestData.RequestTimeout,
120-
(r) => UpdateCluster(r.NewClusterState),
121-
(r) => SniffResponseBytes.Create(_cluster.Nodes, _cluster.ElasticsearchVersion,_cluster.PublishAddressOverride, _cluster.SniffShouldReturnFqnd)
123+
r => UpdateCluster(r.NewClusterState),
124+
_ => SniffResponseBytes.Create(_cluster.Nodes, _cluster.ElasticsearchVersion,_cluster.PublishAddressOverride, _cluster.SniffShouldReturnFqnd)
122125
);
123126
}
124127
if (IsPingRequest(requestData))
@@ -129,8 +132,20 @@ public override TResponse Request<TResponse>(RequestData requestData)
129132
nameof(VirtualCluster.Ping),
130133
_cluster.PingingRules,
131134
requestData.PingTimeout,
132-
(r) => { },
133-
(r) => null //HEAD request
135+
_ => { },
136+
_ => null //HEAD request
137+
);
138+
}
139+
if (IsProductCheckRequest(requestData))
140+
{
141+
_ = Interlocked.Increment(ref state.ProductChecked);
142+
return HandleRules<TResponse, IRule>(
143+
requestData,
144+
nameof(VirtualCluster.ProductCheck),
145+
_cluster.ProductCheckRules,
146+
requestData.RequestTimeout,
147+
_ => { },
148+
_ => ValidProductCheckResponse().ResponseBytes
134149
);
135150
}
136151
_ = Interlocked.Increment(ref state.Called);
@@ -139,7 +154,7 @@ public override TResponse Request<TResponse>(RequestData requestData)
139154
nameof(VirtualCluster.ClientCalls),
140155
_cluster.ClientCallRules,
141156
requestData.RequestTimeout,
142-
(r) => { },
157+
_ => { },
143158
CallResponse
144159
);
145160
}
@@ -247,14 +262,17 @@ private TResponse Fail<TResponse, TRule>(RequestData requestData, TRule rule, Ru
247262
throw new TheException();
248263

249264
return ret.Match(
250-
(e) => throw e,
251-
(statusCode) => ReturnConnectionStatus<TResponse>(requestData, CallResponse(rule),
265+
e => throw e,
266+
statusCode => ReturnConnectionStatus<TResponse>(requestData, CallResponse(rule),
252267
//make sure we never return a valid status code in Fail responses because of a bad rule.
253-
statusCode >= 200 && statusCode < 300 ? 502 : statusCode, rule.ReturnContentType)
268+
statusCode is >= 200 and < 300 ? 502 : statusCode, rule.ReturnContentType)
254269
);
255270
}
256271

257-
private TResponse Success<TResponse, TRule>(RequestData requestData, Action<TRule> beforeReturn, Func<TRule, byte[]> successResponse,
272+
private TResponse Success<TResponse, TRule>(
273+
RequestData requestData,
274+
Action<TRule> beforeReturn,
275+
Func<TRule, byte[]> successResponse,
258276
TRule rule
259277
)
260278
where TResponse : class, IElasticsearchResponse, new()
@@ -277,11 +295,10 @@ private static byte[] CallResponse<TRule>(TRule rule)
277295
if (_defaultResponseBytes != null) return _defaultResponseBytes;
278296

279297
var response = DefaultResponse;
280-
using (var ms = RecyclableMemoryStreamFactory.Default.Create())
281-
{
282-
LowLevelRequestResponseSerializer.Instance.Serialize(response, ms);
283-
_defaultResponseBytes = ms.ToArray();
284-
}
298+
299+
using var ms = RecyclableMemoryStreamFactory.Default.Create();
300+
LowLevelRequestResponseSerializer.Instance.Serialize(response, ms);
301+
_defaultResponseBytes = ms.ToArray();
285302
return _defaultResponseBytes;
286303
}
287304

@@ -295,6 +312,7 @@ private class State
295312
public int Pinged;
296313
public int Sniffed;
297314
public int Successes;
315+
public int ProductChecked;
298316
}
299317
}
300318
}

0 commit comments

Comments
 (0)