|
1 | 1 | using System;
|
| 2 | +using System.Collections.Concurrent; |
| 3 | +using System.Collections.Generic; |
| 4 | +using System.Linq; |
2 | 5 | using System.Threading;
|
3 | 6 |
|
4 | 7 | namespace NHibernate.Test
|
5 | 8 | {
|
6 | 9 | public class MultiThreadRunner<T>
|
7 | 10 | {
|
8 | 11 | public delegate void ExecuteAction(T subject);
|
9 |
| - private readonly int numThreads; |
10 |
| - private readonly ExecuteAction[] actions; |
11 |
| - private readonly Random rnd = new Random(); |
12 |
| - private bool running; |
13 |
| - private int timeout = 1000; |
14 |
| - private int timeoutBetweenThreadStart = 30; |
15 | 12 |
|
16 |
| - public MultiThreadRunner(int numThreads, ExecuteAction[] actions) |
| 13 | + private readonly int _numThreads; |
| 14 | + private readonly ExecuteAction[] _actions; |
| 15 | + private readonly Random _rnd = new Random(); |
| 16 | + private volatile bool _running; |
| 17 | + private ConcurrentQueue<Exception> _errors = new ConcurrentQueue<Exception>(); |
| 18 | + |
| 19 | + public MultiThreadRunner(int numThreads, params ExecuteAction[] actions) |
17 | 20 | {
|
18 |
| - if(numThreads < 1) |
| 21 | + if (numThreads < 1) |
19 | 22 | {
|
20 |
| - throw new ArgumentOutOfRangeException("numThreads",numThreads,"Must be GT 1"); |
| 23 | + throw new ArgumentOutOfRangeException(nameof(numThreads), numThreads, "Must be GTE 1"); |
21 | 24 | }
|
22 | 25 | if (actions == null || actions.Length == 0)
|
23 | 26 | {
|
24 |
| - throw new ArgumentNullException("actions"); |
| 27 | + throw new ArgumentNullException(nameof(actions)); |
25 | 28 | }
|
26 |
| - foreach (ExecuteAction action in actions) |
| 29 | + if (actions.Any(action => action == null)) |
27 | 30 | {
|
28 |
| - if(action==null) |
29 |
| - throw new ArgumentNullException("actions", "null delegate"); |
| 31 | + throw new ArgumentNullException(nameof(actions), "null delegate"); |
30 | 32 | }
|
31 |
| - this.numThreads = numThreads; |
32 |
| - this.actions = actions; |
| 33 | + _numThreads = numThreads; |
| 34 | + _actions = actions; |
33 | 35 | }
|
34 | 36 |
|
35 |
| - public int EndTimeout |
36 |
| - { |
37 |
| - get { return timeout; } |
38 |
| - set { timeout = value; } |
39 |
| - } |
| 37 | + public int EndTimeout { get; set; } = 1000; |
40 | 38 |
|
41 |
| - public int TimeoutBetweenThreadStart |
42 |
| - { |
43 |
| - get { return timeoutBetweenThreadStart; } |
44 |
| - set { timeoutBetweenThreadStart = value; } |
45 |
| - } |
| 39 | + public int TimeoutBetweenThreadStart { get; set; } = 30; |
46 | 40 |
|
47 |
| - public void Run(T subjectInstance) |
48 |
| - { |
49 |
| - running = true; |
50 |
| - Thread[] t = new Thread[numThreads]; |
51 |
| - for (int i = 0; i < numThreads; i++) |
52 |
| - { |
53 |
| - t[i] = new Thread(ThreadProc); |
54 |
| - t[i].Name = i.ToString(); |
55 |
| - t[i].Start(subjectInstance); |
56 |
| - if (i > 2) |
57 |
| - Thread.Sleep(timeoutBetweenThreadStart); |
58 |
| - } |
| 41 | + public Exception[] GetErrors() => _errors.ToArray(); |
| 42 | + public void ClearErrors() => _errors = new ConcurrentQueue<Exception>(); |
59 | 43 |
|
60 |
| - Thread.Sleep(timeout); |
| 44 | + public int Run(T subjectInstance) |
| 45 | + { |
| 46 | + var allThreads = new List<ThreadHolder<T>>(); |
61 | 47 |
|
62 |
| - // Tell the threads to shut down, then wait until they all |
63 |
| - // finish. |
64 |
| - running = false; |
65 |
| - for (int i = 0; i < numThreads; i++) |
| 48 | + var launcher = new Thread( |
| 49 | + () => |
| 50 | + { |
| 51 | + try |
| 52 | + { |
| 53 | + for (var i = 0; i < _numThreads; i++) |
| 54 | + { |
| 55 | + var threadHolder = new ThreadHolder<T> |
| 56 | + { |
| 57 | + Thread = new Thread(ThreadProc) { Name = i.ToString() }, |
| 58 | + Subject = subjectInstance |
| 59 | + }; |
| 60 | + threadHolder.Thread.Start(threadHolder); |
| 61 | + allThreads.Add(threadHolder); |
| 62 | + if (i > 2 && TimeoutBetweenThreadStart > 0) |
| 63 | + Thread.Sleep(TimeoutBetweenThreadStart); |
| 64 | + } |
| 65 | + } |
| 66 | + catch (Exception e) |
| 67 | + { |
| 68 | + _errors.Enqueue(e); |
| 69 | + throw; |
| 70 | + } |
| 71 | + }); |
| 72 | + var totalLoops = 0; |
| 73 | + _running = true; |
| 74 | + // Use a separated thread for launching in case too many threads are asked: the inner Start will freeze |
| 75 | + // but would be able to resume once _running would have been set to false, causing first threads to stop. |
| 76 | + launcher.Start(); |
| 77 | + // Sleep for the required timeout, taking into account the start delay (if all threads are launchable without |
| 78 | + // having to wait due to thread starvation). |
| 79 | + Thread.Sleep(TimeoutBetweenThreadStart * _numThreads + EndTimeout); |
| 80 | + // Tell the threads to shut down, then wait until they all finish. |
| 81 | + _running = false; |
| 82 | + launcher.Join(); |
| 83 | + foreach (var threadHolder in allThreads.Where(t => t != null)) |
66 | 84 | {
|
67 |
| - t[i].Join(); |
| 85 | + threadHolder.Thread.Join(); |
| 86 | + totalLoops += threadHolder.LoopsDone; |
68 | 87 | }
|
| 88 | + return totalLoops; |
69 | 89 | }
|
70 | 90 |
|
71 | 91 | private void ThreadProc(object arg)
|
72 | 92 | {
|
73 |
| - T subjectInstance = (T) arg; |
74 |
| - while (running) |
| 93 | + try |
| 94 | + { |
| 95 | + var holder = (ThreadHolder<T>) arg; |
| 96 | + while (_running) |
| 97 | + { |
| 98 | + var actionIdx = _rnd.Next(0, _actions.Length); |
| 99 | + _actions[actionIdx](holder.Subject); |
| 100 | + holder.LoopsDone++; |
| 101 | + } |
| 102 | + } |
| 103 | + catch (Exception e) |
75 | 104 | {
|
76 |
| - int actionIdx = rnd.Next(0, actions.Length); |
77 |
| - actions[actionIdx](subjectInstance); |
| 105 | + _errors.Enqueue(e); |
| 106 | + throw; |
78 | 107 | }
|
79 | 108 | }
|
| 109 | + |
| 110 | + private class ThreadHolder<TH> |
| 111 | + { |
| 112 | + public Thread Thread { get; set; } |
| 113 | + public int LoopsDone { get; set; } |
| 114 | + public TH Subject { get; set; } |
| 115 | + } |
80 | 116 | }
|
81 | 117 | }
|
0 commit comments