3
3
using System . ComponentModel ;
4
4
using System . IO . Pipelines ;
5
5
using System . Linq ;
6
+ using System . Reactive ;
6
7
using System . Reactive . Concurrency ;
7
8
using System . Reactive . Disposables ;
8
9
using System . Reactive . Linq ;
9
10
using System . Reactive . Subjects ;
10
11
using System . Text ;
11
12
using System . Threading ;
13
+ using System . Threading . Channels ;
12
14
using System . Threading . Tasks ;
13
15
using Microsoft . Extensions . Logging ;
14
16
@@ -20,11 +22,16 @@ public class OutputHandler : IOutputHandler
20
22
private readonly ISerializer _serializer ;
21
23
private readonly IEnumerable < IOutputFilter > _outputFilters ;
22
24
private readonly ILogger < OutputHandler > _logger ;
23
- private readonly Subject < object > _queue ;
24
- private readonly ReplaySubject < object > _delayedQueue ;
25
+
26
+
27
+ private readonly ChannelReader < object > _queue ;
28
+ private readonly Queue < object > _delayedQueue ;
25
29
private readonly TaskCompletionSource < object ? > _outputIsFinished ;
26
30
private readonly CompositeDisposable _disposable ;
27
31
private bool _delayComplete ;
32
+ private readonly CancellationTokenSource _stopProcessing ;
33
+ private readonly Channel < object > _channel ;
34
+ private readonly ChannelWriter < object > _writer ;
28
35
29
36
public OutputHandler (
30
37
PipeWriter pipeWriter ,
@@ -38,22 +45,26 @@ ILogger<OutputHandler> logger
38
45
_serializer = serializer ;
39
46
_outputFilters = outputFilters . ToArray ( ) ;
40
47
_logger = logger ;
41
- _queue = new Subject < object > ( ) ;
42
- _delayedQueue = new ReplaySubject < object > ( ) ;
48
+ _delayedQueue = new Queue < object > ( ) ;
43
49
_outputIsFinished = new TaskCompletionSource < object ? > ( ) ;
44
50
51
+ _channel = Channel . CreateUnbounded < object > (
52
+ new UnboundedChannelOptions ( ) {
53
+ AllowSynchronousContinuations = true ,
54
+ SingleReader = true ,
55
+ SingleWriter = false
56
+ }
57
+ ) ;
58
+ _queue = _channel . Reader ;
59
+ _writer = _channel . Writer ;
60
+
61
+ _stopProcessing = new CancellationTokenSource ( ) ;
45
62
_disposable = new CompositeDisposable {
46
- _queue
47
- . ObserveOn ( scheduler )
48
- . Select ( value => Observable . FromAsync ( ct => ProcessOutputStream ( value , ct ) ) )
49
- . Concat ( )
50
- . Subscribe ( ) ,
51
- _delayedQueue
52
- . ToArray ( )
53
- . SelectMany ( z => z )
54
- . Subscribe ( _queue . OnNext ) ,
55
- _queue ,
56
- _delayedQueue
63
+ Disposable . Create ( ( ) => _stopProcessing . Cancel ( ) ) ,
64
+ _stopProcessing ,
65
+ Observable . FromAsync ( ( ) => ProcessOutputStream ( _stopProcessing . Token ) )
66
+ . Do ( _ => { } , e => _logger . LogCritical ( e , "unhandled exception" ) )
67
+ . Subscribe ( )
57
68
} ;
58
69
}
59
70
@@ -64,34 +75,39 @@ private bool ShouldSend(object value)
64
75
65
76
public void Send ( object ? value )
66
77
{
67
- // _logger.LogTrace("Writing out value {@Value} ({Type})", value, value?.GetType().FullName);
68
-
69
78
try
70
79
{
71
- if ( _queue . IsDisposed || _disposable . IsDisposed || value == null ) return ;
72
- if ( ! ShouldSend ( value ) )
80
+ if ( _disposable . IsDisposed || value == null ) return ;
81
+ if ( ! ShouldSend ( value ) && ! _delayComplete )
73
82
{
74
- if ( _delayComplete || _delayedQueue . IsDisposed || ! _delayedQueue . HasObservers ) return ;
75
- _delayedQueue . OnNext ( value ) ;
83
+ _delayedQueue . Enqueue ( value ) ;
76
84
}
77
85
else
78
86
{
79
- _queue . OnNext ( value ) ;
87
+ _writer . TryWrite ( value ) ;
80
88
}
81
89
}
82
- catch ( ObjectDisposedException ) { }
90
+ catch ( ObjectDisposedException )
91
+ {
92
+ }
83
93
}
84
94
85
95
public void Initialized ( )
86
96
{
87
- if ( _delayComplete || _delayedQueue . IsDisposed || ! _delayedQueue . HasObservers ) return ;
88
- _delayedQueue . OnCompleted ( ) ;
97
+ if ( _delayComplete ) return ;
98
+ while ( _delayedQueue . Count > 0 )
99
+ {
100
+ var item = _delayedQueue . Dequeue ( ) ;
101
+ _writer . TryWrite ( item ) ;
102
+ }
103
+
89
104
_delayComplete = true ;
90
- _delayedQueue . Dispose ( ) ;
105
+ _delayedQueue . Clear ( ) ;
91
106
}
92
107
93
108
public async Task StopAsync ( )
94
109
{
110
+ _channel . Writer . TryComplete ( ) ;
95
111
await _pipeWriter . CompleteAsync ( ) . ConfigureAwait ( false ) ;
96
112
_disposable . Dispose ( ) ;
97
113
}
@@ -107,17 +123,21 @@ internal async Task WriteAndFlush()
107
123
await _pipeWriter . CompleteAsync ( ) . ConfigureAwait ( false ) ;
108
124
}
109
125
110
- private async Task ProcessOutputStream ( object value , CancellationToken cancellationToken )
126
+ private async Task ProcessOutputStream ( CancellationToken cancellationToken )
111
127
{
112
128
try
113
129
{
130
+ do
131
+ {
132
+ var value = await _queue . ReadAsync ( cancellationToken ) ;
114
133
// _logger.LogTrace("Writing out {@Value}", value);
115
- // TODO: this will be part of the serialization refactor to make streaming first class
116
- var content = _serializer . SerializeObject ( value ) ;
117
- var contentBytes = Encoding . UTF8 . GetBytes ( content ) . AsMemory ( ) ;
118
- await _pipeWriter . WriteAsync ( Encoding . UTF8 . GetBytes ( $ "Content-Length: { contentBytes . Length } \r \n \r \n ") , cancellationToken ) . ConfigureAwait ( false ) ;
119
- await _pipeWriter . WriteAsync ( contentBytes , cancellationToken ) . ConfigureAwait ( false ) ;
120
- await _pipeWriter . FlushAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
134
+ // TODO: this will be part of the serialization refactor to make streaming first class
135
+ var content = _serializer . SerializeObject ( value ) ;
136
+ var contentBytes = Encoding . UTF8 . GetBytes ( content ) . AsMemory ( ) ;
137
+ await _pipeWriter . WriteAsync ( Encoding . UTF8 . GetBytes ( $ "Content-Length: { contentBytes . Length } \r \n \r \n ") , cancellationToken ) . ConfigureAwait ( false ) ;
138
+ await _pipeWriter . WriteAsync ( contentBytes , cancellationToken ) . ConfigureAwait ( false ) ;
139
+ await _pipeWriter . FlushAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
140
+ } while ( true ) ;
121
141
}
122
142
catch ( OperationCanceledException ex ) when ( ex . CancellationToken != cancellationToken )
123
143
{
@@ -136,12 +156,14 @@ private async Task ProcessOutputStream(object value, CancellationToken cancellat
136
156
private void Error ( Exception ex )
137
157
{
138
158
_outputIsFinished . TrySetResult ( ex ) ;
159
+ _writer . TryComplete ( ) ;
139
160
_disposable . Dispose ( ) ;
140
161
}
141
162
142
163
public void Dispose ( )
143
164
{
144
165
_outputIsFinished . TrySetResult ( null ) ;
166
+ _writer . TryComplete ( ) ;
145
167
_disposable . Dispose ( ) ;
146
168
}
147
169
}
0 commit comments