Skip to content

Commit cf483fc

Browse files
authored
Improve throughput a bit by avoiding spinning up new threads when it's not needed (Azure#189)
1 parent 35f9c1e commit cf483fc

File tree

4 files changed

+146
-129
lines changed

4 files changed

+146
-129
lines changed

src/Messaging/MessagingStream.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
//
55

66
using System;
7-
using System.Collections.Concurrent;
87
using System.Threading;
98
using System.Threading.Tasks;
109

@@ -16,15 +15,12 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging
1615
internal class MessagingStream
1716
{
1817
private readonly AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
19-
private readonly BlockingCollection<StreamingMessage> _msgQueue;
18+
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(initialCount: 1, maxCount: 1);
2019

2120
internal MessagingStream(string host, int port)
2221
{
2322
Channel channel = new Channel(host, port, ChannelCredentials.Insecure);
2423
_call = new FunctionRpc.FunctionRpcClient(channel).EventStream();
25-
26-
_msgQueue = new BlockingCollection<StreamingMessage>();
27-
Task.Run(WriteImplAsync);
2824
}
2925

3026
/// <summary>
@@ -38,19 +34,23 @@ internal MessagingStream(string host, int port)
3834
internal async Task<bool> MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None);
3935

4036
/// <summary>
41-
/// Write the outgoing message to the buffer.
37+
/// Write the outgoing message.
4238
/// </summary>
43-
internal void Write(StreamingMessage message) => _msgQueue.Add(message);
39+
internal void Write(StreamingMessage message) => WriteImplAsync(message).ConfigureAwait(false);
4440

4541
/// <summary>
4642
/// Take a message from the buffer and write to the gRPC channel.
4743
/// </summary>
48-
private async Task WriteImplAsync()
44+
private async Task WriteImplAsync(StreamingMessage message)
4945
{
50-
while (true)
46+
try
47+
{
48+
await _semaphoreSlim.WaitAsync();
49+
await _call.RequestStream.WriteAsync(message);
50+
}
51+
finally
5152
{
52-
StreamingMessage msg = _msgQueue.Take();
53-
await _call.RequestStream.WriteAsync(msg);
53+
_semaphoreSlim.Release();
5454
}
5555
}
5656
}

src/PowerShell/PowerShellManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ internal PowerShellManager(ILogger logger, Action<PowerShell, ILogger> initActio
5353
}
5454

5555
var initialSessionState = InitialSessionState.CreateDefault();
56-
initialSessionState.ThreadOptions = PSThreadOptions.ReuseThread;
56+
initialSessionState.ThreadOptions = PSThreadOptions.UseCurrentThread;
5757
initialSessionState.EnvironmentVariables.Add(
5858
new SessionStateVariableEntry("PSModulePath", FunctionLoader.FunctionModulePath, null));
5959

