Skip to content

Commit ff7a455

Browse files
qmfrederikbrendandburns
authored andcommitted
RFC - Support WebSockets with channels (#66)
* Add support for demuxing streams from WebSockets. * Add missing using statement, fix unit test
1 parent 511cc60 commit ff7a455

File tree

5 files changed

+854
-1
lines changed

5 files changed

+854
-1
lines changed

src/ByteBuffer.cs

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Diagnostics;
4+
using System.Threading;
5+
6+
namespace k8s
7+
{
8+
// There may be already an async implementation that we can use:
9+
// https://github.com/StephenCleary/AsyncEx/wiki/AsyncProducerConsumerQueue
10+
// However, they focus on individual objects and may not be a good choice for use with fixed-with byte buffers
11+
12+
/// <summary>
13+
/// Represents a bounded buffer. A dedicated thread can send bytes to this buffer (the producer); while another thread can
14+
/// read bytes from this buffer (the consumer).
15+
/// </summary>
16+
/// <remarks>
17+
/// This is a producer-consumer problem (or bounded-buffer problem), see https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
18+
/// </remarks>
19+
public class ByteBuffer : IDisposable
20+
{
21+
private const int DefaultBufferSize = 4 * 1024; // 4 KB
22+
private const int DefaultMaximumSize = 40 * 1024 * 1024; // 40 MB
23+
24+
private readonly int maximumSize;
25+
private readonly AutoResetEvent dataAvailable = new AutoResetEvent(initialState: false);
26+
private readonly object lockObject = new object();
27+
28+
private byte[] buffer;
29+
private int bytesWritten = 0;
30+
private int bytesRead = 0;
31+
32+
/// <summary>
33+
/// Used by a writer to indicate the end of the file. When set, the reader will be notified that no
34+
/// more data is available.
35+
/// </summary>
36+
private bool endOfFile;
37+
38+
/// <summary>
39+
/// Initializes a new instance of the <see cref="ByteBuffer"/> class using the default buffer size and limit.
40+
/// </summary>
41+
public ByteBuffer()
42+
: this(DefaultBufferSize, DefaultMaximumSize)
43+
{
44+
}
45+
46+
/// <summary>
47+
/// Initializes a new instance of the <see cref="ByteBuffer"/> class.
48+
/// </summary>
49+
/// <param name="bufferSize">
50+
/// The initial buffer size.
51+
/// </param>
52+
/// <param name="maximumSize">
53+
/// The maximum buffer size.
54+
/// </param>
55+
public ByteBuffer(int bufferSize, int maximumSize)
56+
{
57+
this.maximumSize = maximumSize;
58+
this.buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
59+
this.endOfFile = false;
60+
}
61+
62+
/// <summary>
63+
/// Gets the current buffer size.
64+
/// </summary>
65+
public int Size
66+
{
67+
get { return this.buffer.Length; }
68+
}
69+
70+
/// <summary>
71+
/// Gets the offset from which the next byte will be read. Increased every time a caller reads data.
72+
/// </summary>
73+
public int ReadWaterMark
74+
{
75+
get;
76+
private set;
77+
}
78+
79+
/// <summary>
80+
/// Gets the offset to which the next byte will be written. Increased every time a caller writes data.
81+
/// </summary>
82+
public int WriteWaterMark
83+
{
84+
get;
85+
private set;
86+
}
87+
88+
/// <summary>
89+
/// Gets the amount of bytes availble for reading.
90+
/// </summary>
91+
public int AvailableReadableBytes
92+
{
93+
get
94+
{
95+
lock (this.lockObject)
96+
{
97+
if (this.ReadWaterMark == this.WriteWaterMark)
98+
{
99+
return 0;
100+
}
101+
else if (this.ReadWaterMark < this.WriteWaterMark)
102+
{
103+
return this.WriteWaterMark - this.ReadWaterMark;
104+
}
105+
else
106+
{
107+
return
108+
109+
// Bytes available at the end of the array
110+
this.buffer.Length - this.ReadWaterMark
111+
112+
// Bytes available at the start of the array
113+
+ this.WriteWaterMark;
114+
}
115+
}
116+
}
117+
}
118+
119+
/// <summary>
120+
/// Gets the amount of bytes available for writing.
121+
/// </summary>
122+
public int AvailableWritableBytes
123+
{
124+
get
125+
{
126+
lock (this.lockObject)
127+
{
128+
if (this.WriteWaterMark > this.ReadWaterMark)
129+
{
130+
return
131+
/* Available bytes at the end of the buffer */
132+
this.buffer.Length - this.WriteWaterMark
133+
/* Available bytes at the start of the buffer */
134+
+ this.ReadWaterMark;
135+
}
136+
else if (this.WriteWaterMark == this.ReadWaterMark)
137+
{
138+
return this.buffer.Length;
139+
}
140+
else
141+
{
142+
return this.ReadWaterMark - this.WriteWaterMark;
143+
}
144+
}
145+
}
146+
}
147+
148+
/// <inheritdoc/>
149+
public void Dispose()
150+
{
151+
ArrayPool<byte>.Shared.Return(this.buffer);
152+
}
153+
154+
/// <summary>
155+
/// Writes bytes to the buffer.
156+
/// </summary>
157+
/// <param name="data">
158+
/// The source byte array from which to read the bytes.
159+
/// </param>
160+
/// <param name="offset">
161+
/// The offset of the first byte to copy.
162+
/// </param>
163+
/// <param name="length">
164+
/// The amount of bytes to copy.
165+
/// </param>
166+
public void Write(byte[] data, int offset, int length)
167+
{
168+
lock (this.lockObject)
169+
{
170+
// Does the data fit?
171+
// We must make sure that ReadWaterMark != WriteWaterMark; that would indicate 'all data has been read' instead
172+
// instead of 'all data must be read'
173+
if (this.AvailableWritableBytes <= length)
174+
{
175+
// Grow the buffer
176+
this.Grow(this.buffer.Length + length - this.AvailableWritableBytes + 1);
177+
}
178+
179+
// Write the data; first the data that fits between the write watermark and the end of the buffer
180+
int availableBeforeWrapping = this.buffer.Length - this.WriteWaterMark;
181+
182+
Array.Copy(data, offset, this.buffer, this.WriteWaterMark, Math.Min(availableBeforeWrapping, length));
183+
this.WriteWaterMark += Math.Min(availableBeforeWrapping, length);
184+
185+
if (length > availableBeforeWrapping)
186+
{
187+
Array.Copy(data, offset + availableBeforeWrapping, this.buffer, 0, length - availableBeforeWrapping);
188+
this.WriteWaterMark = length - availableBeforeWrapping;
189+
}
190+
191+
this.bytesWritten += length;
192+
Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
193+
}
194+
195+
this.dataAvailable.Set();
196+
}
197+
198+
/// <summary>
199+
/// Stops writing data to the buffer, indicating that the end of file has been reached.
200+
/// </summary>
201+
public void WriteEnd()
202+
{
203+
lock (this.lockObject)
204+
{
205+
this.endOfFile = true;
206+
this.dataAvailable.Set();
207+
}
208+
}
209+
210+
/// <summary>
211+
/// Reads bytes from the buffer.
212+
/// </summary>
213+
/// <param name="data">
214+
/// The byte array into which to read the data.
215+
/// </param>
216+
/// <param name="offset">
217+
/// The offset at which to start writing the bytes.
218+
/// </param>
219+
/// <param name="count">
220+
/// The amount of bytes to read.
221+
/// </param>
222+
/// <returns>
223+
/// The total number of bytes read.
224+
/// </returns>
225+
public int Read(byte[] data, int offset, int count)
226+
{
227+
while (this.AvailableReadableBytes == 0 && !this.endOfFile)
228+
{
229+
this.dataAvailable.WaitOne();
230+
}
231+
232+
int toRead = 0;
233+
234+
lock (this.lockObject)
235+
{
236+
// Signal the end of file to the caller.
237+
if (this.AvailableReadableBytes == 0 && this.endOfFile)
238+
{
239+
return 0;
240+
}
241+
242+
toRead = Math.Min(this.AvailableReadableBytes, count);
243+
244+
int availableBeforeWrapping = this.buffer.Length - this.ReadWaterMark;
245+
246+
Array.Copy(this.buffer, this.ReadWaterMark, data, offset, Math.Min(availableBeforeWrapping, toRead));
247+
this.ReadWaterMark += Math.Min(availableBeforeWrapping, toRead);
248+
249+
if (toRead > availableBeforeWrapping)
250+
{
251+
Array.Copy(this.buffer, 0, data, offset + availableBeforeWrapping, toRead - availableBeforeWrapping);
252+
this.ReadWaterMark = toRead - availableBeforeWrapping;
253+
}
254+
255+
this.bytesRead += toRead;
256+
Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
257+
}
258+
259+
return toRead;
260+
}
261+
262+
/// <summary>
263+
/// Increases the buffer size. Any call to this method must be protected with a lock.
264+
/// </summary>
265+
/// <param name="size">
266+
/// The new buffer size.
267+
/// </param>
268+
private void Grow(int size)
269+
{
270+
if (size > this.maximumSize)
271+
{
272+
throw new OutOfMemoryException();
273+
}
274+
275+
var newBuffer = ArrayPool<byte>.Shared.Rent(size);
276+
277+
if (this.WriteWaterMark <= this.ReadWaterMark)
278+
{
279+
// Copy the data at the start
280+
Array.Copy(this.buffer, 0, newBuffer, 0, this.WriteWaterMark);
281+
282+
int trailingDataLength = this.buffer.Length - this.ReadWaterMark;
283+
Array.Copy(this.buffer,
284+
sourceIndex: this.ReadWaterMark,
285+
destinationArray: newBuffer,
286+
destinationIndex: newBuffer.Length - trailingDataLength,
287+
length: trailingDataLength);
288+
289+
this.ReadWaterMark += newBuffer.Length - this.buffer.Length;
290+
}
291+
else
292+
{
293+
// ... [Read WM] ... [Write WM] ... [newly available space]
294+
Array.Copy(this.buffer, 0, newBuffer, 0, this.buffer.Length);
295+
}
296+
297+
ArrayPool<byte>.Shared.Return(this.buffer);
298+
this.buffer = newBuffer;
299+
300+
Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
301+
}
302+
}
303+
}

src/MuxedStream.cs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
using System;
2+
using System.IO;
3+
4+
namespace k8s
5+
{
6+
public class MuxedStream : Stream
7+
{
8+
private ByteBuffer inputBuffer;
9+
private byte? outputIndex;
10+
private StreamDemuxer muxer;
11+
12+
public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputIndex)
13+
{
14+
this.inputBuffer = inputBuffer;
15+
this.outputIndex = outputIndex;
16+
17+
if (this.inputBuffer == null && outputIndex == null)
18+
{
19+
throw new ArgumentException("You must specify at least inputBuffer or outputIndex");
20+
}
21+
22+
this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
23+
}
24+
25+
public override bool CanRead => this.inputBuffer != null;
26+
27+
public override bool CanSeek => false;
28+
29+
public override bool CanWrite => this.outputIndex != null;
30+
31+
public override long Length => throw new NotSupportedException();
32+
33+
public override long Position
34+
{
35+
get => throw new NotSupportedException();
36+
set => throw new NotSupportedException();
37+
}
38+
39+
public override void Write(byte[] buffer, int offset, int count)
40+
{
41+
if (this.outputIndex == null)
42+
{
43+
throw new InvalidOperationException();
44+
}
45+
else
46+
{
47+
this.muxer.Write(this.outputIndex.Value, buffer, offset, count).GetAwaiter().GetResult();
48+
}
49+
}
50+
51+
public override int Read(byte[] buffer, int offset, int count)
52+
{
53+
if (this.inputBuffer == null)
54+
{
55+
throw new InvalidOperationException();
56+
}
57+
else
58+
{
59+
return this.inputBuffer.Read(buffer, offset, count);
60+
}
61+
}
62+
63+
public override void Flush()
64+
{
65+
throw new NotSupportedException();
66+
}
67+
68+
public override long Seek(long offset, SeekOrigin origin)
69+
{
70+
throw new NotSupportedException();
71+
}
72+
73+
public override void SetLength(long value)
74+
{
75+
throw new NotSupportedException();
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)