Skip to content

Commit 1d7f988

Browse files
authored
Add Channeler example (#1851)
1 parent 4f06b60 commit 1d7f988

12 files changed

+403
-1
lines changed

examples/Channeler/Channeler.sln

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.29230.61
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Server", "Server\Server.csproj", "{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}"
7+
EndProject
8+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Client", "Client\Client.csproj", "{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}"
9+
EndProject
10+
Global
11+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
12+
Debug|Any CPU = Debug|Any CPU
13+
Release|Any CPU = Release|Any CPU
14+
EndGlobalSection
15+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
16+
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
17+
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
18+
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
19+
{534AC5F8-2DF2-40BD-87A5-B3D8310118C4}.Release|Any CPU.Build.0 = Release|Any CPU
20+
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
21+
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Debug|Any CPU.Build.0 = Debug|Any CPU
22+
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Release|Any CPU.ActiveCfg = Release|Any CPU
23+
{48A1D3BC-A14B-436A-8822-6DE2BEF8B747}.Release|Any CPU.Build.0 = Release|Any CPU
24+
EndGlobalSection
25+
GlobalSection(SolutionProperties) = preSolution
26+
HideSolutionNode = FALSE
27+
EndGlobalSection
28+
GlobalSection(ExtensibilityGlobals) = postSolution
29+
SolutionGuid = {D22B3129-3BFB-41FA-9FCE-E45EBEF8C2DD}
30+
EndGlobalSection
31+
EndGlobal
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<Protobuf Include="..\Proto\data_channel.proto" GrpcServices="Client" Link="Protos\data_channel.proto" />
10+
11+
<PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufPackageVersion)" />
12+
<PackageReference Include="Grpc.Net.Client" Version="$(GrpcDotNetPackageVersion)" />
13+
<PackageReference Include="Grpc.Tools" Version="$(GrpcToolsPackageVersion)" PrivateAssets="All" />
14+
</ItemGroup>
15+
16+
</Project>

