Skip to content

Commit 73ccc67

Browse files
committed
Solve pool max threads problem for MacOS
1 parent 4606893 commit 73ccc67

File tree

2 files changed

+64
-26
lines changed

2 files changed

+64
-26
lines changed

src/task/blocking.rs

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crossbeam_channel::{bounded, Receiver, Sender};
1111
use lazy_static::lazy_static;
1212

1313
use crate::future::Future;
14+
use crate::io::ErrorKind;
1415
use crate::task::{Context, Poll};
1516
use crate::utils::abort_on_panic;
1617
use std::sync::{Arc, Mutex};
@@ -31,13 +32,13 @@ const FREQUENCY_QUEUE_SIZE: usize = 10;
3132
// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
3233
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);
3334

34-
// Possible max threads (without OS contract)
35-
const MAX_THREADS: u64 = 10_000;
36-
3735
// Pool task frequency variable
3836
// Holds scheduled tasks onto the thread pool for the calculation window
3937
static FREQUENCY: AtomicU64 = AtomicU64::new(0);
4038

39+
// Possible max threads (without OS contract)
40+
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000);
41+
4142
struct Pool {
4243
sender: Sender<async_task::Task<()>>,
4344
receiver: Receiver<async_task::Task<()>>,
@@ -89,14 +90,19 @@ lazy_static! {
8990
static ref POOL_SIZE: Arc<Mutex<u64>> = Arc::new(Mutex::new(LOW_WATERMARK));
9091
}
9192

92-
// Gets the current pool size
93-
// Used for pool size boundary checking in pool manager
94-
fn get_current_pool_size() -> u64 {
95-
let current_arc = POOL_SIZE.clone();
96-
let current_pool_size = *current_arc.lock().unwrap();
97-
LOW_WATERMARK.max(current_pool_size)
98-
}
99-
93+
// Exponentially Weighted Moving Average calculation
94+
//
95+
// This allows us to find the EMA value.
96+
// This value represents the trend of tasks mapped onto the thread pool.
97+
// Calculation is following:
98+
//
99+
// α :: EMA_COEFFICIENT :: smoothing factor between 0 and 1
100+
// Yt :: freq :: frequency sample at time t
101+
// St :: acc :: EMA at time t
102+
//
103+
// Under these definitions formula is following:
104+
// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
105+
#[inline]
100106
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
101107
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
102108
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
@@ -124,11 +130,7 @@ fn scale_pool() {
124130
// Calculate message rate for the given time window
125131
let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64;
126132

127-
// Adapts the thread count of pool
128-
//
129-
// Sliding window of frequencies visited by the pool manager.
130-
// Select the maximum from the window and check against the current task dispatch frequency.
131-
// If current frequency is bigger, we will scale up.
133+
// Calculates current time window's EMA value (including last sample)
132134
let prev_ema_frequency = calculate_ema(&freq_queue);
133135

134136
// Add seen frequency data to the frequency histogram.
@@ -137,22 +139,35 @@ fn scale_pool() {
137139
freq_queue.pop_front();
138140
}
139141

142+
// Calculates current time window's EMA value (including last sample)
140143
let curr_ema_frequency = calculate_ema(&freq_queue);
141144

145+
// Adapts the thread count of pool
146+
//
147+
// Sliding window of frequencies visited by the pool manager.
148+
// Pool manager creates EMA value for previous window and current window.
149+
// Compare them to determine scaling amount based on the trends.
150+
// If current EMA value is bigger, we will scale up.
142151
if curr_ema_frequency > prev_ema_frequency {
152+
// "Scale by" amount can be seen as "how much load is coming".
153+
// "Scale" amount is "how many threads we should spawn".
143154
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
144155
let scale = ((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as u64;
145156

146-
// Pool size shouldn't reach to max_threads anyway.
147-
// Pool manager backpressures itself while visiting message rate frequencies.
148-
// You will get an error before hitting to limits by OS.
157+
// It is time to scale the pool!
149158
(0..scale).for_each(|_| {
150159
create_blocking_thread();
151160
});
152-
} else if curr_ema_frequency == prev_ema_frequency && current_frequency != 0 {
161+
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
162+
&& current_frequency != 0
163+
{
153164
// Throughput is low. Allocate more threads to unblock flow.
165+
// If we fall to this case, scheduler is congested by longhauling tasks.
166+
// For unblock the flow we should add up some threads to the pool, but not that much to
167+
// stagger the program's operation.
154168
let scale = LOW_WATERMARK * current_frequency + 1;
155169

170+
// Scale it up!
156171
(0..scale).for_each(|_| {
157172
create_blocking_thread();
158173
});
@@ -165,6 +180,16 @@ fn scale_pool() {
165180
// Dynamic threads will terminate themselves if they don't
166181
// receive any work after between one and ten seconds.
167182
fn create_blocking_thread() {
183+
// Check that thread is spawnable.
184+
// If it hits to the OS limits don't spawn it.
185+
{
186+
let current_arc = POOL_SIZE.clone();
187+
let pool_size = *current_arc.lock().unwrap();
188+
if pool_size >= MAX_THREADS.load(Ordering::SeqCst) {
189+
MAX_THREADS.store(10_000, Ordering::SeqCst);
190+
return;
191+
}
192+
}
168193
// We want to avoid having all threads terminate at
169194
// exactly the same time, causing thundering herd
170195
// effects. We want to stagger their destruction over
@@ -174,7 +199,7 @@ fn create_blocking_thread() {
174199
// Generate a simple random number of milliseconds
175200
let rand_sleep_ms = u64::from(random(10_000));
176201

177-
thread::Builder::new()
202+
let _ = thread::Builder::new()
178203
.name("async-blocking-driver-dynamic".to_string())
179204
.spawn(move || {
180205
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
@@ -192,15 +217,30 @@ fn create_blocking_thread() {
192217
*current_arc.lock().unwrap() -= 1;
193218
}
194219
})
195-
.expect("cannot start a dynamic thread driving blocking tasks");
220+
.map_err(|err| {
221+
match err.kind() {
222+
ErrorKind::WouldBlock => {
223+
// Maximum allowed threads per process is varying from system to system.
224+
// Some systems has it(like MacOS), some doesn't(Linux)
225+
// This case expected to not happen.
226+
// But when happened this shouldn't throw a panic.
227+
let current_arc = POOL_SIZE.clone();
228+
MAX_THREADS.store(*current_arc.lock().unwrap() - 1, Ordering::SeqCst);
229+
}
230+
_ => eprintln!(
231+
"cannot start a dynamic thread driving blocking tasks: {}",
232+
err
233+
),
234+
}
235+
});
196236
}
197237

198238
// Enqueues work, attempting to send to the threadpool in a
199239
// nonblocking way and spinning up needed amount of threads
200240
// based on the previous statistics without relying on
201241
// if there is not a thread ready to accept the work or not.
202242
fn schedule(t: async_task::Task<()>) {
203-
// Add up for every incoming task schedule
243+
// Add up for every incoming scheduled task
204244
FREQUENCY.fetch_add(1, Ordering::Acquire);
205245

206246
if let Err(err) = POOL.sender.try_send(t) {

tests/thread_pool.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ fn longhauling_task_join() {
103103
let start = Instant::now();
104104

105105
// First batch of overhauling tasks
106-
let handles = (0..100_000)
106+
let _ = (0..100_000)
107107
.map(|_| {
108108
task::blocking::spawn(async {
109109
let duration = Duration::from_millis(1000);
@@ -112,8 +112,6 @@ fn longhauling_task_join() {
112112
})
113113
.collect::<Vec<JoinHandle<()>>>();
114114

115-
task::block_on(join_all(handles));
116-
117115
// Let them join to see how it behaves under different workloads.
118116
let duration = Duration::from_millis(thread_join_time_max);
119117
thread::sleep(duration);

0 commit comments

Comments
 (0)