Lines changed: 121 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,121 @@
1-
//
2-
// Copyright (c) Microsoft. All rights reserved.
3-
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4-
//
5-
6-
using System;
7-
using System.Collections.Concurrent;
8-
using System.Threading;
9-
using System.Threading.Tasks;
10-
using Microsoft.Azure.Functions.PowerShellWorker.Messaging;
11-
using Microsoft.Azure.Functions.PowerShellWorker.Utility;
12-
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
13-
14-
namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
15-
{
16-
using System.Management.Automation;
17-
18-
/// <summary>
19-
/// The PowerShellManager pool for the in-proc concurrency support.
20-
/// </summary>
21-
internal class PowerShellManagerPool
22-
{
23-
private readonly int _upperBound;
24-
private readonly MessagingStream _msgStream;
25-
private readonly BlockingCollection<PowerShellManager> _pool;
26-
private int _poolSize;
27-
28-
/// <summary>
29-
/// Constructor of the pool.
30-
/// </summary>
31-
internal PowerShellManagerPool(MessagingStream msgStream)
32-
{
33-
string upperBound = Environment.GetEnvironmentVariable("PSWorkerInProcConcurrencyUpperBound");
34-
if (string.IsNullOrEmpty(upperBound) || !int.TryParse(upperBound, out _upperBound))
35-
{
36-
_upperBound = 1;
37-
}
38-
39-
_msgStream = msgStream;
40-
_pool = new BlockingCollection<PowerShellManager>(_upperBound);
41-
RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.LogConcurrencyUpperBound, _upperBound.ToString()));
42-
}
43-
44-
/// <summary>
45-
/// Initialize the pool and populate it with PowerShellManager instances.
46-
/// We instantiate PowerShellManager instances in a lazy way, starting from size 1 and increase the number of workers as needed.
47-
/// </summary>
48-
internal void Initialize(string requestId, Action<PowerShell, ILogger> initAction = null)
49-
{
50-
var logger = new RpcLogger(_msgStream);
51-
52-
try
53-
{
54-
logger.SetContext(requestId, invocationId: null);
55-
_pool.Add(new PowerShellManager(logger, initAction));
56-
_poolSize = 1;
57-
}
58-
finally
59-
{
60-
logger.ResetContext();
61-
}
62-
}
63-
64-
/// <summary>
65-
/// Checkout an idle PowerShellManager instance in a non-blocking asynchronous way.
66-
/// </summary>
67-
internal PowerShellManager CheckoutIdleWorker(StreamingMessage request, AzFunctionInfo functionInfo)
68-
{
69-
PowerShellManager psManager = null;
70-
string requestId = request.RequestId;
71-
string invocationId = request.InvocationRequest?.InvocationId;
72-
73-
// If the pool has an idle one, just use it.
74-
if (!_pool.TryTake(out psManager))
75-
{
76-
// The pool doesn't have an idle one.
77-
if (_poolSize < _upperBound &&
78-
Interlocked.Increment(ref _poolSize) <= _upperBound)
79-
{
80-
// If the pool hasn't reached its bounded capacity yet, then
81-
// we create a new item and return it.
82-
var logger = new RpcLogger(_msgStream);
83-
logger.SetContext(requestId, invocationId);
84-
psManager = new PowerShellManager(logger);
85-
86-
RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.LogNewPowerShellManagerCreated, _poolSize.ToString()));
87-
}
88-
else
89-
{
90-
// If the pool has reached its bounded capacity, then the thread
91-
// should be blocked until an idle one becomes available.
92-
psManager = _pool.Take();
93-
}
94-
}
95-
96-
// Register the function with the Runspace before returning the idle PowerShellManager.
97-
FunctionMetadata.RegisterFunctionMetadata(psManager.InstanceId, functionInfo);
98-
psManager.Logger.SetContext(requestId, invocationId);
99-
return psManager;
100-
}
101-
102-
/// <summary>
103-
/// Return a used PowerShellManager instance to the pool.
104-
/// </summary>
105-
internal void ReclaimUsedWorker(PowerShellManager psManager)
106-
{
107-
if (psManager != null)
108-
{
109-
// Unregister the Runspace before reclaiming the used PowerShellManager.
110-
FunctionMetadata.UnregisterFunctionMetadata(psManager.InstanceId);
111-
psManager.Logger.ResetContext();
112-
_pool.Add(psManager);
113-
}
114-
}
115-
}
116-
}
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using System;
7+
using System.Collections.Concurrent;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.Azure.Functions.PowerShellWorker.Messaging;
11+
using Microsoft.Azure.Functions.PowerShellWorker.Utility;
12+
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
13+
14+
namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
15+
{
16+
using System.Management.Automation;
17+
18+
/// <summary>
19+
/// The PowerShellManager pool for the in-proc concurrency support.
20+
/// </summary>
21+
internal class PowerShellManagerPool
22+
{
23+
private readonly int _upperBound;
24+
private readonly MessagingStream _msgStream;
25+
private readonly BlockingCollection<PowerShellManager> _pool;
26+
private int _poolSize;
27+
28+
/// <summary>
29+
/// Gets the concurrency upper bound.
30+
/// </summary>
31+
internal int UpperBound => _upperBound;
32+
33+
/// <summary>
34+
/// Constructor of the pool.
35+
/// </summary>
36+
internal PowerShellManagerPool(MessagingStream msgStream)
37+
{
38+
string upperBound = Environment.GetEnvironmentVariable("PSWorkerInProcConcurrencyUpperBound");
39+
if (string.IsNullOrEmpty(upperBound) || !int.TryParse(upperBound, out _upperBound))
40+
{
41+
_upperBound = 1;
42+
}
43+
44+
_msgStream = msgStream;
45+
_pool = new BlockingCollection<PowerShellManager>(_upperBound);
46+
RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.LogConcurrencyUpperBound, _upperBound.ToString()));
47+
}
48+
49+
/// <summary>
50+
/// Initialize the pool and populate it with PowerShellManager instances.
51+
/// We instantiate PowerShellManager instances in a lazy way, starting from size 1 and increase the number of workers as needed.
52+
/// </summary>
53+
internal void Initialize(string requestId, Action<PowerShell, ILogger> initAction = null)
54+
{
55+
var logger = new RpcLogger(_msgStream);
56+
57+
try
58+
{
59+
logger.SetContext(requestId, invocationId: null);
60+
_pool.Add(new PowerShellManager(logger, initAction));
61+
_poolSize = 1;
62+
}
63+
finally
64+
{
65+
logger.ResetContext();
66+
}
67+
}
68+
69+
/// <summary>
70+
/// Checkout an idle PowerShellManager instance in a non-blocking asynchronous way.
71+
/// </summary>
72+
internal PowerShellManager CheckoutIdleWorker(StreamingMessage request, AzFunctionInfo functionInfo)
73+
{
74+
PowerShellManager psManager = null;
75+
string requestId = request.RequestId;
76+
string invocationId = request.InvocationRequest?.InvocationId;
77+
78+
// If the pool has an idle one, just use it.
79+
if (!_pool.TryTake(out psManager))
80+
{
81+
// The pool doesn't have an idle one.
82+
if (_poolSize < _upperBound &&
83+
Interlocked.Increment(ref _poolSize) <= _upperBound)
84+
{
85+
// If the pool hasn't reached its bounded capacity yet, then
86+
// we create a new item and return it.
87+
var logger = new RpcLogger(_msgStream);
88+
logger.SetContext(requestId, invocationId);
89+
psManager = new PowerShellManager(logger);
90+
91+
RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.LogNewPowerShellManagerCreated, _poolSize.ToString()));
92+
}
93+
else
94+
{
95+
// If the pool has reached its bounded capacity, then the thread
96+
// should be blocked until an idle one becomes available.
97+
psManager = _pool.Take();
98+
}
99+
}
100+
101+
// Register the function with the Runspace before returning the idle PowerShellManager.
102+
FunctionMetadata.RegisterFunctionMetadata(psManager.InstanceId, functionInfo);
103+
psManager.Logger.SetContext(requestId, invocationId);
104+
return psManager;
105+
}
106+
107+
/// <summary>
108+
/// Return a used PowerShellManager instance to the pool.
109+
/// </summary>
110+
internal void ReclaimUsedWorker(PowerShellManager psManager)
111+
{
112+
if (psManager != null)
113+
{
114+
// Unregister the Runspace before reclaiming the used PowerShellManager.
115+
FunctionMetadata.UnregisterFunctionMetadata(psManager.InstanceId);
116+
psManager.Logger.ResetContext();
117+
_pool.Add(psManager);
118+
}
119+
}
120+
}
121+
}

src/RequestProcessor.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,19 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request)
208208
{
209209
functionInfo = _functionLoader.GetFunctionInfo(request.InvocationRequest.FunctionId);
210210
psManager = _powershellPool.CheckoutIdleWorker(request, functionInfo);
211-
Task.Run(() => ProcessInvocationRequestImpl(request, functionInfo, psManager));
211+
212+
if (_powershellPool.UpperBound == 1)
213+
{
214+
// When the concurrency upper bound is 1, we can handle only one invocation at a time anyways,
215+
// so it's better to just do it on the current thread to reduce the required synchronization.
216+
ProcessInvocationRequestImpl(request, functionInfo, psManager);
217+
}
218+
else
219+
{
220+
// When the concurrency upper bound is more than 1, we have to handle the invocation in a worker
221+
// thread, so multiple invocations can make progress at the same time, even though by time-sharing.
222+
Task.Run(() => ProcessInvocationRequestImpl(request, functionInfo, psManager));
223+
}
212224
}
213225
catch (Exception e)
214226
{

0 commit comments

Comments
 (0)