Skip to content

Add multithreading for saving tracker data #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions Assets/UXF/Scripts/Etc/ResultsDict.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ResultsDictionary
{
private Dictionary<string, object> baseDict;
private bool allowAdHocAdding;
private Object lockObject = new Object();

/// <summary>
/// Dictionary of results for a trial.
Expand All @@ -25,7 +26,10 @@ public ResultsDictionary(IEnumerable<string> initialKeys, bool allowAdHocAdding)
this.allowAdHocAdding = allowAdHocAdding;
foreach (var key in initialKeys)
{
baseDict.Add(key, string.Empty);
lock (lockObject)
{
baseDict.Add(key, string.Empty);
}
}
}

Expand All @@ -38,13 +42,16 @@ public object this[string key]
{
get { return baseDict[key]; }
set {
if (allowAdHocAdding || baseDict.ContainsKey(key))
{
baseDict[key] = value;
}
else
lock (lockObject)
{
throw new KeyNotFoundException(string.Format("Custom header \"{0}\" does not exist!", key));
if (allowAdHocAdding || baseDict.ContainsKey(key))
{
baseDict[key] = value;
}
else
{
throw new KeyNotFoundException(string.Format("Custom header \"{0}\" does not exist!", key));
}
}
}
}
Expand Down Expand Up @@ -73,4 +80,4 @@ public bool ContainsKey(string key)

}

}
}
81 changes: 76 additions & 5 deletions Assets/UXF/Scripts/Etc/Trial.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System.Threading;
using UnityEngine;
using System.Collections.Specialized;

using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.IO;

namespace UXF
{
Expand Down Expand Up @@ -62,6 +64,11 @@ public bool saveData
/// </summary>
public ResultsDictionary result;

// Used by the worker task
private static BlockingQueue<System.Action> blockingQueue = new BlockingQueue<System.Action>();
private static Task workerTask;
private static bool quitting = false;

/// <summary>
/// Manually create a trial. When doing this you need to add this trial to a block with block.trials.Add(trial)
/// </summary>
Expand Down Expand Up @@ -277,7 +284,12 @@ private void SaveData()
tracker.StopRecording();
if (tracker.Data.CountRows() > 0)
{
SaveDataTable(tracker.Data, tracker.DataName, dataType: UXFDataType.Trackers);
UXFDataTable table = tracker.Data;
string name = tracker.DataName;
ManageInWorker(() =>
{
SaveDataTable(table, name, dataType: UXFDataType.Trackers);
});
}
}
catch (NullReferenceException)
Expand All @@ -292,9 +304,68 @@ private void SaveData()
result[s] = settings.GetObject(s, string.Empty);
}
}
}


/// <summary>
/// Adds a new command to a queue which is executed in a separate worker thread when it is available.
/// Warning: The Unity Engine API is not thread safe, so do not attempt to put any Unity commands here.
/// </summary>
/// <param name="action"></param>
public static void ManageInWorker(System.Action action)
{
if (workerTask == null)
{
workerTask = Task.Run(Worker);
quitting = false;
}

blockingQueue.Enqueue(action);
}

/// <summary>
/// The worker thread used when <see cref="ManageInWorker"/> is called.
/// </summary>
private static void Worker()
{
// performs FileIO tasks in seperate thread
foreach (var action in blockingQueue)
{
try
{
action.Invoke();
}
catch (ThreadAbortException)
{
break;
}
catch (IOException e)
{
Utilities.UXFDebugLogError(string.Format("Error, file may be in use! Exception: {0}", e));
}
catch (System.Exception e)
{
// stops thread aborting upon an exception
Debug.LogException(e);
}

if (quitting && blockingQueue.NumItems() == 0)
{
break;
}
}
}

/// <summary>
/// Wait for all tasks scheduled through <see cref="ManageInWorker"/>
/// </summary>
public static void WaitForTasks()
{
Utilities.UXFDebugLog("Waiting for tasks to finish");
quitting = true;
blockingQueue.Enqueue(() => {}); // ensures bq breaks from foreach loop
workerTask?.Wait();
Utilities.UXFDebugLog("Tasks finished");
}
}

/// <summary>
/// Status of a trial
Expand All @@ -307,4 +378,4 @@ public enum TrialStatus
}


}
}
2 changes: 2 additions & 0 deletions Assets/UXF/Scripts/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ public void End()

void SaveResults()
{
Trial.WaitForTasks();

// generate list of all headers possible
// hashset keeps unique set of keys
HashSet<string> resultsHeaders = new HashSet<string>();
Expand Down