Skip to content

Commit 5de474c

Browse files
authored
Add in-process concurrency support to the PS worker (#123)
The major changes are: - Use `PSThreadOptions.ReuseThread` for the `InitialSessionState` when creating `Runspace`, so that every `PowerShellManager` only creates one thread and then reuse it afterwards. The default behavior is to create a new thread every time `PowerShell.Invoke` is called. - Update `RequestProcessor` to process `InvocationRequest` in asynchronously via tasks. - Implement `PowerShellManagerPool` using `BlockingCollection` - make upper bound of the pool configurable via an environment variable `PSWorkerInProcConcurrencyUpperBound` - make the pool able to expand in a lazy way - checkout `PowerShellManager` via `CheckoutIdleWorker` on the main thread. Once getting an idle instance back, the main thread will queue a task to process an invocation request on a thread-pool thread and forget about it -- the main thread then can go ahead to process the next message. - Update the `RpcLogger` and make every `PowerShellManager` have its own logger instance. - also update the way to set the `RequestId` and `InvocationId` for logger. The original way to setup the context only works for single-thread design. - Update `MessagingStream` to use a `BlockingCollection` to hold all messages that are about to be written out, then use a single thread-pool thread to take out items and write them to the gRPC channel. - currently, the way we write out response/log messages is completely task-based/async, using a semaphore for synchronization. However, this approach doesn't guarantee the order of the message. - this is because there could be multiple tasks blocked on the semaphore, and releasing the semaphore allows a blocked task to enter the semaphore, but there is no guaranteed order, such as first-in-first-out, for blocked threads to enter the semaphore. - so, the unblocked task could be a random one, and thus change the arrival order of the message when writing the message to the gRPC channel. - Remove the two system logging we have in our worker, because they drastically worsen the processing time per an invocation request when there are a lot in-coming invocation requests. - the logging for "TriggerMetadata" parameter is not that useful, and should be removed - the execution time logging is good to have, but not necessary, especially when it impact the throughput.
1 parent 6406040 commit 5de474c

13 files changed

+180
-213
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ obj/
1717
# Visual Studio IDE directory
1818
.vs/
1919

20+
# VSCode directories that are not at the repository root
21+
/**/.vscode/
22+
2023
# Project Rider IDE files
2124
.idea.powershell/
2225

src/Logging/ILogger.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
1111
internal interface ILogger
1212
{
1313
void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false);
14+
void SetContext(string requestId, string invocationId);
15+
void ResetContext();
1416
}
1517
}

src/Logging/RpcLogger.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,33 @@
1212

