Skip to content

Commit 1f41f50

Browse files
jtgeibelTurbo87
authored andcommitted
Update background jobs to use a single connection
This commit is the initial backport from my `diesel2` branch.
1 parent 951aa7b commit 1f41f50

File tree

4 files changed

+51
-74
lines changed

4 files changed

+51
-74
lines changed

src/background_jobs.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use reqwest::blocking::Client;
33
use std::panic::AssertUnwindSafe;
44
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
55

6-
use crate::db::DieselPool;
76
use crate::swirl::errors::EnqueueError;
87
use crate::swirl::PerformError;
98
use crate::uploaders::Uploader;
@@ -89,35 +88,30 @@ impl Job {
8988
pub(super) fn perform(
9089
self,
9190
env: &Option<Environment>,
92-
conn: &DieselPool,
91+
conn: &PgConnection,
9392
) -> Result<(), PerformError> {
9493
let env = env
9594
.as_ref()
9695
.expect("Application should configure a background runner environment");
9796
match self {
98-
Job::DailyDbMaintenance(_) => {
99-
conn.with_connection(&worker::perform_daily_db_maintenance)
100-
}
97+
Job::DailyDbMaintenance(_) => worker::perform_daily_db_maintenance(conn),
10198
Job::DumpDb(args) => worker::perform_dump_db(env, args.database_url, args.target_name),
102-
Job::IndexAddCrate(args) => conn
103-
.with_connection(&|conn| worker::perform_index_add_crate(env, conn, &args.krate)),
99+
Job::IndexAddCrate(args) => worker::perform_index_add_crate(env, conn, &args.krate),
104100
Job::IndexSquash(_) => worker::perform_index_squash(env),
105101
Job::IndexSyncToHttp(args) => worker::perform_index_sync_to_http(env, args.crate_name),
106-
Job::IndexUpdateYanked(args) => conn.with_connection(&|conn| {
102+
Job::IndexUpdateYanked(args) => {
107103
worker::perform_index_update_yanked(env, conn, &args.krate, &args.version_num)
108-
}),
109-
Job::RenderAndUploadReadme(args) => conn.with_connection(&|conn| {
110-
worker::perform_render_and_upload_readme(
111-
conn,
112-
env,
113-
args.version_id,
114-
&args.text,
115-
&args.readme_path,
116-
args.base_url.as_deref(),
117-
args.pkg_path_in_vcs.as_deref(),
118-
)
119-
}),
120-
Job::UpdateDownloads(_) => conn.with_connection(&worker::perform_update_downloads),
104+
}
105+
Job::RenderAndUploadReadme(args) => worker::perform_render_and_upload_readme(
106+
conn,
107+
env,
108+
args.version_id,
109+
&args.text,
110+
&args.readme_path,
111+
args.base_url.as_deref(),
112+
args.pkg_path_in_vcs.as_deref(),
113+
),
114+
Job::UpdateDownloads(_) => worker::perform_update_downloads(conn),
121115
}
122116
}
123117
}

src/db.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use diesel::prelude::*;
22
use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
3-
use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
43
use prometheus::Histogram;
5-
use std::sync::Arc;
6-
use std::{error::Error, ops::Deref, time::Duration};
4+
use std::sync::{Arc, Mutex, MutexGuard};
5+
use std::{ops::Deref, time::Duration};
76
use thiserror::Error;
87
use url::Url;
98

@@ -18,21 +17,10 @@ pub enum DieselPool {
1817
BackgroundJobPool {
1918
pool: r2d2::Pool<ConnectionManager<PgConnection>>,
2019
},
21-
Test(Arc<ReentrantMutex<PgConnection>>),
20+
Test(Arc<Mutex<PgConnection>>),
2221
}
2322

24-
type Callback<'a> = &'a dyn Fn(&PgConnection) -> Result<(), Box<dyn Error>>;
25-
2623
impl DieselPool {
27-
pub(crate) fn with_connection(&self, f: Callback<'_>) -> Result<(), Box<dyn Error>> {
28-
match self {
29-
DieselPool::Pool { pool, .. } | DieselPool::BackgroundJobPool { pool } => {
30-
f(&*pool.get()?)
31-
}
32-
DieselPool::Test(connection) => f(&connection.lock()),
33-
}
34-
}
35-
3624
pub(crate) fn new(
3725
url: &str,
3826
config: &config::DatabasePools,
@@ -74,7 +62,7 @@ impl DieselPool {
7462
.expect("failed to establish connection");
7563
conn.begin_test_transaction()
7664
.expect("failed to begin test transaction");
77-
DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
65+
DieselPool::Test(Arc::new(Mutex::new(conn)))
7866
}
7967

8068
pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
@@ -92,7 +80,7 @@ impl DieselPool {
9280
}
9381
}),
9482
DieselPool::BackgroundJobPool { pool } => Ok(DieselPooledConn::Pool(pool.get()?)),
95-
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
83+
DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.try_lock().unwrap())),
9684
}
9785
}
9886

@@ -136,9 +124,10 @@ pub struct PoolState {
136124
pub idle_connections: u32,
137125
}
138126

127+
#[allow(clippy::large_enum_variant)]
139128
pub enum DieselPooledConn<'a> {
140129
Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
141-
Test(ReentrantMutexGuard<'a, PgConnection>),
130+
Test(MutexGuard<'a, PgConnection>),
142131
}
143132

144133
impl Deref for DieselPooledConn<'_> {

src/swirl/runner.rs

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,6 @@ impl Runner {
6868
}
6969
}
7070

