Skip to content

Commit c5f3fc8

Browse files
committed
use existing concurrency primitive in_parallel
1 parent bf8a7a4 commit c5f3fc8

File tree

4 files changed

+18
-161
lines changed

4 files changed

+18
-161
lines changed

gix-features/src/parallel/in_parallel.rs

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -34,76 +34,6 @@ pub fn build_thread() -> std::thread::Builder {
3434
std::thread::Builder::new()
3535
}
3636

37-
/// Read items from `input` and `consume` them in multiple threads,
38-
/// whose output output is collected by a `reducer`. Its task is to
39-
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
40-
///
41-
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
42-
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
43-
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
44-
/// created by `new_thread_state(…)`.
45-
/// * For `reducer`, see the [`Reduce`] trait
46-
pub fn in_parallel_chunks<'a, I, S, O, R>(
47-
input: &'a mut [I],
48-
chunk_size: usize,
49-
thread_limit: Option<usize>,
50-
new_thread_state: impl Fn(usize) -> S + Send + Clone,
51-
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
52-
mut reducer: R,
53-
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
54-
where
55-
R: Reduce<Input = O>,
56-
I: Send,
57-
O: Send,
58-
{
59-
let num_threads = num_threads(thread_limit);
60-
std::thread::scope(move |s| {
61-
let receive_result = {
62-
let (send_input, receive_input) = crossbeam_channel::bounded::<&mut [I]>(num_threads);
63-
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
64-
for thread_id in 0..num_threads {
65-
std::thread::Builder::new()
66-
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
67-
.spawn_scoped(s, {
68-
let send_result = send_result.clone();
69-
let receive_input = receive_input.clone();
70-
let new_thread_state = new_thread_state.clone();
71-
let consume = consume.clone();
72-
move || {
73-
let mut state = new_thread_state(thread_id);
74-
for chunk in receive_input {
75-
for item in chunk {
76-
if let Some(output) = consume(item, &mut state) {
77-
if send_result.send(output).is_err() {
78-
break;
79-
}
80-
}
81-
}
82-
}
83-
}
84-
})
85-
.expect("valid name");
86-
}
87-
std::thread::Builder::new()
88-
.name("gitoxide.in_parallel.feed".into())
89-
.spawn_scoped(s, move || {
90-
for chunk in input.chunks_mut(chunk_size) {
91-
if send_input.send(chunk).is_err() {
92-
break;
93-
}
94-
}
95-
})
96-
.expect("valid name");
97-
receive_result
98-
};
99-
100-
for item in receive_result {
101-
drop(reducer.feed(item)?);
102-
}
103-
reducer.finalize()
104-
})
105-
}
106-
10737
/// Read items from `input` and `consume` them in multiple threads,
10838
/// whose output output is collected by a `reducer`. Its task is to
10939
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.

gix-features/src/parallel/mod.rs

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@
3535
#[cfg(feature = "parallel")]
3636
mod in_parallel;
3737
#[cfg(feature = "parallel")]
38-
pub use in_parallel::{build_thread, in_parallel, in_parallel_chunks, in_parallel_with_slice, join, threads};
38+
pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
3939

4040
mod serial;
4141
#[cfg(not(feature = "parallel"))]
42-
pub use serial::{build_thread, in_parallel, in_parallel_chunks, in_parallel_with_slice, join, threads};
42+
pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
4343

4444
mod in_order;
4545
pub use in_order::{InOrderIter, SequenceId};
@@ -128,53 +128,6 @@ pub fn num_threads(thread_limit: Option<usize>) -> usize {
128128
.unwrap_or(logical_cores)
129129
}
130130

