Skip to content

Commit a85f62c

Browse files
committed
Update background jobs to use a single connection
1 parent 333554d commit a85f62c

File tree

3 files changed

+46
-65
lines changed

3 files changed

+46
-65
lines changed

src/background_jobs.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use reqwest::blocking::Client;
33
use std::panic::AssertUnwindSafe;
44
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
55

6-
use crate::db::DieselPool;
76
use crate::swirl::errors::EnqueueError;
87
use crate::swirl::PerformError;
98
use crate::uploaders::Uploader;
@@ -89,35 +88,30 @@ impl Job {
8988
pub(super) fn perform(
9089
self,
9190
env: &Option<Environment>,
92-
conn: &DieselPool,
91+
conn: &mut PgConnection,
9392
) -> Result<(), PerformError> {
9493
let env = env
9594
.as_ref()
9695
.expect("Application should configure a background runner environment");
9796
match self {
98-
Job::DailyDbMaintenance(_) => {
99-
conn.with_connection(&worker::perform_daily_db_maintenance)
100-
}
97+
Job::DailyDbMaintenance(_) => worker::perform_daily_db_maintenance(conn),
10198
Job::DumpDb(args) => worker::perform_dump_db(env, args.database_url, args.target_name),
102-
Job::IndexAddCrate(args) => conn
103-
.with_connection(&|conn| worker::perform_index_add_crate(env, conn, &args.krate)),
99+
Job::IndexAddCrate(args) => worker::perform_index_add_crate(env, conn, &args.krate),
104100
Job::IndexSquash(_) => worker::perform_index_squash(env),
105101
Job::IndexSyncToHttp(args) => worker::perform_index_sync_to_http(env, args.crate_name),
106-
Job::IndexUpdateYanked(args) => conn.with_connection(&|conn| {
102+
Job::IndexUpdateYanked(args) => {
107103
worker::perform_index_update_yanked(env, conn, &args.krate, &args.version_num)
108-
}),
109-
Job::RenderAndUploadReadme(args) => conn.with_connection(&|conn| {
110-
worker::perform_render_and_upload_readme(
111-
conn,
112-
env,
113-
args.version_id,
114-
&args.text,
115-
&args.readme_path,
116-
args.base_url.as_deref(),
117-
args.pkg_path_in_vcs.as_deref(),
118-
)
119-
}),
120-
Job::UpdateDownloads(_) => conn.with_connection(&worker::perform_update_downloads),
104+
}
105+
Job::RenderAndUploadReadme(args) => worker::perform_render_and_upload_readme(
106+
conn,
107+
env,
108+
args.version_id,
109+
&args.text,
110+
&args.readme_path,
111+
args.base_url.as_deref(),
112+
args.pkg_path_in_vcs.as_deref(),
113+
),
114+
Job::UpdateDownloads(_) => worker::perform_update_downloads(conn),
121115
}
122116
}
123117
}

src/db.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use diesel::prelude::*;
22
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
3-
use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
43
use prometheus::Histogram;
5-
use std::sync::Arc;
4+
use std::sync::{Arc, Mutex, MutexGuard};
65
use std::{
7-
error::Error,
86
ops::{Deref, DerefMut},
97
time::Duration,
108
};
@@ -22,21 +20,10 @@ pub enum DieselPool {
2220
BackgroundJobPool {
2321
pool: r2d2::Pool<ConnectionManager<PgConnection>>,
2422
},
25-
Test(Arc<ReentrantMutex<PgConnection>>),
23+
Test(Arc<Mutex<PgConnection>>),
2624
}
2725

28-
type Callback<'a> = &'a dyn Fn(&mut PgConnection) -> Result<(), Box<dyn Error>>;
29-
3026
impl DieselPool {
31-
pub(crate) fn with_connection(&self, f: Callback<'_>) -> Result<(), Box<dyn Error>> {
32-
match self {
33-
DieselPool::Pool { pool, .. } | DieselPool::BackgroundJobPool { pool } => {
34-
f(&mut *pool.get()?)
35-
}
36-
DieselPool::Test(connection) => f(&mut connection.lock()),
37-
}
38-
}
39-
4027
pub(crate) fn new(
4128
url: &str,
4229
config: &config::DatabasePools,
@@ -78,7 +65,7 @@ impl DieselPool {
7865
.expect("failed to establish connection");
7966
conn.begin_test_transaction()
8067
.expect("failed to begin test transaction");
81-
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
68+
DieselPool::Test(Arc::new(Mutex::new(conn)))
8269
}
8370

8471
pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
@@ -96,7 +83,7 @@ impl DieselPool {
9683
}
9784
}),
9885
DieselPool::BackgroundJobPool { pool } => Ok(DieselPooledConn::Pool(pool.get()?)),
99-
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
86+
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.try_lock().unwrap())),
10087
}
10188
}
10289

@@ -140,9 +127,10 @@ pub struct PoolState {
140127
pub idle_connections: u32,
141128
}
142129

