@@ -21,15 +21,16 @@ const LOW_WATERMARK: u64 = 2;
21
21
22
22
// Pool managers interval time (milliseconds)
23
23
// This is the actual interval which makes adaptation calculation
24
- const MANAGER_POLL_INTERVAL : u64 = 50 ;
25
-
26
- // Frequency scale factor for thread adaptation calculation
27
- const FREQUENCY_SCALE_FACTOR : u64 = 200 ;
24
+ const MANAGER_POLL_INTERVAL : u64 = 200 ;
28
25
29
26
// Frequency histogram's sliding window size
30
27
// Defines how many frequencies will be considered for adaptation.
31
28
const FREQUENCY_QUEUE_SIZE : usize = 10 ;
32
29
30
+ // Exponential moving average smoothing coefficient for limited window
31
+ // Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
32
+ const EMA_COEFFICIENT : f64 = 2_f64 / ( FREQUENCY_QUEUE_SIZE as f64 + 1_f64 ) ;
33
+
33
34
// Possible max threads (without OS contract)
34
35
const MAX_THREADS : u64 = 10_000 ;
35
36
@@ -96,6 +97,12 @@ fn get_current_pool_size() -> u64 {
96
97
LOW_WATERMARK . max ( current_pool_size)
97
98
}
98
99
100
+ fn calculate_ema ( freq_queue : & VecDeque < u64 > ) -> f64 {
101
+ freq_queue. iter ( ) . enumerate ( ) . fold ( 0_f64 , |acc, ( i, freq) | {
102
+ acc + ( ( * freq as f64 ) * ( ( 1_f64 - EMA_COEFFICIENT ) . powf ( i as f64 ) as f64 ) )
103
+ } ) * EMA_COEFFICIENT as f64
104
+ }
105
+
99
106
// Adaptive pool scaling function
100
107
//
101
108
// This allows to spawn new threads to make room for incoming task pressure.
@@ -114,41 +121,43 @@ fn scale_pool() {
114
121
freq_queue. push_back ( 0 ) ;
115
122
}
116
123
117
- // Calculate message rate at the given time window
118
- // Factor is there for making adaptation linear.
119
- // Exponential bursts may create idle threads which won't be utilized.
120
- // They may actually cause slowdown.
121
- let current_freq_factorized = current_frequency as f64 * FREQUENCY_SCALE_FACTOR as f64 ;
122
- let frequency = ( current_freq_factorized / MANAGER_POLL_INTERVAL as f64 ) as u64 ;
124
+ // Calculate message rate for the given time window
125
+ let frequency = ( current_frequency as f64 / MANAGER_POLL_INTERVAL as f64 ) as u64 ;
123
126
124
127
// Adapts the thread count of pool
125
128
//
126
129
// Sliding window of frequencies visited by the pool manager.
127
130
// Select the maximum from the window and check against the current task dispatch frequency.
128
131
// If current frequency is bigger, we will scale up.
129
- if let Some ( & max_measurement) = freq_queue. iter ( ) . max ( ) {
130
- if frequency > max_measurement {
131
- // Don't spawn more than cores.
132
- // Default behaviour of most of the linear adapting thread pools.
133
- let scale_by = num_cpus:: get ( ) . max ( LOW_WATERMARK as usize ) as u64 ;
134
-
135
- // Pool size can't reach to max_threads anyway.
136
- // Pool manager backpressures itself while visiting message rate frequencies.
137
- // You will get an error before hitting to limits by OS.
138
- if get_current_pool_size ( ) < MAX_THREADS {
139
- ( 0 ..scale_by) . for_each ( |_| {
140
- create_blocking_thread ( ) ;
141
- } ) ;
142
- }
143
- }
144
- }
132
+ let prev_ema_frequency = calculate_ema ( & freq_queue) ;
145
133
146
134
// Add seen frequency data to the frequency histogram.
147
135
freq_queue. push_back ( frequency) ;
148
136
if freq_queue. len ( ) == FREQUENCY_QUEUE_SIZE + 1 {
149
137
freq_queue. pop_front ( ) ;
150
138
}
151
139
140
+ let curr_ema_frequency = calculate_ema ( & freq_queue) ;
141
+
142
+ if curr_ema_frequency > prev_ema_frequency {
143
+ let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
144
+ let scale = ( ( LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64 ) as u64 ;
145
+
146
+ // Pool size shouldn't reach to max_threads anyway.
147
+ // Pool manager backpressures itself while visiting message rate frequencies.
148
+ // You will get an error before hitting to limits by OS.
149
+ ( 0 ..scale) . for_each ( |_| {
150
+ create_blocking_thread ( ) ;
151
+ } ) ;
152
+ } else if curr_ema_frequency == prev_ema_frequency && current_frequency != 0 {
153
+ // Throughput is low. Allocate more threads to unblock flow.
154
+ let scale = LOW_WATERMARK * current_frequency + 1 ;
155
+
156
+ ( 0 ..scale) . for_each ( |_| {
157
+ create_blocking_thread ( ) ;
158
+ } ) ;
159
+ }
160
+
152
161
FREQUENCY . store ( 0 , Ordering :: Release ) ;
153
162
}
154
163
0 commit comments