Skip to content

Support compiling codegen units in parallel #1271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ use rustc_index::vec::IndexVec;
use rustc_middle::ty::adjustment::PointerCast;
use rustc_middle::ty::layout::FnAbiOf;
use rustc_middle::ty::print::with_no_trimmed_paths;
use rustc_middle::ty::SymbolName;

use crate::constant::ConstantCx;
use crate::debuginfo::FunctionDebugContext;
use crate::prelude::*;
use crate::pretty_clif::CommentWriter;

struct CodegenedFunction<'tcx> {
symbol_name: SymbolName<'tcx>,
pub(crate) struct CodegenedFunction {
symbol_name: String,
func_id: FuncId,
func: Function,
clif_comments: CommentWriter,
func_debug_cx: Option<FunctionDebugContext>,
}

#[cfg_attr(not(feature = "jit"), allow(dead_code))]
pub(crate) fn codegen_and_compile_fn<'tcx>(
tcx: TyCtxt<'tcx>,
cx: &mut crate::CodegenCx,
Expand All @@ -36,13 +36,13 @@ pub(crate) fn codegen_and_compile_fn<'tcx>(
compile_fn(cx, cached_context, module, codegened_func);
}

fn codegen_fn<'tcx>(
pub(crate) fn codegen_fn<'tcx>(
tcx: TyCtxt<'tcx>,
cx: &mut crate::CodegenCx,
cached_func: Function,
module: &mut dyn Module,
instance: Instance<'tcx>,
) -> CodegenedFunction<'tcx> {
) -> CodegenedFunction {
debug_assert!(!instance.substs.needs_infer());

let mir = tcx.instance_mir(instance.def);
Expand All @@ -56,9 +56,9 @@ fn codegen_fn<'tcx>(
});

// Declare function
let symbol_name = tcx.symbol_name(instance);
let symbol_name = tcx.symbol_name(instance).name.to_string();
let sig = get_function_sig(tcx, module.isa().triple(), instance);
let func_id = module.declare_function(symbol_name.name, Linkage::Local, &sig).unwrap();
let func_id = module.declare_function(&symbol_name, Linkage::Local, &sig).unwrap();

// Make the FunctionBuilder
let mut func_ctx = FunctionBuilderContext::new();
Expand All @@ -81,7 +81,7 @@ fn codegen_fn<'tcx>(
let clif_comments = crate::pretty_clif::CommentWriter::new(tcx, instance);

let func_debug_cx = if let Some(debug_context) = &mut cx.debug_context {
Some(debug_context.define_function(tcx, symbol_name.name, mir.span))
Some(debug_context.define_function(tcx, &symbol_name, mir.span))
} else {
None
};
Expand Down Expand Up @@ -113,6 +113,7 @@ fn codegen_fn<'tcx>(
tcx.sess.time("codegen clif ir", || codegen_fn_body(&mut fx, start_block));

// Recover all necessary data from fx, before accessing func will prevent future access to it.
let symbol_name = fx.symbol_name;
let clif_comments = fx.clif_comments;
let func_debug_cx = fx.func_debug_cx;

