diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 9a5fc44a30c..033b74fc448 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -3,7 +3,6 @@ use reqwest::blocking::Client; use std::panic::AssertUnwindSafe; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; -use crate::db::DieselPool; use crate::swirl::errors::EnqueueError; use crate::swirl::PerformError; use crate::uploaders::Uploader; @@ -94,34 +93,31 @@ impl Job { pub(super) fn perform( self, env: &Option, - conn: &DieselPool, + conn: &PgConnection, ) -> Result<(), PerformError> { let env = env .as_ref() .expect("Application should configure a background runner environment"); match self { - Job::DailyDbMaintenance => conn.with_connection(&worker::perform_daily_db_maintenance), + Job::DailyDbMaintenance => worker::perform_daily_db_maintenance(conn), Job::DumpDb(args) => worker::perform_dump_db(env, args.database_url, args.target_name), - Job::IndexAddCrate(args) => conn - .with_connection(&|conn| worker::perform_index_add_crate(env, conn, &args.krate)), + Job::IndexAddCrate(args) => worker::perform_index_add_crate(env, conn, &args.krate), Job::IndexSquash => worker::perform_index_squash(env), Job::IndexSyncToHttp(args) => worker::perform_index_sync_to_http(env, args.crate_name), - Job::IndexUpdateYanked(args) => conn.with_connection(&|conn| { + Job::IndexUpdateYanked(args) => { worker::perform_index_update_yanked(env, conn, &args.krate, &args.version_num) - }), + } Job::NormalizeIndex(args) => worker::perform_normalize_index(env, args), - Job::RenderAndUploadReadme(args) => conn.with_connection(&|conn| { - worker::perform_render_and_upload_readme( - conn, - env, - args.version_id, - &args.text, - &args.readme_path, - args.base_url.as_deref(), - args.pkg_path_in_vcs.as_deref(), - ) - }), - Job::UpdateDownloads => conn.with_connection(&worker::perform_update_downloads), + Job::RenderAndUploadReadme(args) => worker::perform_render_and_upload_readme( + conn, + env, + args.version_id, + &args.text, + &args.readme_path, + args.base_url.as_deref(), + args.pkg_path_in_vcs.as_deref(), + ), + Job::UpdateDownloads => worker::perform_update_downloads(conn), } } } diff --git a/src/db.rs b/src/db.rs index 9bd8760dce2..69be5d0b400 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,9 +1,8 @@ use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, CustomizeConnection}; -use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; use prometheus::Histogram; -use std::sync::Arc; -use std::{error::Error, ops::Deref, time::Duration}; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::{ops::Deref, time::Duration}; use thiserror::Error; use url::Url; @@ -18,21 +17,10 @@ pub enum DieselPool { BackgroundJobPool { pool: r2d2::Pool>, }, - Test(Arc>), + Test(Arc>), } -type Callback<'a> = &'a dyn Fn(&PgConnection) -> Result<(), Box>; - impl DieselPool { - pub(crate) fn with_connection(&self, f: Callback<'_>) -> Result<(), Box> { - match self { - DieselPool::Pool { pool, .. } | DieselPool::BackgroundJobPool { pool } => { - f(&*pool.get()?) - } - DieselPool::Test(connection) => f(&connection.lock()), - } - } - pub(crate) fn new( url: &str, config: &config::DatabasePools, @@ -74,7 +62,7 @@ impl DieselPool { .expect("failed to establish connection"); conn.begin_test_transaction() .expect("failed to begin test transaction"); - DieselPool::Test(Arc::new(ReentrantMutex::new(conn))) + DieselPool::Test(Arc::new(Mutex::new(conn))) } pub fn get(&self) -> Result, PoolError> { @@ -92,7 +80,7 @@ impl DieselPool { } }), DieselPool::BackgroundJobPool { pool } => Ok(DieselPooledConn::Pool(pool.get()?)), - DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())), + DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.try_lock().unwrap())), } } @@ -136,9 +124,10 @@ pub struct PoolState { pub idle_connections: u32, } +#[allow(clippy::large_enum_variant)] pub enum DieselPooledConn<'a> { Pool(r2d2::PooledConnection>), - Test(ReentrantMutexGuard<'a, PgConnection>), + Test(MutexGuard<'a, PgConnection>), } impl Deref for DieselPooledConn<'_> { diff --git a/src/swirl/runner.rs b/src/swirl/runner.rs index d23c60314f7..040b5e81488 100644 --- a/src/swirl/runner.rs +++ b/src/swirl/runner.rs @@ -1,3 +1,4 @@ +use diesel::connection::{AnsiTransactionManager, TransactionManager}; use diesel::prelude::*; use diesel::r2d2; use diesel::r2d2::ConnectionManager; @@ -68,14 +69,6 @@ impl Runner { } } -impl Runner { - #[doc(hidden)] - /// For use in integration tests - pub(super) fn connection_pool(&self) -> &DieselPool { - &self.connection_pool - } -} - impl Runner { /// Runs all pending jobs in the queue /// @@ -120,20 +113,18 @@ impl Runner { fn run_single_job(&self, sender: SyncSender) { let environment = self.environment.clone(); - // FIXME: https://github.com/sfackler/r2d2/pull/70 - let connection_pool = AssertUnwindSafe(self.connection_pool().clone()); - self.get_single_job(sender, move |job| { + self.get_single_job(sender, move |job, conn| { let job = Job::from_value(&job.job_type, job.data)?; - - // Make sure to move the whole `AssertUnwindSafe` - let connection_pool = connection_pool; - job.perform(&environment, &connection_pool.0) + job.perform(&environment, conn) }) } fn get_single_job(&self, sender: SyncSender, f: F) where - F: FnOnce(storage::BackgroundJob) -> Result<(), PerformError> + Send + UnwindSafe + 'static, + F: FnOnce(storage::BackgroundJob, &PgConnection) -> Result<(), PerformError> + + Send + + UnwindSafe + + 'static, { use diesel::result::Error::RollbackTransaction; @@ -166,9 +157,52 @@ impl Runner { }; let job_id = job.id; - let result = catch_unwind(|| f(job)) - .map_err(|e| try_to_extract_panic_info(&e)) - .and_then(|r| r); + let transaction_manager = conn.transaction_manager(); + let initial_depth = >::get_transaction_depth( + transaction_manager + ); + if initial_depth != 1 { + warn!("Initial transaction depth is not 1. This is very unexpected"); + } + + let result = conn + .transaction(|| { + let conn = AssertUnwindSafe(conn); + catch_unwind(|| { + // Ensure the whole `AssertUnwindSafe(_)` is moved + let conn = conn; + f(job, conn.0) + }) + .map_err(|e| try_to_extract_panic_info(&e)) + }) + // TODO: Replace with flatten() once that stabilizes + .and_then(std::convert::identity); + + loop { + let depth = >::get_transaction_depth( + transaction_manager + ); + if depth == initial_depth { + break; + } + warn!("Rolling back a transaction due to a panic in a background task"); + match transaction_manager + .rollback_transaction(conn) + { + Ok(_) => (), + Err(e) => { + error!("Leaking a thread and database connection because of an error while rolling back transaction: {e}"); + loop { + std::thread::sleep(Duration::from_secs(24 * 60 * 60)); + error!("How am I still alive?"); + } + } + } + } match result { Ok(_) => storage::delete_successful_job(conn, job_id)?, @@ -269,7 +303,7 @@ mod tests { let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let return_barrier2 = return_barrier.clone(); - runner.get_single_job(dummy_sender(), move |job| { + runner.get_single_job(dummy_sender(), move |job, _| { fetch_barrier.0.wait(); // Tell thread 2 it can lock its job assert_eq!(first_job_id, job.id); return_barrier.0.wait(); // Wait for thread 2 to lock its job @@ -277,7 +311,7 @@ mod tests { }); fetch_barrier2.0.wait(); // Wait until thread 1 locks its job - runner.get_single_job(dummy_sender(), move |job| { + runner.get_single_job(dummy_sender(), move |job, _| { assert_eq!(second_job_id, job.id); return_barrier2.0.wait(); // Tell thread 1 it can unlock its job Ok(()) @@ -293,7 +327,7 @@ mod tests { let runner = runner(); create_dummy_job(&runner); - runner.get_single_job(dummy_sender(), |_| Ok(())); + runner.get_single_job(dummy_sender(), |_, _| Ok(())); runner.wait_for_jobs().unwrap(); let remaining_jobs = background_jobs @@ -311,10 +345,12 @@ mod tests { let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let barrier2 = barrier.clone(); - runner.get_single_job(dummy_sender(), move |_| { - barrier.0.wait(); - // error so the job goes back into the queue - Err("nope".into()) + runner.get_single_job(dummy_sender(), move |_, conn| { + conn.transaction(|| { + barrier.0.wait(); + // The job should go back into the queue after a panic + panic!(); + }) }); let conn = &*runner.connection().unwrap(); @@ -350,7 +386,7 @@ mod tests { let runner = runner(); let job_id = create_dummy_job(&runner).id; - runner.get_single_job(dummy_sender(), |_| panic!()); + runner.get_single_job(dummy_sender(), |_, _| panic!()); runner.wait_for_jobs().unwrap(); let tries = background_jobs diff --git a/src/tests/all.rs b/src/tests/all.rs index 489e0168740..d1f42d5d88e 100644 --- a/src/tests/all.rs +++ b/src/tests/all.rs @@ -162,16 +162,11 @@ fn new_category<'a>(category: &'a str, slug: &'a str, description: &'a str) -> N // This reflects the configuration of our test environment. In the production application, this // does not hold true. #[test] -fn multiple_live_references_to_the_same_connection_can_be_checked_out() { - use std::ptr; - +#[should_panic] +fn recursive_get_of_db_conn_in_tests_will_panic() { let (app, _) = TestApp::init().empty(); let app = app.as_inner(); - let conn1 = app.primary_database.get().unwrap(); - let conn2 = app.primary_database.get().unwrap(); - let conn1_ref: &PgConnection = &conn1; - let conn2_ref: &PgConnection = &conn2; - - assert!(ptr::eq(conn1_ref, conn2_ref)); + let _conn1 = app.primary_database.get().unwrap(); + let _conn2 = app.primary_database.get().unwrap(); }