Skip to content

Commit d9a2ef6

Browse files
committed
Support exec over the SPDY protocol
1 parent a5d691b commit d9a2ef6

File tree

7 files changed

+848
-28
lines changed

7 files changed

+848
-28
lines changed

src/KubernetesClient/ByteBuffer.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55

66
namespace k8s
77
{
8-
// There may be already an async implementation that we can use:
9-
// https://github.com/StephenCleary/AsyncEx/wiki/AsyncProducerConsumerQueue
10-
// However, they focus on individual objects and may not be a good choice for use with fixed-with byte buffers
8+
// Pipe could be used instead for an async implementation. However, since this class is public there's no telling whether somebody
9+
// has referenced it, so removing this class would be a breaking change. Neither could we easily replace the implementation of
10+
// this class with Pipe's implementation, since this class exposes various public members (like WriteWaterMark and the OnResize event)
11+
// that don't make sense in Pipe's context. (It seems they are used for unit testing, but unfortunately they were made public rather
12+
// than internal, so removing them is a breaking change.)
1113

1214
/// <summary>
1315
/// Represents a bounded buffer. A dedicated thread can send bytes to this buffer (the producer); while another thread can

src/KubernetesClient/Kubernetes.ConfigInit.cs

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public Kubernetes(KubernetesClientConfiguration config, HttpClient httpClient, b
5656
/// Optional. The delegating handlers to add to the http client pipeline.
5757
/// </param>
5858
public Kubernetes(KubernetesClientConfiguration config, params DelegatingHandler[] handlers)
59-
: this(handlers)
6059
{
60+
CreateHttpClient(handlers);
6161
ValidateConfig(config);
6262
this.config = config;
6363
InitializeFromConfig();
@@ -77,6 +77,18 @@ public KubernetesScheme Scheme
7777
}
7878
}
7979

80+
// TODO: uncomment if we update to a later version of Microsoft.Rest.ClientRuntime
81+
/*/// <inheritdoc/>
82+
protected override DelegatingHandler CreateHttpHandlerPipeline(HttpClientHandler httpClientHandler, params DelegatingHandler[] handlers)
83+
{
84+
HttpMessageHandler rootHandler = RemoveRetryHandler(base.CreateHttpHandlerPipeline(httpClientHandler, handlers));
85+
return rootHandler as DelegatingHandler ?? new ForwardingHandler(rootHandler);
86+
}
87+
88+
private sealed class ForwardingHandler : DelegatingHandler
89+
{
90+
public ForwardingHandler(HttpMessageHandler handler) : base(handler) { }
91+
}*/
8092

8193
private void ValidateConfig(KubernetesClientConfiguration config)
8294
{
@@ -171,15 +183,21 @@ partial void CustomInitialize()
171183
#if NET452
172184
ServicePointManager.SecurityProtocol |= SecurityProtocolType.Tls12;
173185
#endif
186+
FirstMessageHandler = RemoveRetryHandler(FirstMessageHandler);
174187
AppendDelegatingHandler<WatcherDelegatingHandler>();
175188
DeserializationSettings.Converters.Add(new V1Status.V1StatusObjectViewConverter());
176189
}
177190

178191
private void AppendDelegatingHandler<T>() where T : DelegatingHandler, new()
179192
{
180193
var cur = FirstMessageHandler as DelegatingHandler;
194+
if (cur == null)
195+
{
196+
FirstMessageHandler = new T() { InnerHandler = FirstMessageHandler };
197+
return;
198+
}
181199

182-
while (cur != null)
200+
do
183201
{
184202
var next = cur.InnerHandler as DelegatingHandler;
185203

@@ -195,7 +213,52 @@ partial void CustomInitialize()
195213
}
196214

197215
cur = next;
216+
} while (cur != null);
217+
}
218+
219+
// NOTE: this method replicates the logic that the base ServiceClient uses except that it doesn't insert the RetryDelegatingHandler.
220+
// (see RemoveRetryHandler below for why we don't want it.) it seems that depending on your framework version and/or
221+
// Microsoft.Rest.ClientRuntime version and/or which constructor gets called, there are at least two ways that the retry handler
222+
// gets added: 1) when the HTTP client is constructed (handled here), 2) in CreateHttpHandlerPipeline (handled in an override), and
223+
// possibly 3) based on the handlers set in CustomInitialize
224+
private void CreateHttpClient(DelegatingHandler[] handlers)
225+
{
226+
FirstMessageHandler = HttpClientHandler = CreateRootHandler();
227+
if(handlers != null)
228+
{
229+
for(int i = handlers.Length - 1; i >= 0; i--)
230+
{
231+
DelegatingHandler handler = handlers[i];
232+
while(handler.InnerHandler is DelegatingHandler d) handler = d;
233+
handler.InnerHandler = FirstMessageHandler;
234+
FirstMessageHandler = handlers[i];
235+
}
236+
}
237+
HttpClient = new HttpClient(FirstMessageHandler, false);
238+
}
239+
240+
/// <summary>Removes the retry handler added by the Microsoft.Rest.ClientRuntime.</summary>
241+
// NOTE: we remove the RetryDelegatingHandler for two reasons. first, it has a very broad definition of what's considered a failed
242+
// request. it considers everything outside 2xx to be failed, including 1xx (e.g. 101 Switching Protocols) and 3xx. and, it wants to
243+
// retry 4xx errors, almost none of which are really retriable except 429 Too Many Requests and /maybe/ 423 Locked (but we're not
244+
// dealing with WebDAV here). really, i don't think we want retry in a Kubernetes client, and as for 429 Too Many Requests, there's
245+
// already a separate RetryAfterDelegatingHandler installed to handle 429 requests. a further problem with the RetryDelegatingHandler
246+
// is that upon seeing a non-2xx status code it tries to read the entire response body as a string. this doesn't work well with
247+
// streaming responses like watches and upgraded (SPDY / web socket) connections.
248+
private HttpMessageHandler RemoveRetryHandler(HttpMessageHandler rootHandler)
249+
{
250+
for (DelegatingHandler prev = null, handler = rootHandler as DelegatingHandler; handler != null;)
251+
{
252+
if (handler is RetryDelegatingHandler)
253+
{
254+
if (prev == null) rootHandler = handler.InnerHandler; // if 'handler' is at the head of the chain, just return the next item
255+
else prev.InnerHandler = handler.InnerHandler; // otherwise, unlink 'handler' from the chain
256+
break;
257+
}
258+
prev = handler;
259+
handler = handler.InnerHandler as DelegatingHandler;
198260
}
261+
return rootHandler;
199262
}
200263

