From 2d60749e8600eff54f8176ec626a0ef70eef77a0 Mon Sep 17 00:00:00 2001 From: Frederik Carlier Date: Wed, 6 Dec 2017 22:39:21 +0100 Subject: [PATCH 1/2] Add support for demuxing streams from WebSockets. --- src/ByteBuffer.cs | 303 ++++++++++++++++++++++++++++++++++ src/MuxedStream.cs | 78 +++++++++ src/StreamDemuxer.cs | 127 +++++++++++++++ tests/AuthTests.cs | 2 +- tests/ByteBufferTests.cs | 344 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 853 insertions(+), 1 deletion(-) create mode 100644 src/ByteBuffer.cs create mode 100644 src/MuxedStream.cs create mode 100644 src/StreamDemuxer.cs create mode 100644 tests/ByteBufferTests.cs diff --git a/src/ByteBuffer.cs b/src/ByteBuffer.cs new file mode 100644 index 000000000..8fc8afb0e --- /dev/null +++ b/src/ByteBuffer.cs @@ -0,0 +1,303 @@ +using System; +using System.Buffers; +using System.Diagnostics; +using System.Threading; + +namespace k8s +{ + // There may be already an async implementation that we can use: + // https://github.com/StephenCleary/AsyncEx/wiki/AsyncProducerConsumerQueue + // However, they focus on individual objects and may not be a good choice for use with fixed-with byte buffers + + /// + /// Represents a bounded buffer. A dedicated thread can send bytes to this buffer (the producer); while another thread can + /// read bytes from this buffer (the consumer). + /// + /// + /// This is a producer-consumer problem (or bounded-buffer problem), see https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem + /// + public class ByteBuffer : IDisposable + { + private const int DefaultBufferSize = 4 * 1024; // 4 KB + private const int DefaultMaximumSize = 40 * 1024 * 1024; // 40 MB + + private readonly int maximumSize; + private readonly AutoResetEvent dataAvailable = new AutoResetEvent(initialState: false); + private readonly object lockObject = new object(); + + private byte[] buffer; + private int bytesWritten = 0; + private int bytesRead = 0; + + /// + /// Used by a writer to indicate the end of the file. When set, the reader will be notified that no + /// more data is available. + /// + private bool endOfFile; + + /// + /// Initializes a new instance of the class using the default buffer size and limit. + /// + public ByteBuffer() + : this(DefaultBufferSize, DefaultMaximumSize) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// The initial buffer size. + /// + /// + /// The maximum buffer size. + /// + public ByteBuffer(int bufferSize, int maximumSize) + { + this.maximumSize = maximumSize; + this.buffer = ArrayPool.Shared.Rent(bufferSize); + this.endOfFile = false; + } + + /// + /// Gets the current buffer size. + /// + public int Size + { + get { return this.buffer.Length; } + } + + /// + /// Gets the offset from which the next byte will be read. Increased every time a caller reads data. + /// + public int ReadWaterMark + { + get; + private set; + } + + /// + /// Gets the offset to which the next byte will be written. Increased every time a caller writes data. + /// + public int WriteWaterMark + { + get; + private set; + } + + /// + /// Gets the amount of bytes availble for reading. + /// + public int AvailableReadableBytes + { + get + { + lock (this.lockObject) + { + if (this.ReadWaterMark == this.WriteWaterMark) + { + return 0; + } + else if (this.ReadWaterMark < this.WriteWaterMark) + { + return this.WriteWaterMark - this.ReadWaterMark; + } + else + { + return + + // Bytes available at the end of the array + this.buffer.Length - this.ReadWaterMark + + // Bytes available at the start of the array + + this.WriteWaterMark; + } + } + } + } + + /// + /// Gets the amount of bytes available for writing. + /// + public int AvailableWritableBytes + { + get + { + lock (this.lockObject) + { + if (this.WriteWaterMark > this.ReadWaterMark) + { + return + /* Available bytes at the end of the buffer */ + this.buffer.Length - this.WriteWaterMark + /* Available bytes at the start of the buffer */ + + this.ReadWaterMark; + } + else if (this.WriteWaterMark == this.ReadWaterMark) + { + return this.buffer.Length; + } + else + { + return this.ReadWaterMark - this.WriteWaterMark; + } + } + } + } + + /// + public void Dispose() + { + ArrayPool.Shared.Return(this.buffer); + } + + /// + /// Writes bytes to the buffer. + /// + /// + /// The source byte array from which to read the bytes. + /// + /// + /// The offset of the first byte to copy. + /// + /// + /// The amount of bytes to copy. + /// + public void Write(byte[] data, int offset, int length) + { + lock (this.lockObject) + { + // Does the data fit? + // We must make sure that ReadWaterMark != WriteWaterMark; that would indicate 'all data has been read' instead + // instead of 'all data must be read' + if (this.AvailableWritableBytes <= length) + { + // Grow the buffer + this.Grow(this.buffer.Length + length - this.AvailableWritableBytes + 1); + } + + // Write the data; first the data that fits between the write watermark and the end of the buffer + int availableBeforeWrapping = this.buffer.Length - this.WriteWaterMark; + + Array.Copy(data, offset, this.buffer, this.WriteWaterMark, Math.Min(availableBeforeWrapping, length)); + this.WriteWaterMark += Math.Min(availableBeforeWrapping, length); + + if (length > availableBeforeWrapping) + { + Array.Copy(data, offset + availableBeforeWrapping, this.buffer, 0, length - availableBeforeWrapping); + this.WriteWaterMark = length - availableBeforeWrapping; + } + + this.bytesWritten += length; + Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten); + } + + this.dataAvailable.Set(); + } + + /// + /// Stops writing data to the buffer, indicating that the end of file has been reached. + /// + public void WriteEnd() + { + lock (this.lockObject) + { + this.endOfFile = true; + this.dataAvailable.Set(); + } + } + + /// + /// Reads bytes from the buffer. + /// + /// + /// The byte array into which to read the data. + /// + /// + /// The offset at which to start writing the bytes. + /// + /// + /// The amount of bytes to read. + /// + /// + /// The total number of bytes read. + /// + public int Read(byte[] data, int offset, int count) + { + while (this.AvailableReadableBytes == 0 && !this.endOfFile) + { + this.dataAvailable.WaitOne(); + } + + int toRead = 0; + + lock (this.lockObject) + { + // Signal the end of file to the caller. + if (this.AvailableReadableBytes == 0 && this.endOfFile) + { + return 0; + } + + toRead = Math.Min(this.AvailableReadableBytes, count); + + int availableBeforeWrapping = this.buffer.Length - this.ReadWaterMark; + + Array.Copy(this.buffer, this.ReadWaterMark, data, offset, Math.Min(availableBeforeWrapping, toRead)); + this.ReadWaterMark += Math.Min(availableBeforeWrapping, toRead); + + if (toRead > availableBeforeWrapping) + { + Array.Copy(this.buffer, 0, data, offset + availableBeforeWrapping, toRead - availableBeforeWrapping); + this.ReadWaterMark = toRead - availableBeforeWrapping; + } + + this.bytesRead += toRead; + Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten); + } + + return toRead; + } + + /// + /// Increases the buffer size. Any call to this method must be protected with a lock. + /// + /// + /// The new buffer size. + /// + private void Grow(int size) + { + if (size > this.maximumSize) + { + throw new OutOfMemoryException(); + } + + var newBuffer = ArrayPool.Shared.Rent(size); + + if (this.WriteWaterMark <= this.ReadWaterMark) + { + // Copy the data at the start + Array.Copy(this.buffer, 0, newBuffer, 0, this.WriteWaterMark); + + int trailingDataLength = this.buffer.Length - this.ReadWaterMark; + Array.Copy(this.buffer, + sourceIndex: this.ReadWaterMark, + destinationArray: newBuffer, + destinationIndex: newBuffer.Length - trailingDataLength, + length: trailingDataLength); + + this.ReadWaterMark += newBuffer.Length - this.buffer.Length; + } + else + { + // ... [Read WM] ... [Write WM] ... [newly available space] + Array.Copy(this.buffer, 0, newBuffer, 0, this.buffer.Length); + } + + ArrayPool.Shared.Return(this.buffer); + this.buffer = newBuffer; + + Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten); + } + } +} diff --git a/src/MuxedStream.cs b/src/MuxedStream.cs new file mode 100644 index 000000000..021529fe3 --- /dev/null +++ b/src/MuxedStream.cs @@ -0,0 +1,78 @@ +using System; +using System.IO; + +namespace k8s +{ + public class MuxedStream : Stream + { + private ByteBuffer inputBuffer; + private byte? outputIndex; + private StreamDemuxer muxer; + + public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputIndex) + { + this.inputBuffer = inputBuffer; + this.outputIndex = outputIndex; + + if (this.inputBuffer == null && outputIndex == null) + { + throw new ArgumentException("You must specify at least inputBuffer or outputIndex"); + } + + this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer)); + } + + public override bool CanRead => this.inputBuffer != null; + + public override bool CanSeek => false; + + public override bool CanWrite => this.outputIndex != null; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + if (this.outputIndex == null) + { + throw new InvalidOperationException(); + } + else + { + this.muxer.Write(this.outputIndex.Value, buffer, offset, count).GetAwaiter().GetResult(); + } + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (this.inputBuffer == null) + { + throw new InvalidOperationException(); + } + else + { + return this.inputBuffer.Read(buffer, offset, count); + } + } + + public override void Flush() + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/StreamDemuxer.cs b/src/StreamDemuxer.cs new file mode 100644 index 000000000..f37d91949 --- /dev/null +++ b/src/StreamDemuxer.cs @@ -0,0 +1,127 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace k8s +{ + public class StreamDemuxer : IDisposable + { + private readonly WebSocket webSocket; + private readonly Dictionary buffers = new Dictionary(); + private readonly CancellationTokenSource cts = new CancellationTokenSource(); + private Task runLoop; + + public StreamDemuxer(WebSocket webSocket) + { + this.webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); + } + + public event EventHandler ConnectionClosed; + + public void Start() + { + this.runLoop = this.RunLoop(this.cts.Token); + } + + public void Dispose() + { + if (this.runLoop != null) + { + this.cts.Cancel(); + this.runLoop.Wait(); + } + } + + public Stream GetStream(byte? inputIndex, byte? outputIndex) + { + if (inputIndex != null && !this.buffers.ContainsKey(inputIndex.Value)) + { + lock (this.buffers) + { + var buffer = new ByteBuffer(); + this.buffers.Add(inputIndex.Value, buffer); + } + } + + var inputBuffer = inputIndex == null ? null : this.buffers[inputIndex.Value]; + return new MuxedStream(this, inputBuffer, outputIndex); + } + + public async Task Write(byte index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken)) + { + byte[] writeBuffer = ArrayPool.Shared.Rent(count + 1); + + try + { + writeBuffer[0] = (byte)index; + Array.Copy(buffer, offset, writeBuffer, 1, count); + ArraySegment segment = new ArraySegment(writeBuffer, 0, count + 1); + await this.webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken).ConfigureAwait(false); + } + finally + { + ArrayPool.Shared.Return(writeBuffer); + } + } + + protected async Task RunLoop(CancellationToken cancellationToken) + { + // Get a 1KB buffer + byte[] buffer = ArrayPool.Shared.Rent(1024 * 1024); + + try + { + var segment = new ArraySegment(buffer); + + while (!cancellationToken.IsCancellationRequested && this.webSocket.CloseStatus == null) + { + // We always get data in this format: + // [stream index] (1 for stdout, 2 for stderr) + // [payload] + var result = await this.webSocket.ReceiveAsync(segment, cancellationToken).ConfigureAwait(false); + + // Ignore empty messages + if (result.Count < 2) + { + continue; + } + + var streamIndex = buffer[0]; + var extraByteCount = 1; + + while (true) + { + if (this.buffers.ContainsKey(streamIndex)) + { + this.buffers[streamIndex].Write(buffer, extraByteCount, result.Count - extraByteCount); + } + + if (result.EndOfMessage == true) + { + break; + } + + extraByteCount = 0; + result = await this.webSocket.ReceiveAsync(segment, cancellationToken).ConfigureAwait(false); + } + } + + } + finally + { + ArrayPool.Shared.Return(buffer); + this.runLoop = null; + + foreach (var b in this.buffers.Values) + { + b.WriteEnd(); + } + + this.ConnectionClosed?.Invoke(this, EventArgs.Empty); + } + } + } +} diff --git a/tests/AuthTests.cs b/tests/AuthTests.cs index dccd6724d..1e2a4b6db 100644 --- a/tests/AuthTests.cs +++ b/tests/AuthTests.cs @@ -315,4 +315,4 @@ public void Token() } } } -} +} diff --git a/tests/ByteBufferTests.cs b/tests/ByteBufferTests.cs new file mode 100644 index 000000000..aa4cb7db2 --- /dev/null +++ b/tests/ByteBufferTests.cs @@ -0,0 +1,344 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace k8s.tests +{ + /// + /// Tests the class. + /// + public class ByteBufferTests + { + private readonly byte[] writeData = new byte[] { 0xF0, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF }; + + /// + /// Tests a sequential read and write operation. + /// + [Fact] + public void LinearReadWriteTest() + { + ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100); + + // There's no real guarantee that this will be the case because the ArrayPool does not guarantee + // a specific buffer size. So let's assert this first to make sure the test fails should this + // assumption not hold. + Assert.Equal(0x10, buffer.Size); + + // Assert the initial values. + Assert.Equal(0, buffer.AvailableReadableBytes); + Assert.Equal(0x10, buffer.AvailableWritableBytes); + Assert.Equal(0, buffer.ReadWaterMark); + Assert.Equal(0, buffer.WriteWaterMark); + + // Write two bytes + buffer.Write(this.writeData, 0, 2); + + Assert.Equal(2, buffer.AvailableReadableBytes); + Assert.Equal(0x0E, buffer.AvailableWritableBytes); + Assert.Equal(0, buffer.ReadWaterMark); + Assert.Equal(2, buffer.WriteWaterMark); + + // Read two bytes, one byte at a time + byte[] readData = new byte[0x10]; + + var read = buffer.Read(readData, 0, 1); + Assert.Equal(1, read); + + // Verify the result of the read operation. + Assert.Equal(0xF0, readData[0]); + Assert.Equal(0, readData[1]); // Make sure no additional data was read + + // Check the state of the buffer + Assert.Equal(1, buffer.AvailableReadableBytes); + Assert.Equal(0x0F, buffer.AvailableWritableBytes); + Assert.Equal(1, buffer.ReadWaterMark); + Assert.Equal(2, buffer.WriteWaterMark); + + // Read another byte + read = buffer.Read(readData, 1, 1); + Assert.Equal(1, read); + + // Verify the result of the read operation. + Assert.Equal(0xF1, readData[1]); + Assert.Equal(0, readData[2]); // Make sure no additional data was read + + // Check the state of the buffer + Assert.Equal(0, buffer.AvailableReadableBytes); + Assert.Equal(0x10, buffer.AvailableWritableBytes); + Assert.Equal(2, buffer.ReadWaterMark); + Assert.Equal(2, buffer.WriteWaterMark); + } + + /// + /// Tests reading a writing which crosses the boundary (end) of the circular buffer. + /// + [Fact] + public void BoundaryReadWriteTest() + { + ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100); + + // There's no real guarantee that this will be the case because the ArrayPool does not guarantee + // a specific buffer size. So let's assert this first to make sure the test fails should this + // assumption not hold. + Assert.Equal(0x10, buffer.Size); + + // Write out 0x0A bytes to the buffer, to increase the high water level for writing bytes + buffer.Write(this.writeData, 0, 0x0A); + + // Assert the initial values. + Assert.Equal(0x0A, buffer.AvailableReadableBytes); + Assert.Equal(0x06, buffer.AvailableWritableBytes); + Assert.Equal(0, buffer.ReadWaterMark); + Assert.Equal(0x0A, buffer.WriteWaterMark); + + // Read 0x0A bytes, to increase the high water level for reading bytes + byte[] readData = new byte[0x10]; + var read = buffer.Read(readData, 0, 0x0A); + Assert.Equal(0x0A, read); + + Assert.Equal(0x00, buffer.AvailableReadableBytes); + Assert.Equal(0x10, buffer.AvailableWritableBytes); + Assert.Equal(0x0A, buffer.ReadWaterMark); + Assert.Equal(0x0A, buffer.WriteWaterMark); + + // Write an additional 0x0A bytes, but now in reverse order. This will cause the data + // to be wrapped. + Array.Reverse(this.writeData); + + buffer.Write(this.writeData, 0, 0x0A); + + // Assert the resulting state of the buffer. + Assert.Equal(0x0A, buffer.AvailableReadableBytes); + Assert.Equal(0x06, buffer.AvailableWritableBytes); + Assert.Equal(0x0A, buffer.ReadWaterMark); + Assert.Equal(0x04, buffer.WriteWaterMark); + + // Read ten bytes, this will be a wrapped read + read = buffer.Read(readData, 0, 0x0A); + Assert.Equal(0x0A, read); + + // Verify the result of the read operation. + Assert.Equal(0xFF, readData[0]); + Assert.Equal(0xFE, readData[1]); + Assert.Equal(0xFD, readData[2]); + Assert.Equal(0xFC, readData[3]); + Assert.Equal(0xFB, readData[4]); + Assert.Equal(0xFA, readData[5]); + Assert.Equal(0xF9, readData[6]); + Assert.Equal(0xF8, readData[7]); + Assert.Equal(0xF7, readData[8]); + Assert.Equal(0xF6, readData[9]); + Assert.Equal(0, readData[10]); // Make sure no additional data was read + + // Check the state of the buffer + Assert.Equal(0, buffer.AvailableReadableBytes); + Assert.Equal(0x10, buffer.AvailableWritableBytes); + Assert.Equal(4, buffer.ReadWaterMark); + Assert.Equal(4, buffer.WriteWaterMark); + } + + /// + /// Tests resizing of the class. + /// + [Fact] + public void ResizeWriteTest() + { + ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100); + + // There's no real guarantee that this will be the case because the ArrayPool does not guarantee + // a specific buffer size. So let's assert this first to make sure the test fails should this + // assumption not hold. + Assert.Equal(0x10, buffer.Size); + + // Write out 0x0A bytes to the buffer, to increase the high water level for writing bytes + buffer.Write(this.writeData, 0, 0x0A); + + byte[] readData = new byte[0x20]; + + // Read these 0x0A bytes. + var read = buffer.Read(readData, 0, 0x0A); + Assert.Equal(0x0A, read); + + // Assert the initial state of the buffer + Assert.Equal(0x00, buffer.AvailableReadableBytes); + Assert.Equal(0x10, buffer.AvailableWritableBytes); + Assert.Equal(0x0A, buffer.ReadWaterMark); + Assert.Equal(0x0A, buffer.WriteWaterMark); + + // Write out 0x0A bytes to the buffer, this will cause the buffer to wrap + buffer.Write(this.writeData, 0, 0x0A); + + Assert.Equal(0x0A, buffer.AvailableReadableBytes); + Assert.Equal(0x06, buffer.AvailableWritableBytes); + Assert.Equal(0x0A, buffer.ReadWaterMark); + Assert.Equal(0x04, buffer.WriteWaterMark); + + // Write an additional 0x0A bytes, but now in reverse order. This will cause the buffer to be resized. + Array.Reverse(this.writeData); + + buffer.Write(this.writeData, 0, 0x0A); + + // Make sure the buffer has been resized. + Assert.Equal(0x20, buffer.Size); + Assert.Equal(0x14, buffer.AvailableReadableBytes); // 2 * 0x0A = 0x14 + Assert.Equal(0x0C, buffer.AvailableWritableBytes); // 0x20 - 0x14 = 0x0C + Assert.Equal(0x0A, buffer.ReadWaterMark); + Assert.Equal(0x1E, buffer.WriteWaterMark); + + // Read data, and verify the read data + read = buffer.Read(readData, 0, 0x14); + Assert.Equal(0xF0, readData[0]); + Assert.Equal(0xF1, readData[1]); + Assert.Equal(0xF2, readData[2]); + Assert.Equal(0xF3, readData[3]); + Assert.Equal(0xF4, readData[4]); + Assert.Equal(0xF5, readData[5]); + Assert.Equal(0xF6, readData[6]); + Assert.Equal(0xF7, readData[7]); + Assert.Equal(0xF8, readData[8]); + Assert.Equal(0xF9, readData[9]); + + Assert.Equal(0xFF, readData[10]); + Assert.Equal(0xFE, readData[11]); + Assert.Equal(0xFD, readData[12]); + Assert.Equal(0xFC, readData[13]); + Assert.Equal(0xFB, readData[14]); + Assert.Equal(0xFA, readData[15]); + Assert.Equal(0xF9, readData[16]); + Assert.Equal(0xF8, readData[17]); + Assert.Equal(0xF7, readData[18]); + Assert.Equal(0xF6, readData[19]); + } + + /// + /// Tests a call to which wants to read more data + /// than is available. + /// + [Fact] + public void ReadTooMuchDataTest() + { + var buffer = new ByteBuffer(); + + var readData = new byte[0x10]; + + // Read 0x010 bytes of data when only 0x06 are available + buffer.Write(this.writeData, 0, 0x06); + + var read = buffer.Read(readData, 0, readData.Length); + Assert.Equal(0x06, read); + + Assert.Equal(0xF0, readData[0]); + Assert.Equal(0xF1, readData[1]); + Assert.Equal(0xF2, readData[2]); + Assert.Equal(0xF3, readData[3]); + Assert.Equal(0xF4, readData[4]); + Assert.Equal(0xF5, readData[5]); + Assert.Equal(0x00, readData[6]); + } + + /// + /// Tests a call to when no data is available; and makes + /// sure the call blocks until data is available. + /// + [Fact] + public void ReadBlocksUntilDataAvailableTest() + { + // Makes sure that the Read method does not return until data is available. + var buffer = new ByteBuffer(); + var readData = new byte[0x10]; + var read = 0; + + // Kick off a read operation + var readTask = Task.Run(() => read = buffer.Read(readData, 0, readData.Length)); + Thread.Sleep(250); + Assert.False(readTask.IsCompleted); + + // Write data to the buffer + buffer.Write(this.writeData, 0, 0x03); + + Thread.Sleep(250); + + Assert.True(readTask.IsCompleted); + + Assert.Equal(3, read); + Assert.Equal(0xF0, readData[0]); + Assert.Equal(0xF1, readData[1]); + Assert.Equal(0xF2, readData[2]); + Assert.Equal(0x00, readData[3]); + } + + /// + /// Tests reading until the end of the file. + /// + [Fact] + public void ReadUntilEndOfFileTest() + { + ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100); + + // There's no real guarantee that this will be the case because the ArrayPool does not guarantee + // a specific buffer size. So let's assert this first to make sure the test fails should this + // assumption not hold. + Assert.Equal(0x10, buffer.Size); + + buffer.Write(this.writeData, 0, 2); + buffer.Write(this.writeData, 2, 2); + buffer.WriteEnd(); + + // Assert the initial state of the buffer + Assert.Equal(0x04, buffer.AvailableReadableBytes); + Assert.Equal(0x0C, buffer.AvailableWritableBytes); + Assert.Equal(0x00, buffer.ReadWaterMark); + Assert.Equal(0x04, buffer.WriteWaterMark); + + // Read the data on a chunk-by-chunk basis + byte[] readData = new byte[0x03]; + var read = buffer.Read(readData, 0, 3); + Assert.Equal(3, read); + Assert.Equal(0xF0, readData[0]); + Assert.Equal(0xF1, readData[1]); + Assert.Equal(0xF2, readData[2]); + + read = buffer.Read(readData, 0, 3); + Assert.Equal(1, read); + Assert.Equal(0xF3, readData[0]); + } + + /// + /// Tests reading until the end of a file, piecemeal. + /// + [Fact] + public void ReadUntilEndOfFileTest2() + { + ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100); + + // There's no real guarantee that this will be the case because the ArrayPool does not guarantee + // a specific buffer size. So let's assert this first to make sure the test fails should this + // assumption not hold. + Assert.Equal(0x10, buffer.Size); + + buffer.Write(this.writeData, 0, 2); + buffer.Write(this.writeData, 2, 2); + buffer.WriteEnd(); + + // Assert the initial state of the buffer + Assert.Equal(0x04, buffer.AvailableReadableBytes); + Assert.Equal(0x0C, buffer.AvailableWritableBytes); + Assert.Equal(0x00, buffer.ReadWaterMark); + Assert.Equal(0x04, buffer.WriteWaterMark); + + // Read the data at once + byte[] readData = new byte[0x10]; + var read = buffer.Read(readData, 0, 0x10); + Assert.Equal(4, read); + Assert.Equal(0xF0, readData[0]); + Assert.Equal(0xF1, readData[1]); + Assert.Equal(0xF2, readData[2]); + Assert.Equal(0xF3, readData[3]); + Assert.Equal(0x00, readData[4]); + + read = buffer.Read(readData, 0, 0x10); + Assert.Equal(0, read); + } + } +} From 5e5606a2900156a1157506fa263ff6e4606288e3 Mon Sep 17 00:00:00 2001 From: Frederik Carlier Date: Fri, 22 Dec 2017 11:05:43 +0100 Subject: [PATCH 2/2] Add missing using statement, fix unit test --- src/StreamDemuxer.cs | 1 + tests/ByteBufferTests.cs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/StreamDemuxer.cs b/src/StreamDemuxer.cs index f37d91949..4712b0d14 100644 --- a/src/StreamDemuxer.cs +++ b/src/StreamDemuxer.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.IO; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; diff --git a/tests/ByteBufferTests.cs b/tests/ByteBufferTests.cs index aa4cb7db2..5bdca7cd3 100644 --- a/tests/ByteBufferTests.cs +++ b/tests/ByteBufferTests.cs @@ -183,8 +183,8 @@ public void ResizeWriteTest() Assert.Equal(0x20, buffer.Size); Assert.Equal(0x14, buffer.AvailableReadableBytes); // 2 * 0x0A = 0x14 Assert.Equal(0x0C, buffer.AvailableWritableBytes); // 0x20 - 0x14 = 0x0C - Assert.Equal(0x0A, buffer.ReadWaterMark); - Assert.Equal(0x1E, buffer.WriteWaterMark); + Assert.Equal(0x1A, buffer.ReadWaterMark); + Assert.Equal(0x0E, buffer.WriteWaterMark); // Read data, and verify the read data read = buffer.Read(readData, 0, 0x14);