1
1
//! A thread pool for running blocking functions asynchronously.
2
+ //!
3
+ //! Blocking thread pool consists of four elements:
4
+ //! * Frequency Detector
5
+ //! * Trend Estimator
6
+ //! * Predictive Upscaler
7
+ //! * Time-based Downscaler
8
+ //!
9
+ //! ## Frequency Detector
10
+ //! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
11
+ //! Pool manager thread does this sampling every 200 milliseconds.
12
+ //! This value is going to be used for trend estimation phase.
13
+ //!
14
+ //! ## Trend Estimator
15
+ //! Hold up to the given number of frequencies to create an estimation.
16
+ //! This pool holds 10 frequencies at a time.
17
+ //! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
18
+ //!
19
+ //! Algorithm is altered and adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61).
20
+ //!
21
+ //! ## Predictive Upscaler
22
+ //! Selects upscaling amount based on estimation or when throughput hogs based on amount of tasks mapped.
23
+ //! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
24
+ //!
25
+ //! ## Time-based Downscaler
26
+ //! After dynamic tasks spawned with upscaler they will continue working in between 1 second and 10 seconds.
27
+ //! When tasks are detached from the channels after this amount they join back.
2
28
3
29
use std:: collections:: VecDeque ;
4
30
use std:: fmt;
@@ -16,35 +42,37 @@ use crate::task::{Context, Poll};
16
42
use crate :: utils:: abort_on_panic;
17
43
use std:: sync:: { Arc , Mutex } ;
18
44
19
- // Low watermark value, defines the bare minimum of the pool.
20
- // Spawns initial thread set.
45
+ /// Low watermark value, defines the bare minimum of the pool.
46
+ /// Spawns initial thread set.
21
47
const LOW_WATERMARK : u64 = 2 ;
22
48
23
- // Pool managers interval time (milliseconds)
24
- // This is the actual interval which makes adaptation calculation
49
+ /// Pool managers interval time (milliseconds).
50
+ /// This is the actual interval which makes adaptation calculation.
25
51
const MANAGER_POLL_INTERVAL : u64 = 200 ;
26
52
27
- // Frequency histogram's sliding window size
28
- // Defines how many frequencies will be considered for adaptation.
53
+ /// Frequency histogram's sliding window size.
54
+ /// Defines how many frequencies will be considered for adaptation.
29
55
const FREQUENCY_QUEUE_SIZE : usize = 10 ;
30
56
31
- // Exponential moving average smoothing coefficient for limited window
32
- // Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
57
+ /// Exponential moving average smoothing coefficient for limited window.
58
+ /// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
33
59
const EMA_COEFFICIENT : f64 = 2_f64 / ( FREQUENCY_QUEUE_SIZE as f64 + 1_f64 ) ;
34
60
35
- // Pool task frequency variable
36
- // Holds scheduled tasks onto the thread pool for the calculation window
61
+ /// Pool task frequency variable.
62
+ /// Holds scheduled tasks onto the thread pool for the calculation window.
37
63
static FREQUENCY : AtomicU64 = AtomicU64 :: new ( 0 ) ;
38
64
39
- // Possible max threads (without OS contract)
65
+ /// Possible max threads (without OS contract).
40
66
static MAX_THREADS : AtomicU64 = AtomicU64 :: new ( 10_000 ) ;
41
67
68
+ /// Pool interface between the scheduler and thread pool
42
69
struct Pool {
43
70
sender : Sender < async_task:: Task < ( ) > > ,
44
71
receiver : Receiver < async_task:: Task < ( ) > > ,
45
72
}
46
73
47
74
lazy_static ! {
75
+ /// Blocking pool with static starting thread count.
48
76
static ref POOL : Pool = {
49
77
for _ in 0 ..LOW_WATERMARK {
50
78
thread:: Builder :: new( )
@@ -81,45 +109,50 @@ lazy_static! {
81
109
Pool { sender, receiver }
82
110
} ;
83
111
84
- // Pool task frequency calculation variables
112
+ /// Sliding window for pool task frequency calculation
85
113
static ref FREQ_QUEUE : Arc <Mutex <VecDeque <u64 >>> = {
86
114
Arc :: new( Mutex :: new( VecDeque :: with_capacity( FREQUENCY_QUEUE_SIZE + 1 ) ) )
87
115
} ;
88
116
89
- // Pool size variable
117
+ /// Dynamic pool thread count variable
90
118
static ref POOL_SIZE : Arc <Mutex <u64 >> = Arc :: new( Mutex :: new( LOW_WATERMARK ) ) ;
91
119
}
92
120
93
- // Exponentially Weighted Moving Average calculation
94
- //
95
- // This allows us to find the EMA value.
96
- // This value represents the trend of tasks mapped onto the thread pool.
97
- // Calculation is following:
98
- //
99
- // +--------+-----------------+----------------------------------+
100
- // | Symbol | Identifier | Explanation |
101
- // +--------+-----------------+----------------------------------+
102
- // | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
103
- // | Yt | freq | frequency sample at time t |
104
- // | St | acc | EMA at time t |
105
- // +--------+-----------------+----------------------------------+
106
- //
107
- // Under these definitions formula is following:
108
- // EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
121
+ /// Exponentially Weighted Moving Average calculation
122
+ ///
123
+ /// This allows us to find the EMA value.
124
+ /// This value represents the trend of tasks mapped onto the thread pool.
125
+ /// Calculation is following:
126
+ /// ```
127
+ /// +--------+-----------------+----------------------------------+
128
+ /// | Symbol | Identifier | Explanation |
129
+ /// +--------+-----------------+----------------------------------+
130
+ /// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
131
+ /// | Yt | freq | frequency sample at time t |
132
+ /// | St | acc | EMA at time t |
133
+ /// +--------+-----------------+----------------------------------+
134
+ /// ```
135
+ /// Under these definitions formula is following:
136
+ /// ```
137
+ /// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
138
+ /// ```
139
+ /// # Arguments
140
+ ///
141
+ /// * `freq_queue` - Sliding window of frequency samples
109
142
#[ inline]
110
143
fn calculate_ema ( freq_queue : & VecDeque < u64 > ) -> f64 {
111
144
freq_queue. iter ( ) . enumerate ( ) . fold ( 0_f64 , |acc, ( i, freq) | {
112
145
acc + ( ( * freq as f64 ) * ( ( 1_f64 - EMA_COEFFICIENT ) . powf ( i as f64 ) as f64 ) )
113
146
} ) * EMA_COEFFICIENT as f64
114
147
}
115
148
116
- // Adaptive pool scaling function
117
- //
118
- // This allows to spawn new threads to make room for incoming task pressure.
119
- // Works in the background detached from the pool system and scales up the pool based
120
- // on the request rate.
121
- //
122
- // It uses frequency based calculation to define work. Utilizing average processing rate.
149
+ /// Adaptive pool scaling function
150
+ ///
151
+ /// This allows to spawn new threads to make room for incoming task pressure.
152
+ /// Works in the background detached from the pool system and scales up the pool based
153
+ /// on the request rate.
154
+ ///
155
+ /// It uses frequency based calculation to define work. Utilizing average processing rate.
123
156
fn scale_pool ( ) {
124
157
// Fetch current frequency, it does matter that operations are ordered in this approach.
125
158
let current_frequency = FREQUENCY . load ( Ordering :: SeqCst ) ;
@@ -180,9 +213,9 @@ fn scale_pool() {
180
213
FREQUENCY . store ( 0 , Ordering :: Release ) ;
181
214
}
182
215
183
- // Creates yet another thread to receive tasks.
184
- // Dynamic threads will terminate themselves if they don't
185
- // receive any work after between one and ten seconds.
216
+ /// Creates blocking thread to receive tasks
217
+ /// Dynamic threads will terminate themselves if they don't
218
+ /// receive any work after between one and ten seconds.
186
219
fn create_blocking_thread ( ) {
187
220
// Check that thread is spawnable.
188
221
// If it hits to the OS limits don't spawn it.
@@ -239,10 +272,10 @@ fn create_blocking_thread() {
239
272
} ) ;
240
273
}
241
274
242
- // Enqueues work, attempting to send to the thread pool in a
243
- // nonblocking way and spinning up needed amount of threads
244
- // based on the previous statistics without relying on
245
- // if there is not a thread ready to accept the work or not.
275
+ /// Enqueues work, attempting to send to the thread pool in a
276
+ /// nonblocking way and spinning up needed amount of threads
277
+ /// based on the previous statistics without relying on
278
+ /// if there is not a thread ready to accept the work or not.
246
279
fn schedule ( t : async_task:: Task < ( ) > ) {
247
280
// Add up for every incoming scheduled task
248
281
FREQUENCY . fetch_add ( 1 , Ordering :: Acquire ) ;
0 commit comments