Skip to content

Commit cfa0776

Browse files
committed
Add async version of concurrent first usage test
1 parent c3f7bd6 commit cfa0776

File tree

1 file changed

+71
-0
lines changed

1 file changed

+71
-0
lines changed

tests/Tests/ClientConcepts/ConnectionPooling/BuildingBlocks/RequestPipelines.doc.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,77 @@ public void FirstUsageCheckConcurrentThreads()
189189
semaphoreSlim.CurrentCount.Should().Be(1);
190190
}
191191

192+
// hide
193+
[U] public async Task FirstUsageCheckConcurrentThreadsAsync()
194+
{
195+
//hide
196+
var response = new
197+
{
198+
cluster_name = "elasticsearch",
199+
nodes = new
200+
{
201+
node1 = new
202+
{
203+
name = "Node Name 1",
204+
transport_address = "127.0.0.1:9300",
205+
host = "127.0.0.1",
206+
ip = "127.0.01",
207+
version = "5.0.0-alpha3",
208+
build_hash = "e455fd0",
209+
roles = new List<string>(),
210+
http = new
211+
{
212+
bound_address = new[] { "127.0.0.1:9200" }
213+
},
214+
settings = new Dictionary<string, object>
215+
{
216+
{ "cluster.name", "elasticsearch" },
217+
{ "node.name", "Node Name 1" }
218+
}
219+
}
220+
}
221+
};
222+
223+
//hide
224+
var responseBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response));
225+
226+
/** We can demonstrate this with the following example. First, let's configure
227+
* a custom `IConnection` implementation that's simply going to return a known
228+
* 200 response after one second
229+
*/
230+
var inMemoryConnection = new WaitingInMemoryConnection(
231+
TimeSpan.FromSeconds(1),
232+
responseBody);
233+
234+
/**
235+
* Next, we create a <<sniffing-connection-pool, Sniffing connection pool>> using our
236+
* custom connection and a timeout for how long a request can take before the client
237+
* times out
238+
*/
239+
var sniffingPipeline = CreatePipeline(
240+
uris => new SniffingConnectionPool(uris),
241+
connection: inMemoryConnection,
242+
settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));
243+
244+
/**Now, with a `SemaphoreSlim` in place that allows only one thread to enter at a time,
245+
* start three tasks that will initiate a sniff on startup.
246+
*
247+
* The first task will successfully sniff on startup with the remaining two waiting
248+
* tasks exiting without exception. The `SemaphoreSlim` is also released, ready for
249+
* when sniffing needs to take place again
250+
*/
251+
var semaphoreSlim = new SemaphoreSlim(1, 1);
252+
253+
var task1 = sniffingPipeline.FirstPoolUsageAsync(semaphoreSlim, CancellationToken.None);
254+
var task2 = sniffingPipeline.FirstPoolUsageAsync(semaphoreSlim, CancellationToken.None);
255+
var task3 = sniffingPipeline.FirstPoolUsageAsync(semaphoreSlim, CancellationToken.None);
256+
257+
var exception = await Record.ExceptionAsync(async () => await Task.WhenAll(task1, task2, task3));
258+
259+
exception.Should().BeNull();
260+
semaphoreSlim.CurrentCount.Should().Be(1);
261+
}
262+
192263
/**==== Sniff on connection failure */
193264
[U]
194265
public void SniffsOnConnectionFailure()

0 commit comments

Comments
 (0)