From 18d4fb40b2290a3d8e7fa8dffe6a2afc41c67e3f Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Mar 2022 20:38:09 +0800 Subject: [PATCH 1/2] more flexible in_parallel closures due to FnMut (#301) Previously they were Sync so FnMut wasn't so useful. Now they are thread-local, and thus should be mutable for max flexibility. --- git-features/src/parallel/in_parallel.rs | 26 ++++++++++++++++++++---- git-features/src/parallel/serial.rs | 4 ++-- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/git-features/src/parallel/in_parallel.rs b/git-features/src/parallel/in_parallel.rs index 2d3b69290dc..afd8d10ee5c 100644 --- a/git-features/src/parallel/in_parallel.rs +++ b/git-features/src/parallel/in_parallel.rs @@ -34,8 +34,8 @@ where pub fn in_parallel( input: impl Iterator + Send, thread_limit: Option, - new_thread_state: impl Fn(usize) -> S + Send + Clone, - consume: impl Fn(I, &mut S) -> O + Send + Clone, + new_thread_state: impl FnMut(usize) -> S + Send + Clone, + consume: impl FnMut(I, &mut S) -> O + Send + Clone, mut reducer: R, ) -> Result<::Output, ::Error> where @@ -52,8 +52,8 @@ where s.spawn({ let send_result = send_result.clone(); let receive_input = receive_input.clone(); - let new_thread_state = new_thread_state.clone(); - let consume = consume.clone(); + let mut new_thread_state = new_thread_state.clone(); + let mut consume = consume.clone(); move |_| { let mut state = new_thread_state(thread_id); for item in receive_input { @@ -81,3 +81,21 @@ where }) .unwrap() } + +// #[allow(missing_docs)] // TODO: docs +// pub fn in_parallel_with_mut_slice( +// input: &mut [I], +// thread_limit: Option, +// new_thread_state: impl Fn(usize) -> S + Send + Clone, +// consume: impl Fn(I, &mut S) -> Result + Send + Clone, +// finalize: impl FnMut(&mut [I], Produce) -> FinalResult, +// ) -> Result +// where +// I: Send, +// O: Send, +// E: Send, +// Produce: Iterator, +// { +// let num_threads = num_threads(thread_limit); +// crossbeam_utils::thread::scope(move |s| todo!()).unwrap() +// } diff --git a/git-features/src/parallel/serial.rs b/git-features/src/parallel/serial.rs index eb92aa0a724..b21d56cadb2 100644 --- a/git-features/src/parallel/serial.rs +++ b/git-features/src/parallel/serial.rs @@ -72,8 +72,8 @@ pub use not_parallel::{join, threads, Scope, ScopedJoinHandle}; pub fn in_parallel( input: impl Iterator, _thread_limit: Option, - new_thread_state: impl Fn(usize) -> S, - consume: impl Fn(I, &mut S) -> O, + new_thread_state: impl FnMut(usize) -> S, + consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> where From 79d5f64d93200d8b11e1c3162809f2a03453d9f4 Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Sat, 12 Mar 2022 21:24:06 +0800 Subject: [PATCH 2/2] A sketch for work-stealing parallelization that has mostly uncontended locks (#301) --- git-features/src/parallel/in_parallel.rs | 91 +++++++++++++++++++----- git-features/src/parallel/mod.rs | 2 +- git-features/src/parallel/serial.rs | 4 +- 3 files changed, 77 insertions(+), 20 deletions(-) diff --git a/git-features/src/parallel/in_parallel.rs b/git-features/src/parallel/in_parallel.rs index afd8d10ee5c..d3efa413bee 100644 --- a/git-features/src/parallel/in_parallel.rs +++ b/git-features/src/parallel/in_parallel.rs @@ -1,4 +1,5 @@ use crate::parallel::{num_threads, Reduce}; +use std::sync::atomic::{AtomicBool, Ordering}; /// Runs `left` and `right` in parallel, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1 + Send, right: impl FnOnce() -> O2 + Send) -> (O1, O2) { @@ -82,20 +83,76 @@ where .unwrap() } -// #[allow(missing_docs)] // TODO: docs -// pub fn in_parallel_with_mut_slice( -// input: &mut [I], -// thread_limit: Option, -// new_thread_state: impl Fn(usize) -> S + Send + Clone, -// consume: impl Fn(I, &mut S) -> Result + Send + Clone, -// finalize: impl FnMut(&mut [I], Produce) -> FinalResult, -// ) -> Result -// where -// I: Send, -// O: Send, -// E: Send, -// Produce: Iterator, -// { -// let num_threads = num_threads(thread_limit); -// crossbeam_utils::thread::scope(move |s| todo!()).unwrap() -// } +#[allow(missing_docs)] // TODO: docs +pub fn in_parallel_with_mut_slice( + input: &mut [I], + thread_limit: Option, + new_thread_state: impl FnMut(usize) -> S + Send + Clone, + consume: impl FnMut(&mut I, &mut S) -> Result + Send + Clone, + mut periodic: impl FnMut() -> std::time::Duration + Send, + mut finalize: impl FnMut(&[I], Vec) -> Result, +) -> Result +where + I: Send, + O: Send, + E: Send, +{ + let num_threads = num_threads(thread_limit); + let input = &*input; // downgrade to immutable + let results: Vec>>> = (0..input.len()).map(|_| Default::default()).collect(); + let stop_periodic = &AtomicBool::default(); + + crossbeam_utils::thread::scope({ + let results = &results; + move |s| { + s.spawn({ + move |_| loop { + if stop_periodic.load(Ordering::Relaxed) { + break; + } + + std::thread::sleep(periodic()); + } + }); + + let threads: Vec<_> = (0..num_threads) + .map(|n| { + s.spawn({ + let mut new_thread_state = new_thread_state.clone(); + let mut _consume = consume.clone(); + move |_| { + let _state = new_thread_state(n); + let mut item = 0; + while let Some(res) = &results.get(num_threads * item + n) { + item += 1; + if let Some(mut guard) = res.try_lock() { + match guard.as_mut() { + Some(_) => { + // somebody stole our work, assume all future work is done, too + return; + } + None => { + todo!("make input mutable and consume/process") + } + } + } + } + // TODO: work-stealing logic once we are out of bounds. Pose as prior thread and walk backwards. + } + }) + }) + .collect(); + for thread in threads { + thread.join().expect("must not panic"); + } + + stop_periodic.store(true, Ordering::Relaxed); + } + }) + .unwrap(); + let mut unwrapped_results = Vec::with_capacity(results.len()); + for res in results { + unwrapped_results.push(res.into_inner().expect("result obtained, item processed")?); + } + finalize(input, unwrapped_results) +} diff --git a/git-features/src/parallel/mod.rs b/git-features/src/parallel/mod.rs index ebdedd3f308..cde597b355b 100644 --- a/git-features/src/parallel/mod.rs +++ b/git-features/src/parallel/mod.rs @@ -35,7 +35,7 @@ #[cfg(feature = "parallel")] mod in_parallel; #[cfg(feature = "parallel")] -pub use in_parallel::{in_parallel, join, threads}; +pub use in_parallel::{in_parallel, in_parallel_with_mut_slice, join, threads}; mod serial; #[cfg(not(feature = "parallel"))] diff --git a/git-features/src/parallel/serial.rs b/git-features/src/parallel/serial.rs index b21d56cadb2..b3afc36943e 100644 --- a/git-features/src/parallel/serial.rs +++ b/git-features/src/parallel/serial.rs @@ -72,8 +72,8 @@ pub use not_parallel::{join, threads, Scope, ScopedJoinHandle}; pub fn in_parallel( input: impl Iterator, _thread_limit: Option, - new_thread_state: impl FnMut(usize) -> S, - consume: impl FnMut(I, &mut S) -> O, + mut new_thread_state: impl FnMut(usize) -> S, + mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> where