examples/Channeler/Client/Program.cs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
using System.Text;
20+
using DataChannel;
21+
using Google.Protobuf;
22+
using Grpc.Core;
23+
using Grpc.Net.Client;
24+
25+
namespace Client
26+
{
27+
public partial class Program
28+
{
29+
public static readonly byte[] TestData = Encoding.UTF8.GetBytes("The quick brown fox jumped over the lazy dog.");
30+
31+
static async Task Main(string[] args)
32+
{
33+
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
34+
var client = new DataChanneler.DataChannelerClient(channel);
35+
36+
await UploadDataAsync(client);
37+
38+
await DownloadResultsAsync(client);
39+
40+
Console.WriteLine("Shutting down");
41+
Console.WriteLine("Press any key to exit...");
42+
Console.ReadKey();
43+
}
44+
45+
private static async Task UploadDataAsync(DataChanneler.DataChannelerClient client)
46+
{
47+
var call = client.UploadData();
48+
49+
var dataChunks = TestData.Chunk(5);
50+
foreach (var chunk in dataChunks)
51+
{
52+
Console.WriteLine($"Uploading chunk: {chunk.Length} bytes");
53+
await call.RequestStream.WriteAsync(new DataRequest { Value = ByteString.CopyFrom(chunk) });
54+
}
55+
56+
await call.RequestStream.CompleteAsync();
57+
58+
var result = await call;
59+
Console.WriteLine($"Total upload processed: {result.BytesProcessed} bytes");
60+
}
61+
62+
private static async Task DownloadResultsAsync(DataChanneler.DataChannelerClient client)
63+
{
64+
var call = client.DownloadResults(new DataRequest { Value = ByteString.CopyFrom(TestData) });
65+
66+
await foreach (var result in call.ResponseStream.ReadAllAsync())
67+
{
68+
Console.WriteLine($"Downloaded bytes processed result: {result.BytesProcessed}");
69+
}
70+
}
71+
}
72+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2019 The gRPC Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
syntax = "proto3";
16+
17+
package data_channel;
18+
19+
service DataChanneler {
20+
rpc UploadData (stream DataRequest) returns (DataResult);
21+
rpc DownloadResults (DataRequest) returns (stream DataResult);
22+
}
23+
24+
message DataRequest {
25+
bytes value = 1;
26+
}
27+
28+
message DataResult {
29+
int32 bytes_processed = 1;
30+
}

examples/Channeler/Server/Program.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
using System.IO;
20+
using Microsoft.AspNetCore.Hosting;
21+
using Microsoft.Extensions.Hosting;
22+
23+
namespace Server
24+
{
25+
public class Program
26+
{
27+
public static void Main(string[] args)
28+
{
29+
CreateHostBuilder(args).Build().Run();
30+
}
31+
32+
public static IHostBuilder CreateHostBuilder(string[] args) =>
33+
Host.CreateDefaultBuilder(args)
34+
.ConfigureWebHostDefaults(webBuilder =>
35+
{
36+
webBuilder.UseStartup<Startup>();
37+
});
38+
}
39+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"profiles": {
3+
"Server": {
4+
"commandName": "Project",
5+
"launchBrowser": false,
6+
"applicationUrl": "https://localhost:5001",
7+
"environmentVariables": {
8+
"ASPNETCORE_ENVIRONMENT": "Development"
9+
}
10+
}
11+
}
12+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Web">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net6.0</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<Protobuf Include="..\Proto\data_channel.proto" GrpcServices="Server" Link="Protos\data_channel.proto" />
9+
10+
<PackageReference Include="Grpc.AspNetCore" Version="$(GrpcDotNetPackageVersion)" />
11+
</ItemGroup>
12+
13+
</Project>
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
using System.Threading.Channels;
20+
using DataChannel;
21+
using Grpc.Core;
22+
23+
namespace Server
24+
{
25+
public class DataChannelerService : DataChanneler.DataChannelerBase
26+
{
27+
private readonly ILogger _logger;
28+
29+
public DataChannelerService(ILoggerFactory loggerFactory)
30+
{
31+
_logger = loggerFactory.CreateLogger<DataChannelerService>();
32+
}
33+
34+
public override async Task<DataResult> UploadData(
35+
IAsyncStreamReader<DataRequest> requestStream, ServerCallContext context)
36+
{
37+
var channel = Channel.CreateBounded<DataRequest>(new BoundedChannelOptions(capacity: 5)
38+
{
39+
SingleReader = false,
40+
SingleWriter = true
41+
});
42+
43+
var readTask = Task.Run(async () =>
44+
{
45+
await foreach (var message in requestStream.ReadAllAsync())
46+
{
47+
await channel.Writer.WriteAsync(message);
48+
}
49+
50+
channel.Writer.Complete();
51+
});
52+
53+
// Process incoming messages on three threads.
54+
var bytesProcessedByThread = await Task.WhenAll(
55+
ProcessMessagesAsync(channel.Reader, _logger),
56+
ProcessMessagesAsync(channel.Reader, _logger),
57+
ProcessMessagesAsync(channel.Reader, _logger));
58+
59+
await readTask;
60+
61+
return new DataResult { BytesProcessed = bytesProcessedByThread.Sum() };
62+
63+
static async Task<int> ProcessMessagesAsync(ChannelReader<DataRequest> reader, ILogger logger)
64+
{
65+
var total = 0;
66+
await foreach (var message in reader.ReadAllAsync())
67+
{
68+
total += message.Value.Length;
69+
}
70+
return total;
71+
}
72+
}
73+
74+
public override async Task DownloadResults(DataRequest request,
75+
IServerStreamWriter<DataResult> responseStream, ServerCallContext context)
76+
{
77+
var channel = Channel.CreateBounded<DataResult>(new BoundedChannelOptions(capacity: 5)
78+
{
79+
SingleReader = true,
80+
SingleWriter = false
81+
});
82+
83+
var consumerTask = Task.Run(async () =>
84+
{
85+
// Consume messages from channel and write to response stream.
86+
await foreach (var message in channel.Reader.ReadAllAsync())
87+
{
88+
await responseStream.WriteAsync(message);
89+
}
90+
});
91+
92+
var dataChunks = request.Value.Chunk(size: 10);
93+
94+
// Write messages to channel from multiple threads.
95+
await Task.WhenAll(dataChunks.Select(
96+
async c =>
97+
{
98+
var message = new DataResult { BytesProcessed = c.Length };
99+
await channel.Writer.WriteAsync(message);
100+
}));
101+
102+
// Complete writing and wait for consumer to complete.
103+
channel.Writer.Complete();
104+
await consumerTask;
105+
}
106+
}
107+
}

examples/Channeler/Server/Startup.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
using Microsoft.AspNetCore.Builder;
20+
using Microsoft.AspNetCore.Hosting;
21+
using Microsoft.Extensions.DependencyInjection;
22+
using Microsoft.Extensions.Hosting;
23+
24+
namespace Server
25+
{
26+
public class Startup
27+
{
28+
public void ConfigureServices(IServiceCollection services)
29+
{
30+
services.AddGrpc();
31+
}
32+
33+
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
34+
{
35+
if (env.IsDevelopment())
36+
{
37+
app.UseDeveloperExceptionPage();
38+
}
39+
40+
app.UseRouting();
41+
42+
app.UseEndpoints(endpoints =>
43+
{
44+
endpoints.MapGrpcService<DataChannelerService>();
45+
});
46+
}
47+
}
48+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Debug",
5+
"System": "Information",
6+
"Grpc": "Information",
7+
"Microsoft": "Information"
8+
}
9+
}
10+
}

0 commit comments

Comments
 (0)