Skip to content

Commit 0e6b30f

Browse files
committed
EMA based statistically adaptive blocking thread pool
1 parent c9d8b19 commit 0e6b30f

File tree

3 files changed

+94
-21
lines changed

3 files changed

+94
-21
lines changed

benches/blocking.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#![feature(test)]
2+
3+
extern crate test;
4+
5+
use async_std::task;
6+
use async_std::task_local;
7+
use test::{black_box, Bencher};
8+
use std::thread;
9+
use std::time::Duration;
10+
use async_std::task::blocking::JoinHandle;
11+
use futures::future::{join_all};
12+
13+
14+
#[bench]
15+
fn blocking(b: &mut Bencher) {
16+
b.iter(|| {
17+
let handles = (0..10_000).map(|_| {
18+
task::blocking::spawn(async {
19+
let duration = Duration::from_millis(1);
20+
thread::sleep(duration);
21+
})
22+
}).collect::<Vec<JoinHandle<()>>>();
23+
24+
task::block_on(join_all(handles));
25+
});
26+
}

src/task/blocking.rs

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,34 @@ use crate::future::Future;
1313
use crate::task::{Context, Poll};
1414
use crate::utils::abort_on_panic;
1515

16+
const LOW_WATERMARK: u64 = 2;
1617
const MAX_THREADS: u64 = 10_000;
1718

18-
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
19+
// Pool task frequency calculation variables
20+
static AVR_FREQUENCY: AtomicU64 = AtomicU64::new(0);
21+
static FREQUENCY: AtomicU64 = AtomicU64::new(0);
22+
23+
// Pool speedup calculation variables
24+
static SPEEDUP: AtomicU64 = AtomicU64::new(0);
25+
26+
// Pool size variables
27+
static EXPECTED_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK);
28+
static CURRENT_POOL_SIZE: AtomicU64 = AtomicU64::new(LOW_WATERMARK);
1929

2030
struct Pool {
2131
sender: Sender<async_task::Task<()>>,
22-
receiver: Receiver<async_task::Task<()>>,
32+
receiver: Receiver<async_task::Task<()>>
2333
}
2434

2535
lazy_static! {
2636
static ref POOL: Pool = {
27-
for _ in 0..2 {
37+
for _ in 0..LOW_WATERMARK {
2838
thread::Builder::new()
2939
.name("async-blocking-driver".to_string())
3040
.spawn(|| abort_on_panic(|| {
3141
for task in &POOL.receiver {
3242
task.run();
43+
calculate_dispatch_frequency();
3344
}
3445
}))
3546
.expect("cannot start a thread driving blocking tasks");
@@ -47,18 +58,30 @@ lazy_static! {
4758
};
4859
}
4960

50-
// Create up to MAX_THREADS dynamic blocking task worker threads.
51-
// Dynamic threads will terminate themselves if they don't
52-
// receive any work after between one and ten seconds.
53-
fn maybe_create_another_blocking_thread() {
54-
// We use a `Relaxed` atomic operation because
55-
// it's just a heuristic, and would not lose correctness
56-
// even if it's random.
57-
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
58-
if workers >= MAX_THREADS {
59-
return;
61+
fn calculate_dispatch_frequency() {
62+
// Calculate current message processing rate here
63+
let previous_freq = FREQUENCY.fetch_sub(1, Ordering::Relaxed);
64+
let avr_freq = AVR_FREQUENCY.load(Ordering::Relaxed);
65+
let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed);
66+
let frequency = (avr_freq as f64 + previous_freq as f64 / current_pool_size as f64) as u64;
67+
AVR_FREQUENCY.store(frequency, Ordering::Relaxed);
68+
69+
// Adapt the thread count of pool
70+
let speedup = SPEEDUP.load(Ordering::Relaxed);
71+
if frequency > speedup {
72+
// Speedup can be gained. Scale the pool up here.
73+
SPEEDUP.store(frequency, Ordering::Relaxed);
74+
EXPECTED_POOL_SIZE.store(current_pool_size + 1, Ordering::Relaxed);
75+
} else {
76+
// There is no need for the extra threads, schedule them to be closed.
77+
EXPECTED_POOL_SIZE.fetch_sub(2, Ordering::Relaxed);
6078
}
79+
}
6180

81+
// Creates yet another thread to receive tasks.
82+
// Dynamic threads will terminate themselves if they don't
83+
// receive any work after between one and ten seconds.
84+
fn create_blocking_thread() {
6285
// We want to avoid having all threads terminate at
6386
// exactly the same time, causing thundering herd
6487
// effects. We want to stagger their destruction over
@@ -73,25 +96,49 @@ fn maybe_create_another_blocking_thread() {
7396
.spawn(move || {
7497
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
7598

76-
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
99+
CURRENT_POOL_SIZE.fetch_add(1, Ordering::Relaxed);
77100
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
78101
abort_on_panic(|| task.run());
102+
calculate_dispatch_frequency();
79103
}
80-
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
104+
CURRENT_POOL_SIZE.fetch_sub(1, Ordering::Relaxed);
81105
})
82106
.expect("cannot start a dynamic thread driving blocking tasks");
83107
}
84108

85109
// Enqueues work, attempting to send to the threadpool in a
86-
// nonblocking way and spinning up another worker thread if
87-
// there is not a thread ready to accept the work.
110+
// nonblocking way and spinning up needed amount of threads
111+
// based on the previous statistics without relying on
112+
// if there is not a thread ready to accept the work or not.
88113
fn schedule(t: async_task::Task<()>) {
114+
// Add up for every incoming task schedule
115+
FREQUENCY.fetch_add(1, Ordering::Relaxed);
116+
117+
// Calculate the amount of threads needed to spin up
118+
// then retry sending while blocking. It doesn't spin if
119+
// expected pool size is above the MAX_THREADS (which is a
120+
// case won't happen)
121+
let pool_size = EXPECTED_POOL_SIZE.load(Ordering::Relaxed);
122+
let current_pool_size = CURRENT_POOL_SIZE.load(Ordering::Relaxed);
123+
if pool_size > current_pool_size && pool_size <= MAX_THREADS {
124+
let needed = pool_size - current_pool_size;
125+
126+
// For safety, check boundaries before spawning threads
127+
if needed > 0 && (needed < pool_size || needed < current_pool_size) {
128+
(0..needed).for_each(|_| {
129+
create_blocking_thread();
130+
});
131+
}
132+
}
133+
89134
if let Err(err) = POOL.sender.try_send(t) {
90135
// We were not able to send to the channel without
91-
// blocking. Try to spin up another thread and then
92-
// retry sending while blocking.
93-
maybe_create_another_blocking_thread();
136+
// blocking.
94137
POOL.sender.send(err.into_inner()).unwrap();
138+
} else {
139+
// Every successful dispatch, rewarded with negative
140+
let reward = AVR_FREQUENCY.load(Ordering::Relaxed) as f64 / 2.0_f64;
141+
EXPECTED_POOL_SIZE.fetch_sub(reward as u64, Ordering::Relaxed);
95142
}
96143
}
97144

src/task/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ mod pool;
3434
mod sleep;
3535
mod task;
3636

37-
pub(crate) mod blocking;
37+
pub mod blocking;

0 commit comments

Comments
 (0)