Skip to content

Commit c5b27b2

Browse files
committed
Wrote a test for unaccepted connections
1 parent 9e5cd63 commit c5b27b2

File tree

5 files changed

+141
-79
lines changed

5 files changed

+141
-79
lines changed

src/Servers/Kestrel/Transport.Libuv/src/Internal/LibuvConnectionListener.cs

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5-
using System.Collections.Concurrent;
65
using System.Collections.Generic;
76
using System.Linq;
87
using System.Net;
@@ -19,6 +18,8 @@ internal class LibuvConnectionListener : IConnectionListener
1918
{
2019
private readonly List<ListenerContext> _listeners = new List<ListenerContext>();
2120
private IAsyncEnumerator<LibuvConnection> _acceptEnumerator;
21+
private bool _stopped;
22+
private bool _disposed;
2223

2324
public LibuvConnectionListener(LibuvTransportContext context, EndPoint endPoint)
2425
: this(new LibuvFunctions(), context, endPoint)
@@ -43,7 +44,54 @@ public LibuvConnectionListener(LibuvFunctions uv, LibuvTransportContext context,
4344

4445
public EndPoint EndPoint { get; set; }
4546

46-
public async Task StopThreadsAsync()
47+
public async ValueTask<ConnectionContext> AcceptAsync(CancellationToken cancellationToken = default)
48+
{
49+
if (_disposed)
50+
{
51+
throw new ObjectDisposedException(GetType().FullName);
52+
}
53+
54+
if (await _acceptEnumerator.MoveNextAsync())
55+
{
56+
return _acceptEnumerator.Current;
57+
}
58+
59+
// null means we're done...
60+
return null;
61+
}
62+
63+
public async ValueTask StopAsync(CancellationToken cancellationToken = default)
64+
{
65+
await UnbindAsync().ConfigureAwait(false);
66+
}
67+
68+
public async ValueTask DisposeAsync()
69+
{
70+
if (_disposed)
71+
{
72+
return;
73+
}
74+
75+
_disposed = true;
76+
77+
await UnbindAsync().ConfigureAwait(false);
78+
79+
if (_acceptEnumerator != null)
80+
{
81+
while (await _acceptEnumerator.MoveNextAsync())
82+
{
83+
_acceptEnumerator.Current.Abort();
84+
}
85+
86+
await _acceptEnumerator.DisposeAsync();
87+
}
88+
89+
_listeners.Clear();
90+
91+
await StopThreadsAsync().ConfigureAwait(false);
92+
}
93+
94+
internal async Task StopThreadsAsync()
4795
{
4896
try
4997
{
@@ -71,7 +119,7 @@ await Task.WhenAll(Threads.Select(thread => thread.StopAsync(TimeSpan.FromSecond
71119
#endif
72120
}
73121

74-
public async Task BindAsync()
122+
internal async Task BindAsync()
75123
{
76124
// TODO: Move thread management to LibuvTransportFactory
77125
// TODO: Split endpoint management from thread management
@@ -143,7 +191,7 @@ private async IAsyncEnumerator<LibuvConnection> AcceptConnections()
143191
while (remainingSlots > 0)
144192
{
145193
// Calling GetAwaiter().GetResult() is safe because we know the task is completed
146-
(LibuvConnection connection, int slot) = (await Task.WhenAny(slots)).GetAwaiter().GetResult();
194+
(var connection, var slot) = (await Task.WhenAny(slots)).GetAwaiter().GetResult();
147195

148196
// If the connection is null then the listener was closed
149197
if (connection == null)
@@ -166,58 +214,26 @@ private async IAsyncEnumerator<LibuvConnection> AcceptConnections()
166214
}
167215
}
168216

169-
public async Task UnbindAsync()
217+
internal async Task UnbindAsync()
170218
{
219+
if (_stopped)
220+
{
221+
return;
222+
}
223+
224+
_stopped = true;
225+
171226
var disposeTasks = _listeners.Select(listener => ((IAsyncDisposable)listener).DisposeAsync()).ToArray();
172227

173228
if (!await WaitAsync(Task.WhenAll(disposeTasks), TimeSpan.FromSeconds(5)).ConfigureAwait(false))
174229
{
175230
Log.LogError(0, null, "Disposing listeners failed");
176231
}
177-
178-
_listeners.Clear();
179232
}
180233

181234
private static async Task<bool> WaitAsync(Task task, TimeSpan timeout)
182235
{
183236
return await Task.WhenAny(task, Task.Delay(timeout)).ConfigureAwait(false) == task;
184237
}
185-
186-
public async ValueTask<ConnectionContext> AcceptAsync(CancellationToken cancellationToken = default)
187-
{
188-
if (await _acceptEnumerator.MoveNextAsync())
189-
{
190-
return _acceptEnumerator.Current;
191-
}
192-
193-
// null means we're done...
194-
return null;
195-
}
196-
197-
public async ValueTask StopAsync(CancellationToken cancellationToken)
198-
{
199-
await UnbindAsync().ConfigureAwait(false);
200-
}
201-
202-
public async ValueTask DisposeAsync()
203-
{
204-
// TODO: ConfigureAwait
205-
await UnbindAsync().ConfigureAwait(false);
206-
207-
if (_acceptEnumerator != null)
208-
{
209-
// TODO: Log how many connections were unaccepted
210-
if (await _acceptEnumerator.MoveNextAsync().ConfigureAwait(false))
211-
{
212-
// Abort the connection
213-
_acceptEnumerator.Current.Abort();
214-
}
215-
216-
// Dispose the enumerator
217-
await _acceptEnumerator.DisposeAsync().ConfigureAwait(false);
218-
}
219-
220-
await StopThreadsAsync().ConfigureAwait(false);
221-
}
222238
}
223239
}

src/Servers/Kestrel/Transport.Libuv/src/Internal/Listener.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ protected virtual void DispatchConnection(UvStreamHandle socket)
197197

198198
public virtual async Task DisposeAsync()
199199
{
200-
StopAcceptingConnections();
201200
// Ensure the event loop is still running.
202201
// If the event loop isn't running and we try to wait on this Post
203202
// to complete, then LibuvTransport will never be disposed and
@@ -210,6 +209,8 @@ await Thread.PostAsync(listener =>
210209

211210
listener._closed = true;
212211

212+
listener.StopAcceptingConnections();
213+
213214
}, this).ConfigureAwait(false);
214215
}
215216

src/Servers/Kestrel/Transport.Libuv/src/Internal/ListenerContext.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Diagnostics;
56
using System.IO.Pipelines;
67
using System.Net;
78
using System.Net.Sockets;
@@ -89,7 +90,8 @@ protected internal void HandleConnection(UvStreamHandle socket)
8990
var connection = new LibuvConnection(socket, TransportContext.Log, Thread, remoteEndPoint, localEndPoint, InputOptions, OutputOptions);
9091
connection.Start();
9192

92-
_acceptQueue.Writer.TryWrite(connection);
93+
bool accepted = _acceptQueue.Writer.TryWrite(connection);
94+
Debug.Assert(accepted, "The connection was not written to the channel!");
9395
}
9496
catch (Exception ex)
9597
{

src/Servers/Kestrel/Transport.Libuv/src/Internal/ListenerSecondary.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ private void FreeBuffer()
177177

178178
public async Task DisposeAsync()
179179
{
180-
StopAcceptingConnections();
181180
// Ensure the event loop is still running.
182181
// If the event loop isn't running and we try to wait on this Post
183182
// to complete, then LibuvTransport will never be disposed and
@@ -191,11 +190,15 @@ await Thread.PostAsync(listener =>
191190

192191
listener._closed = true;
193192

193+
listener.StopAcceptingConnections();
194+
194195
}, this).ConfigureAwait(false);
195196
}
196197
else
197198
{
198199
FreeBuffer();
200+
201+
StopAcceptingConnections();
199202
}
200203
}
201204
}

