diff --git a/src/background_jobs.rs b/src/background_jobs.rs index 033b74fc448..9a5fc44a30c 100644 --- a/src/background_jobs.rs +++ b/src/background_jobs.rs @@ -3,6 +3,7 @@ use reqwest::blocking::Client; use std::panic::AssertUnwindSafe; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; +use crate::db::DieselPool; use crate::swirl::errors::EnqueueError; use crate::swirl::PerformError; use crate::uploaders::Uploader; @@ -93,31 +94,34 @@ impl Job { pub(super) fn perform( self, env: &Option, - conn: &PgConnection, + conn: &DieselPool, ) -> Result<(), PerformError> { let env = env .as_ref() .expect("Application should configure a background runner environment"); match self { - Job::DailyDbMaintenance => worker::perform_daily_db_maintenance(conn), + Job::DailyDbMaintenance => conn.with_connection(&worker::perform_daily_db_maintenance), Job::DumpDb(args) => worker::perform_dump_db(env, args.database_url, args.target_name), - Job::IndexAddCrate(args) => worker::perform_index_add_crate(env, conn, &args.krate), + Job::IndexAddCrate(args) => conn + .with_connection(&|conn| worker::perform_index_add_crate(env, conn, &args.krate)), Job::IndexSquash => worker::perform_index_squash(env), Job::IndexSyncToHttp(args) => worker::perform_index_sync_to_http(env, args.crate_name), - Job::IndexUpdateYanked(args) => { + Job::IndexUpdateYanked(args) => conn.with_connection(&|conn| { worker::perform_index_update_yanked(env, conn, &args.krate, &args.version_num) - } + }), Job::NormalizeIndex(args) => worker::perform_normalize_index(env, args), - Job::RenderAndUploadReadme(args) => worker::perform_render_and_upload_readme( - conn, - env, - args.version_id, - &args.text, - &args.readme_path, - args.base_url.as_deref(), - args.pkg_path_in_vcs.as_deref(), - ), - Job::UpdateDownloads => worker::perform_update_downloads(conn), + Job::RenderAndUploadReadme(args) => conn.with_connection(&|conn| { + worker::perform_render_and_upload_readme( + conn, + env, + args.version_id, + &args.text, + &args.readme_path, + args.base_url.as_deref(), + args.pkg_path_in_vcs.as_deref(), + ) + }), + Job::UpdateDownloads => conn.with_connection(&worker::perform_update_downloads), } } } diff --git a/src/db.rs b/src/db.rs index 69be5d0b400..9bd8760dce2 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,8 +1,9 @@ use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, CustomizeConnection}; +use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; use prometheus::Histogram; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::{ops::Deref, time::Duration}; +use std::sync::Arc; +use std::{error::Error, ops::Deref, time::Duration}; use thiserror::Error; use url::Url; @@ -17,10 +18,21 @@ pub enum DieselPool { BackgroundJobPool { pool: r2d2::Pool>, }, - Test(Arc>), + Test(Arc>), } +type Callback<'a> = &'a dyn Fn(&PgConnection) -> Result<(), Box>; + impl DieselPool { + pub(crate) fn with_connection(&self, f: Callback<'_>) -> Result<(), Box> { + match self { + DieselPool::Pool { pool, .. } | DieselPool::BackgroundJobPool { pool } => { + f(&*pool.get()?) + } + DieselPool::Test(connection) => f(&connection.lock()), + } + } + pub(crate) fn new( url: &str, config: &config::DatabasePools, @@ -62,7 +74,7 @@ impl DieselPool { .expect("failed to establish connection"); conn.begin_test_transaction() .expect("failed to begin test transaction"); - DieselPool::Test(Arc::new(Mutex::new(conn))) + DieselPool::Test(Arc::new(ReentrantMutex::new(conn))) } pub fn get(&self) -> Result, PoolError> { @@ -80,7 +92,7 @@ impl DieselPool { } }), DieselPool::BackgroundJobPool { pool } => Ok(DieselPooledConn::Pool(pool.get()?)), - DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.try_lock().unwrap())), + DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())), } } @@ -124,10 +136,9 @@ pub struct PoolState { pub idle_connections: u32, } -#[allow(clippy::large_enum_variant)] pub enum DieselPooledConn<'a> { Pool(r2d2::PooledConnection>), - Test(MutexGuard<'a, PgConnection>), + Test(ReentrantMutexGuard<'a, PgConnection>), } impl Deref for DieselPooledConn<'_> { diff --git a/src/swirl/runner.rs b/src/swirl/runner.rs index 040b5e81488..d23c60314f7 100644 --- a/src/swirl/runner.rs +++ b/src/swirl/runner.rs @@ -1,4 +1,3 @@ -use diesel::connection::{AnsiTransactionManager, TransactionManager}; use diesel::prelude::*; use diesel::r2d2; use diesel::r2d2::ConnectionManager; @@ -69,6 +68,14 @@ impl Runner { } } +impl Runner { + #[doc(hidden)] + /// For use in integration tests + pub(super) fn connection_pool(&self) -> &DieselPool { + &self.connection_pool + } +} + impl Runner { /// Runs all pending jobs in the queue /// @@ -113,18 +120,20 @@ impl Runner { fn run_single_job(&self, sender: SyncSender) { let environment = self.environment.clone(); - self.get_single_job(sender, move |job, conn| { + // FIXME: https://github.com/sfackler/r2d2/pull/70 + let connection_pool = AssertUnwindSafe(self.connection_pool().clone()); + self.get_single_job(sender, move |job| { let job = Job::from_value(&job.job_type, job.data)?; - job.perform(&environment, conn) + + // Make sure to move the whole `AssertUnwindSafe` + let connection_pool = connection_pool; + job.perform(&environment, &connection_pool.0) }) } fn get_single_job(&self, sender: SyncSender, f: F) where - F: FnOnce(storage::BackgroundJob, &PgConnection) -> Result<(), PerformError> - + Send - + UnwindSafe - + 'static, + F: FnOnce(storage::BackgroundJob) -> Result<(), PerformError> + Send + UnwindSafe + 'static, { use diesel::result::Error::RollbackTransaction; @@ -157,52 +166,9 @@ impl Runner { }; let job_id = job.id; - let transaction_manager = conn.transaction_manager(); - let initial_depth = >::get_transaction_depth( - transaction_manager - ); - if initial_depth != 1 { - warn!("Initial transaction depth is not 1. This is very unexpected"); - } - - let result = conn - .transaction(|| { - let conn = AssertUnwindSafe(conn); - catch_unwind(|| { - // Ensure the whole `AssertUnwindSafe(_)` is moved - let conn = conn; - f(job, conn.0) - }) - .map_err(|e| try_to_extract_panic_info(&e)) - }) - // TODO: Replace with flatten() once that stabilizes - .and_then(std::convert::identity); - - loop { - let depth = >::get_transaction_depth( - transaction_manager - ); - if depth == initial_depth { - break; - } - warn!("Rolling back a transaction due to a panic in a background task"); - match transaction_manager - .rollback_transaction(conn) - { - Ok(_) => (), - Err(e) => { - error!("Leaking a thread and database connection because of an error while rolling back transaction: {e}"); - loop { - std::thread::sleep(Duration::from_secs(24 * 60 * 60)); - error!("How am I still alive?"); - } - } - } - } + let result = catch_unwind(|| f(job)) + .map_err(|e| try_to_extract_panic_info(&e)) + .and_then(|r| r); match result { Ok(_) => storage::delete_successful_job(conn, job_id)?, @@ -303,7 +269,7 @@ mod tests { let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let return_barrier2 = return_barrier.clone(); - runner.get_single_job(dummy_sender(), move |job, _| { + runner.get_single_job(dummy_sender(), move |job| { fetch_barrier.0.wait(); // Tell thread 2 it can lock its job assert_eq!(first_job_id, job.id); return_barrier.0.wait(); // Wait for thread 2 to lock its job @@ -311,7 +277,7 @@ mod tests { }); fetch_barrier2.0.wait(); // Wait until thread 1 locks its job - runner.get_single_job(dummy_sender(), move |job, _| { + runner.get_single_job(dummy_sender(), move |job| { assert_eq!(second_job_id, job.id); return_barrier2.0.wait(); // Tell thread 1 it can unlock its job Ok(()) @@ -327,7 +293,7 @@ mod tests { let runner = runner(); create_dummy_job(&runner); - runner.get_single_job(dummy_sender(), |_, _| Ok(())); + runner.get_single_job(dummy_sender(), |_| Ok(())); runner.wait_for_jobs().unwrap(); let remaining_jobs = background_jobs @@ -345,12 +311,10 @@ mod tests { let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2))); let barrier2 = barrier.clone(); - runner.get_single_job(dummy_sender(), move |_, conn| { - conn.transaction(|| { - barrier.0.wait(); - // The job should go back into the queue after a panic - panic!(); - }) + runner.get_single_job(dummy_sender(), move |_| { + barrier.0.wait(); + // error so the job goes back into the queue + Err("nope".into()) }); let conn = &*runner.connection().unwrap(); @@ -386,7 +350,7 @@ mod tests { let runner = runner(); let job_id = create_dummy_job(&runner).id; - runner.get_single_job(dummy_sender(), |_, _| panic!()); + runner.get_single_job(dummy_sender(), |_| panic!()); runner.wait_for_jobs().unwrap(); let tries = background_jobs diff --git a/src/tests/all.rs b/src/tests/all.rs index d1f42d5d88e..489e0168740 100644 --- a/src/tests/all.rs +++ b/src/tests/all.rs @@ -162,11 +162,16 @@ fn new_category<'a>(category: &'a str, slug: &'a str, description: &'a str) -> N // This reflects the configuration of our test environment. In the production application, this // does not hold true. #[test] -#[should_panic] -fn recursive_get_of_db_conn_in_tests_will_panic() { +fn multiple_live_references_to_the_same_connection_can_be_checked_out() { + use std::ptr; + let (app, _) = TestApp::init().empty(); let app = app.as_inner(); - let _conn1 = app.primary_database.get().unwrap(); - let _conn2 = app.primary_database.get().unwrap(); + let conn1 = app.primary_database.get().unwrap(); + let conn2 = app.primary_database.get().unwrap(); + let conn1_ref: &PgConnection = &conn1; + let conn2_ref: &PgConnection = &conn2; + + assert!(ptr::eq(conn1_ref, conn2_ref)); }