130+
#[allow(clippy::large_enum_variant)]
143131
pub enum DieselPooledConn<'a> {
144132
Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
145-
Test(ReentrantMutexGuard<'a, PgConnection>),
133+
Test(MutexGuard<'a, PgConnection>),
146134
}
147135

148136
impl Deref for DieselPooledConn<'_> {

src/swirl/runner.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,6 @@ impl Runner {
6868
}
6969
}
7070

71-
impl Runner {
72-
#[doc(hidden)]
73-
/// For use in integration tests
74-
pub(super) fn connection_pool(&self) -> &DieselPool {
75-
&self.connection_pool
76-
}
77-
}
78-
7971
impl Runner {
8072
/// Runs all pending jobs in the queue
8173
///
@@ -120,20 +112,18 @@ impl Runner {
120112

121113
fn run_single_job(&self, sender: SyncSender<Event>) {
122114
let environment = self.environment.clone();
123-
// FIXME: https://github.com/sfackler/r2d2/pull/70
124-
let connection_pool = AssertUnwindSafe(self.connection_pool().clone());
125-
self.get_single_job(sender, move |job| {
115+
self.get_single_job(sender, move |job, conn| {
126116
let job = Job::from_value(&job.job_type, job.data)?;
127-
128-
// Make sure to move the whole `AssertUnwindSafe`
129-
let connection_pool = connection_pool;
130-
job.perform(&environment, &connection_pool.0)
117+
job.perform(&environment, conn)
131118
})
132119
}
133120

134121
fn get_single_job<F>(&self, sender: SyncSender<Event>, f: F)
135122
where
136-
F: FnOnce(storage::BackgroundJob) -> Result<(), PerformError> + Send + UnwindSafe + 'static,
123+
F: FnOnce(storage::BackgroundJob, &mut PgConnection) -> Result<(), PerformError>
124+
+ Send
125+
+ UnwindSafe
126+
+ 'static,
137127
{
138128
use diesel::result::Error::RollbackTransaction;
139129

@@ -166,9 +156,18 @@ impl Runner {
166156
};
167157
let job_id = job.id;
168158

169-
let result = catch_unwind(|| f(job))
170-
.map_err(|e| try_to_extract_panic_info(&e))
171-
.and_then(|r| r);
159+
let result = conn
160+
.transaction(|conn| {
161+
let conn = AssertUnwindSafe(conn);
162+
catch_unwind(|| {
163+
// Ensure the whole `AssertUnwindSafe(_)` is moved
164+
let conn = conn;
165+
f(job, conn.0)
166+
})
167+
.map_err(|e| try_to_extract_panic_info(&e))
168+
})
169+
// TODO: Replace with flatten() once that stabilizes
170+
.and_then(std::convert::identity);
172171

173172
match result {
174173
Ok(_) => storage::delete_successful_job(conn, job_id)?,
@@ -269,15 +268,15 @@ mod tests {
269268
let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
270269
let return_barrier2 = return_barrier.clone();
271270

272-
runner.get_single_job(dummy_sender(), move |job| {
271+
runner.get_single_job(dummy_sender(), move |job, _| {
273272
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
274273
assert_eq!(first_job_id, job.id);
275274
return_barrier.0.wait(); // Wait for thread 2 to lock its job
276275
Ok(())
277276
});
278277

279278
fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
280-
runner.get_single_job(dummy_sender(), move |job| {
279+
runner.get_single_job(dummy_sender(), move |job, _| {
281280
assert_eq!(second_job_id, job.id);
282281
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
283282
Ok(())
@@ -293,7 +292,7 @@ mod tests {
293292
let runner = runner();
294293
create_dummy_job(&runner);
295294

296-
runner.get_single_job(dummy_sender(), |_| Ok(()));
295+
runner.get_single_job(dummy_sender(), |_, _| Ok(()));
297296
runner.wait_for_jobs().unwrap();
298297

299298
let remaining_jobs = background_jobs
@@ -311,10 +310,10 @@ mod tests {
311310
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
312311
let barrier2 = barrier.clone();
313312

314-
runner.get_single_job(dummy_sender(), move |_| {
313+
runner.get_single_job(dummy_sender(), move |_, _| {
315314
barrier.0.wait();
316-
// error so the job goes back into the queue
317-
Err("nope".into())
315+
// The job should go back into the queue after a panic
316+
panic!();
318317
});
319318

320319
let conn = &mut *runner.connection().unwrap();
@@ -350,7 +349,7 @@ mod tests {
350349
let runner = runner();
351350
let job_id = create_dummy_job(&runner).id;
352351

353-
runner.get_single_job(dummy_sender(), |_| panic!());
352+
runner.get_single_job(dummy_sender(), |_, _| panic!());
354353
runner.wait_for_jobs().unwrap();
355354

356355
let tries = background_jobs

0 commit comments

Comments
 (0)