src/Servers/Kestrel/Transport.Libuv/test/LibuvTransportTests.cs

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System.Buffers;
45
using System.Collections.Generic;
56
using System.Linq;
67
using System.Net;
78
using System.Net.Http;
89
using System.Net.Sockets;
910
using System.Text;
10-
using System.Threading;
1111
using System.Threading.Tasks;
1212
using Microsoft.AspNetCore.Http;
1313
using Microsoft.AspNetCore.Server.Kestrel.Core;
1414
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal;
15-
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
1615
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal;
1716
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests.TestHelpers;
1817
using Microsoft.AspNetCore.Testing;
@@ -23,15 +22,6 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Tests
2322
{
2423
public class LibuvTransportTests
2524
{
26-
public static TheoryData<ListenOptions> ConnectionAdapterData => new TheoryData<ListenOptions>
27-
{
28-
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0)),
29-
new ListenOptions(new IPEndPoint(IPAddress.Loopback, 0))
30-
{
31-
ConnectionAdapters = { new PassThroughConnectionAdapter() }
32-
}
33-
};
34-
3525
public static IEnumerable<object[]> OneToTen => Enumerable.Range(1, 10).Select(i => new object[] { i });
3626

3727
[Fact]
@@ -42,7 +32,7 @@ public async Task TransportCanBindAndStop()
4232

4333
// The transport can no longer start threads without binding to an endpoint.
4434
await transport.BindAsync();
45-
await transport.StopThreadsAsync();
35+
await transport.DisposeAsync();
4636
}
4737

4838
[Fact]
@@ -52,42 +42,92 @@ public async Task TransportCanBindUnbindAndStop()
5242
var transport = new LibuvConnectionListener(transportContext, new IPEndPoint(IPAddress.Loopback, 0));
5343

