@@ -29,9 +29,7 @@ This code is longer than the `IAsyncEnumerable<T>` code, because C# doesn't have
29
29
30
30
``` csharp
31
31
using System ;
32
- using System .Diagnostics ;
33
32
using System .Threading ;
34
- using System .Threading .Tasks ;
35
33
36
34
namespace Grpc .Core
37
35
{
@@ -43,14 +41,14 @@ namespace Grpc.Core
43
41
44
42
public GrpcStreamObservable (IAsyncStreamReader <T > reader , CancellationToken token = default )
45
43
{
46
- _reader = reader ;
44
+ _reader = reader ?? throw new ArgumentNullException ( nameof ( reader )) ;
47
45
_token = token ;
48
46
_used = 0 ;
49
47
}
50
48
51
49
public IDisposable Subscribe (IObserver <T > observer ) =>
52
50
Interlocked .Exchange (ref _used , 1 ) == 0
53
- ? new GrpcStreamSubscription (_reader , observer , _token )
51
+ ? new GrpcStreamSubscription < T > (_reader , observer , _token )
54
52
: throw new InvalidOperationException (" Subscribe can only be called once." );
55
53
56
54
}
@@ -63,28 +61,35 @@ namespace Grpc.Core
63
61
The ` GrpcStreamSubscription ` class handles the enumeration of the ` IAsyncStreamReader ` :
64
62
65
63
``` csharp
66
- public class GrpcStreamSubscription : IDisposable
64
+ public class GrpcStreamSubscription < T > : IDisposable
67
65
{
68
- private readonly Task _task ;
66
+ private readonly IAsyncStreamReader <T > _reader ;
67
+ private readonly IObserver <T > _observer ;
68
+
69
69
private readonly CancellationTokenSource _tokenSource ;
70
+
71
+ private readonly Task _task ;
72
+
70
73
private bool _completed ;
71
74
72
- public GrpcStreamSubscription (IAsyncStreamReader <T > reader , IObserver <T > observer , CancellationToken token )
75
+ public GrpcStreamSubscription (IAsyncStreamReader <T > reader , IObserver <T > observer , CancellationToken token = default )
73
76
{
74
- Debug .Assert (reader != null );
75
- Debug .Assert (observer != null );
77
+ _reader = reader ?? throw new ArgumentNullException (nameof (reader ));
78
+ _observer = observer ?? throw new ArgumentNullException (nameof (observer ));
79
+
76
80
_tokenSource = new CancellationTokenSource ();
77
81
token .Register (_tokenSource .Cancel );
78
- _task = Run (reader , observer , _tokenSource .Token );
82
+
83
+ _task = Run (_tokenSource .Token );
79
84
}
80
85
81
- private async Task Run (IAsyncStreamReader < T > reader , IObserver < T > observer , CancellationToken token )
86
+ private async Task Run (CancellationToken token )
82
87
{
83
88
while (! token .IsCancellationRequested )
84
89
{
85
90
try
86
91
{
87
- if (! await reader .MoveNext (token )) break ;
92
+ if (! await _reader .MoveNext (token )) break ;
88
93
}
89
94
catch (RpcException e ) when (e .StatusCode == Grpc .Core .StatusCode .NotFound )
90
95
{
@@ -96,16 +101,16 @@ public class GrpcStreamSubscription : IDisposable
96
101
}
97
102
catch (Exception e )
98
103
{
99
- observer .OnError (e );
104
+ _observer .OnError (e );
100
105
_completed = true ;
101
106
return ;
102
107
}
103
108
104
- observer .OnNext (reader .Current );
109
+ _observer .OnNext (_reader .Current );
105
110
}
106
111
107
112
_completed = true ;
108
- observer .OnCompleted ();
113
+ _observer .OnCompleted ();
109
114
}
110
115
111
116
public void Dispose ()
@@ -126,16 +131,16 @@ All that is required now is a simple extension method to create the observable f
126
131
127
132
``` csharp
128
133
using System ;
129
- using System .Diagnostics ;
130
134
using System .Threading ;
131
- using System .Threading .Tasks ;
132
135
133
136
namespace Grpc .Core
134
137
{
135
138
public static class AsyncStreamReaderObservableExtensions
136
139
{
137
- public static IObservable <T > AsObservable <T >(this IAsyncStreamReader <T > reader ) =>
138
- new GrpcStreamObservable <T >(reader );
140
+ public static IObservable <T > AsObservable <T >(
141
+ this IAsyncStreamReader <T > reader ,
142
+ CancellationToken cancellationToken = default ) =>
143
+ new GrpcStreamObservable <T >(reader , cancellationToken );
139
144
}
140
145
}
141
146
```
0 commit comments