Skip to content

Refactor the code base to make it easy to support concurrency within a worker in future #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/FunctionLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal class FunctionLoader

internal static string FunctionAppRootPath { get; private set; }
internal static string FunctionAppProfilePath { get; private set; }
internal static string FunctionAppModulesPath { get; private set; }
internal static string FunctionModulePath { get; private set; }

/// <summary>
/// Query for function metadata can happen in parallel.
Expand Down Expand Up @@ -51,9 +51,14 @@ internal void LoadFunction(FunctionLoadRequest request)
/// </summary>
internal static void SetupWellKnownPaths(FunctionLoadRequest request)
{
// Resolve the FunctionApp root path
FunctionAppRootPath = Path.GetFullPath(Path.Join(request.Metadata.Directory, ".."));
FunctionAppModulesPath = Path.Join(FunctionAppRootPath, "Modules");
// Resolve module paths
var appLevelModulesPath = Path.Join(FunctionAppRootPath, "Modules");
var workerLevelModulesPath = Path.Join(AppDomain.CurrentDomain.BaseDirectory, "Modules");
FunctionModulePath = $"{appLevelModulesPath}{Path.PathSeparator}{workerLevelModulesPath}";

// Resolve the FunctionApp profile path
var options = new EnumerationOptions { MatchCasing = MatchCasing.CaseInsensitive };
var profiles = Directory.EnumerateFiles(FunctionAppRootPath, "profile.ps1", options);
FunctionAppProfilePath = profiles.FirstOrDefault();
Expand Down
71 changes: 30 additions & 41 deletions src/PowerShell/PowerShellManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,33 @@ internal class PowerShellManager
private readonly ILogger _logger;
private readonly PowerShell _pwsh;

/// <summary>
/// Gets the Runspace InstanceId.
/// </summary>
internal Guid InstanceId => _pwsh.Runspace.InstanceId;

static PowerShellManager()
{
// Set the type accelerators for 'HttpResponseContext' and 'HttpResponseContext'.
// We probably will expose more public types from the worker in future for the interop between worker and the 'PowerShellWorker' module.
// But it's most likely only 'HttpResponseContext' and 'HttpResponseContext' are supposed to be used directly by users, so we only add
// type accelerators for these two explicitly.
var accelerator = typeof(PSObject).Assembly.GetType("System.Management.Automation.TypeAccelerators");
var addMethod = accelerator.GetMethod("Add", new Type[] { typeof(string), typeof(Type) });
addMethod.Invoke(null, new object[] { "HttpResponseContext", typeof(HttpResponseContext) });
addMethod.Invoke(null, new object[] { "HttpRequestContext", typeof(HttpRequestContext) });
}

internal PowerShellManager(ILogger logger)
{
if (FunctionLoader.FunctionAppRootPath == null)
{
throw new InvalidOperationException($"The FunctionApp root hasn't been resolved yet!");
}

var initialSessionState = InitialSessionState.CreateDefault();
initialSessionState.EnvironmentVariables.Add(
new SessionStateVariableEntry("PSModulePath", FunctionLoader.FunctionModulePath, null));

// Setting the execution policy on macOS and Linux throws an exception so only update it on Windows
if(Platform.IsWindows)
Expand All @@ -34,8 +58,9 @@ internal PowerShellManager(ILogger logger)
// Windows client versions. This is needed if a user is testing their function locally with the func CLI
initialSessionState.ExecutionPolicy = Microsoft.PowerShell.ExecutionPolicy.Unrestricted;
}
_pwsh = PowerShell.Create(initialSessionState);

_logger = logger;
_pwsh = PowerShell.Create(initialSessionState);

// Setup Stream event listeners
var streamHandler = new StreamHandler(logger);
Expand All @@ -45,34 +70,17 @@ internal PowerShellManager(ILogger logger)
_pwsh.Streams.Progress.DataAdding += streamHandler.ProgressDataAdding;
_pwsh.Streams.Verbose.DataAdding += streamHandler.VerboseDataAdding;
_pwsh.Streams.Warning.DataAdding += streamHandler.WarningDataAdding;
}

/// <summary>
/// This method performs the one-time initialization at the worker process level.
/// </summary>
internal void PerformWorkerLevelInitialization()
{
// Set the type accelerators for 'HttpResponseContext' and 'HttpResponseContext'.
// We probably will expose more public types from the worker in future for the interop between worker and the 'PowerShellWorker' module.
// But it's most likely only 'HttpResponseContext' and 'HttpResponseContext' are supposed to be used directly by users, so we only add
// type accelerators for these two explicitly.
var accelerator = typeof(PSObject).Assembly.GetType("System.Management.Automation.TypeAccelerators");
var addMethod = accelerator.GetMethod("Add", new Type[] { typeof(string), typeof(Type) });
addMethod.Invoke(null, new object[] { "HttpResponseContext", typeof(HttpResponseContext) });
addMethod.Invoke(null, new object[] { "HttpRequestContext", typeof(HttpRequestContext) });

// Set the PSModulePath
var workerModulesPath = Path.Join(AppDomain.CurrentDomain.BaseDirectory, "Modules");
Environment.SetEnvironmentVariable("PSModulePath", $"{FunctionLoader.FunctionAppModulesPath}{Path.PathSeparator}{workerModulesPath}");
// Initialize the Runspace
InvokeProfile(FunctionLoader.FunctionAppProfilePath);
}

