Skip to content

Commit a862c3b

Browse files
committed
Add multithreading for saving tracker data (Fix #163)
This adds multithreading when saving data. This becomes an issues when large amounts of data is being written out, where the methods in SaveData do alot of string processing. To ensure data is correctly written out, this also wait for the threads to complete when session is ending.
1 parent 6f43243 commit a862c3b

File tree

3 files changed

+93
-13
lines changed

3 files changed

+93
-13
lines changed

Assets/UXF/Scripts/Etc/ResultsDict.cs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public class ResultsDictionary
1313
{
1414
private Dictionary<string, object> baseDict;
1515
private bool allowAdHocAdding;
16+
private Object lockObject = new Object();
1617

1718
/// <summary>
1819
/// Dictionary of results for a trial.
@@ -25,7 +26,10 @@ public ResultsDictionary(IEnumerable<string> initialKeys, bool allowAdHocAdding)
2526
this.allowAdHocAdding = allowAdHocAdding;
2627
foreach (var key in initialKeys)
2728
{
28-
baseDict.Add(key, string.Empty);
29+
lock (lockObject)
30+
{
31+
baseDict.Add(key, string.Empty);
32+
}
2933
}
3034
}
3135

@@ -38,13 +42,16 @@ public object this[string key]
3842
{
3943
get { return baseDict[key]; }
4044
set {
41-
if (allowAdHocAdding || baseDict.ContainsKey(key))
42-
{
43-
baseDict[key] = value;
44-
}
45-
else
45+
lock (lockObject)
4646
{
47-
throw new KeyNotFoundException(string.Format("Custom header \"{0}\" does not exist!", key));
47+
if (allowAdHocAdding || baseDict.ContainsKey(key))
48+
{
49+
baseDict[key] = value;
50+
}
51+
else
52+
{
53+
throw new KeyNotFoundException(string.Format("Custom header \"{0}\" does not exist!", key));
54+
}
4855
}
4956
}
5057
}
@@ -73,4 +80,4 @@ public bool ContainsKey(string key)
7380

7481
}
7582

76-
}
83+
}

Assets/UXF/Scripts/Etc/Trial.cs

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
using System.Threading;
66
using UnityEngine;
77
using System.Collections.Specialized;
8-
8+
using System.Threading.Tasks;
9+
using System.Collections.Concurrent;
10+
using System.IO;
911

1012
namespace UXF
1113
{
@@ -62,6 +64,11 @@ public bool saveData
6264
/// </summary>
6365
public ResultsDictionary result;
6466

67+
// Used by the worker task
68+
private static BlockingQueue<System.Action> blockingQueue = new BlockingQueue<System.Action>();
69+
private static Task workerTask;
70+
private static bool quitting = false;
71+
6572
/// <summary>
6673
/// Manually create a trial. When doing this you need to add this trial to a block with block.trials.Add(trial)
6774
/// </summary>
@@ -277,7 +284,12 @@ private void SaveData()
277284
tracker.StopRecording();
278285
if (tracker.Data.CountRows() > 0)
279286
{
280-
SaveDataTable(tracker.Data, tracker.DataName, dataType: UXFDataType.Trackers);
287+
UXFDataTable table = tracker.Data;
288+
string name = tracker.DataName;
289+
ManageInWorker(() =>
290+
{
291+
SaveDataTable(table, name, dataType: UXFDataType.Trackers);
292+
});
281293
}
282294
}
283295
catch (NullReferenceException)
@@ -292,9 +304,68 @@ private void SaveData()
292304
result[s] = settings.GetObject(s, string.Empty);
293305
}
294306
}
295-
}
296307

297-
308+
/// <summary>
309+
/// Adds a new command to a queue which is executed in a separate worker thread when it is available.
310+
/// Warning: The Unity Engine API is not thread safe, so do not attempt to put any Unity commands here.
311+
/// </summary>
312+
/// <param name="action"></param>
313+
public static void ManageInWorker(System.Action action)
314+
{
315+
if (workerTask == null)
316+
{
317+
workerTask = Task.Run(Worker);
318+
quitting = false;
319+
}
320+
321+
blockingQueue.Enqueue(action);
322+
}
323+
324+
/// <summary>
325+
/// The worker thread used when <see cref="ManageInWorker"/> is called.
326+
/// </summary>
327+
private static void Worker()
328+
{
329+
// performs FileIO tasks in seperate thread
330+
foreach (var action in blockingQueue)
331+
{
332+
try
333+
{
334+
action.Invoke();
335+
}
336+
catch (ThreadAbortException)
337+
{
338+
break;
339+
}
340+
catch (IOException e)
341+
{
342+
Utilities.UXFDebugLogError(string.Format("Error, file may be in use! Exception: {0}", e));
343+
}
344+
catch (System.Exception e)
345+
{
346+
// stops thread aborting upon an exception
347+
Debug.LogException(e);
348+
}
349+
350+
if (quitting && blockingQueue.NumItems() == 0)
351+
{
352+
break;
353+
}
354+
}
355+
}
356+
357+
/// <summary>
358+
/// Wait for all tasks scheduled through <see cref="ManageInWorker"/>
359+
/// </summary>
360+
public static void WaitForTasks()
361+
{
362+
Utilities.UXFDebugLog("Waiting for tasks to finish");
363+
quitting = true;
364+
blockingQueue.Enqueue(() => {}); // ensures bq breaks from foreach loop
365+
workerTask?.Wait();
366+
Utilities.UXFDebugLog("Tasks finished");
367+
}
368+
}
298369

299370
/// <summary>
300371
/// Status of a trial
@@ -307,4 +378,4 @@ public enum TrialStatus
307378
}
308379

309380

310-
}
381+
}

Assets/UXF/Scripts/Session.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,8 @@ public void End()
695695

696696
void SaveResults()
697697
{
698+
Trial.WaitForTasks();
699+
698700
// generate list of all headers possible
699701
// hashset keeps unique set of keys
700702
HashSet<string> resultsHeaders = new HashSet<string>();

0 commit comments

Comments
 (0)