Skip to content

Reuse existing connection when executing each background job #5837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions src/background_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use reqwest::blocking::Client;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};

use crate::db::DieselPool;
use crate::swirl::errors::EnqueueError;
use crate::swirl::PerformError;
use crate::uploaders::Uploader;
Expand Down Expand Up @@ -94,34 +93,31 @@ impl Job {
pub(super) fn perform(
self,
env: &Option<Environment>,
conn: &DieselPool,
conn: &PgConnection,
) -> Result<(), PerformError> {
let env = env
.as_ref()
.expect("Application should configure a background runner environment");
match self {
Job::DailyDbMaintenance => conn.with_connection(&worker::perform_daily_db_maintenance),
Job::DailyDbMaintenance => worker::perform_daily_db_maintenance(conn),
Job::DumpDb(args) => worker::perform_dump_db(env, args.database_url, args.target_name),
Job::IndexAddCrate(args) => conn
.with_connection(&|conn| worker::perform_index_add_crate(env, conn, &args.krate)),
Job::IndexAddCrate(args) => worker::perform_index_add_crate(env, conn, &args.krate),
Job::IndexSquash => worker::perform_index_squash(env),
Job::IndexSyncToHttp(args) => worker::perform_index_sync_to_http(env, args.crate_name),
Job::IndexUpdateYanked(args) => conn.with_connection(&|conn| {
Job::IndexUpdateYanked(args) => {
worker::perform_index_update_yanked(env, conn, &args.krate, &args.version_num)
}),
}
Job::NormalizeIndex(args) => worker::perform_normalize_index(env, args),
Job::RenderAndUploadReadme(args) => conn.with_connection(&|conn| {
worker::perform_render_and_upload_readme(
conn,
env,
args.version_id,
&args.text,
&args.readme_path,
args.base_url.as_deref(),
args.pkg_path_in_vcs.as_deref(),
)
}),
Job::UpdateDownloads => conn.with_connection(&worker::perform_update_downloads),
Job::RenderAndUploadReadme(args) => worker::perform_render_and_upload_readme(
conn,
env,
args.version_id,
&args.text,
&args.readme_path,
args.base_url.as_deref(),
args.pkg_path_in_vcs.as_deref(),
),
Job::UpdateDownloads => worker::perform_update_downloads(conn),
}
}
}
Expand Down
25 changes: 7 additions & 18 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
use prometheus::Histogram;
use std::sync::Arc;
use std::{error::Error, ops::Deref, time::Duration};
use std::sync::{Arc, Mutex, MutexGuard};
use std::{ops::Deref, time::Duration};
use thiserror::Error;
use url::Url;

Expand All @@ -18,21 +17,10 @@ pub enum DieselPool {
BackgroundJobPool {
pool: r2d2::Pool<ConnectionManager<PgConnection>>,
},
Test(Arc<ReentrantMutex<PgConnection>>),
Test(Arc<Mutex<PgConnection>>),
}

type Callback<'a> = &'a dyn Fn(&PgConnection) -> Result<(), Box<dyn Error>>;

impl DieselPool {
pub(crate) fn with_connection(&self, f: Callback<'_>) -> Result<(), Box<dyn Error>> {
match self {
DieselPool::Pool { pool, .. } | DieselPool::BackgroundJobPool { pool } => {
f(&*pool.get()?)
}
DieselPool::Test(connection) => f(&connection.lock()),
}
}

pub(crate) fn new(
url: &str,
config: &config::DatabasePools,
Expand Down Expand Up @@ -74,7 +62,7 @@ impl DieselPool {
.expect("failed to establish connection");
conn.begin_test_transaction()
.expect("failed to begin test transaction");
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
DieselPool::Test(Arc::new(Mutex::new(conn)))
}

pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
Expand All @@ -92,7 +80,7 @@ impl DieselPool {
}
}),
DieselPool::BackgroundJobPool { pool } => Ok(DieselPooledConn::Pool(pool.get()?)),
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.try_lock().unwrap())),
}
}

Expand Down Expand Up @@ -136,9 +124,10 @@ pub struct PoolState {
pub idle_connections: u32,
}

#[allow(clippy::large_enum_variant)]
pub enum DieselPooledConn<'a> {
Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
Test(ReentrantMutexGuard<'a, PgConnection>),
Test(MutexGuard<'a, PgConnection>),
}

impl Deref for DieselPooledConn<'_> {
Expand Down
90 changes: 63 additions & 27 deletions src/swirl/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use diesel::connection::{AnsiTransactionManager, TransactionManager};
use diesel::prelude::*;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
Expand Down Expand Up @@ -68,14 +69,6 @@ impl Runner {
}
}

impl Runner {
#[doc(hidden)]
/// For use in integration tests
pub(super) fn connection_pool(&self) -> &DieselPool {
&self.connection_pool
}
}

