From bde2a86862c4887c163519227bac7bb4fdfa899b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?John=20K=C3=A5re=20Alsaker?= Date: Sun, 24 Sep 2023 01:34:45 +0200 Subject: [PATCH] Create the previous dep graph index on a background thread --- compiler/rustc_data_structures/src/marker.rs | 2 + .../rustc_incremental/src/persist/load.rs | 23 +- .../rustc_incremental/src/persist/save.rs | 4 +- compiler/rustc_interface/src/passes.rs | 2 +- .../rustc_query_system/src/dep_graph/graph.rs | 8 +- .../rustc_query_system/src/dep_graph/mod.rs | 4 +- .../src/dep_graph/serialized.rs | 208 ++++++++++++++---- 7 files changed, 194 insertions(+), 57 deletions(-) diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index e0df1b232e134..07bba0e297e88 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -70,6 +70,7 @@ macro_rules! impl_dyn_send { } impl_dyn_send!( + [std::thread::JoinHandle where T] [std::sync::atomic::AtomicPtr where T] [std::sync::Mutex where T: ?Sized+ DynSend] [std::sync::mpsc::Sender where T: DynSend] @@ -152,6 +153,7 @@ macro_rules! impl_dyn_sync { } impl_dyn_sync!( + [std::thread::JoinHandle where T] [std::sync::atomic::AtomicPtr where T] [std::sync::OnceLock where T: DynSend + DynSync] [std::sync::Mutex where T: ?Sized + DynSend] diff --git a/compiler/rustc_incremental/src/persist/load.rs b/compiler/rustc_incremental/src/persist/load.rs index 0e646b136c452..4ad4ff285cb3b 100644 --- a/compiler/rustc_incremental/src/persist/load.rs +++ b/compiler/rustc_incremental/src/persist/load.rs @@ -35,9 +35,9 @@ pub enum LoadResult { LoadDepGraph(PathBuf, std::io::Error), } -impl LoadResult { +impl LoadResult { /// Accesses the data returned in [`LoadResult::Ok`]. - pub fn open(self, sess: &Session) -> T { + pub fn open(self, sess: &Session, fallback: impl FnOnce() -> T) -> T { // Check for errors when using `-Zassert-incremental-state` match (sess.opts.assert_incr_state, &self) { (Some(IncrementalStateAssertion::NotLoaded), LoadResult::Ok { .. }) => { @@ -55,14 +55,14 @@ impl LoadResult { match self { LoadResult::LoadDepGraph(path, err) => { sess.dcx().emit_warn(errors::LoadDepGraph { path, err }); - Default::default() + fallback() } LoadResult::DataOutOfDate => { if let Err(err) = delete_all_session_dir_contents(sess) { sess.dcx() .emit_err(errors::DeleteIncompatible { path: dep_graph_path(sess), err }); } - Default::default() + fallback() } LoadResult::Ok { data } => data, } @@ -93,13 +93,15 @@ fn delete_dirty_work_product(sess: &Session, swp: SerializedWorkProduct) { fn load_dep_graph( sess: &Session, - deps: &DepsType, -) -> LoadResult<(Arc, WorkProductMap)> { + deps: &Arc, +) -> LoadResult<(Arc>, WorkProductMap)> { let prof = sess.prof.clone(); if sess.opts.incremental.is_none() { // No incremental compilation. - return LoadResult::Ok { data: Default::default() }; + return LoadResult::Ok { + data: (Arc::new(SerializedDepGraph::empty(deps, sess)), Default::default()), + }; } let _timer = sess.prof.generic_activity("incr_comp_prepare_load_dep_graph"); @@ -174,7 +176,7 @@ fn load_dep_graph( return LoadResult::DataOutOfDate; } - let dep_graph = SerializedDepGraph::decode::(&mut decoder, deps); + let dep_graph = SerializedDepGraph::decode(&mut decoder, deps, sess); LoadResult::Ok { data: (dep_graph, prev_work_products) } } @@ -208,7 +210,7 @@ pub fn load_query_result_cache(sess: &Session) -> Option { /// Setups the dependency graph by loading an existing graph from disk and set up streaming of a /// new graph to an incremental session directory. -pub fn setup_dep_graph(sess: &Session, crate_name: Symbol, deps: &DepsType) -> DepGraph { +pub fn setup_dep_graph(sess: &Session, crate_name: Symbol, deps: &Arc) -> DepGraph { // `load_dep_graph` can only be called after `prepare_session_directory`. prepare_session_directory(sess, crate_name); @@ -227,7 +229,8 @@ pub fn setup_dep_graph(sess: &Session, crate_name: Symbol, deps: &DepsType) -> D } res.and_then(|result| { - let (prev_graph, prev_work_products) = result.open(sess); + let (prev_graph, prev_work_products) = result + .open(sess, || (Arc::new(SerializedDepGraph::empty(deps, sess)), Default::default())); build_dep_graph(sess, prev_graph, prev_work_products) }) .unwrap_or_else(DepGraph::new_disabled) diff --git a/compiler/rustc_incremental/src/persist/save.rs b/compiler/rustc_incremental/src/persist/save.rs index 58fea3278a839..23e43d19457f1 100644 --- a/compiler/rustc_incremental/src/persist/save.rs +++ b/compiler/rustc_incremental/src/persist/save.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use rustc_data_structures::fx::FxIndexMap; use rustc_data_structures::sync::join; use rustc_middle::dep_graph::{ - DepGraph, SerializedDepGraph, WorkProduct, WorkProductId, WorkProductMap, + DepGraph, DepsType, SerializedDepGraph, WorkProduct, WorkProductId, WorkProductMap, }; use rustc_middle::ty::TyCtxt; use rustc_serialize::Encodable as RustcEncodable; @@ -144,7 +144,7 @@ fn encode_query_cache(tcx: TyCtxt<'_>, encoder: FileEncoder) -> FileEncodeResult /// and moves it to the permanent dep-graph path pub(crate) fn build_dep_graph( sess: &Session, - prev_graph: Arc, + prev_graph: Arc>, prev_work_products: WorkProductMap, ) -> Option { if sess.opts.incremental.is_none() { diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index f4d11a7c0be2d..cfeae2302ee56 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -828,7 +828,7 @@ pub fn create_and_enter_global_ctxt FnOnce(TyCtxt<'tcx>) -> T>( let outputs = util::build_output_filenames(&pre_configured_attrs, sess); - let dep_type = DepsType { dep_names: rustc_query_impl::dep_kind_names() }; + let dep_type = Arc::new(DepsType { dep_names: rustc_query_impl::dep_kind_names() }); let dep_graph = setup_dep_graph(sess, crate_name, &dep_type); let cstore = diff --git a/compiler/rustc_query_system/src/dep_graph/graph.rs b/compiler/rustc_query_system/src/dep_graph/graph.rs index 3ae56cef2c421..b9e494354e374 100644 --- a/compiler/rustc_query_system/src/dep_graph/graph.rs +++ b/compiler/rustc_query_system/src/dep_graph/graph.rs @@ -91,7 +91,7 @@ pub(crate) struct DepGraphData { /// The dep-graph from the previous compilation session. It contains all /// nodes and edges as well as all fingerprints of nodes that have them. - previous: Arc, + previous: Arc>, colors: DepNodeColorMap, @@ -121,7 +121,7 @@ where impl DepGraph { pub fn new( session: &Session, - prev_graph: Arc, + prev_graph: Arc>, prev_work_products: WorkProductMap, encoder: FileEncoder, ) -> DepGraph { @@ -1171,7 +1171,7 @@ impl CurrentDepGraph { session: &Session, prev_graph_node_count: usize, encoder: FileEncoder, - previous: Arc, + previous: Arc>, ) -> Self { let mut stable_hasher = StableHasher::new(); previous.session_count().hash(&mut stable_hasher); @@ -1260,7 +1260,7 @@ impl CurrentDepGraph { #[inline] fn debug_assert_not_in_new_nodes( &self, - prev_graph: &SerializedDepGraph, + prev_graph: &SerializedDepGraph, prev_index: SerializedDepNodeIndex, ) { if let Some(ref nodes_in_current_session) = self.nodes_in_current_session { diff --git a/compiler/rustc_query_system/src/dep_graph/mod.rs b/compiler/rustc_query_system/src/dep_graph/mod.rs index 89d1db878095f..1a1dcd2170cea 100644 --- a/compiler/rustc_query_system/src/dep_graph/mod.rs +++ b/compiler/rustc_query_system/src/dep_graph/mod.rs @@ -12,7 +12,7 @@ pub(crate) use graph::DepGraphData; pub use graph::{DepGraph, DepNodeIndex, TaskDepsRef, WorkProduct, WorkProductMap, hash_result}; pub use query::DepGraphQuery; use rustc_data_structures::profiling::SelfProfilerRef; -use rustc_data_structures::sync::DynSync; +use rustc_data_structures::sync::{DynSend, DynSync}; use rustc_session::Session; pub use serialized::{SerializedDepGraph, SerializedDepNodeIndex}; use tracing::instrument; @@ -90,7 +90,7 @@ pub trait DepContext: Copy { } } -pub trait Deps: DynSync { +pub trait Deps: DynSend + DynSync + Send + Sync + 'static { /// Execute the operation with provided dependencies. fn with_deps(deps: TaskDepsRef<'_>, op: OP) -> R where diff --git a/compiler/rustc_query_system/src/dep_graph/serialized.rs b/compiler/rustc_query_system/src/dep_graph/serialized.rs index f1b609a3ca906..dd1a9eac389a0 100644 --- a/compiler/rustc_query_system/src/dep_graph/serialized.rs +++ b/compiler/rustc_query_system/src/dep_graph/serialized.rs @@ -42,16 +42,18 @@ use std::cell::RefCell; use std::cmp::max; use std::marker::PhantomData; -use std::sync::Arc; use std::sync::atomic::Ordering; -use std::{iter, mem, u64}; +use std::sync::{Arc, OnceLock}; +use std::thread::JoinHandle; +use std::{iter, mem, panic, thread, u64}; +use parking_lot::Mutex; use rustc_data_structures::fingerprint::{Fingerprint, PackedFingerprint}; use rustc_data_structures::fx::FxHashMap; -use rustc_data_structures::outline; use rustc_data_structures::profiling::SelfProfilerRef; use rustc_data_structures::sync::{AtomicU64, Lock, WorkerLocal, broadcast}; use rustc_data_structures::unhash::UnhashMap; +use rustc_data_structures::{jobserver, outline}; use rustc_index::IndexVec; use rustc_serialize::opaque::mem_encoder::MemEncoder; use rustc_serialize::opaque::{FileEncodeResult, FileEncoder, IntEncodedWithFixedSize, MemDecoder}; @@ -87,29 +89,57 @@ const DEP_NODE_WIDTH_BITS: usize = DEP_NODE_SIZE / 2; /// /// There may be unused indices with DEP_KIND_NULL in this graph due to batch allocation of /// indices to threads. -#[derive(Debug, Default)] -pub struct SerializedDepGraph { +pub struct SerializedDepGraph { /// The set of all DepNodes in the graph nodes: IndexVec, + /// The set of all Fingerprints in the graph. Each Fingerprint corresponds to /// the DepNode at the same index in the nodes vector. fingerprints: IndexVec, + /// For each DepNode, stores the list of edges originating from that /// DepNode. Encoded as a [start, end) pair indexing into edge_list_data, /// which holds the actual DepNodeIndices of the target nodes. edge_list_indices: IndexVec, + /// A flattened list of all edge targets in the graph, stored in the same /// varint encoding that we use on disk. Edge sources are implicit in edge_list_indices. edge_list_data: Vec, + /// Stores a map from fingerprints to nodes per dep node kind. - /// This is the reciprocal of `nodes`. - index: Vec>, + /// This is the reciprocal of `nodes`. This is computed on demand for each dep kind. + /// The entire index is also computed in a background thread. + index: Vec>>, + + /// Stores the number of node for each dep node kind. + index_sizes: Vec, + /// The number of previous compilation sessions. This is used to generate /// unique anon dep nodes per session. session_count: u64, + + /// Used to lookup names of dep kinds. + deps: Arc, + + /// A profiler reference for used in the index prefetching thread. + prof: SelfProfilerRef, + + /// A handle to the prefetcher thread. + handle: Mutex>>, +} + +impl Drop for SerializedDepGraph { + fn drop(&mut self) { + // Join the prefetcher thread if present to avoid leaking the jobserver token. + if let Some(handle) = self.handle.get_mut().take() { + if let Err(e) = handle.join() { + panic::resume_unwind(e); + } + } + } } -impl SerializedDepGraph { +impl SerializedDepGraph { #[inline] pub fn edge_targets_from( &self, @@ -139,7 +169,14 @@ impl SerializedDepGraph { #[inline] pub fn node_to_index_opt(&self, dep_node: &DepNode) -> Option { - self.index.get(dep_node.kind.as_usize())?.get(&dep_node.hash).cloned() + let index = self.index.get(dep_node.kind.as_usize())?; + let index = index.get().unwrap_or_else(|| { + outline(|| { + self.setup_index(dep_node.kind); + self.index[dep_node.kind.as_usize()].get().unwrap() + }) + }); + index.get(&dep_node.hash).cloned() } #[inline] @@ -156,6 +193,92 @@ impl SerializedDepGraph { pub fn session_count(&self) -> u64 { self.session_count } + + #[inline] + fn add_to_index( + &self, + index: &mut UnhashMap, + node: &DepNode, + idx: SerializedDepNodeIndex, + ) { + if index.insert(node.hash, idx).is_some() { + // Empty nodes and side effect nodes can have duplicates + if node.kind != D::DEP_KIND_NULL && node.kind != D::DEP_KIND_SIDE_EFFECT { + let name = self.deps.name(node.kind); + panic!( + "Error: A dep graph node ({name}) does not have an unique index. \ + Running a clean build on a nightly compiler with `-Z incremental-verify-ich` \ + can help narrow down the issue for reporting. A clean build may also work around the issue.\n + DepNode: {node:?}" + ) + } + } + } + + /// This computes and sets up the index for just the specified `DepKind`. + fn setup_index(&self, dep_kind: DepKind) { + let _timer = self.prof.generic_activity("incr_comp_dep_graph_setup_index"); + + let mut index = UnhashMap::with_capacity_and_hasher( + self.index_sizes[dep_kind.as_usize()], + Default::default(), + ); + + for (idx, node) in self.nodes.iter_enumerated() { + if node.kind == dep_kind { + self.add_to_index(&mut index, node, idx); + } + } + + // This may race with the prefetching thread, but that will set the same value. + self.index[dep_kind.as_usize()].set(index).ok(); + } + + fn prefetch(&self) { + let _timer = self.prof.generic_activity("incr_comp_prefetch_dep_graph_index"); + + let mut index: Vec<_> = self + .index_sizes + .iter() + .map(|&n| UnhashMap::with_capacity_and_hasher(n, Default::default())) + .collect(); + + // Use a single loop to build indices for all kinds, unlike `setup_index` which builds + // a single index for each loop over the nodes. + for (idx, node) in self.nodes.iter_enumerated() { + self.add_to_index(&mut index[node.kind.as_usize()], node, idx); + } + + for (i, index) in index.into_iter().enumerate() { + // This may race with `setup_index`, but that will set the same value. + self.index[i].set(index).ok(); + } + } + + /// This spawns a thread that prefetches the index. + fn spawn_prefetch_thread(self: &Arc) -> Option> { + if !self.index.is_empty() { + let client = jobserver::client(); + // This should ideally use `try_acquire` to avoid races on the tokens, + // but the jobserver crate doesn't support that operation. + if let Ok(tokens) = client.available() + && tokens > 0 + { + let this = Arc::clone(self); + let handle = thread::spawn(move || { + let _token = client.acquire(); + this.prefetch(); + }); + Some(handle) + } else { + // Prefetch the index on the current thread if we don't have a token available. + self.prefetch(); + None + } + } else { + None + } + } } /// A packed representation of an edge's start index and byte width. @@ -190,9 +313,28 @@ fn mask(bits: usize) -> usize { usize::MAX >> ((size_of::() * 8) - bits) } -impl SerializedDepGraph { - #[instrument(level = "debug", skip(d, deps))] - pub fn decode(d: &mut MemDecoder<'_>, deps: &D) -> Arc { +impl SerializedDepGraph { + pub fn empty(deps: &Arc, sess: &Session) -> Self { + SerializedDepGraph { + nodes: Default::default(), + fingerprints: Default::default(), + edge_list_indices: Default::default(), + edge_list_data: Default::default(), + index: Default::default(), + index_sizes: Default::default(), + session_count: 0, + deps: Arc::clone(deps), + prof: sess.prof.clone(), + handle: Mutex::new(None), + } + } + + #[instrument(level = "debug", skip(d, deps, sess))] + pub fn decode( + d: &mut MemDecoder<'_>, + deps: &Arc, + sess: &Session, + ) -> Arc> { // The last 16 bytes are the node count and edge count. debug!("position: {:?}", d.position()); @@ -269,36 +411,26 @@ impl SerializedDepGraph { // end of the array. This padding ensure it doesn't. edge_list_data.extend(&[0u8; DEP_NODE_PAD]); - // Read the number of each dep kind and use it to create an hash map with a suitable size. - let mut index: Vec<_> = (0..(D::DEP_KIND_MAX + 1)) - .map(|_| UnhashMap::with_capacity_and_hasher(d.read_u32() as usize, Default::default())) - .collect(); + // Read the number of nodes for each dep kind. + let index_sizes: Vec<_> = + (0..(D::DEP_KIND_MAX + 1)).map(|_| d.read_u32() as usize).collect(); let session_count = d.read_u64(); - for (idx, node) in nodes.iter_enumerated() { - if index[node.kind.as_usize()].insert(node.hash, idx).is_some() { - // Empty nodes and side effect nodes can have duplicates - if node.kind != D::DEP_KIND_NULL && node.kind != D::DEP_KIND_SIDE_EFFECT { - let name = deps.name(node.kind); - panic!( - "Error: A dep graph node ({name}) does not have an unique index. \ - Running a clean build on a nightly compiler with `-Z incremental-verify-ich` \ - can help narrow down the issue for reporting. A clean build may also work around the issue.\n - DepNode: {node:?}" - ) - } - } - } - - Arc::new(SerializedDepGraph { + let result = Arc::new(SerializedDepGraph { nodes, fingerprints, edge_list_indices, edge_list_data, - index, + index: (0..index_sizes.len()).map(|_| OnceLock::new()).collect(), + index_sizes, session_count, - }) + deps: Arc::clone(deps), + prof: sess.prof.clone(), + handle: Mutex::new(None), + }); + *result.handle.lock() = result.spawn_prefetch_thread(); + result } } @@ -487,7 +619,7 @@ impl NodeInfo { fingerprint: Fingerprint, prev_index: SerializedDepNodeIndex, colors: &DepNodeColorMap, - previous: &SerializedDepGraph, + previous: &SerializedDepGraph, ) -> usize { let edges = previous.edge_targets_from(prev_index); let edge_count = edges.size_hint().0; @@ -545,7 +677,7 @@ struct LocalEncoderResult { struct EncoderState { next_node_index: AtomicU64, - previous: Arc, + previous: Arc>, file: Lock>, local: WorkerLocal>, stats: Option>>, @@ -553,7 +685,7 @@ struct EncoderState { } impl EncoderState { - fn new(encoder: FileEncoder, record_stats: bool, previous: Arc) -> Self { + fn new(encoder: FileEncoder, record_stats: bool, previous: Arc>) -> Self { Self { previous, next_node_index: AtomicU64::new(0), @@ -844,7 +976,7 @@ impl GraphEncoder { sess: &Session, encoder: FileEncoder, prev_node_count: usize, - previous: Arc, + previous: Arc>, ) -> Self { let record_graph = sess .opts