Skip to content

Commit 9972b3e

Browse files
committed
Add a Future type
1 parent 13910d4 commit 9972b3e

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

src/librustc_data_structures/sync.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
2929
pub use std::sync::atomic::Ordering;
3030
pub use std::sync::atomic::Ordering::SeqCst;
3131

32+
pub mod future;
33+
3234
pub fn catch<R>(
3335
store: &Lock<Option<Box<dyn Any + std::marker::Send + 'static>>>,
3436
f: impl FnOnce() -> R,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use crate::jobserver;
2+
use crate::sync;
3+
use std::mem::ManuallyDrop;
4+
use std::panic::{self, AssertUnwindSafe};
5+
use std::sync::Arc;
6+
use std::sync::Condvar;
7+
use std::sync::Mutex;
8+
use std::thread;
9+
10+
struct FutureData<'a, R> {
11+
// ManuallyDrop is needed here to ensure the destructor of FutureData cannot refer to 'a
12+
func: Mutex<ManuallyDrop<Option<Box<dyn FnOnce() -> R + sync::Send + 'a>>>>,
13+
result: Mutex<Option<thread::Result<R>>>,
14+
waiter: Condvar,
15+
}
16+
pub struct Future<'a, R> {
17+
data: Arc<FutureData<'a, R>>,
18+
}
19+
20+
fn create<'a, R>(f: impl FnOnce() -> R + sync::Send + 'a) -> Future<'a, R> {
21+
let data = Arc::new(FutureData {
22+
func: Mutex::new(ManuallyDrop::new(Some(Box::new(f)))),
23+
result: Mutex::new(None),
24+
waiter: Condvar::new(),
25+
});
26+
Future { data: data.clone() }
27+
}
28+
29+
fn run<R>(data: &FutureData<'_, R>) {
30+
if let Some(func) = data.func.lock().unwrap().take() {
31+
// Execute the function if it has not yet been joined
32+
let r = panic::catch_unwind(AssertUnwindSafe(func));
33+
*data.result.lock().unwrap() = Some(r);
34+
data.waiter.notify_all();
35+
}
36+
}
37+
38+
impl<R: sync::Send + 'static> Future<'static, R> {
39+
pub fn spawn(f: impl FnOnce() -> R + sync::Send + 'static) -> Self {
40+
let result = create(f);
41+
let data = result.data.clone();
42+
sync::spawn(move || run(&data));
43+
result
44+
}
45+
}
46+
47+
impl<'a, R: sync::Send + 'a> Future<'a, R> {
48+
pub fn join(self) -> R {
49+
if let Some(func) = self.data.func.lock().unwrap().take() {
50+
// The function was not executed yet by Rayon, just run it
51+
func()
52+
} else {
53+
// The function has started executing, wait for it to complete
54+
jobserver::release_thread();
55+
let mut result = self
56+
.data
57+
.waiter
58+
.wait_while(self.data.result.lock().unwrap(), |result| result.is_none())
59+
.unwrap();
60+
jobserver::acquire_thread();
61+
match result.take().unwrap() {
62+
Ok(r) => {
63+
return r;
64+
}
65+
Err(err) => panic::resume_unwind(err),
66+
}
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)