impl Runner {
/// Runs all pending jobs in the queue
///
Expand Down Expand Up @@ -120,20 +113,18 @@ impl Runner {

fn run_single_job(&self, sender: SyncSender<Event>) {
let environment = self.environment.clone();
// FIXME: https://github.com/sfackler/r2d2/pull/70
let connection_pool = AssertUnwindSafe(self.connection_pool().clone());
self.get_single_job(sender, move |job| {
self.get_single_job(sender, move |job, conn| {
let job = Job::from_value(&job.job_type, job.data)?;

// Make sure to move the whole `AssertUnwindSafe`
let connection_pool = connection_pool;
job.perform(&environment, &connection_pool.0)
job.perform(&environment, conn)
})
}

fn get_single_job<F>(&self, sender: SyncSender<Event>, f: F)
where
F: FnOnce(storage::BackgroundJob) -> Result<(), PerformError> + Send + UnwindSafe + 'static,
F: FnOnce(storage::BackgroundJob, &PgConnection) -> Result<(), PerformError>
+ Send
+ UnwindSafe
+ 'static,
{
use diesel::result::Error::RollbackTransaction;

Expand Down Expand Up @@ -166,9 +157,52 @@ impl Runner {
};
let job_id = job.id;

let result = catch_unwind(|| f(job))
.map_err(|e| try_to_extract_panic_info(&e))
.and_then(|r| r);
let transaction_manager = conn.transaction_manager();
let initial_depth = <AnsiTransactionManager as TransactionManager<
PgConnection,
>>::get_transaction_depth(
transaction_manager
);
if initial_depth != 1 {
warn!("Initial transaction depth is not 1. This is very unexpected");
}

let result = conn
.transaction(|| {
let conn = AssertUnwindSafe(conn);
catch_unwind(|| {
// Ensure the whole `AssertUnwindSafe(_)` is moved
let conn = conn;
f(job, conn.0)
})
.map_err(|e| try_to_extract_panic_info(&e))
})
// TODO: Replace with flatten() once that stabilizes
.and_then(std::convert::identity);

loop {
let depth = <AnsiTransactionManager as TransactionManager<
PgConnection,
>>::get_transaction_depth(
transaction_manager
);
if depth == initial_depth {
break;
}
warn!("Rolling back a transaction due to a panic in a background task");
match transaction_manager
.rollback_transaction(conn)
{
Ok(_) => (),
Err(e) => {
error!("Leaking a thread and database connection because of an error while rolling back transaction: {e}");
loop {
std::thread::sleep(Duration::from_secs(24 * 60 * 60));
error!("How am I still alive?");
}
}
}
}

match result {
Ok(_) => storage::delete_successful_job(conn, job_id)?,
Expand Down Expand Up @@ -269,15 +303,15 @@ mod tests {
let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
let return_barrier2 = return_barrier.clone();

runner.get_single_job(dummy_sender(), move |job| {
runner.get_single_job(dummy_sender(), move |job, _| {
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
assert_eq!(first_job_id, job.id);
return_barrier.0.wait(); // Wait for thread 2 to lock its job
Ok(())
});

fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
runner.get_single_job(dummy_sender(), move |job| {
runner.get_single_job(dummy_sender(), move |job, _| {
assert_eq!(second_job_id, job.id);
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
Ok(())
Expand All @@ -293,7 +327,7 @@ mod tests {
let runner = runner();
create_dummy_job(&runner);

runner.get_single_job(dummy_sender(), |_| Ok(()));
runner.get_single_job(dummy_sender(), |_, _| Ok(()));
runner.wait_for_jobs().unwrap();

let remaining_jobs = background_jobs
Expand All @@ -311,10 +345,12 @@ mod tests {
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
let barrier2 = barrier.clone();

runner.get_single_job(dummy_sender(), move |_| {
barrier.0.wait();
// error so the job goes back into the queue
Err("nope".into())
runner.get_single_job(dummy_sender(), move |_, conn| {
conn.transaction(|| {
barrier.0.wait();
// The job should go back into the queue after a panic
panic!();
})
});

let conn = &*runner.connection().unwrap();
Expand Down Expand Up @@ -350,7 +386,7 @@ mod tests {
let runner = runner();
let job_id = create_dummy_job(&runner).id;

runner.get_single_job(dummy_sender(), |_| panic!());
runner.get_single_job(dummy_sender(), |_, _| panic!());
runner.wait_for_jobs().unwrap();

let tries = background_jobs
Expand Down
13 changes: 4 additions & 9 deletions src/tests/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,11 @@ fn new_category<'a>(category: &'a str, slug: &'a str, description: &'a str) -> N
// This reflects the configuration of our test environment. In the production application, this
// does not hold true.
#[test]
fn multiple_live_references_to_the_same_connection_can_be_checked_out() {
use std::ptr;

#[should_panic]
fn recursive_get_of_db_conn_in_tests_will_panic() {
let (app, _) = TestApp::init().empty();
let app = app.as_inner();

let conn1 = app.primary_database.get().unwrap();
let conn2 = app.primary_database.get().unwrap();
let conn1_ref: &PgConnection = &conn1;
let conn2_ref: &PgConnection = &conn2;

assert!(ptr::eq(conn1_ref, conn2_ref));
let _conn1 = app.primary_database.get().unwrap();
let _conn2 = app.primary_database.get().unwrap();
}