71-
impl Runner {
72-
#[doc(hidden)]
73-
/// For use in integration tests
74-
pub(super) fn connection_pool(&self) -> &DieselPool {
75-
&self.connection_pool
76-
}
77-
}
78-
7971
impl Runner {
8072
/// Runs all pending jobs in the queue
8173
///
@@ -120,20 +112,18 @@ impl Runner {
120112

121113
fn run_single_job(&self, sender: SyncSender<Event>) {
122114
let environment = self.environment.clone();
123-
// FIXME: https://github.com/sfackler/r2d2/pull/70
124-
let connection_pool = AssertUnwindSafe(self.connection_pool().clone());
125-
self.get_single_job(sender, move |job| {
115+
self.get_single_job(sender, move |job, conn| {
126116
let job = Job::from_value(&job.job_type, job.data)?;
127-
128-
// Make sure to move the whole `AssertUnwindSafe`
129-
let connection_pool = connection_pool;
130-
job.perform(&environment, &connection_pool.0)
117+
job.perform(&environment, conn)
131118
})
132119
}
133120

134121
fn get_single_job<F>(&self, sender: SyncSender<Event>, f: F)
135122
where
136-
F: FnOnce(storage::BackgroundJob) -> Result<(), PerformError> + Send + UnwindSafe + 'static,
123+
F: FnOnce(storage::BackgroundJob, &PgConnection) -> Result<(), PerformError>
124+
+ Send
125+
+ UnwindSafe
126+
+ 'static,
137127
{
138128
use diesel::result::Error::RollbackTransaction;
139129

@@ -166,9 +156,18 @@ impl Runner {
166156
};
167157
let job_id = job.id;
168158

169-
let result = catch_unwind(|| f(job))
170-
.map_err(|e| try_to_extract_panic_info(&e))
171-
.and_then(|r| r);
159+
let result = conn
160+
.transaction(|| {
161+
let conn = AssertUnwindSafe(conn);
162+
catch_unwind(|| {
163+
// Ensure the whole `AssertUnwindSafe(_)` is moved
164+
let conn = conn;
165+
f(job, conn.0)
166+
})
167+
.map_err(|e| try_to_extract_panic_info(&e))
168+
})
169+
// TODO: Replace with flatten() once that stabilizes
170+
.and_then(std::convert::identity);
172171

173172
match result {
174173
Ok(_) => storage::delete_successful_job(conn, job_id)?,
@@ -269,15 +268,15 @@ mod tests {
269268
let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
270269
let return_barrier2 = return_barrier.clone();
271270

272-
runner.get_single_job(dummy_sender(), move |job| {
271+
runner.get_single_job(dummy_sender(), move |job, _| {
273272
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
274273
assert_eq!(first_job_id, job.id);
275274
return_barrier.0.wait(); // Wait for thread 2 to lock its job
276275
Ok(())
277276
});
278277

279278
fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
280-
runner.get_single_job(dummy_sender(), move |job| {
279+
runner.get_single_job(dummy_sender(), move |job, _| {
281280
assert_eq!(second_job_id, job.id);
282281
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
283282
Ok(())
@@ -293,7 +292,7 @@ mod tests {
293292
let runner = runner();
294293
create_dummy_job(&runner);
295294

296-
runner.get_single_job(dummy_sender(), |_| Ok(()));
295+
runner.get_single_job(dummy_sender(), |_, _| Ok(()));
297296
runner.wait_for_jobs().unwrap();
298297

299298
let remaining_jobs = background_jobs
@@ -311,10 +310,10 @@ mod tests {
311310
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
312311
let barrier2 = barrier.clone();
313312

314-
runner.get_single_job(dummy_sender(), move |_| {
313+
runner.get_single_job(dummy_sender(), move |_, _| {
315314
barrier.0.wait();
316-
// error so the job goes back into the queue
317-
Err("nope".into())
315+
// The job should go back into the queue after a panic
316+
panic!();
318317
});
319318

320319
let conn = &*runner.connection().unwrap();
@@ -350,7 +349,7 @@ mod tests {
350349
let runner = runner();
351350
let job_id = create_dummy_job(&runner).id;
352351

353-
runner.get_single_job(dummy_sender(), |_| panic!());
352+
runner.get_single_job(dummy_sender(), |_, _| panic!());
354353
runner.wait_for_jobs().unwrap();
355354

356355
let tries = background_jobs

src/tests/all.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,11 @@ fn new_category<'a>(category: &'a str, slug: &'a str, description: &'a str) -> N
162162
// This reflects the configuration of our test environment. In the production application, this
163163
// does not hold true.
164164
#[test]
165-
fn multiple_live_references_to_the_same_connection_can_be_checked_out() {
166-
use std::ptr;
167-
165+
#[should_panic]
166+
fn recursive_get_of_db_conn_in_tests_will_panic() {
168167
let (app, _) = TestApp::init().empty();
169168
let app = app.as_inner();
170169

171-
let conn1 = app.primary_database.get().unwrap();
172-
let conn2 = app.primary_database.get().unwrap();
173-
let conn1_ref: &PgConnection = &conn1;
174-
let conn2_ref: &PgConnection = &conn2;
175-
176-
assert!(ptr::eq(conn1_ref, conn2_ref));
170+
let _conn1 = app.primary_database.get().unwrap();
171+
let _conn2 = app.primary_database.get().unwrap();
177172
}

0 commit comments

Comments
 (0)