131-
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
132-
///
133-
/// For parameters, see the documentation of [`in_parallel()`]
134-
#[cfg(feature = "parallel")]
135-
pub fn in_parallel_chunks_if<'a, I, S, O, R>(
136-
condition: impl FnOnce() -> bool,
137-
input: &'a mut [I],
138-
chunk_size: usize,
139-
thread_limit: Option<usize>,
140-
new_thread_state: impl Fn(usize) -> S + Send + Clone,
141-
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
142-
reducer: R,
143-
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
144-
where
145-
R: Reduce<Input = O>,
146-
I: Send,
147-
O: Send,
148-
{
149-
if num_threads(thread_limit) > 1 && condition() {
150-
in_parallel_chunks(input, chunk_size, thread_limit, new_thread_state, consume, reducer)
151-
} else {
152-
serial::in_parallel_chunks(input, chunk_size, thread_limit, new_thread_state, consume, reducer)
153-
}
154-
}
155-
156-
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
157-
///
158-
/// For parameters, see the documentation of [`in_parallel()`]
159-
///
160-
/// Note that the non-parallel version is equivalent to [`in_parallel()`].
161-
#[cfg(not(feature = "parallel"))]
162-
pub fn in_parallel_chunks_if<'a, I, S, O, R>(
163-
_condition: impl FnOnce() -> bool,
164-
input: &'a mut [I],
165-
chunk_size: usize,
166-
thread_limit: Option<usize>,
167-
new_thread_state: impl Fn(usize) -> S + Send + Clone,
168-
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
169-
reducer: R,
170-
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
171-
where
172-
R: Reduce<Input = O>,
173-
I: Send,
174-
O: Send,
175-
{
176-
serial::in_parallel_chunks(input, chunk_size, thread_limit, new_thread_state, consume, reducer)
177-
}
178131
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
179132
///
180133
/// For parameters, see the documentation of [`in_parallel()`]

gix-features/src/parallel/serial.rs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -133,35 +133,3 @@ where
133133
}
134134
reducer.finalize()
135135
}
136-
137-
/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
138-
/// whose task is to aggregate these outputs into the final result returned by this function.
139-
///
140-
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
141-
/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
142-
/// * For `reducer`, see the [`Reduce`] trait
143-
/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
144-
/// similar to the parallel version.
145-
///
146-
/// **This serial version performing all calculations on the current thread.**
147-
pub fn in_parallel_chunks<'a, I, S, O, R>(
148-
input: &'a mut [I],
149-
_chunk_size: usize,
150-
_thread_limit: Option<usize>,
151-
new_thread_state: impl Fn(usize) -> S + Send + Clone,
152-
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
153-
mut reducer: R,
154-
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
155-
where
156-
R: Reduce<Input = O>,
157-
I: Send,
158-
O: Send,
159-
{
160-
let mut state = new_thread_state(0);
161-
for item in input {
162-
if let Some(res) = consume(item, &mut state) {
163-
drop(reducer.feed(res)?);
164-
}
165-
}
166-
reducer.finalize()
167-
}

gix-worktree/src/index/status/worktree.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::index::status::{Collector, Status};
77
use crate::{fs, read};
88
use bstr::BStr;
99
use filetime::FileTime;
10-
use gix_features::parallel::{in_parallel_chunks_if, Reduce};
10+
use gix_features::parallel::{in_parallel_if, Reduce};
1111
use gix_index as index;
1212
use gix_path as path;
1313

@@ -84,10 +84,9 @@ pub fn status<'index, T: Send>(
8484
None,
8585
);
8686
let path_backing = index.path_backing.as_slice();
87-
in_parallel_chunks_if(
88-
|| true,
89-
&mut index.entries,
90-
chunk_size,
87+
in_parallel_if(
88+
|| true, // TODO: heuristic: when is parallelization not worth it?
89+
index.entries.chunks_mut(chunk_size),
9190
thread_limit,
9291
|_| State {
9392
buf: Vec::new(),
@@ -96,7 +95,12 @@ pub fn status<'index, T: Send>(
9695
worktree,
9796
options: &options,
9897
},
99-
|entry, state| state.process(entry, diff),
98+
|entries, state| {
99+
entries
100+
.iter_mut()
101+
.filter_map(|entry| state.process(entry, diff))
102+
.collect()
103+
},
100104
Reducer {
101105
collector,
102106
phantom: PhantomData,
@@ -234,17 +238,19 @@ struct Reducer<'a, 'index, T: Collector<'index>> {
234238
}
235239

236240
impl<'index, T, C: Collector<'index, Diff = T>> Reduce for Reducer<'_, 'index, C> {
237-
type Input = StatusResult<'index, T>;
241+
type Input = Vec<StatusResult<'index, T>>;
238242

239243
type FeedProduce = ();
240244

241245
type Output = ();
242246

243247
type Error = Error;
244248

245-
fn feed(&mut self, item: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
246-
let (entry, path, status, conflict) = item?;
247-
self.collector.visit_entry(entry, path, status, conflict);
249+
fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
250+
for item in items {
251+
let (entry, path, status, conflict) = item?;
252+
self.collector.visit_entry(entry, path, status, conflict);
253+
}
248254
Ok(())
249255
}
250256

0 commit comments

Comments
 (0)