Expand All @@ -121,7 +122,7 @@ fn codegen_fn<'tcx>(
if cx.should_write_ir {
crate::pretty_clif::write_clif_file(
tcx.output_filenames(()),
symbol_name.name,
&symbol_name,
"unopt",
module.isa(),
&func,
Expand All @@ -135,11 +136,11 @@ fn codegen_fn<'tcx>(
CodegenedFunction { symbol_name, func_id, func, clif_comments, func_debug_cx }
}

fn compile_fn<'tcx>(
pub(crate) fn compile_fn(
cx: &mut crate::CodegenCx,
cached_context: &mut Context,
module: &mut dyn Module,
codegened_func: CodegenedFunction<'tcx>,
codegened_func: CodegenedFunction,
) {
let clif_comments = codegened_func.clif_comments;

Expand Down Expand Up @@ -195,7 +196,7 @@ fn compile_fn<'tcx>(
// Write optimized function to file for debugging
crate::pretty_clif::write_clif_file(
&cx.output_filenames,
codegened_func.symbol_name.name,
&codegened_func.symbol_name,
"opt",
module.isa(),
&context.func,
Expand All @@ -205,7 +206,7 @@ fn compile_fn<'tcx>(
if let Some(disasm) = &context.compiled_code().unwrap().disasm {
crate::pretty_clif::write_ir_file(
&cx.output_filenames,
&format!("{}.vcode", codegened_func.symbol_name.name),
&format!("{}.vcode", codegened_func.symbol_name),
|file| file.write_all(disasm.as_bytes()),
)
}
Expand Down
3 changes: 1 addition & 2 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use rustc_index::vec::IndexVec;
use rustc_middle::ty::layout::{
FnAbiError, FnAbiOfHelpers, FnAbiRequest, LayoutError, LayoutOfHelpers,
};
use rustc_middle::ty::SymbolName;
use rustc_span::SourceFile;
use rustc_target::abi::call::FnAbi;
use rustc_target::abi::{Integer, Primitive};
Expand Down Expand Up @@ -246,7 +245,7 @@ pub(crate) struct FunctionCx<'m, 'clif, 'tcx: 'm> {
pub(crate) func_debug_cx: Option<FunctionDebugContext>,

pub(crate) instance: Instance<'tcx>,
pub(crate) symbol_name: SymbolName<'tcx>,
pub(crate) symbol_name: String,
pub(crate) mir: &'tcx Body<'tcx>,
pub(crate) fn_abi: Option<&'tcx FnAbi<'tcx, Ty<'tcx>>>,

Expand Down
168 changes: 168 additions & 0 deletions src/concurrency_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::sync::{Arc, Condvar, Mutex};

use rustc_session::Session;

use jobserver::HelperThread;

// FIXME don't panic when a worker thread panics

pub(super) struct ConcurrencyLimiter {
helper_thread: Option<HelperThread>,
state: Arc<Mutex<state::ConcurrencyLimiterState>>,
available_token_condvar: Arc<Condvar>,
}

impl ConcurrencyLimiter {
pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
let available_token_condvar = Arc::new(Condvar::new());

let state_helper = state.clone();
let available_token_condvar_helper = available_token_condvar.clone();
let helper_thread = sess
.jobserver
.clone()
.into_helper_thread(move |token| {
let mut state = state_helper.lock().unwrap();
state.add_new_token(token.unwrap());
available_token_condvar_helper.notify_one();
})
.unwrap();
ConcurrencyLimiter {
helper_thread: Some(helper_thread),
state,
available_token_condvar: Arc::new(Condvar::new()),
}
}

pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
let mut state = self.state.lock().unwrap();
loop {
state.assert_invariants();

if state.try_start_job() {
return ConcurrencyLimiterToken {
state: self.state.clone(),
available_token_condvar: self.available_token_condvar.clone(),
};
}

self.helper_thread.as_mut().unwrap().request_token();
state = self.available_token_condvar.wait(state).unwrap();
}
}

pub(super) fn job_already_done(&mut self) {
let mut state = self.state.lock().unwrap();
state.job_already_done();
}
}

impl Drop for ConcurrencyLimiter {
fn drop(&mut self) {
//
self.helper_thread.take();

// Assert that all jobs have finished
let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
state.assert_done();
}
}

#[derive(Debug)]
pub(super) struct ConcurrencyLimiterToken {
state: Arc<Mutex<state::ConcurrencyLimiterState>>,
available_token_condvar: Arc<Condvar>,
}

impl Drop for ConcurrencyLimiterToken {
fn drop(&mut self) {
let mut state = self.state.lock().unwrap();
state.job_finished();
self.available_token_condvar.notify_one();
}
}

mod state {
use jobserver::Acquired;

#[derive(Debug)]
pub(super) struct ConcurrencyLimiterState {
pending_jobs: usize,
active_jobs: usize,

// None is used to represent the implicit token, Some to represent explicit tokens
tokens: Vec<Option<Acquired>>,
}

impl ConcurrencyLimiterState {
pub(super) fn new(pending_jobs: usize) -> Self {
ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
}

pub(super) fn assert_invariants(&self) {
// There must be no excess active jobs
assert!(self.active_jobs <= self.pending_jobs);

// There may not be more active jobs than there are tokens
assert!(self.active_jobs <= self.tokens.len());
}

pub(super) fn assert_done(&self) {
assert_eq!(self.pending_jobs, 0);
assert_eq!(self.active_jobs, 0);
}

pub(super) fn add_new_token(&mut self, token: Acquired) {
self.tokens.push(Some(token));
self.drop_excess_capacity();
}

pub(super) fn try_start_job(&mut self) -> bool {
if self.active_jobs < self.tokens.len() {
// Using existing token
self.job_started();
return true;
}

false
}

pub(super) fn job_started(&mut self) {
self.assert_invariants();
self.active_jobs += 1;
self.drop_excess_capacity();
self.assert_invariants();
}

pub(super) fn job_finished(&mut self) {
self.assert_invariants();
self.pending_jobs -= 1;
self.active_jobs -= 1;
self.assert_invariants();
self.drop_excess_capacity();
self.assert_invariants();
}

pub(super) fn job_already_done(&mut self) {
self.assert_invariants();
self.pending_jobs -= 1;
self.assert_invariants();
self.drop_excess_capacity();
self.assert_invariants();
}

fn drop_excess_capacity(&mut self) {
self.assert_invariants();

// Drop all tokens that can never be used anymore
self.tokens.truncate(std::cmp::max(self.pending_jobs, 1));

// Keep some excess tokens to satisfy requests faster
const MAX_EXTRA_CAPACITY: usize = 2;
self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));

self.assert_invariants();
}
}
}
Loading