1313
namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
1414
{
15-
internal class RpcLogger : ILogger, IDisposable
15+
internal class RpcLogger : ILogger
1616
{
1717
private const string SystemLogPrefix = "LanguageWorkerConsoleLog";
1818
private readonly MessagingStream _msgStream;
1919
private readonly StringBuilder _systemLogMsg;
2020
private string _invocationId;
2121
private string _requestId;
2222

23-
public RpcLogger(MessagingStream msgStream)
23+
internal RpcLogger(MessagingStream msgStream)
2424
{
2525
_msgStream = msgStream;
2626
_systemLogMsg = new StringBuilder();
2727
}
2828

29-
public IDisposable BeginScope(string requestId, string invocationId)
29+
public void SetContext(string requestId, string invocationId)
3030
{
3131
_requestId = requestId;
3232
_invocationId = invocationId;
33-
return this;
3433
}
3534

36-
public void Dispose()
35+
public void ResetContext()
3736
{
3837
_requestId = null;
3938
_invocationId = null;
4039
}
4140

42-
public async void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false)
41+
public void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false)
4342
{
4443
if (isUserLog)
4544
{
@@ -56,7 +55,7 @@ public async void Log(LogLevel logLevel, string message, Exception exception = n
5655
}
5756
};
5857

59-
await _msgStream.WriteAsync(logMessage);
58+
_msgStream.Write(logMessage);
6059
}
6160
else
6261
{

src/Messaging/MessagingStream.cs

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//
55

66
using System;
7+
using System.Collections.Concurrent;
78
using System.Threading;
89
using System.Threading.Tasks;
910

@@ -12,47 +13,44 @@
1213

1314
namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging
1415
{
15-
internal class MessagingStream : IDisposable
16+
internal class MessagingStream
1617
{
17-
private SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1);
18-
private AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
19-
private bool isDisposed;
18+
private readonly AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
19+
private readonly BlockingCollection<StreamingMessage> _msgQueue;
2020

21-
public MessagingStream(string host, int port)
21+
internal MessagingStream(string host, int port)
2222
{
2323
Channel channel = new Channel(host, port, ChannelCredentials.Insecure);
2424
_call = new FunctionRpc.FunctionRpcClient(channel).EventStream();
25-
}
2625

27-
public void Dispose()
28-
{
29-
if (!isDisposed)
30-
{
31-
isDisposed = true;
32-
_call.Dispose();
33-
}
26+
_msgQueue = new BlockingCollection<StreamingMessage>();
27+
Task.Run(WriteImplAsync);
3428
}
3529

36-
public StreamingMessage GetCurrentMessage() =>
37-
isDisposed ? null : _call.ResponseStream.Current;
38-
39-
public async Task<bool> MoveNext() =>
40-
!isDisposed && await _call.ResponseStream.MoveNext(CancellationToken.None);
41-
42-
public async Task WriteAsync(StreamingMessage message)
30+
/// <summary>
31+
/// Get the current message.
32+
/// </summary>
33+
internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current;
34+
35+
/// <summary>
36+
/// Move to the next message.
37+
/// </summary>
38+
internal async Task<bool> MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None);
39+
40+
/// <summary>
41+
/// Write the outgoing message to the buffer.
42+
/// </summary>
43+
internal void Write(StreamingMessage message) => _msgQueue.Add(message);
44+
45+
/// <summary>
46+
/// Take a message from the buffer and write to the gRPC channel.
47+
/// </summary>
48+
private async Task WriteImplAsync()
4349
{
44-
if(isDisposed) return;
45-
46-
// Wait for the handle to be released because we can't have
47-
// more than one message being sent at the same time
48-
await _writeSemaphore.WaitAsync();
49-
try
50-
{
51-
await _call.RequestStream.WriteAsync(message);
52-
}
53-
finally
50+
while (true)
5451
{
55-
_writeSemaphore.Release();
52+
StreamingMessage msg = _msgQueue.Take();
53+
await _call.RequestStream.WriteAsync(msg);
5654
}
5755
}
5856
}

src/PowerShell/PowerShellManager.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ internal class PowerShellManager
2828
/// </summary>
2929
internal Guid InstanceId => _pwsh.Runspace.InstanceId;
3030

31+
/// <summary>
32+
/// Gets the associated logger.
33+
/// </summary>
34+
internal ILogger Logger => _logger;
35+
3136
static PowerShellManager()
3237
{
3338
// Set the type accelerators for 'HttpResponseContext' and 'HttpResponseContext'.
@@ -48,6 +53,7 @@ internal PowerShellManager(ILogger logger)
4853
}
4954

5055
var initialSessionState = InitialSessionState.CreateDefault();
56+
initialSessionState.ThreadOptions = PSThreadOptions.ReuseThread;
5157
initialSessionState.EnvironmentVariables.Add(
5258
new SessionStateVariableEntry("PSModulePath", FunctionLoader.FunctionModulePath, null));
5359