5444
await transport.BindAsync();
55-
await transport.UnbindAsync();
56-
await transport.StopThreadsAsync();
45+
await transport.StopAsync();
46+
await transport.DisposeAsync();
5747
}
5848

59-
[Theory]
60-
[MemberData(nameof(ConnectionAdapterData))]
61-
public async Task ConnectionCanReadAndWrite(ListenOptions listenOptions)
49+
[Fact]
50+
public async Task ConnectionCanReadAndWrite()
6251
{
63-
var serviceContext = new TestServiceContext();
64-
listenOptions.UseHttpServer(listenOptions.ConnectionAdapters, serviceContext, new DummyApplication(TestApp.EchoApp), HttpProtocols.Http1);
52+
var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
6553

6654
var transportContext = new TestLibuvTransportContext();
67-
var transport = new LibuvConnectionListener(transportContext, listenOptions.EndPoint);
55+
var transport = new LibuvConnectionListener(transportContext, endpoint);
6856

6957
await transport.BindAsync();
70-
listenOptions.EndPoint = transport.EndPoint;
58+
endpoint = (IPEndPoint)transport.EndPoint;
7159

72-
var dispatcher = new ConnectionDispatcher(serviceContext, listenOptions.Build());
73-
_ = dispatcher.StartAcceptingConnections(transport);
60+
async Task EchoServerAsync()
61+
{
62+
await using var connection = await transport.AcceptAsync();
7463

75-
using (var socket = TestConnection.CreateConnectedLoopbackSocket(listenOptions.IPEndPoint.Port))
64+
while (true)
65+
{
66+
var result = await connection.Transport.Input.ReadAsync();
67+
68+
if (result.IsCompleted)
69+
{
70+
break;
71+
}
72+
await connection.Transport.Output.WriteAsync(result.Buffer.ToArray());
73+
74+
connection.Transport.Input.AdvanceTo(result.Buffer.End);
75+
}
76+
}
77+
78+
var serverTask = EchoServerAsync();
79+
80+
using (var socket = TestConnection.CreateConnectedLoopbackSocket(endpoint.Port))
7681
{
77-
var data = "Hello World";
78-
socket.Send(Encoding.ASCII.GetBytes($"POST / HTTP/1.0\r\nContent-Length: 11\r\n\r\n{data}"));
82+
var data = Encoding.ASCII.GetBytes("Hello World");
83+
socket.Send(data);
84+
7985
var buffer = new byte[data.Length];
8086
var read = 0;
8187
while (read < data.Length)
8288
{
8389
read += socket.Receive(buffer, read, buffer.Length - read, SocketFlags.None);
8490
}
91+
92+
Assert.Equal(data, buffer);
8593
}
8694

87-
88-
Assert.True(await serviceContext.ConnectionManager.CloseAllConnectionsAsync(new CancellationTokenSource(TestConstants.DefaultTimeout).Token));
89-
await transport.UnbindAsync();
90-
await transport.StopThreadsAsync();
95+
await serverTask.DefaultTimeout();
96+
97+
await transport.StopAsync();
98+
await transport.DisposeAsync();
99+
}
100+
101+
[Fact]
102+
public async Task UnacceptedConnectionsAreAborted()
103+
{
104+
var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
105+
106+
var transportContext = new TestLibuvTransportContext();
107+
var transport = new LibuvConnectionListener(transportContext, endpoint);
108+
109+
await transport.BindAsync();
110+
endpoint = (IPEndPoint)transport.EndPoint;
111+
112+
async Task ConnectAsync()
113+
{
114+
using (var socket = TestConnection.CreateConnectedLoopbackSocket(endpoint.Port))
115+
{
116+
var read = await socket.ReceiveAsync(new byte[10], SocketFlags.None);
117+
Assert.Equal(0, read);
118+
}
119+
}
120+
121+
var connectTask = ConnectAsync();
122+
123+
await transport.StopAsync();
124+
await transport.DisposeAsync();
125+
126+
// The connection was accepted because libuv eagerly accepts connections
127+
// they sit in a queue in each listener, we want to make sure that resources
128+
// are cleaned up if they are never accepted by the caller
129+
130+
await connectTask.DefaultTimeout();
91131
}
92132

93133
[ConditionalTheory]
@@ -132,7 +172,7 @@ public async Task OneToTenThreads(int threadCount)
132172
}
133173
}
134174

135-
await transport.UnbindAsync().ConfigureAwait(false);
175+
await transport.StopAsync().ConfigureAwait(false);
136176

137177
await acceptTask.ConfigureAwait(false);
138178

@@ -141,7 +181,7 @@ public async Task OneToTenThreads(int threadCount)
141181
await serviceContext.ConnectionManager.AbortAllConnectionsAsync().ConfigureAwait(false);
142182
}
143183

144-
await transport.StopThreadsAsync().ConfigureAwait(false);
184+
await transport.DisposeAsync().ConfigureAwait(false);
145185
}
146186
}
147187
}

0 commit comments

Comments
 (0)