@@ -369,11 +369,13 @@ public void Write(ReadOnlySpan<byte> span)
369
369
370
370
#region PipeStream
371
371
/// <summary>Represents a stream backed by a <see cref="Pipe"/>. Data written to the stream can be read back out.</summary>
372
- /// <remarks>The stream supports a single reader and single writer operating in parallel.</remarks>
372
+ /// <remarks>The stream supports a single reader and single writer operating in parallel. After the stream is closed, the remaining
373
+ /// data can still be read from it.
374
+ /// </remarks>
373
375
sealed class PipeStream : Stream
374
376
{
375
- public PipeStream ( ) : this ( new Pipe ( ) , true ) { }
376
- public PipeStream ( Pipe pipe , bool ownPipe ) => ( this . readPipe , this . ownPipe ) = ( pipe , ownPipe ) ;
377
+ public PipeStream ( ) : this ( new Pipe ( ) ) { }
378
+ public PipeStream ( Pipe pipe ) => this . readPipe = pipe ?? throw new ArgumentNullException ( nameof ( pipe ) ) ;
377
379
378
380
/// <inheritdoc/>
379
381
public override bool CanRead => true ;
@@ -382,7 +384,7 @@ public PipeStream() : this(new Pipe(), true) { }
382
384
public override bool CanSeek => false ;
383
385
384
386
/// <inheritdoc/>
385
- public override bool CanWrite => false ;
387
+ public override bool CanWrite => true ;
386
388
387
389
/// <inheritdoc/>
388
390
/// <remarks>This property is not supported and throws <see cref="NotSupportedException"/>.</remarks>
@@ -393,41 +395,23 @@ public PipeStream() : this(new Pipe(), true) { }
393
395
public override long Position { get => throw new NotSupportedException ( ) ; set => throw new NotSupportedException ( ) ; }
394
396
395
397
/// <inheritdoc/>
396
- public override void Flush ( ) => AssertNotDisposed ( ) ; // TODO: should we make Flush wait until data has been sent over the wire?
397
-
398
- /// <summary>Queues data that can later be read from the stream.</summary>
399
- public void QueueData ( byte [ ] data , int count ) => readPipe . Write ( data , 0 , count ) ;
400
-
401
- /// <summary>Queues data that can later be read from the stream.</summary>
402
- public void QueueData ( ReadOnlySpan < byte > data ) => readPipe . Write ( data ) ;
398
+ public override void Flush ( ) { }
403
399
404
400
/// <inheritdoc/>
405
- public override int Read ( byte [ ] buffer , int offset , int count )
406
- {
407
- AssertNotDisposed ( ) ;
408
- return readPipe . Read ( buffer , offset , count ) ;
409
- }
401
+ public override int Read ( byte [ ] buffer , int offset , int count ) => readPipe . Read ( buffer , offset , count ) ;
410
402
411
403
#if NETCOREAPP2_1
412
404
/// <inheritdoc/>
413
- public override int Read ( Span < byte > buffer )
414
- {
415
- AssertNotDisposed ( ) ;
416
- return readPipe . Read ( buffer ) ;
417
- }
405
+ public override int Read ( Span < byte > buffer ) => readPipe . Read ( buffer ) ;
418
406
419
407
/// <inheritdoc/>
420
- public override ValueTask < int > ReadAsync ( Memory < byte > buffer , CancellationToken cancellationToken = default )
421
- {
422
- AssertNotDisposed ( ) ;
423
- return readPipe . ReadAsync ( buffer , cancellationToken ) ;
424
- }
408
+ public override ValueTask < int > ReadAsync ( Memory < byte > buffer , CancellationToken cancellationToken = default ) =>
409
+ readPipe . ReadAsync ( buffer , cancellationToken ) ;
425
410
#endif
426
411
427
412
/// <inheritdoc/>
428
413
public override Task < int > ReadAsync ( byte [ ] buffer , int offset , int count , CancellationToken cancelToken )
429
414
{
430
- AssertNotDisposed ( ) ;
431
415
#if NETCOREAPP2_1
432
416
return readPipe . ReadAsync ( buffer , offset , count , cancelToken ) . AsTask ( ) ;
433
417
#else
@@ -444,27 +428,31 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
444
428
public override void SetLength ( long value ) => throw new NotSupportedException ( ) ;
445
429
446
430
/// <inheritdoc/>
447
- public override void Write ( byte [ ] buffer , int offset , int count ) => throw new NotSupportedException ( ) ;
431
+ public override void Write ( byte [ ] buffer , int offset , int count ) => readPipe . Write ( buffer , offset , count ) ;
432
+
433
+ /// <inheritdoc/>
434
+ public override Task WriteAsync ( byte [ ] buffer , int offset , int count , CancellationToken cancellationToken )
435
+ {
436
+ readPipe . Write ( buffer , offset , count ) ;
437
+ return Task . FromResult ( false ) ;
438
+ }
448
439
449
440
#if NETCOREAPP2_1
450
441
/// <inheritdoc/>
451
- public override void Write ( ReadOnlySpan < byte > buffer ) => throw new NotSupportedException ( ) ;
452
- #endif
442
+ public override void Write ( ReadOnlySpan < byte > buffer ) => readPipe . Write ( buffer ) ;
453
443
454
444
/// <inheritdoc/>
455
- protected override void Dispose ( bool manuallyDisposed )
445
+ public override ValueTask WriteAsync ( ReadOnlyMemory < byte > buffer , CancellationToken cancellationToken = default )
456
446
{
457
- disposed = true ;
458
- if ( ownPipe ) readPipe . Dispose ( ) ;
447
+ readPipe . Write ( buffer . Span ) ;
448
+ return default ;
459
449
}
450
+ #endif
460
451
461
- void AssertNotDisposed ( )
462
- {
463
- if ( disposed ) throw new ObjectDisposedException ( GetType ( ) . Name ) ;
464
- }
452
+ /// <inheritdoc/>
453
+ protected override void Dispose ( bool manuallyDisposed ) => readPipe . Finish ( ) ;
465
454
466
455
readonly Pipe readPipe ;
467
- bool disposed , ownPipe ;
468
456
}
469
457
#endregion
470
458
}
0 commit comments