@@ -154,16 +160,11 @@ internal Hashtable InvokeFunction(
154160
// Gives access to additional Trigger Metadata if the user specifies TriggerMetadata
155161
if(functionInfo.FuncParameters.Contains(AzFunctionInfo.TriggerMetadata))
156162
{
157-
_logger.Log(LogLevel.Debug, "Parameter '-TriggerMetadata' found.");
158163
_pwsh.AddParameter(AzFunctionInfo.TriggerMetadata, triggerMetadata);
159164
}
160165

161-
Collection<object> pipelineItems = null;
162-
using (ExecutionTimer.Start(_logger, "Execution of the user's function completed."))
163-
{
164-
pipelineItems = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject")
165-
.InvokeAndClearCommands<object>();
166-
}
166+
Collection<object> pipelineItems = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject")
167+
.InvokeAndClearCommands<object>();
167168

168169
var result = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Get-OutputBinding")
169170
.AddParameter("Purge", true)

src/PowerShell/PowerShellManagerPool.cs

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
//
55

66
using System;
7+
using System.Collections.Concurrent;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.Azure.Functions.PowerShellWorker.Messaging;
711
using Microsoft.Azure.Functions.PowerShellWorker.Utility;
12+
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
813

914
namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
1015
{
@@ -15,37 +20,80 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
1520
/// </summary>
1621
internal class PowerShellManagerPool
1722
{
18-
private readonly ILogger _logger;
19-
// Today we don't really support the in-proc concurrency. We just hold an instance of PowerShellManager in this field.
20-
private PowerShellManager _psManager;
23+
private readonly int _upperBound;
24+
private readonly MessagingStream _msgStream;
25+
private readonly BlockingCollection<PowerShellManager> _pool;
26+
private int _poolSize;
2127

2228
/// <summary>
2329
/// Constructor of the pool.
2430
/// </summary>
25-
internal PowerShellManagerPool(ILogger logger)
31+
internal PowerShellManagerPool(MessagingStream msgStream)
2632
{
27-
_logger = logger;
33+
string upperBound = Environment.GetEnvironmentVariable("PSWorkerInProcConcurrencyUpperBound");
34+
if (string.IsNullOrEmpty(upperBound) || !int.TryParse(upperBound, out _upperBound))
35+
{
36+
_upperBound = 1;
37+
}
38+
39+
_msgStream = msgStream;
40+
_pool = new BlockingCollection<PowerShellManager>(_upperBound);
2841
}
2942

3043
/// <summary>
3144
/// Initialize the pool and populate it with PowerShellManager instances.
32-
/// When it's time to really implement this pool, we probably should instantiate PowerShellManager instances in a lazy way.
33-
/// Maybe start from size 1 and increase the number of workers as needed.
45+
/// We instantiate PowerShellManager instances in a lazy way, starting from size 1 and increase the number of workers as needed.
3446
/// </summary>
35-
internal void Initialize()
47+
internal void Initialize(string requestId)
3648
{
37-
_psManager = new PowerShellManager(_logger);
49+
var logger = new RpcLogger(_msgStream);
50+
51+
try
52+
{
53+
logger.SetContext(requestId, invocationId: null);
54+
_pool.Add(new PowerShellManager(logger));
55+
_poolSize = 1;
56+
}
57+
finally
58+
{
59+
logger.ResetContext();
60+
}
3861
}
3962

4063
/// <summary>
41-
/// Checkout an idle PowerShellManager instance.
42-
/// When it's time to really implement this pool, this method is supposed to block when there is no idle instance available.
64+
/// Checkout an idle PowerShellManager instance in a non-blocking asynchronous way.
4365
/// </summary>
44-
internal PowerShellManager CheckoutIdleWorker(AzFunctionInfo functionInfo)
66+
internal PowerShellManager CheckoutIdleWorker(StreamingMessage request, AzFunctionInfo functionInfo)
4567
{
68+
PowerShellManager psManager = null;
69+
string requestId = request.RequestId;
70+
string invocationId = request.InvocationRequest?.InvocationId;
71+
72+
// If the pool has an idle one, just use it.
73+
if (!_pool.TryTake(out psManager))
74+
{
75+
// The pool doesn't have an idle one.
76+
if (_poolSize < _upperBound &&
77+
Interlocked.Increment(ref _poolSize) <= _upperBound)
78+
{
79+
// If the pool hasn't reached its bounded capacity yet, then
80+
// we create a new item and return it.
81+
var logger = new RpcLogger(_msgStream);
82+
logger.SetContext(requestId, invocationId);
83+
psManager = new PowerShellManager(logger);
84+
}
85+
else
86+
{
87+
// If the pool has reached its bounded capacity, then the thread
88+
// should be blocked until an idle one becomes available.
89+
psManager = _pool.Take();
90+
}
91+
}
92+
4693
// Register the function with the Runspace before returning the idle PowerShellManager.
47-
FunctionMetadata.RegisterFunctionMetadata(_psManager.InstanceId, functionInfo);
48-
return _psManager;
94+
FunctionMetadata.RegisterFunctionMetadata(psManager.InstanceId, functionInfo);
95+
psManager.Logger.SetContext(requestId, invocationId);
96+
return psManager;
4997
}
5098

5199
/// <summary>
@@ -57,6 +105,8 @@ internal void ReclaimUsedWorker(PowerShellManager psManager)
57105
{
58106
// Unregister the Runspace before reclaiming the used PowerShellManager.
59107
FunctionMetadata.UnregisterFunctionMetadata(psManager.InstanceId);
108+
psManager.Logger.ResetContext();
109+
_pool.Add(psManager);
60110
}
61111
}
62112
}

0 commit comments

Comments
 (0)