Skip to content

Commit 98cbf7f

Browse files
committed
Restore task::spawn_blocking
1 parent 84e5c5f commit 98cbf7f

File tree

1 file changed

+85
-2
lines changed

1 file changed

+85
-2
lines changed

src/task/spawn_blocking.rs

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
use crate::task::{spawn, JoinHandle};
1+
use std::sync::atomic::{AtomicUsize, Ordering};
2+
use std::thread;
3+
use std::time::Duration;
4+
5+
use crossbeam_channel::{unbounded, Receiver, Sender};
6+
use once_cell::sync::Lazy;
7+
8+
use crate::task::{JoinHandle, Task};
9+
use crate::utils::abort_on_panic;
210

311
/// Spawns a blocking task.
412
///
@@ -35,5 +43,80 @@ where
3543
F: FnOnce() -> T + Send + 'static,
3644
T: Send + 'static,
3745
{
38-
spawn(async { f() })
46+
let schedule = |task| POOL.sender.send(task).unwrap();
47+
let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None));
48+
task.schedule();
49+
JoinHandle::new(handle)
50+
}
51+
52+
type Runnable = async_task::Task<Task>;
53+
54+
struct Pool {
55+
sender: Sender<Runnable>,
56+
receiver: Receiver<Runnable>,
57+
}
58+
59+
/// The number of sleeping worker threads.
60+
static SLEEPING: AtomicUsize = AtomicUsize::new(0);
61+
62+
static POOL: Lazy<Pool> = Lazy::new(|| {
63+
// Start a single worker thread waiting for the first task.
64+
start_thread();
65+
66+
let (sender, receiver) = unbounded();
67+
Pool { sender, receiver }
68+
});
69+
70+
fn start_thread() {
71+
SLEEPING.fetch_add(1, Ordering::SeqCst);
72+
let timeout = Duration::from_secs(1);
73+
74+
thread::Builder::new()
75+
.name("async-std/blocking".to_string())
76+
.spawn(move || {
77+
loop {
78+
let mut task = match POOL.receiver.recv_timeout(timeout) {
79+
Ok(task) => task,
80+
Err(_) => {
81+
// Check whether this is the last sleeping thread.
82+
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
83+
// If so, then restart the thread to make sure there is always at least
84+
// one sleeping thread.
85+
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
86+
continue;
87+
}
88+
}
89+
90+
// Stop the thread.
91+
return;
92+
}
93+
};
94+
95+
// If there are no sleeping threads, then start one to make sure there is always at
96+
// least one sleeping thread.
97+
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
98+
start_thread();
99+
}
100+
101+
loop {
102+
// Run the task.
103+
abort_on_panic(|| task.run());
104+
105+
// Try taking another task if there are any available.
106+
task = match POOL.receiver.try_recv() {
107+
Ok(task) => task,
108+
Err(_) => break,
109+
};
110+
}
111+
112+
// If there is at least one sleeping thread, stop this thread instead of putting it
113+
// to sleep.
114+
if SLEEPING.load(Ordering::SeqCst) > 0 {
115+
return;
116+
}
117+
118+
SLEEPING.fetch_add(1, Ordering::SeqCst);
119+
}
120+
})
121+
.expect("cannot start a blocking thread");
39122
}

0 commit comments

Comments
 (0)