/// <summary>
/// This method performs initialization that has to be done for each Runspace, e.g. running the Function App's profile.ps1.
/// This method invokes the FunctionApp's profile.ps1.
/// </summary>
internal void PerformRunspaceLevelInitialization()
internal void InvokeProfile(string profilePath)
{
Exception exception = null;
string profilePath = FunctionLoader.FunctionAppProfilePath;
if (profilePath == null)
{
_logger.Log(LogLevel.Trace, $"No 'profile.ps1' is found at the FunctionApp root folder: {FunctionLoader.FunctionAppRootPath}");
Expand Down Expand Up @@ -195,25 +203,6 @@ internal string ConvertToJson(object fromObj)
.InvokeAndClearCommands<string>()[0];
}

/// <summary>
/// Helper method to set the output binding metadata for the function that is about to run.
/// </summary>
internal void RegisterFunctionMetadata(AzFunctionInfo functionInfo)
{
var outputBindings = functionInfo.OutputBindings;
FunctionMetadata.OutputBindingCache.AddOrUpdate(_pwsh.Runspace.InstanceId,
outputBindings,
(key, value) => outputBindings);
}

/// <summary>
/// Helper method to clear the output binding metadata for the function that has done running.
/// </summary>
internal void UnregisterFunctionMetadata()
{
FunctionMetadata.OutputBindingCache.TryRemove(_pwsh.Runspace.InstanceId, out _);
}

private void ResetRunspace(string moduleName)
{
// Reset the runspace to the Initial Session State
Expand Down
63 changes: 63 additions & 0 deletions src/PowerShell/PowerShellManagerPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

using System;
using Microsoft.Azure.Functions.PowerShellWorker.Utility;

namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
{
using System.Management.Automation;

/// <summary>
/// The PowerShellManager pool for the in-proc concurrency support.
/// </summary>
internal class PowerShellManagerPool
{
private readonly ILogger _logger;
// Today we don't really support the in-proc concurrency. We just hold an instance of PowerShellManager in this field.
private PowerShellManager _psManager;

/// <summary>
/// Constructor of the pool.
/// </summary>
internal PowerShellManagerPool(ILogger logger)
{
_logger = logger;
}

/// <summary>
/// Initialize the pool and populate it with PowerShellManager instances.
/// When it's time to really implement this pool, we probably should instantiate PowerShellManager instances in a lazy way.
/// Maybe start from size 1 and increase the number of workers as needed.
/// </summary>
internal void Initialize()
{
_psManager = new PowerShellManager(_logger);
}

/// <summary>
/// Checkout an idle PowerShellManager instance.
/// When it's time to really implement this pool, this method is supposed to block when there is no idle instance available.
/// </summary>
internal PowerShellManager CheckoutIdleWorker(AzFunctionInfo functionInfo)
{
// Register the function with the Runspace before returning the idle PowerShellManager.
FunctionMetadata.RegisterFunctionMetadata(_psManager.InstanceId, functionInfo);
return _psManager;
}

/// <summary>
/// Return a used PowerShellManager instance to the pool.
/// </summary>
internal void ReclaimUsedWorker(PowerShellManager psManager)
{
if (psManager != null)
{
// Unregister the Runspace before reclaiming the used PowerShellManager.
FunctionMetadata.UnregisterFunctionMetadata(psManager.InstanceId);
}
}
}
}
17 changes: 17 additions & 0 deletions src/Public/FunctionMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,22 @@ public static ReadOnlyDictionary<string, ReadOnlyBindingInfo> GetOutputBindingIn
OutputBindingCache.TryGetValue(runspaceInstanceId, out outputBindings);
return outputBindings;
}

/// <summary>
/// Helper method to set the output binding metadata for the function that is about to run.
/// </summary>
internal static void RegisterFunctionMetadata(Guid instanceId, AzFunctionInfo functionInfo)
{
var outputBindings = functionInfo.OutputBindings;
OutputBindingCache.AddOrUpdate(instanceId, outputBindings, (key, value) => outputBindings);
}

/// <summary>
/// Helper method to clear the output binding metadata for the function that has done running.
/// </summary>
internal static void UnregisterFunctionMetadata(Guid instanceId)
{
OutputBindingCache.TryRemove(instanceId, out _);
}
}
}
38 changes: 17 additions & 21 deletions src/RequestProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal class RequestProcessor
private readonly FunctionLoader _functionLoader;
private readonly RpcLogger _logger;
private readonly MessagingStream _msgStream;
private readonly PowerShellManager _powerShellManager;
private readonly PowerShellManagerPool _powershellPool;

// Indicate whether the FunctionApp has been initialized.
private bool _isFunctionAppInitialized;
Expand All @@ -29,7 +29,7 @@ internal RequestProcessor(MessagingStream msgStream)
{
_msgStream = msgStream;
_logger = new RpcLogger(msgStream);
_powerShellManager = new PowerShellManager(_logger);
_powershellPool = new PowerShellManagerPool(_logger);
_functionLoader = new FunctionLoader();
}

