Skip to content

Share the existing background job connection with (only) some jobs #5918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions src/background_jobs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use reqwest::blocking::Client;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};

use crate::db::DieselPool;
use crate::db::ConnectionPool;
use crate::swirl::errors::EnqueueError;
use crate::swirl::PerformError;
use crate::uploaders::Uploader;
Expand All @@ -23,6 +24,20 @@ pub enum Job {
UpdateDownloads,
}

/// Database state that is passed to `Job::perform()`.
pub(crate) struct PerformState<'a> {
/// The existing connection used to lock the background job.
///
/// Most jobs can reuse the existing connection, however it will already be within a
/// transaction and is thus not appropriate in all cases.
pub(crate) conn: &'a PgConnection,
/// A connection pool for obtaining a unique connection.
///
/// This will be `None` within our standard test framework, as there everything is expected to
/// run within a single transaction.
pub(crate) pool: Option<ConnectionPool>,
}

impl Job {
const DAILY_DB_MAINTENANCE: &str = "daily_db_maintenance";
const DUMP_DB: &str = "dump_db";
Expand Down Expand Up @@ -94,38 +109,53 @@ impl Job {
pub(super) fn perform(
self,
env: &Option<Environment>,
conn: &DieselPool,
state: PerformState<'_>,
) -> Result<(), PerformError> {
let PerformState { conn, pool } = state;
let env = env
.as_ref()
.expect("Application should configure a background runner environment");
match self {
Job::DailyDbMaintenance => conn.with_connection(&worker::perform_daily_db_maintenance),
Job::DailyDbMaintenance => {
worker::perform_daily_db_maintenance(&*fresh_connection(pool)?)
}
Job::DumpDb(args) => worker::perform_dump_db(env, args.database_url, args.target_name),
Job::IndexAddCrate(args) => conn
.with_connection(&|conn| worker::perform_index_add_crate(env, conn, &args.krate)),
Job::IndexAddCrate(args) => worker::perform_index_add_crate(env, conn, &args.krate),
Job::IndexSquash => worker::perform_index_squash(env),
Job::IndexSyncToHttp(args) => worker::perform_index_sync_to_http(env, args.crate_name),
Job::IndexUpdateYanked(args) => conn.with_connection(&|conn| {
Job::IndexUpdateYanked(args) => {
worker::perform_index_update_yanked(env, conn, &args.krate, &args.version_num)
}),
}
Job::NormalizeIndex(args) => worker::perform_normalize_index(env, args),
Job::RenderAndUploadReadme(args) => conn.with_connection(&|conn| {
worker::perform_render_and_upload_readme(
conn,
env,
args.version_id,
&args.text,
&args.readme_path,
args.base_url.as_deref(),
args.pkg_path_in_vcs.as_deref(),
)
}),
Job::UpdateDownloads => conn.with_connection(&worker::perform_update_downloads),
Job::RenderAndUploadReadme(args) => worker::perform_render_and_upload_readme(
conn,
env,
args.version_id,
&args.text,
&args.readme_path,
args.base_url.as_deref(),
args.pkg_path_in_vcs.as_deref(),
),
Job::UpdateDownloads => worker::perform_update_downloads(&*fresh_connection(pool)?),
}
}
}

/// A helper function for jobs needing a fresh connection (i.e. not already within a transaction).
///
/// This will error when run from our main test framework, as there most work is expected to be
/// done within an existing transaction.
fn fresh_connection(
pool: Option<ConnectionPool>,
) -> Result<PooledConnection<ConnectionManager<PgConnection>>, PerformError> {
let Some(pool) = pool else {
// In production a pool should be available. This can only be hit in tests, which don't
// provide the pool.
return Err(String::from("Database pool was unavailable").into());
};
Ok(pool.get()?)
}

#[derive(Serialize, Deserialize)]
pub struct DumpDbJob {
pub(super) database_url: String,
Expand Down
38 changes: 18 additions & 20 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,28 @@
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
use prometheus::Histogram;
use std::sync::Arc;
use std::{error::Error, ops::Deref, time::Duration};
use std::sync::{Arc, Mutex, MutexGuard};
use std::{ops::Deref, time::Duration};
use thiserror::Error;
use url::Url;

use crate::config;

pub type ConnectionPool = r2d2::Pool<ConnectionManager<PgConnection>>;

#[derive(Clone)]
pub enum DieselPool {
Pool {
pool: r2d2::Pool<ConnectionManager<PgConnection>>,
pool: ConnectionPool,
time_to_obtain_connection_metric: Histogram,
},
BackgroundJobPool {
pool: r2d2::Pool<ConnectionManager<PgConnection>>,
pool: ConnectionPool,
},
Test(Arc<ReentrantMutex<PgConnection>>),
Test(Arc<Mutex<PgConnection>>),
}

type Callback<'a> = &'a dyn Fn(&PgConnection) -> Result<(), Box<dyn Error>>;

impl DieselPool {
pub(crate) fn with_connection(&self, f: Callback<'_>) -> Result<(), Box<dyn Error>> {
match self {
DieselPool::Pool { pool, .. } | DieselPool::BackgroundJobPool { pool } => {
f(&*pool.get()?)
}
DieselPool::Test(connection) => f(&connection.lock()),
}
}

pub(crate) fn new(
url: &str,
config: &config::DatabasePools,
Expand Down Expand Up @@ -69,12 +59,19 @@ impl DieselPool {
Self::BackgroundJobPool { pool }
}

pub(crate) fn to_real_pool(&self) -> Option<ConnectionPool> {
match self {
Self::Pool { pool, .. } | Self::BackgroundJobPool { pool } => Some(pool.clone()),
_ => None,
}
}

pub(crate) fn new_test(config: &config::DatabasePools, url: &str) -> DieselPool {
let conn = PgConnection::establish(&connection_url(config, url))
.expect("failed to establish connection");
conn.begin_test_transaction()
.expect("failed to begin test transaction");
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
DieselPool::Test(Arc::new(Mutex::new(conn)))
}

pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
Expand All @@ -92,7 +89,7 @@ impl DieselPool {
}
}),
DieselPool::BackgroundJobPool { pool } => Ok(DieselPooledConn::Pool(pool.get()?)),
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.try_lock().unwrap())),
}
}

