From a862c3bee77fbd7b5eba251b9853932541bef635 Mon Sep 17 00:00:00 2001 From: Shariff Faleel Date: Fri, 2 May 2025 19:10:29 -0700 Subject: [PATCH] Add multithreading for saving tracker data (Fix #163) This adds multithreading when saving data. This becomes an issues when large amounts of data is being written out, where the methods in SaveData do alot of string processing. To ensure data is correctly written out, this also wait for the threads to complete when session is ending. --- Assets/UXF/Scripts/Etc/ResultsDict.cs | 23 +++++--- Assets/UXF/Scripts/Etc/Trial.cs | 81 +++++++++++++++++++++++++-- Assets/UXF/Scripts/Session.cs | 2 + 3 files changed, 93 insertions(+), 13 deletions(-) diff --git a/Assets/UXF/Scripts/Etc/ResultsDict.cs b/Assets/UXF/Scripts/Etc/ResultsDict.cs index 00308aac..edf290ba 100644 --- a/Assets/UXF/Scripts/Etc/ResultsDict.cs +++ b/Assets/UXF/Scripts/Etc/ResultsDict.cs @@ -13,6 +13,7 @@ public class ResultsDictionary { private Dictionary baseDict; private bool allowAdHocAdding; + private Object lockObject = new Object(); /// /// Dictionary of results for a trial. @@ -25,7 +26,10 @@ public ResultsDictionary(IEnumerable initialKeys, bool allowAdHocAdding) this.allowAdHocAdding = allowAdHocAdding; foreach (var key in initialKeys) { - baseDict.Add(key, string.Empty); + lock (lockObject) + { + baseDict.Add(key, string.Empty); + } } } @@ -38,13 +42,16 @@ public object this[string key] { get { return baseDict[key]; } set { - if (allowAdHocAdding || baseDict.ContainsKey(key)) - { - baseDict[key] = value; - } - else + lock (lockObject) { - throw new KeyNotFoundException(string.Format("Custom header \"{0}\" does not exist!", key)); + if (allowAdHocAdding || baseDict.ContainsKey(key)) + { + baseDict[key] = value; + } + else + { + throw new KeyNotFoundException(string.Format("Custom header \"{0}\" does not exist!", key)); + } } } } @@ -73,4 +80,4 @@ public bool ContainsKey(string key) } -} \ No newline at end of file +} diff --git a/Assets/UXF/Scripts/Etc/Trial.cs b/Assets/UXF/Scripts/Etc/Trial.cs index 9ff67a06..8d819cc3 100644 --- a/Assets/UXF/Scripts/Etc/Trial.cs +++ b/Assets/UXF/Scripts/Etc/Trial.cs @@ -5,7 +5,9 @@ using System.Threading; using UnityEngine; using System.Collections.Specialized; - +using System.Threading.Tasks; +using System.Collections.Concurrent; +using System.IO; namespace UXF { @@ -62,6 +64,11 @@ public bool saveData /// public ResultsDictionary result; + // Used by the worker task + private static BlockingQueue blockingQueue = new BlockingQueue(); + private static Task workerTask; + private static bool quitting = false; + /// /// Manually create a trial. When doing this you need to add this trial to a block with block.trials.Add(trial) /// @@ -277,7 +284,12 @@ private void SaveData() tracker.StopRecording(); if (tracker.Data.CountRows() > 0) { - SaveDataTable(tracker.Data, tracker.DataName, dataType: UXFDataType.Trackers); + UXFDataTable table = tracker.Data; + string name = tracker.DataName; + ManageInWorker(() => + { + SaveDataTable(table, name, dataType: UXFDataType.Trackers); + }); } } catch (NullReferenceException) @@ -292,9 +304,68 @@ private void SaveData() result[s] = settings.GetObject(s, string.Empty); } } - } - + /// + /// Adds a new command to a queue which is executed in a separate worker thread when it is available. + /// Warning: The Unity Engine API is not thread safe, so do not attempt to put any Unity commands here. + /// + /// + public static void ManageInWorker(System.Action action) + { + if (workerTask == null) + { + workerTask = Task.Run(Worker); + quitting = false; + } + + blockingQueue.Enqueue(action); + } + + /// + /// The worker thread used when is called. + /// + private static void Worker() + { + // performs FileIO tasks in seperate thread + foreach (var action in blockingQueue) + { + try + { + action.Invoke(); + } + catch (ThreadAbortException) + { + break; + } + catch (IOException e) + { + Utilities.UXFDebugLogError(string.Format("Error, file may be in use! Exception: {0}", e)); + } + catch (System.Exception e) + { + // stops thread aborting upon an exception + Debug.LogException(e); + } + + if (quitting && blockingQueue.NumItems() == 0) + { + break; + } + } + } + + /// + /// Wait for all tasks scheduled through + /// + public static void WaitForTasks() + { + Utilities.UXFDebugLog("Waiting for tasks to finish"); + quitting = true; + blockingQueue.Enqueue(() => {}); // ensures bq breaks from foreach loop + workerTask?.Wait(); + Utilities.UXFDebugLog("Tasks finished"); + } + } /// /// Status of a trial @@ -307,4 +378,4 @@ public enum TrialStatus } -} \ No newline at end of file +} diff --git a/Assets/UXF/Scripts/Session.cs b/Assets/UXF/Scripts/Session.cs index 519d6887..0340c125 100644 --- a/Assets/UXF/Scripts/Session.cs +++ b/Assets/UXF/Scripts/Session.cs @@ -695,6 +695,8 @@ public void End() void SaveResults() { + Trial.WaitForTasks(); + // generate list of all headers possible // hashset keeps unique set of keys HashSet resultsHeaders = new HashSet();