Expand Down Expand Up @@ -98,9 +98,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request)
if (!_isFunctionAppInitialized)
{
FunctionLoader.SetupWellKnownPaths(functionLoadRequest);
_powerShellManager.PerformWorkerLevelInitialization();
_powerShellManager.PerformRunspaceLevelInitialization();

_powershellPool.Initialize();
_isFunctionAppInitialized = true;
}

Expand All @@ -122,6 +120,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request)
/// </summary>
internal StreamingMessage ProcessInvocationRequest(StreamingMessage request)
{
PowerShellManager psManager = null;
InvocationRequest invocationRequest = request.InvocationRequest;

StreamingMessage response = NewStreamingMessageTemplate(
Expand All @@ -130,18 +129,18 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request)
out StatusResult status);
response.InvocationResponse.InvocationId = invocationRequest.InvocationId;

// Invoke powershell logic and return hashtable of out binding data
try
{
// Load information about the function
var functionInfo = _functionLoader.GetFunctionInfo(invocationRequest.FunctionId);
_powerShellManager.RegisterFunctionMetadata(functionInfo);
psManager = _powershellPool.CheckoutIdleWorker(functionInfo);

// Invoke the function and return a hashtable of out binding data
Hashtable results = functionInfo.Type == AzFunctionType.OrchestrationFunction
? InvokeOrchestrationFunction(functionInfo, invocationRequest)
: InvokeSingleActivityFunction(functionInfo, invocationRequest);
? InvokeOrchestrationFunction(psManager, functionInfo, invocationRequest)
: InvokeSingleActivityFunction(psManager, functionInfo, invocationRequest);

BindOutputFromResult(response.InvocationResponse, functionInfo, results);
BindOutputFromResult(psManager, response.InvocationResponse, functionInfo, results);
}
catch (Exception e)
{
Expand All @@ -150,7 +149,7 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request)
}
finally
{
_powerShellManager.UnregisterFunctionMetadata();
_powershellPool.ReclaimUsedWorker(psManager);
}

return response;
Expand Down Expand Up @@ -188,15 +187,15 @@ private StreamingMessage NewStreamingMessageTemplate(string requestId, Streaming
/// <summary>
/// Invoke an orchestration function.
/// </summary>
private Hashtable InvokeOrchestrationFunction(AzFunctionInfo functionInfo, InvocationRequest invocationRequest)
private Hashtable InvokeOrchestrationFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest)
{
throw new NotImplementedException("Durable function is not yet supported for PowerShell");
}

/// <summary>
/// Invoke a regular function or an activity function.
/// </summary>
private Hashtable InvokeSingleActivityFunction(AzFunctionInfo functionInfo, InvocationRequest invocationRequest)
private Hashtable InvokeSingleActivityFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest)
{
// Bundle all TriggerMetadata into Hashtable to send down to PowerShell
var triggerMetadata = new Hashtable(StringComparer.OrdinalIgnoreCase);
Expand All @@ -210,16 +209,13 @@ private Hashtable InvokeSingleActivityFunction(AzFunctionInfo functionInfo, Invo
}
}

return _powerShellManager.InvokeFunction(
functionInfo,
triggerMetadata,
invocationRequest.InputData);
return psManager.InvokeFunction(functionInfo, triggerMetadata, invocationRequest.InputData);
}

/// <summary>
/// Set the 'ReturnValue' and 'OutputData' based on the invocation results appropriately.
/// </summary>
private void BindOutputFromResult(InvocationResponse response, AzFunctionInfo functionInfo, Hashtable results)
private void BindOutputFromResult(PowerShellManager psManager, InvocationResponse response, AzFunctionInfo functionInfo, Hashtable results)
{
switch (functionInfo.Type)
{
Expand All @@ -231,14 +227,14 @@ private void BindOutputFromResult(InvocationResponse response, AzFunctionInfo fu
string outBindingName = binding.Key;
if(string.Equals(outBindingName, AzFunctionInfo.DollarReturn, StringComparison.OrdinalIgnoreCase))
{
response.ReturnValue = results[outBindingName].ToTypedData(_powerShellManager);
response.ReturnValue = results[outBindingName].ToTypedData(psManager);
continue;
}

ParameterBinding paramBinding = new ParameterBinding()
{
Name = outBindingName,
Data = results[outBindingName].ToTypedData(_powerShellManager)
Data = results[outBindingName].ToTypedData(psManager)
};

response.OutputData.Add(paramBinding);
Expand All @@ -247,7 +243,7 @@ private void BindOutputFromResult(InvocationResponse response, AzFunctionInfo fu

case AzFunctionType.OrchestrationFunction:
case AzFunctionType.ActivityFunction:
response.ReturnValue = results[AzFunctionInfo.DollarReturn].ToTypedData(_powerShellManager);
response.ReturnValue = results[AzFunctionInfo.DollarReturn].ToTypedData(psManager);
break;

default:
Expand Down
Loading