Skip to content

Add dynamic threadpool #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
8 commits merged into from
Aug 15, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@

use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;

use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;

use crate::future::Future;
use crate::task::{Context, Poll};
use crate::utils::abort_on_panic;

const MAX_THREADS: u64 = 10_000;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);

struct Pool {
sender: Sender<async_task::Task<()>>,
receiver: Receiver<async_task::Task<()>>,
Expand All @@ -29,11 +35,69 @@ lazy_static! {
.expect("cannot start a thread driving blocking tasks");
}

let (sender, receiver) = unbounded();
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};
}

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after one second.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
}

thread::Builder::new()
.name("async-blocking-driver-dynamic".to_string())
.spawn(|| {
let wait_limit = Duration::from_secs(1);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
})
.expect("cannot start a dynamic thread driving blocking tasks");
}

// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<()>) {
let first_try_result = POOL.sender.try_send(t);
match first_try_result {
Ok(()) => {
// NICEEEE
}
Err(crossbeam::channel::TrySendError::Full(t)) => {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
POOL.sender.send(t).unwrap()
}
Err(crossbeam::channel::TrySendError::Disconnected(_)) => {
panic!(
"unable to send to blocking threadpool \
due to receiver disconnection"
);
}
}
}

/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
Expand All @@ -42,7 +106,6 @@ where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let schedule = |t| POOL.sender.send(t).unwrap();
let (task, handle) = async_task::spawn(future, schedule, ());
task.schedule();
JoinHandle(handle)
Expand Down