201264
/// <summary>

src/KubernetesClient/Kubernetes.Exec.cs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,17 @@ public async Task<int> NamespacedPodExecAsync(string name, string @namespace, st
2020
throw new ArgumentNullException(nameof(action));
2121
}
2222

23-
try
23+
Stream stdIn = new PipeStream(), stdOut = new PipeStream(), stdErr = new PipeStream();
24+
Task<V1Status> execTask = Request<V1Pod>(@namespace, name).Body(stdIn)
25+
.ExecCommandAsync(command.First(), command.Skip(1).ToArray(), container, stdOut, stdErr, tty, false, cancellationToken);
26+
await action(stdIn, stdOut, stdErr).ConfigureAwait(false);
27+
stdIn.Dispose(); // close STDIN just in case the action failed to do so and the remote process is waiting for it
28+
var status = await execTask.ConfigureAwait(false);
29+
if (status.Code.Value < 0)
2430
{
25-
using (var muxedStream = await this.MuxedStreamNamespacedPodExecAsync(name: name, @namespace: @namespace, command: command, container: container, tty: tty, cancellationToken: cancellationToken).ConfigureAwait(false))
26-
using (Stream stdIn = muxedStream.GetStream(null, ChannelIndex.StdIn))
27-
using (Stream stdOut= muxedStream.GetStream(ChannelIndex.StdOut, null))
28-
using (Stream stdErr = muxedStream.GetStream(ChannelIndex.StdErr, null))
29-
using (Stream error = muxedStream.GetStream(ChannelIndex.Error, null))
30-
using (StreamReader errorReader = new StreamReader(error))
31-
{
32-
muxedStream.Start();
33-
34-
await action(stdIn, stdOut, stdErr).ConfigureAwait(false);
35-
36-
var errors = await errorReader.ReadToEndAsync().ConfigureAwait(false);
37-
38-
// StatusError is defined here:
39-
// https://github.com/kubernetes/kubernetes/blob/068e1642f63a1a8c48c16c18510e8854a4f4e7c5/staging/src/k8s.io/apimachinery/pkg/api/errors/errors.go#L37
40-
var returnMessage = SafeJsonConvert.DeserializeObject<V1Status>(errors);
41-
return GetExitCodeOrThrow(returnMessage);
42-
}
43-
}
44-
catch (HttpOperationException httpEx) when (httpEx.Body is V1Status)
45-
{
46-
throw new KubernetesException((V1Status)httpEx.Body);
31+
throw new KubernetesException(status);
4732
}
33+
return status.Code.Value;
4834
}
4935

5036
/// <summary>

src/KubernetesClient/KubernetesClient.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
<PackageReference Include="Microsoft.Rest.ClientRuntime" Version="2.3.10" />
3838
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" Condition="'$(TargetFramework)' != 'netstandard2.0' and '$(TargetFramework)' != 'netcoreapp2.1'" />
3939
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'netcoreapp2.1'" />
40+
<PackageReference Include="SPDY.net" Version="1.0.0" />
4041
<PackageReference Include="YamlDotNet" Version="6.0.0" />
4142
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
4243
</ItemGroup>

0 commit comments

Comments
 (0)