Skip to content

basic checkout parallelism #346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bacc654
switch worktree to thiserror (#301)
Byron Mar 6, 2022
f9beac0
return proper errors during checkout object lookup (#301)
Byron Mar 6, 2022
99de1ef
mior refactor and notes towards parallelization (#301)
Byron Mar 7, 2022
805c0da
a stab at making file writes safer… (#301)
Byron Mar 7, 2022
9f9d36d
try to fix tests on linux (#301)
Byron Mar 7, 2022
d3d7a7c
Allow symlinks to dirs to be returned, too (#301)
Byron Mar 7, 2022
ccd25cb
fix case-insensitive tests (#301)
Byron Mar 7, 2022
cd6e086
prepare for first overwrite test… (#301)
Byron Mar 7, 2022
77b053d
delayed symlink creation for windows, but… (#301)
Byron Mar 7, 2022
ab5cd3d
delayed symlink creation for everyone, but…(#301)
Byron Mar 7, 2022
52c0058
Fix dir-cache to properly handle its valiity which fixes test (#301)
Byron Mar 7, 2022
49d1d34
overwrite-existing support with tests (#301)
Byron Mar 7, 2022
facad25
better symlink checking on ubuntu (#301)
Byron Mar 7, 2022
ea561e6
delete directories recursively on overwrite-existing (#301)
Byron Mar 7, 2022
0bc9489
See if we can remove symlinks this way on windows (#301)
Byron Mar 7, 2022
8f3bc5a
debug mode for windows (#301)
Byron Mar 7, 2022
0c18443
some more debugging on windows (#301)
Byron Mar 7, 2022
9ea1e44
refactor (#301)
Byron Mar 7, 2022
ff95265
try to fix windows once again (#301)
Byron Mar 7, 2022
81bcb8d
fix windows test expecations for good (#301)
Byron Mar 7, 2022
c3c31af
refactor (#301)
Byron Mar 8, 2022
542f49b
refactor (#301)
Byron Mar 8, 2022
0f0d390
fix `interrupt::Iter` (#301)
Byron Mar 8, 2022
8945d95
feat!: `interrupt::Iter`, rename `interrupt::Iter` -> `interrupt::Ite…
Byron Mar 8, 2022
8cbe85d
add thread-count and chunk-size computation; interrupt capability (#301)
Byron Mar 8, 2022
7575a58
proper handling of interruptions during checkout (#301)
Byron Mar 8, 2022
e5f6943
switch index checkout to chunk-based operation (#301)
Byron Mar 8, 2022
1cd7eb3
parallel and non-parallel tests (#301)
Byron Mar 8, 2022
9ecdade
decouple amount of bytes written from progress (#301)
Byron Mar 8, 2022
5f29c0f
basic parallelization, without proper reducer, just so it compiles (#…
Byron Mar 8, 2022
c19331e
conversions from Rc to arc for Handle (#301)
Byron Mar 8, 2022
6bfd865
call chunk processing in threaded processor (#301)
Byron Mar 8, 2022
e83079d
a reducer which produces progress reporting each time it feeds (#301)
Byron Mar 9, 2022
232832f
run multi-threaded tests as well for worktree (#301)
Byron Mar 9, 2022
07a4094
thanks clippy
Byron Mar 9, 2022
07e9081
pass thread-limit along to checkout (#301)
Byron Mar 9, 2022
21d6f88
stabilize assertions in parallel mode (#301)
Byron Mar 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ unit-tests: ## run all unit tests
&& cargo test --features "internal-testing-git-features-parallel"
cd git-index && cargo test --features internal-testing-to-avoid-being-run-by-cargo-test-all \
&& cargo test --features "internal-testing-git-features-parallel"
cd git-worktree && cargo test --features internal-testing-to-avoid-being-run-by-cargo-test-all \
&& cargo test --features "internal-testing-git-features-parallel"
cd git-packetline && cargo test \
&& cargo test --features blocking-io,maybe-async/is_sync --test blocking-packetline \
&& cargo test --features "async-io" --test async-packetline
Expand Down
4 changes: 2 additions & 2 deletions etc/check-package-size.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ echo "in root: gitoxide CLI"
(enter cargo-smart-release && indent cargo diet -n --package-size-limit 85KB)
(enter git-actor && indent cargo diet -n --package-size-limit 5KB)
(enter git-index && indent cargo diet -n --package-size-limit 30KB)
(enter git-worktree && indent cargo diet -n --package-size-limit 15KB)
(enter git-worktree && indent cargo diet -n --package-size-limit 20KB)
(enter git-revision && indent cargo diet -n --package-size-limit 10KB)
(enter git-bitmap && indent cargo diet -n --package-size-limit 5KB)
(enter git-tempfile && indent cargo diet -n --package-size-limit 25KB)
Expand All @@ -36,7 +36,7 @@ echo "in root: gitoxide CLI"
(enter git-object && indent cargo diet -n --package-size-limit 25KB)
(enter git-commitgraph && indent cargo diet -n --package-size-limit 25KB)
(enter git-pack && indent cargo diet -n --package-size-limit 115KB)
(enter git-odb && indent cargo diet -n --package-size-limit 115KB)
(enter git-odb && indent cargo diet -n --package-size-limit 120KB)
(enter git-protocol && indent cargo diet -n --package-size-limit 50KB)
(enter git-packetline && indent cargo diet -n --package-size-limit 35KB)
(enter git-repository && indent cargo diet -n --package-size-limit 80KB)
Expand Down
49 changes: 44 additions & 5 deletions git-features/src/interrupt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,70 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
};

/// A wrapper for an inner iterator which will check for interruptions on each iteration, stopping the iteration when
/// that is requested.
pub struct Iter<'a, I> {
/// The actual iterator to yield elements from.
pub inner: I,
should_interrupt: &'a AtomicBool,
}

impl<'a, I> Iter<'a, I>
where
I: Iterator,
{
/// Create a new iterator over `inner` which checks for interruptions on each iteration on `should_interrupt`.
///
/// Note that this means the consumer of the iterator data should also be able to access `should_interrupt` and
/// consider it when producing the final result to avoid claiming success even though the operation is only partially
/// complete.
pub fn new(inner: I, should_interrupt: &'a AtomicBool) -> Self {
Iter {
inner,
should_interrupt,
}
}
}

impl<'a, I> Iterator for Iter<'a, I>
where
I: Iterator,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
if self.should_interrupt.load(Ordering::Relaxed) {
return None;
}
self.inner.next()
}
}

/// A wrapper for an inner iterator which will check for interruptions on each iteration.
pub struct Iter<'a, I, EFN> {
pub struct IterWithErr<'a, I, EFN> {
/// The actual iterator to yield elements from.
pub inner: I,
make_err: Option<EFN>,
should_interrupt: &'a AtomicBool,
}

impl<'a, I, EFN, E> Iter<'a, I, EFN>
impl<'a, I, EFN, E> IterWithErr<'a, I, EFN>
where
I: Iterator,
EFN: FnOnce() -> E,
{
/// Create a new iterator over `inner` which checks for interruptions on each iteration and cals `make_err()` to
/// signal an interruption happened, causing no further items to be iterated from that point on.
pub fn new(inner: I, make_err: EFN, should_interrupt: &'a AtomicBool) -> Self {
Iter {
IterWithErr {
inner,
make_err: Some(make_err),
should_interrupt,
}
}
}

impl<'a, I, EFN, E> Iterator for Iter<'a, I, EFN>
impl<'a, I, EFN, E> Iterator for IterWithErr<'a, I, EFN>
where
I: Iterator,
EFN: FnOnce() -> E,
Expand All @@ -38,7 +77,7 @@ where
fn next(&mut self) -> Option<Self::Item> {
self.make_err.as_ref()?;
if self.should_interrupt.load(Ordering::Relaxed) {
return Some(Err(self.make_err.take().expect("no bug")()));
return self.make_err.take().map(|f| Err(f()));
}
match self.inner.next() {
Some(next) => Some(Ok(next)),
Expand Down
32 changes: 32 additions & 0 deletions git-features/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,35 @@ pub mod zlib;
///
#[cfg(feature = "time")]
pub mod time;

///
pub mod util {
/// An iterator over chunks of input, producing `Vec<Item>` with a size of `size`, with the last chunk being the remainder and thus
/// potentially smaller than `size`.
pub struct Chunks<I> {
/// The inner iterator to ask for items.
pub inner: I,
/// The size of chunks to produce
pub size: usize,
}

impl<I, Item> Iterator for Chunks<I>
where
I: Iterator<Item = Item>,
{
type Item = Vec<Item>;

fn next(&mut self) -> Option<Self::Item> {
let mut res = Vec::with_capacity(self.size);
let mut items_left = self.size;
for item in &mut self.inner {
res.push(item);
items_left -= 1;
if items_left == 0 {
break;
}
}
(!res.is_empty()).then(|| res)
}
}
}
3 changes: 2 additions & 1 deletion git-features/src/parallel/in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub fn join<O1: Send, O2: Send>(left: impl FnOnce() -> O1 + Send, right: impl Fn
/// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
/// That way it's possible to handle threads without needing the 'static lifetime for data they interact with.
///
/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely.
/// Note that the threads should not rely on actual parallelism as threading might be turned off entirely, hence should not
/// connect each other with channels as deadlock would occour in single-threaded mode.
pub fn threads<'env, F, R>(f: F) -> std::thread::Result<R>
where
F: FnOnce(&crossbeam_utils::thread::Scope<'env>) -> R,
Expand Down
6 changes: 3 additions & 3 deletions git-features/src/parallel/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod stepped {
receive_result: std::sync::mpsc::Receiver<Reduce::Input>,
/// `join()` will be called on these guards to assure every thread tries to send through a closed channel. When
/// that happens, they break out of their loops.
_threads: Vec<std::thread::JoinHandle<()>>,
threads: Vec<std::thread::JoinHandle<()>>,
/// The reducer is called only in the thread using the iterator, dropping it has no side effects.
reducer: Option<Reduce>,
}
Expand All @@ -21,7 +21,7 @@ mod stepped {
drop(std::mem::replace(&mut self.receive_result, sink));

let mut last_err = None;
for handle in std::mem::take(&mut self._threads) {
for handle in std::mem::take(&mut self.threads) {
if let Err(err) = handle.join() {
last_err = Some(err);
};
Expand Down Expand Up @@ -82,7 +82,7 @@ mod stepped {
receive_result
};
Stepwise {
_threads: threads,
threads,
receive_result,
reducer: Some(reducer),
}
Expand Down
22 changes: 22 additions & 0 deletions git-odb/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::rc::Rc;
use std::{
cell::RefCell,
ops::{Deref, DerefMut},
Expand All @@ -16,6 +17,27 @@ pub type ObjectCache = dyn git_pack::cache::Object + Send + 'static;
/// A constructor for boxed object caches.
pub type NewObjectCacheFn = dyn Fn() -> Box<ObjectCache> + Send + Sync + 'static;

impl Cache<crate::store::Handle<Rc<crate::Store>>> {
/// Convert this cache's handle into one that keeps its store in an arc. This creates an entirely new store,
/// so should be done early to avoid unnecessary work (and mappings).
pub fn into_arc(self) -> std::io::Result<Cache<crate::store::Handle<Arc<crate::Store>>>> {
let inner = self.inner.into_arc()?;
Ok(Cache {
inner,
new_pack_cache: self.new_pack_cache,
new_object_cache: self.new_object_cache,
pack_cache: self.pack_cache,
object_cache: self.object_cache,
})
}
}
impl Cache<crate::store::Handle<Arc<crate::Store>>> {
/// No op, as we are containing an arc handle already.
pub fn into_arc(self) -> std::io::Result<Cache<crate::store::Handle<Arc<crate::Store>>>> {
Ok(self)
}
}

impl<S> Cache<S> {
/// Dissolve this instance, discard all caches, and return the inner implementation.
pub fn into_inner(self) -> S {
Expand Down
35 changes: 35 additions & 0 deletions git-odb/src/store_impls/dynamic/handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::convert::{TryFrom, TryInto};
use std::rc::Rc;
use std::{
cell::RefCell,
ops::Deref,
Expand Down Expand Up @@ -324,6 +326,39 @@ where
}
}

impl TryFrom<&super::Store> for super::Store {
type Error = std::io::Error;

fn try_from(s: &super::Store) -> Result<Self, Self::Error> {
super::Store::at_opts(
s.path(),
crate::store::init::Options {
slots: crate::store::init::Slots::Given(s.files.len().try_into().expect("BUG: too many slots")),
object_hash: Default::default(),
use_multi_pack_index: false,
},
)
}
}

impl super::Handle<Rc<super::Store>> {
/// Convert a ref counted store into one that is ref-counted and thread-safe, by creating a new Store.
pub fn into_arc(self) -> std::io::Result<super::Handle<Arc<super::Store>>> {
let store = Arc::new(super::Store::try_from(self.store_ref())?);
let mut cache = store.to_handle_arc();
cache.refresh = self.refresh;
cache.max_recursion_depth = self.max_recursion_depth;
Ok(cache)
}
}

impl super::Handle<Arc<super::Store>> {
/// Convert a ref counted store into one that is ref-counted and thread-safe, by creating a new Store
pub fn into_arc(self) -> std::io::Result<super::Handle<Arc<super::Store>>> {
Ok(self)
}
}

impl<S> Clone for super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
Expand Down
4 changes: 2 additions & 2 deletions git-pack/src/data/output/count/objects/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ where
thread_limit,
None,
);
let chunks = util::Chunks {
iter: objects_ids,
let chunks = git_features::util::Chunks {
inner: objects_ids,
size: chunk_size,
};
let seen_objs = dashmap::DashSet::<ObjectId, HashBuildHasher>::default();
Expand Down
25 changes: 0 additions & 25 deletions git-pack/src/data/output/count/objects/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,3 @@ mod trait_impls {
}
}
}

pub struct Chunks<I> {
pub size: usize,
pub iter: I,
}

impl<I, Item> Iterator for Chunks<I>
where
I: Iterator<Item = Item>,
{
type Item = Vec<Item>;

fn next(&mut self) -> Option<Self::Item> {
let mut res = Vec::with_capacity(self.size);
let mut items_left = self.size;
for item in &mut self.iter {
res.push(item);
items_left -= 1;
if items_left == 0 {
break;
}
}
(!res.is_empty()).then(|| res)
}
}
4 changes: 2 additions & 2 deletions git-repository/src/interrupt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub use init::init_handler;
/// A wrapper for an inner iterator which will check for interruptions on each iteration.
pub struct Iter<I, EFN> {
/// The actual iterator to yield elements from.
inner: git_features::interrupt::Iter<'static, I, EFN>,
inner: git_features::interrupt::IterWithErr<'static, I, EFN>,
}

impl<I, EFN, E> Iter<I, EFN>
Expand All @@ -74,7 +74,7 @@ where
/// signal an interruption happened, causing no further items to be iterated from that point on.
pub fn new(inner: I, make_err: EFN) -> Self {
Iter {
inner: git_features::interrupt::Iter::new(inner, make_err, &IS_INTERRUPTED),
inner: git_features::interrupt::IterWithErr::new(inner, make_err, &IS_INTERRUPTED),
}
}

Expand Down
6 changes: 1 addition & 5 deletions git-repository/tests/easy/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ fn prefix() -> crate::Result {

// TODO: do this in-memory (with or without writing to disk)
assert!(
std::process::Command::new("git")
.current_dir(worktree_dir.path())
.args(["config", "--int", "core.abbrev", "5"])
.status()?
.success(),
git_testtools::run_git(worktree_dir.path(), &["config", "--int", "core.abbrev", "5"])?.success(),
"set core abbrev value successfully"
);

Expand Down
21 changes: 18 additions & 3 deletions git-worktree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,24 @@ edition = "2018"
[lib]
doctest = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[test]]
name = "multi-threaded"
path = "tests/worktree-multi-threaded.rs"
required-features = ["internal-testing-git-features-parallel"]

[[test]]
name = "single-threaded"
path = "tests/worktree-single-threaded.rs"
required-features = ["internal-testing-to-avoid-being-run-by-cargo-test-all"]


[features]
## Data structures implement `serde::Serialize` and `serde::Deserialize`.
serde1 = [ "serde", "bstr/serde1", "git-index/serde1", "git-hash/serde1", "git-object/serde1" ]

internal-testing-git-features-parallel = ["git-features/parallel"]
internal-testing-to-avoid-being-run-by-cargo-test-all = []

[dependencies]
git-index = { version = "^0.1.0", path = "../git-index" }
git-hash = { version = "^0.9.0", path = "../git-hash" }
Expand All @@ -24,15 +36,18 @@ git-features = { version = "^0.19.1", path = "../git-features" }

serde = { version = "1.0.114", optional = true, default-features = false, features = ["derive"]}

quick-error = "2.0.1"
thiserror = "1.0.26"
bstr = { version = "0.2.13", default-features = false }

document-features = { version = "0.2.0", optional = true }
symlink = "0.1.0"

[target.'cfg(unix)'.dependencies]
libc = "0.2.119"

[dev-dependencies]
git-testtools = { path = "../tests/tools" }
git-odb = { path = "../git-odb" }
symlink = "0.1.0"

walkdir = "2.3.2"
tempfile = "3.2.0"
Expand Down
Loading