diff --git a/src/ByteBuffer.cs b/src/ByteBuffer.cs
new file mode 100644
index 000000000..8fc8afb0e
--- /dev/null
+++ b/src/ByteBuffer.cs
@@ -0,0 +1,303 @@
+using System;
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading;
+
+namespace k8s
+{
+ // There may be already an async implementation that we can use:
+ // https://github.com/StephenCleary/AsyncEx/wiki/AsyncProducerConsumerQueue
+ // However, they focus on individual objects and may not be a good choice for use with fixed-with byte buffers
+
+ ///
+ /// Represents a bounded buffer. A dedicated thread can send bytes to this buffer (the producer); while another thread can
+ /// read bytes from this buffer (the consumer).
+ ///
+ ///
+ /// This is a producer-consumer problem (or bounded-buffer problem), see https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
+ ///
+ public class ByteBuffer : IDisposable
+ {
+ private const int DefaultBufferSize = 4 * 1024; // 4 KB
+ private const int DefaultMaximumSize = 40 * 1024 * 1024; // 40 MB
+
+ private readonly int maximumSize;
+ private readonly AutoResetEvent dataAvailable = new AutoResetEvent(initialState: false);
+ private readonly object lockObject = new object();
+
+ private byte[] buffer;
+ private int bytesWritten = 0;
+ private int bytesRead = 0;
+
+ ///
+ /// Used by a writer to indicate the end of the file. When set, the reader will be notified that no
+ /// more data is available.
+ ///
+ private bool endOfFile;
+
+ ///
+ /// Initializes a new instance of the class using the default buffer size and limit.
+ ///
+ public ByteBuffer()
+ : this(DefaultBufferSize, DefaultMaximumSize)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The initial buffer size.
+ ///
+ ///
+ /// The maximum buffer size.
+ ///
+ public ByteBuffer(int bufferSize, int maximumSize)
+ {
+ this.maximumSize = maximumSize;
+ this.buffer = ArrayPool.Shared.Rent(bufferSize);
+ this.endOfFile = false;
+ }
+
+ ///
+ /// Gets the current buffer size.
+ ///
+ public int Size
+ {
+ get { return this.buffer.Length; }
+ }
+
+ ///
+ /// Gets the offset from which the next byte will be read. Increased every time a caller reads data.
+ ///
+ public int ReadWaterMark
+ {
+ get;
+ private set;
+ }
+
+ ///
+ /// Gets the offset to which the next byte will be written. Increased every time a caller writes data.
+ ///
+ public int WriteWaterMark
+ {
+ get;
+ private set;
+ }
+
+ ///
+ /// Gets the amount of bytes availble for reading.
+ ///
+ public int AvailableReadableBytes
+ {
+ get
+ {
+ lock (this.lockObject)
+ {
+ if (this.ReadWaterMark == this.WriteWaterMark)
+ {
+ return 0;
+ }
+ else if (this.ReadWaterMark < this.WriteWaterMark)
+ {
+ return this.WriteWaterMark - this.ReadWaterMark;
+ }
+ else
+ {
+ return
+
+ // Bytes available at the end of the array
+ this.buffer.Length - this.ReadWaterMark
+
+ // Bytes available at the start of the array
+ + this.WriteWaterMark;
+ }
+ }
+ }
+ }
+
+ ///
+ /// Gets the amount of bytes available for writing.
+ ///
+ public int AvailableWritableBytes
+ {
+ get
+ {
+ lock (this.lockObject)
+ {
+ if (this.WriteWaterMark > this.ReadWaterMark)
+ {
+ return
+ /* Available bytes at the end of the buffer */
+ this.buffer.Length - this.WriteWaterMark
+ /* Available bytes at the start of the buffer */
+ + this.ReadWaterMark;
+ }
+ else if (this.WriteWaterMark == this.ReadWaterMark)
+ {
+ return this.buffer.Length;
+ }
+ else
+ {
+ return this.ReadWaterMark - this.WriteWaterMark;
+ }
+ }
+ }
+ }
+
+ ///
+ public void Dispose()
+ {
+ ArrayPool.Shared.Return(this.buffer);
+ }
+
+ ///
+ /// Writes bytes to the buffer.
+ ///
+ ///
+ /// The source byte array from which to read the bytes.
+ ///
+ ///
+ /// The offset of the first byte to copy.
+ ///
+ ///
+ /// The amount of bytes to copy.
+ ///
+ public void Write(byte[] data, int offset, int length)
+ {
+ lock (this.lockObject)
+ {
+ // Does the data fit?
+ // We must make sure that ReadWaterMark != WriteWaterMark; that would indicate 'all data has been read' instead
+ // instead of 'all data must be read'
+ if (this.AvailableWritableBytes <= length)
+ {
+ // Grow the buffer
+ this.Grow(this.buffer.Length + length - this.AvailableWritableBytes + 1);
+ }
+
+ // Write the data; first the data that fits between the write watermark and the end of the buffer
+ int availableBeforeWrapping = this.buffer.Length - this.WriteWaterMark;
+
+ Array.Copy(data, offset, this.buffer, this.WriteWaterMark, Math.Min(availableBeforeWrapping, length));
+ this.WriteWaterMark += Math.Min(availableBeforeWrapping, length);
+
+ if (length > availableBeforeWrapping)
+ {
+ Array.Copy(data, offset + availableBeforeWrapping, this.buffer, 0, length - availableBeforeWrapping);
+ this.WriteWaterMark = length - availableBeforeWrapping;
+ }
+
+ this.bytesWritten += length;
+ Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
+ }
+
+ this.dataAvailable.Set();
+ }
+
+ ///
+ /// Stops writing data to the buffer, indicating that the end of file has been reached.
+ ///
+ public void WriteEnd()
+ {
+ lock (this.lockObject)
+ {
+ this.endOfFile = true;
+ this.dataAvailable.Set();
+ }
+ }
+
+ ///
+ /// Reads bytes from the buffer.
+ ///
+ ///
+ /// The byte array into which to read the data.
+ ///
+ ///
+ /// The offset at which to start writing the bytes.
+ ///
+ ///
+ /// The amount of bytes to read.
+ ///
+ ///
+ /// The total number of bytes read.
+ ///
+ public int Read(byte[] data, int offset, int count)
+ {
+ while (this.AvailableReadableBytes == 0 && !this.endOfFile)
+ {
+ this.dataAvailable.WaitOne();
+ }
+
+ int toRead = 0;
+
+ lock (this.lockObject)
+ {
+ // Signal the end of file to the caller.
+ if (this.AvailableReadableBytes == 0 && this.endOfFile)
+ {
+ return 0;
+ }
+
+ toRead = Math.Min(this.AvailableReadableBytes, count);
+
+ int availableBeforeWrapping = this.buffer.Length - this.ReadWaterMark;
+
+ Array.Copy(this.buffer, this.ReadWaterMark, data, offset, Math.Min(availableBeforeWrapping, toRead));
+ this.ReadWaterMark += Math.Min(availableBeforeWrapping, toRead);
+
+ if (toRead > availableBeforeWrapping)
+ {
+ Array.Copy(this.buffer, 0, data, offset + availableBeforeWrapping, toRead - availableBeforeWrapping);
+ this.ReadWaterMark = toRead - availableBeforeWrapping;
+ }
+
+ this.bytesRead += toRead;
+ Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
+ }
+
+ return toRead;
+ }
+
+ ///
+ /// Increases the buffer size. Any call to this method must be protected with a lock.
+ ///
+ ///
+ /// The new buffer size.
+ ///
+ private void Grow(int size)
+ {
+ if (size > this.maximumSize)
+ {
+ throw new OutOfMemoryException();
+ }
+
+ var newBuffer = ArrayPool.Shared.Rent(size);
+
+ if (this.WriteWaterMark <= this.ReadWaterMark)
+ {
+ // Copy the data at the start
+ Array.Copy(this.buffer, 0, newBuffer, 0, this.WriteWaterMark);
+
+ int trailingDataLength = this.buffer.Length - this.ReadWaterMark;
+ Array.Copy(this.buffer,
+ sourceIndex: this.ReadWaterMark,
+ destinationArray: newBuffer,
+ destinationIndex: newBuffer.Length - trailingDataLength,
+ length: trailingDataLength);
+
+ this.ReadWaterMark += newBuffer.Length - this.buffer.Length;
+ }
+ else
+ {
+ // ... [Read WM] ... [Write WM] ... [newly available space]
+ Array.Copy(this.buffer, 0, newBuffer, 0, this.buffer.Length);
+ }
+
+ ArrayPool.Shared.Return(this.buffer);
+ this.buffer = newBuffer;
+
+ Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
+ }
+ }
+}
diff --git a/src/MuxedStream.cs b/src/MuxedStream.cs
new file mode 100644
index 000000000..021529fe3
--- /dev/null
+++ b/src/MuxedStream.cs
@@ -0,0 +1,78 @@
+using System;
+using System.IO;
+
+namespace k8s
+{
+ public class MuxedStream : Stream
+ {
+ private ByteBuffer inputBuffer;
+ private byte? outputIndex;
+ private StreamDemuxer muxer;
+
+ public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputIndex)
+ {
+ this.inputBuffer = inputBuffer;
+ this.outputIndex = outputIndex;
+
+ if (this.inputBuffer == null && outputIndex == null)
+ {
+ throw new ArgumentException("You must specify at least inputBuffer or outputIndex");
+ }
+
+ this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
+ }
+
+ public override bool CanRead => this.inputBuffer != null;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => this.outputIndex != null;
+
+ public override long Length => throw new NotSupportedException();
+
+ public override long Position
+ {
+ get => throw new NotSupportedException();
+ set => throw new NotSupportedException();
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ if (this.outputIndex == null)
+ {
+ throw new InvalidOperationException();
+ }
+ else
+ {
+ this.muxer.Write(this.outputIndex.Value, buffer, offset, count).GetAwaiter().GetResult();
+ }
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (this.inputBuffer == null)
+ {
+ throw new InvalidOperationException();
+ }
+ else
+ {
+ return this.inputBuffer.Read(buffer, offset, count);
+ }
+ }
+
+ public override void Flush()
+ {
+ throw new NotSupportedException();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void SetLength(long value)
+ {
+ throw new NotSupportedException();
+ }
+ }
+}
diff --git a/src/StreamDemuxer.cs b/src/StreamDemuxer.cs
new file mode 100644
index 000000000..4712b0d14
--- /dev/null
+++ b/src/StreamDemuxer.cs
@@ -0,0 +1,128 @@
+using System;
+using System.Buffers;
+using System.Collections.Generic;
+using System.IO;
+using System.Net.WebSockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace k8s
+{
+ public class StreamDemuxer : IDisposable
+ {
+ private readonly WebSocket webSocket;
+ private readonly Dictionary buffers = new Dictionary();
+ private readonly CancellationTokenSource cts = new CancellationTokenSource();
+ private Task runLoop;
+
+ public StreamDemuxer(WebSocket webSocket)
+ {
+ this.webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
+ }
+
+ public event EventHandler ConnectionClosed;
+
+ public void Start()
+ {
+ this.runLoop = this.RunLoop(this.cts.Token);
+ }
+
+ public void Dispose()
+ {
+ if (this.runLoop != null)
+ {
+ this.cts.Cancel();
+ this.runLoop.Wait();
+ }
+ }
+
+ public Stream GetStream(byte? inputIndex, byte? outputIndex)
+ {
+ if (inputIndex != null && !this.buffers.ContainsKey(inputIndex.Value))
+ {
+ lock (this.buffers)
+ {
+ var buffer = new ByteBuffer();
+ this.buffers.Add(inputIndex.Value, buffer);
+ }
+ }
+
+ var inputBuffer = inputIndex == null ? null : this.buffers[inputIndex.Value];
+ return new MuxedStream(this, inputBuffer, outputIndex);
+ }
+
+ public async Task Write(byte index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ byte[] writeBuffer = ArrayPool.Shared.Rent(count + 1);
+
+ try
+ {
+ writeBuffer[0] = (byte)index;
+ Array.Copy(buffer, offset, writeBuffer, 1, count);
+ ArraySegment segment = new ArraySegment(writeBuffer, 0, count + 1);
+ await this.webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(writeBuffer);
+ }
+ }
+
+ protected async Task RunLoop(CancellationToken cancellationToken)
+ {
+ // Get a 1KB buffer
+ byte[] buffer = ArrayPool.Shared.Rent(1024 * 1024);
+
+ try
+ {
+ var segment = new ArraySegment(buffer);
+
+ while (!cancellationToken.IsCancellationRequested && this.webSocket.CloseStatus == null)
+ {
+ // We always get data in this format:
+ // [stream index] (1 for stdout, 2 for stderr)
+ // [payload]
+ var result = await this.webSocket.ReceiveAsync(segment, cancellationToken).ConfigureAwait(false);
+
+ // Ignore empty messages
+ if (result.Count < 2)
+ {
+ continue;
+ }
+
+ var streamIndex = buffer[0];
+ var extraByteCount = 1;
+
+ while (true)
+ {
+ if (this.buffers.ContainsKey(streamIndex))
+ {
+ this.buffers[streamIndex].Write(buffer, extraByteCount, result.Count - extraByteCount);
+ }
+
+ if (result.EndOfMessage == true)
+ {
+ break;
+ }
+
+ extraByteCount = 0;
+ result = await this.webSocket.ReceiveAsync(segment, cancellationToken).ConfigureAwait(false);
+ }
+ }
+
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(buffer);
+ this.runLoop = null;
+
+ foreach (var b in this.buffers.Values)
+ {
+ b.WriteEnd();
+ }
+
+ this.ConnectionClosed?.Invoke(this, EventArgs.Empty);
+ }
+ }
+ }
+}
diff --git a/tests/AuthTests.cs b/tests/AuthTests.cs
index dccd6724d..1e2a4b6db 100644
--- a/tests/AuthTests.cs
+++ b/tests/AuthTests.cs
@@ -315,4 +315,4 @@ public void Token()
}
}
}
-}
+}
diff --git a/tests/ByteBufferTests.cs b/tests/ByteBufferTests.cs
new file mode 100644
index 000000000..5bdca7cd3
--- /dev/null
+++ b/tests/ByteBufferTests.cs
@@ -0,0 +1,344 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace k8s.tests
+{
+ ///
+ /// Tests the class.
+ ///
+ public class ByteBufferTests
+ {
+ private readonly byte[] writeData = new byte[] { 0xF0, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF };
+
+ ///
+ /// Tests a sequential read and write operation.
+ ///
+ [Fact]
+ public void LinearReadWriteTest()
+ {
+ ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100);
+
+ // There's no real guarantee that this will be the case because the ArrayPool does not guarantee
+ // a specific buffer size. So let's assert this first to make sure the test fails should this
+ // assumption not hold.
+ Assert.Equal(0x10, buffer.Size);
+
+ // Assert the initial values.
+ Assert.Equal(0, buffer.AvailableReadableBytes);
+ Assert.Equal(0x10, buffer.AvailableWritableBytes);
+ Assert.Equal(0, buffer.ReadWaterMark);
+ Assert.Equal(0, buffer.WriteWaterMark);
+
+ // Write two bytes
+ buffer.Write(this.writeData, 0, 2);
+
+ Assert.Equal(2, buffer.AvailableReadableBytes);
+ Assert.Equal(0x0E, buffer.AvailableWritableBytes);
+ Assert.Equal(0, buffer.ReadWaterMark);
+ Assert.Equal(2, buffer.WriteWaterMark);
+
+ // Read two bytes, one byte at a time
+ byte[] readData = new byte[0x10];
+
+ var read = buffer.Read(readData, 0, 1);
+ Assert.Equal(1, read);
+
+ // Verify the result of the read operation.
+ Assert.Equal(0xF0, readData[0]);
+ Assert.Equal(0, readData[1]); // Make sure no additional data was read
+
+ // Check the state of the buffer
+ Assert.Equal(1, buffer.AvailableReadableBytes);
+ Assert.Equal(0x0F, buffer.AvailableWritableBytes);
+ Assert.Equal(1, buffer.ReadWaterMark);
+ Assert.Equal(2, buffer.WriteWaterMark);
+
+ // Read another byte
+ read = buffer.Read(readData, 1, 1);
+ Assert.Equal(1, read);
+
+ // Verify the result of the read operation.
+ Assert.Equal(0xF1, readData[1]);
+ Assert.Equal(0, readData[2]); // Make sure no additional data was read
+
+ // Check the state of the buffer
+ Assert.Equal(0, buffer.AvailableReadableBytes);
+ Assert.Equal(0x10, buffer.AvailableWritableBytes);
+ Assert.Equal(2, buffer.ReadWaterMark);
+ Assert.Equal(2, buffer.WriteWaterMark);
+ }
+
+ ///
+ /// Tests reading a writing which crosses the boundary (end) of the circular buffer.
+ ///
+ [Fact]
+ public void BoundaryReadWriteTest()
+ {
+ ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100);
+
+ // There's no real guarantee that this will be the case because the ArrayPool does not guarantee
+ // a specific buffer size. So let's assert this first to make sure the test fails should this
+ // assumption not hold.
+ Assert.Equal(0x10, buffer.Size);
+
+ // Write out 0x0A bytes to the buffer, to increase the high water level for writing bytes
+ buffer.Write(this.writeData, 0, 0x0A);
+
+ // Assert the initial values.
+ Assert.Equal(0x0A, buffer.AvailableReadableBytes);
+ Assert.Equal(0x06, buffer.AvailableWritableBytes);
+ Assert.Equal(0, buffer.ReadWaterMark);
+ Assert.Equal(0x0A, buffer.WriteWaterMark);
+
+ // Read 0x0A bytes, to increase the high water level for reading bytes
+ byte[] readData = new byte[0x10];
+ var read = buffer.Read(readData, 0, 0x0A);
+ Assert.Equal(0x0A, read);
+
+ Assert.Equal(0x00, buffer.AvailableReadableBytes);
+ Assert.Equal(0x10, buffer.AvailableWritableBytes);
+ Assert.Equal(0x0A, buffer.ReadWaterMark);
+ Assert.Equal(0x0A, buffer.WriteWaterMark);
+
+ // Write an additional 0x0A bytes, but now in reverse order. This will cause the data
+ // to be wrapped.
+ Array.Reverse(this.writeData);
+
+ buffer.Write(this.writeData, 0, 0x0A);
+
+ // Assert the resulting state of the buffer.
+ Assert.Equal(0x0A, buffer.AvailableReadableBytes);
+ Assert.Equal(0x06, buffer.AvailableWritableBytes);
+ Assert.Equal(0x0A, buffer.ReadWaterMark);
+ Assert.Equal(0x04, buffer.WriteWaterMark);
+
+ // Read ten bytes, this will be a wrapped read
+ read = buffer.Read(readData, 0, 0x0A);
+ Assert.Equal(0x0A, read);
+
+ // Verify the result of the read operation.
+ Assert.Equal(0xFF, readData[0]);
+ Assert.Equal(0xFE, readData[1]);
+ Assert.Equal(0xFD, readData[2]);
+ Assert.Equal(0xFC, readData[3]);
+ Assert.Equal(0xFB, readData[4]);
+ Assert.Equal(0xFA, readData[5]);
+ Assert.Equal(0xF9, readData[6]);
+ Assert.Equal(0xF8, readData[7]);
+ Assert.Equal(0xF7, readData[8]);
+ Assert.Equal(0xF6, readData[9]);
+ Assert.Equal(0, readData[10]); // Make sure no additional data was read
+
+ // Check the state of the buffer
+ Assert.Equal(0, buffer.AvailableReadableBytes);
+ Assert.Equal(0x10, buffer.AvailableWritableBytes);
+ Assert.Equal(4, buffer.ReadWaterMark);
+ Assert.Equal(4, buffer.WriteWaterMark);
+ }
+
+ ///
+ /// Tests resizing of the class.
+ ///
+ [Fact]
+ public void ResizeWriteTest()
+ {
+ ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100);
+
+ // There's no real guarantee that this will be the case because the ArrayPool does not guarantee
+ // a specific buffer size. So let's assert this first to make sure the test fails should this
+ // assumption not hold.
+ Assert.Equal(0x10, buffer.Size);
+
+ // Write out 0x0A bytes to the buffer, to increase the high water level for writing bytes
+ buffer.Write(this.writeData, 0, 0x0A);
+
+ byte[] readData = new byte[0x20];
+
+ // Read these 0x0A bytes.
+ var read = buffer.Read(readData, 0, 0x0A);
+ Assert.Equal(0x0A, read);
+
+ // Assert the initial state of the buffer
+ Assert.Equal(0x00, buffer.AvailableReadableBytes);
+ Assert.Equal(0x10, buffer.AvailableWritableBytes);
+ Assert.Equal(0x0A, buffer.ReadWaterMark);
+ Assert.Equal(0x0A, buffer.WriteWaterMark);
+
+ // Write out 0x0A bytes to the buffer, this will cause the buffer to wrap
+ buffer.Write(this.writeData, 0, 0x0A);
+
+ Assert.Equal(0x0A, buffer.AvailableReadableBytes);
+ Assert.Equal(0x06, buffer.AvailableWritableBytes);
+ Assert.Equal(0x0A, buffer.ReadWaterMark);
+ Assert.Equal(0x04, buffer.WriteWaterMark);
+
+ // Write an additional 0x0A bytes, but now in reverse order. This will cause the buffer to be resized.
+ Array.Reverse(this.writeData);
+
+ buffer.Write(this.writeData, 0, 0x0A);
+
+ // Make sure the buffer has been resized.
+ Assert.Equal(0x20, buffer.Size);
+ Assert.Equal(0x14, buffer.AvailableReadableBytes); // 2 * 0x0A = 0x14
+ Assert.Equal(0x0C, buffer.AvailableWritableBytes); // 0x20 - 0x14 = 0x0C
+ Assert.Equal(0x1A, buffer.ReadWaterMark);
+ Assert.Equal(0x0E, buffer.WriteWaterMark);
+
+ // Read data, and verify the read data
+ read = buffer.Read(readData, 0, 0x14);
+ Assert.Equal(0xF0, readData[0]);
+ Assert.Equal(0xF1, readData[1]);
+ Assert.Equal(0xF2, readData[2]);
+ Assert.Equal(0xF3, readData[3]);
+ Assert.Equal(0xF4, readData[4]);
+ Assert.Equal(0xF5, readData[5]);
+ Assert.Equal(0xF6, readData[6]);
+ Assert.Equal(0xF7, readData[7]);
+ Assert.Equal(0xF8, readData[8]);
+ Assert.Equal(0xF9, readData[9]);
+
+ Assert.Equal(0xFF, readData[10]);
+ Assert.Equal(0xFE, readData[11]);
+ Assert.Equal(0xFD, readData[12]);
+ Assert.Equal(0xFC, readData[13]);
+ Assert.Equal(0xFB, readData[14]);
+ Assert.Equal(0xFA, readData[15]);
+ Assert.Equal(0xF9, readData[16]);
+ Assert.Equal(0xF8, readData[17]);
+ Assert.Equal(0xF7, readData[18]);
+ Assert.Equal(0xF6, readData[19]);
+ }
+
+ ///
+ /// Tests a call to which wants to read more data
+ /// than is available.
+ ///
+ [Fact]
+ public void ReadTooMuchDataTest()
+ {
+ var buffer = new ByteBuffer();
+
+ var readData = new byte[0x10];
+
+ // Read 0x010 bytes of data when only 0x06 are available
+ buffer.Write(this.writeData, 0, 0x06);
+
+ var read = buffer.Read(readData, 0, readData.Length);
+ Assert.Equal(0x06, read);
+
+ Assert.Equal(0xF0, readData[0]);
+ Assert.Equal(0xF1, readData[1]);
+ Assert.Equal(0xF2, readData[2]);
+ Assert.Equal(0xF3, readData[3]);
+ Assert.Equal(0xF4, readData[4]);
+ Assert.Equal(0xF5, readData[5]);
+ Assert.Equal(0x00, readData[6]);
+ }
+
+ ///
+ /// Tests a call to when no data is available; and makes
+ /// sure the call blocks until data is available.
+ ///
+ [Fact]
+ public void ReadBlocksUntilDataAvailableTest()
+ {
+ // Makes sure that the Read method does not return until data is available.
+ var buffer = new ByteBuffer();
+ var readData = new byte[0x10];
+ var read = 0;
+
+ // Kick off a read operation
+ var readTask = Task.Run(() => read = buffer.Read(readData, 0, readData.Length));
+ Thread.Sleep(250);
+ Assert.False(readTask.IsCompleted);
+
+ // Write data to the buffer
+ buffer.Write(this.writeData, 0, 0x03);
+
+ Thread.Sleep(250);
+
+ Assert.True(readTask.IsCompleted);
+
+ Assert.Equal(3, read);
+ Assert.Equal(0xF0, readData[0]);
+ Assert.Equal(0xF1, readData[1]);
+ Assert.Equal(0xF2, readData[2]);
+ Assert.Equal(0x00, readData[3]);
+ }
+
+ ///
+ /// Tests reading until the end of the file.
+ ///
+ [Fact]
+ public void ReadUntilEndOfFileTest()
+ {
+ ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100);
+
+ // There's no real guarantee that this will be the case because the ArrayPool does not guarantee
+ // a specific buffer size. So let's assert this first to make sure the test fails should this
+ // assumption not hold.
+ Assert.Equal(0x10, buffer.Size);
+
+ buffer.Write(this.writeData, 0, 2);
+ buffer.Write(this.writeData, 2, 2);
+ buffer.WriteEnd();
+
+ // Assert the initial state of the buffer
+ Assert.Equal(0x04, buffer.AvailableReadableBytes);
+ Assert.Equal(0x0C, buffer.AvailableWritableBytes);
+ Assert.Equal(0x00, buffer.ReadWaterMark);
+ Assert.Equal(0x04, buffer.WriteWaterMark);
+
+ // Read the data on a chunk-by-chunk basis
+ byte[] readData = new byte[0x03];
+ var read = buffer.Read(readData, 0, 3);
+ Assert.Equal(3, read);
+ Assert.Equal(0xF0, readData[0]);
+ Assert.Equal(0xF1, readData[1]);
+ Assert.Equal(0xF2, readData[2]);
+
+ read = buffer.Read(readData, 0, 3);
+ Assert.Equal(1, read);
+ Assert.Equal(0xF3, readData[0]);
+ }
+
+ ///
+ /// Tests reading until the end of a file, piecemeal.
+ ///
+ [Fact]
+ public void ReadUntilEndOfFileTest2()
+ {
+ ByteBuffer buffer = new ByteBuffer(bufferSize: 0x10, maximumSize: 0x100);
+
+ // There's no real guarantee that this will be the case because the ArrayPool does not guarantee
+ // a specific buffer size. So let's assert this first to make sure the test fails should this
+ // assumption not hold.
+ Assert.Equal(0x10, buffer.Size);
+
+ buffer.Write(this.writeData, 0, 2);
+ buffer.Write(this.writeData, 2, 2);
+ buffer.WriteEnd();
+
+ // Assert the initial state of the buffer
+ Assert.Equal(0x04, buffer.AvailableReadableBytes);
+ Assert.Equal(0x0C, buffer.AvailableWritableBytes);
+ Assert.Equal(0x00, buffer.ReadWaterMark);
+ Assert.Equal(0x04, buffer.WriteWaterMark);
+
+ // Read the data at once
+ byte[] readData = new byte[0x10];
+ var read = buffer.Read(readData, 0, 0x10);
+ Assert.Equal(4, read);
+ Assert.Equal(0xF0, readData[0]);
+ Assert.Equal(0xF1, readData[1]);
+ Assert.Equal(0xF2, readData[2]);
+ Assert.Equal(0xF3, readData[3]);
+ Assert.Equal(0x00, readData[4]);
+
+ read = buffer.Read(readData, 0, 0x10);
+ Assert.Equal(0, read);
+ }
+ }
+}