Expand Down Expand Up @@ -136,9 +133,10 @@ pub struct PoolState {
pub idle_connections: u32,
}

#[allow(clippy::large_enum_variant)]
pub enum DieselPooledConn<'a> {
Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
Test(ReentrantMutexGuard<'a, PgConnection>),
Test(MutexGuard<'a, PgConnection>),
}

impl Deref for DieselPooledConn<'_> {
Expand Down
97 changes: 65 additions & 32 deletions src/swirl/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use diesel::connection::{AnsiTransactionManager, TransactionManager};
use diesel::prelude::*;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
Expand All @@ -11,7 +12,7 @@ use threadpool::ThreadPool;

use super::errors::*;
use super::storage;
use crate::background_jobs::{Environment, Job};
use crate::background_jobs::{Environment, Job, PerformState};
use crate::db::{DieselPool, DieselPooledConn};
use event::Event;

Expand Down Expand Up @@ -55,9 +56,7 @@ impl Runner {
job_start_timeout: Duration::from_secs(10),
}
}
}

impl Runner {
pub fn test_runner(environment: Environment, connection_pool: DieselPool) -> Self {
Self {
connection_pool,
Expand All @@ -66,17 +65,7 @@ impl Runner {
job_start_timeout: Duration::from_secs(5),
}
}
}

impl Runner {
#[doc(hidden)]
/// For use in integration tests
pub(super) fn connection_pool(&self) -> &DieselPool {
&self.connection_pool
}
}

impl Runner {
/// Runs all pending jobs in the queue
///
/// This function will return once all jobs in the queue have begun running,
Expand Down Expand Up @@ -120,20 +109,18 @@ impl Runner {

fn run_single_job(&self, sender: SyncSender<Event>) {
let environment = self.environment.clone();
// FIXME: https://github.com/sfackler/r2d2/pull/70
let connection_pool = AssertUnwindSafe(self.connection_pool().clone());
self.get_single_job(sender, move |job| {
self.get_single_job(sender, move |job, state| {
let job = Job::from_value(&job.job_type, job.data)?;

// Make sure to move the whole `AssertUnwindSafe`
let connection_pool = connection_pool;
job.perform(&environment, &connection_pool.0)
job.perform(&environment, state)
})
}

fn get_single_job<F>(&self, sender: SyncSender<Event>, f: F)
where
F: FnOnce(storage::BackgroundJob) -> Result<(), PerformError> + Send + UnwindSafe + 'static,
F: FnOnce(storage::BackgroundJob, PerformState<'_>) -> Result<(), PerformError>
+ Send
+ UnwindSafe
+ 'static,
{
use diesel::result::Error::RollbackTransaction;

Expand Down Expand Up @@ -166,9 +153,53 @@ impl Runner {
};
let job_id = job.id;

let result = catch_unwind(|| f(job))
.map_err(|e| try_to_extract_panic_info(&e))
.and_then(|r| r);
let transaction_manager = conn.transaction_manager();
let initial_depth = <AnsiTransactionManager as TransactionManager<
PgConnection,
>>::get_transaction_depth(
transaction_manager
);
if initial_depth != 1 {
warn!("Initial transaction depth is not 1. This is very unexpected");
}

let result = conn
.transaction(|| {
let pool = pool.to_real_pool();
let state = AssertUnwindSafe(PerformState {conn, pool});
catch_unwind(|| {
// Ensure the whole `AssertUnwindSafe(_)` is moved
let state = state;
f(job, state.0)
})
.map_err(|e| try_to_extract_panic_info(&e))
})
// TODO: Replace with flatten() once that stabilizes
.and_then(std::convert::identity);

loop {
let depth = <AnsiTransactionManager as TransactionManager<
PgConnection,
>>::get_transaction_depth(
transaction_manager
);
if depth == initial_depth {
break;
}
warn!("Rolling back a transaction due to a panic in a background task");
match transaction_manager
.rollback_transaction(conn)
{
Ok(_) => (),
Err(e) => {
error!("Leaking a thread and database connection because of an error while rolling back transaction: {e}");
loop {
std::thread::sleep(Duration::from_secs(24 * 60 * 60));
error!("How am I still alive?");
}
}
}
}

match result {
Ok(_) => storage::delete_successful_job(conn, job_id)?,
Expand Down Expand Up @@ -269,15 +300,15 @@ mod tests {
let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
let return_barrier2 = return_barrier.clone();

runner.get_single_job(dummy_sender(), move |job| {
runner.get_single_job(dummy_sender(), move |job, _| {
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
assert_eq!(first_job_id, job.id);
return_barrier.0.wait(); // Wait for thread 2 to lock its job
Ok(())
});

fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
runner.get_single_job(dummy_sender(), move |job| {
runner.get_single_job(dummy_sender(), move |job, _| {
assert_eq!(second_job_id, job.id);
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
Ok(())
Expand All @@ -293,7 +324,7 @@ mod tests {
let runner = runner();
create_dummy_job(&runner);

runner.get_single_job(dummy_sender(), |_| Ok(()));
runner.get_single_job(dummy_sender(), |_, _| Ok(()));
runner.wait_for_jobs().unwrap();

let remaining_jobs = background_jobs
Expand All @@ -311,10 +342,12 @@ mod tests {
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
let barrier2 = barrier.clone();

runner.get_single_job(dummy_sender(), move |_| {
barrier.0.wait();
// error so the job goes back into the queue
Err("nope".into())
runner.get_single_job(dummy_sender(), move |_, state| {
state.conn.transaction(|| {
barrier.0.wait();
// The job should go back into the queue after a panic
panic!();
})
});

let conn = &*runner.connection().unwrap();
Expand Down Expand Up @@ -350,7 +383,7 @@ mod tests {
let runner = runner();
let job_id = create_dummy_job(&runner).id;

runner.get_single_job(dummy_sender(), |_| panic!());
runner.get_single_job(dummy_sender(), |_, _| panic!());
runner.wait_for_jobs().unwrap();

let tries = background_jobs
Expand Down
13 changes: 4 additions & 9 deletions src/tests/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,11 @@ fn new_category<'a>(category: &'a str, slug: &'a str, description: &'a str) -> N
// This reflects the configuration of our test environment. In the production application, this
// does not hold true.
#[test]
fn multiple_live_references_to_the_same_connection_can_be_checked_out() {
use std::ptr;

#[should_panic]
fn recursive_get_of_db_conn_in_tests_will_panic() {
let (app, _) = TestApp::init().empty();
let app = app.as_inner();

let conn1 = app.primary_database.get().unwrap();
let conn2 = app.primary_database.get().unwrap();
let conn1_ref: &PgConnection = &conn1;
let conn2_ref: &PgConnection = &conn2;

assert!(ptr::eq(conn1_ref, conn2_ref));
let _conn1 = app.primary_database.get().unwrap();
let _conn2 = app.primary_database.get().unwrap();
}