Skip to content

Commit 8c42fdd

Browse files
committed
Simplify exec action by making stream close finish rather than dispose
1 parent d9a2ef6 commit 8c42fdd

File tree

3 files changed

+33
-43
lines changed

3 files changed

+33
-43
lines changed

src/KubernetesClient/Kubernetes.Exec.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ public async Task<int> NamespacedPodExecAsync(string name, string @namespace, st
2020
throw new ArgumentNullException(nameof(action));
2121
}
2222

23-
Stream stdIn = new PipeStream(), stdOut = new PipeStream(), stdErr = new PipeStream();
23+
PipeStream stdIn = new PipeStream(), stdOut = new PipeStream(), stdErr = new PipeStream();
2424
Task<V1Status> execTask = Request<V1Pod>(@namespace, name).Body(stdIn)
2525
.ExecCommandAsync(command.First(), command.Skip(1).ToArray(), container, stdOut, stdErr, tty, false, cancellationToken);
26-
await action(stdIn, stdOut, stdErr).ConfigureAwait(false);
27-
stdIn.Dispose(); // close STDIN just in case the action failed to do so and the remote process is waiting for it
26+
Task actionTask = action(stdIn, stdOut, stdErr);
2827
var status = await execTask.ConfigureAwait(false);
29-
if (status.Code.Value < 0)
28+
stdOut.Close(); // complete the output streams in case the action is blocked waiting for them
29+
stdErr.Close();
30+
await actionTask.ConfigureAwait(false);
31+
if (status.Code.Value < 0) // if the exit code is unknown, throw an exception
3032
{
3133
throw new KubernetesException(status);
3234
}

src/KubernetesClient/KubernetesRequest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public async Task<V1Status> ExecCommandAsync(
237237
V1Status status = await new SPDYExec(spdyConn, headers, stdin, stdout, stderr).RunAsync(cancelToken).ConfigureAwait(false);
238238
if (throwOnFailure && status.Status == "Failure") throw new KubernetesException(status);
239239
return status;
240-
}
240+
}
241241

242242
/// <summary>Executes the request and returns the deserialized response body (or the default value of type
243243
/// <typeparamref name="T"/> if the response was 404 Not Found).

src/KubernetesClient/PipeStream.cs

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,13 @@ public void Write(ReadOnlySpan<byte> span)
369369

370370
#region PipeStream
371371
/// <summary>Represents a stream backed by a <see cref="Pipe"/>. Data written to the stream can be read back out.</summary>
372-
/// <remarks>The stream supports a single reader and single writer operating in parallel.</remarks>
372+
/// <remarks>The stream supports a single reader and single writer operating in parallel. After the stream is closed, the remaining
373+
/// data can still be read from it.
374+
/// </remarks>
373375
sealed class PipeStream : Stream
374376
{
375-
public PipeStream() : this(new Pipe(), true) { }
376-
public PipeStream(Pipe pipe, bool ownPipe) => (this.readPipe, this.ownPipe) = (pipe, ownPipe);
377+
public PipeStream() : this(new Pipe()) { }
378+
public PipeStream(Pipe pipe) => this.readPipe = pipe ?? throw new ArgumentNullException(nameof(pipe));
377379

378380
/// <inheritdoc/>
379381
public override bool CanRead => true;
@@ -382,7 +384,7 @@ public PipeStream() : this(new Pipe(), true) { }
382384
public override bool CanSeek => false;
383385

384386
/// <inheritdoc/>
385-
public override bool CanWrite => false;
387+
public override bool CanWrite => true;
386388

387389
/// <inheritdoc/>
388390
/// <remarks>This property is not supported and throws <see cref="NotSupportedException"/>.</remarks>
@@ -393,41 +395,23 @@ public PipeStream() : this(new Pipe(), true) { }
393395
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
394396

395397
/// <inheritdoc/>
396-
public override void Flush() => AssertNotDisposed(); // TODO: should we make Flush wait until data has been sent over the wire?
397-
398-
/// <summary>Queues data that can later be read from the stream.</summary>
399-
public void QueueData(byte[] data, int count) => readPipe.Write(data, 0, count);
400-
401-
/// <summary>Queues data that can later be read from the stream.</summary>
402-
public void QueueData(ReadOnlySpan<byte> data) => readPipe.Write(data);
398+
public override void Flush() { }
403399

404400
/// <inheritdoc/>
405-
public override int Read(byte[] buffer, int offset, int count)
406-
{
407-
AssertNotDisposed();
408-
return readPipe.Read(buffer, offset, count);
409-
}
401+
public override int Read(byte[] buffer, int offset, int count) => readPipe.Read(buffer, offset, count);
410402

411403
#if NETCOREAPP2_1
412404
/// <inheritdoc/>
413-
public override int Read(Span<byte> buffer)
414-
{
415-
AssertNotDisposed();
416-
return readPipe.Read(buffer);
417-
}
405+
public override int Read(Span<byte> buffer) => readPipe.Read(buffer);
418406

419407
/// <inheritdoc/>
420-
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
421-
{
422-
AssertNotDisposed();
423-
return readPipe.ReadAsync(buffer, cancellationToken);
424-
}
408+
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) =>
409+
readPipe.ReadAsync(buffer, cancellationToken);
425410
#endif
426411

427412
/// <inheritdoc/>
428413
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken)
429414
{
430-
AssertNotDisposed();
431415
#if NETCOREAPP2_1
432416
return readPipe.ReadAsync(buffer, offset, count, cancelToken).AsTask();
433417
#else
@@ -444,27 +428,31 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
444428
public override void SetLength(long value) => throw new NotSupportedException();
445429

446430
/// <inheritdoc/>
447-
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
431+
public override void Write(byte[] buffer, int offset, int count) => readPipe.Write(buffer, offset, count);
432+
433+
/// <inheritdoc/>
434+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
435+
{
436+
readPipe.Write(buffer, offset, count);
437+
return Task.FromResult(false);
438+
}
448439

449440
#if NETCOREAPP2_1
450441
/// <inheritdoc/>
451-
public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException();
452-
#endif
442+
public override void Write(ReadOnlySpan<byte> buffer) => readPipe.Write(buffer);
453443

454444
/// <inheritdoc/>
455-
protected override void Dispose(bool manuallyDisposed)
445+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
456446
{
457-
disposed = true;
458-
if(ownPipe) readPipe.Dispose();
447+
readPipe.Write(buffer.Span);
448+
return default;
459449
}
450+
#endif
460451

461-
void AssertNotDisposed()
462-
{
463-
if(disposed) throw new ObjectDisposedException(GetType().Name);
464-
}
452+
/// <inheritdoc/>
453+
protected override void Dispose(bool manuallyDisposed) => readPipe.Finish();
465454

466455
readonly Pipe readPipe;
467-
bool disposed, ownPipe;
468456
}
469457
#endregion
470458
}

0 commit comments

Comments
 (0)